Skip to content

Commit

Permalink
Move controller packages under internal
Browse files Browse the repository at this point in the history
  • Loading branch information
grs committed Jan 9, 2025
1 parent 998432f commit 136d668
Show file tree
Hide file tree
Showing 67 changed files with 314 additions and 915 deletions.
15 changes: 4 additions & 11 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"flag"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"syscall"
Expand All @@ -13,18 +12,12 @@ import (

iflag "github.com/skupperproject/skupper/internal/flag"
internalclient "github.com/skupperproject/skupper/internal/kube/client"
"github.com/skupperproject/skupper/pkg/kube/grants"
"github.com/skupperproject/skupper/pkg/kube/securedaccess"
"github.com/skupperproject/skupper/internal/kube/controller"
"github.com/skupperproject/skupper/internal/kube/grants"
"github.com/skupperproject/skupper/internal/kube/securedaccess"
"github.com/skupperproject/skupper/pkg/version"
)

var controllerLogLevel = new(slog.LevelVar) // defaults to Info

func init() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: controllerLogLevel}))
slog.SetDefault(logger)
}

func describe(i interface{}) {
fmt.Printf("(%v, %T)\n", i, i)
fmt.Println()
Expand Down Expand Up @@ -95,7 +88,7 @@ func main() {
log.Fatal("Error getting van client ", err.Error())
}

controller, err := NewController(cli, grantConfig, securedAccessConfig, watchNamespace, cli.Namespace)
controller, err := controller.NewController(cli, grantConfig, securedAccessConfig, watchNamespace, cli.Namespace)
if err != nil {
log.Fatal("Error getting new site controller ", err.Error())
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/kube-adaptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
iflag "github.com/skupperproject/skupper/internal/flag"
"github.com/skupperproject/skupper/internal/kube/adaptor"
internalclient "github.com/skupperproject/skupper/internal/kube/client"
"github.com/skupperproject/skupper/pkg/kube"
"github.com/skupperproject/skupper/pkg/version"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func main() {
}

log.Println("Waiting for Skupper router to be ready")
_, err = kube.WaitForPodsSelectorStatus(cli.GetNamespace(), cli.Kube, "skupper.io/component=router", corev1.PodRunning, time.Second*180, time.Second*5)
_, err = internalclient.WaitForPodsSelectorStatus(cli.GetNamespace(), cli.Kube, "skupper.io/component=router", corev1.PodRunning, time.Second*180, time.Second*5)
if err != nil {
log.Fatal("Error waiting for router pods to be ready ", err.Error())
}
Expand Down
51 changes: 49 additions & 2 deletions internal/kube/adaptor/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"github.com/skupperproject/skupper/api/types"
"github.com/skupperproject/skupper/internal/config"
internalclient "github.com/skupperproject/skupper/internal/kube/client"
"github.com/skupperproject/skupper/pkg/kube"
kubeflow "github.com/skupperproject/skupper/pkg/kube/flow"
"github.com/skupperproject/skupper/pkg/vanflow"
"github.com/skupperproject/skupper/pkg/vanflow/session"
"github.com/skupperproject/skupper/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1informer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
Expand Down Expand Up @@ -55,7 +57,7 @@ func siteCollector(ctx context.Context, cli *internalclient.KubeClient) {
UID: current.ObjectMeta.UID,
}

existing, err := kube.NewConfigMap(types.NetworkStatusConfigMapName, &siteData, nil, nil, &owner, cli.Namespace, cli.Kube)
existing, err := newConfigMap(types.NetworkStatusConfigMapName, &siteData, nil, nil, &owner, cli.Namespace, cli.Kube)
if err != nil && existing == nil {
log.Fatal("Failed to create site status config map ", err.Error())
}
Expand Down Expand Up @@ -169,3 +171,48 @@ func deploymentName() string {
return deployment

}

func newConfigMap(name string, data *map[string]string, labels *map[string]string, annotations *map[string]string, owner *metav1.OwnerReference, namespace string, kubeclient kubernetes.Interface) (*corev1.ConfigMap, error) {
configMaps := kubeclient.CoreV1().ConfigMaps(namespace)
existing, err := configMaps.Get(context.TODO(), name, metav1.GetOptions{})
if err == nil {
//TODO: already exists
return existing, nil
} else if errors.IsNotFound(err) {
cm := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}

if data != nil {
cm.Data = *data
}
if labels != nil {
cm.ObjectMeta.Labels = *labels
}
if annotations != nil {
cm.ObjectMeta.Annotations = *annotations
}
if owner != nil {
cm.ObjectMeta.OwnerReferences = []metav1.OwnerReference{
*owner,
}
}

created, err := configMaps.Create(context.TODO(), cm, metav1.CreateOptions{})

if err != nil {
return nil, fmt.Errorf("Failed to create config map: %w", err)
} else {
return created, nil
}
} else {
cm := &corev1.ConfigMap{}
return cm, fmt.Errorf("Failed to check existing config maps: %w", err)
}
}
11 changes: 5 additions & 6 deletions internal/kube/adaptor/config_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,26 @@ import (
corev1 "k8s.io/api/core/v1"

internalclient "github.com/skupperproject/skupper/internal/kube/client"
"github.com/skupperproject/skupper/pkg/kube"
"github.com/skupperproject/skupper/pkg/qdr"
)

// Syncs the live router config with the configmap (bridge configuration,
// secrets for services with TLS enabled, and secrets and connectors for links)
type ConfigSync struct {
agentPool *qdr.AgentPool
controller *kube.Controller
controller *internalclient.Controller
namespace string
profileSyncer *SslProfileSyncer
config *kube.ConfigMapWatcher
secrets *kube.SecretWatcher
config *internalclient.ConfigMapWatcher
secrets *internalclient.SecretWatcher
path string
routerConfigMap string
}

func NewConfigSync(cli internalclient.Clients, namespace string, path string, routerConfigMap string) *ConfigSync {
configSync := &ConfigSync{
agentPool: qdr.NewAgentPool("amqp://localhost:5672", nil),
controller: kube.NewController("config-sync", cli),
controller: internalclient.NewController("config-sync", cli),
namespace: namespace,
profileSyncer: newSslProfileSyncer(path),
path: path,
Expand All @@ -41,7 +40,7 @@ func (c *ConfigSync) Start(stopCh <-chan struct{}) error {
if err := mkdir(c.path); err != nil {
return err
}
c.config = c.controller.WatchConfigMaps(kube.ByName(c.routerConfigMap), c.namespace, c.configEvent)
c.config = c.controller.WatchConfigMaps(internalclient.ByName(c.routerConfigMap), c.namespace, c.configEvent)
c.secrets = c.controller.WatchAllSecrets(c.namespace, c.secretEvent)
c.controller.StartWatchers(stopCh)
log.Printf("CONFIG_SYNC: Waiting for informers to sync...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

internalclient "github.com/skupperproject/skupper/internal/kube/client"
skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1"
"github.com/skupperproject/skupper/pkg/certs"
"github.com/skupperproject/skupper/pkg/kube"
)

type CertificateManager interface {
Expand All @@ -26,12 +26,12 @@ type CertificateManager interface {
type CertificateManagerImpl struct {
definitions map[string]*skupperv2alpha1.Certificate
secrets map[string]*corev1.Secret
certificateWatcher *kube.CertificateWatcher
secretWatcher *kube.SecretWatcher
controller *kube.Controller
certificateWatcher *internalclient.CertificateWatcher
secretWatcher *internalclient.SecretWatcher
controller *internalclient.Controller
}

func NewCertificateManager(controller *kube.Controller) *CertificateManagerImpl {
func NewCertificateManager(controller *internalclient.Controller) *CertificateManagerImpl {
return &CertificateManagerImpl{
definitions: map[string]*skupperv2alpha1.Certificate{},
secrets: map[string]*corev1.Secret{},
Expand Down
19 changes: 2 additions & 17 deletions pkg/kube/controller.go → internal/kube/client/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package kube
package client

import (
"context"
Expand Down Expand Up @@ -45,7 +45,6 @@ import (
routev1interfaces "github.com/openshift/client-go/route/informers/externalversions/internalinterfaces"
routev1informer "github.com/openshift/client-go/route/informers/externalversions/route/v1"

internalclient "github.com/skupperproject/skupper/internal/kube/client"
"github.com/skupperproject/skupper/internal/kube/resource"
skupperv2alpha1 "github.com/skupperproject/skupper/pkg/apis/skupper/v2alpha1"
skupperclient "github.com/skupperproject/skupper/pkg/generated/client/clientset/versioned"
Expand Down Expand Up @@ -94,7 +93,7 @@ type Controller struct {
watchers []Watcher
}

func NewController(name string, clients internalclient.Clients) *Controller {
func NewController(name string, clients Clients) *Controller {
return &Controller{
eventKey: name + "Event",
errorKey: name + "Error",
Expand Down Expand Up @@ -160,15 +159,6 @@ func (c *Controller) GetDeploymentForPod(podName string, namespace string) (*app
return c.GetKubeClient().AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{})
}

func (c *Controller) NewWatchers(client kubernetes.Interface) Watchers {
return &Controller{
eventKey: c.eventKey,
errorKey: c.errorKey,
client: client,
queue: c.queue,
}
}

func (c *Controller) AddEvent(o interface{}) {
c.queue.Add(o)
}
Expand Down Expand Up @@ -284,11 +274,6 @@ func (c *Controller) HaveWatchersSynced() []cache.InformerSynced {
return combined
}

type Watchers interface {
WatchConfigMaps(options internalinterfaces.TweakListOptionsFunc, namespace string, handler ConfigMapHandler) *ConfigMapWatcher
WatchSecrets(options internalinterfaces.TweakListOptionsFunc, namespace string, handler SecretHandler) *SecretWatcher
}

func (c *Controller) WatchConfigMaps(options internalinterfaces.TweakListOptionsFunc, namespace string, handler ConfigMapHandler) *ConfigMapWatcher {
watcher := &ConfigMapWatcher{
handler: handler,
Expand Down
72 changes: 72 additions & 0 deletions internal/kube/client/pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/skupperproject/skupper/pkg/utils"
)

func IsPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady {
return c.Status == corev1.ConditionTrue
}
}
return false
}

func IsPodRunning(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodRunning
}

func getPods(selector string, namespace string, cli kubernetes.Interface) ([]corev1.Pod, error) {
options := metav1.ListOptions{LabelSelector: selector}
podList, err := cli.CoreV1().Pods(namespace).List(context.TODO(), options)
if err != nil {
return nil, err
}
return podList.Items, err
}

func WaitForPodsSelectorStatus(namespace string, clientset kubernetes.Interface, selector string, status corev1.PodPhase, timeout time.Duration, interval time.Duration) ([]corev1.Pod, error) {
var pods []corev1.Pod
var pod corev1.Pod
var err error

ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
err = utils.RetryWithContext(ctx, interval, func() (bool, error) {
pods, err = getPods(selector, namespace, clientset)
if err != nil {
// pod does not exist yet
return false, nil
}
for _, pod = range pods {
if pod.Status.Phase != status {
return false, nil
}
}
return true, nil
})

return pods, err
}
Loading

0 comments on commit 136d668

Please sign in to comment.