Skip to content

Commit

Permalink
Stop crashing when losing connection (#14)
Browse files Browse the repository at this point in the history
This PR contains two changes:

1. Stop crashing when losing connection with some cluster.
2. In cross-cluster synchronization, the artifact on the target cluster
is now deleted when this cluster is removed from the configuration
annotation on the source cluster.
  • Loading branch information
mvleandro authored Aug 10, 2023
1 parent 9f0c288 commit ac9216a
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 21 deletions.
2 changes: 1 addition & 1 deletion application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func New() *cli.App {
app := cli.NewApp()
app.Name = "Keess"
app.Version = "v0.1.8"
app.Version = "v0.1.9"
app.Usage = "Keep stuff synchronized."
app.Description = "Keep secrets and configmaps synchronized."
app.Suggest = true
Expand Down
4 changes: 2 additions & 2 deletions chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.8
version: 0.1.9

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v0.1.8"
appVersion: "v0.1.9"
3 changes: 3 additions & 0 deletions kube_syncer/abstractions/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var EntitiesToAllNamespaces map[string]map[string]runtime.Object = make(map[stri
// A map containing the ConfigMaps that sould be present in every Namespace that matches with the configured label
var EntitiesToLabeledNamespaces map[string]map[string]runtime.Object = make(map[string]map[string]runtime.Object)

// Slice containing the connected clusters.
var ConnectedClusters []string = []string{}

// === Functions === //

// Check if exists a valid annotation in an annotation map.
Expand Down
23 changes: 22 additions & 1 deletion kube_syncer/abstractions/configmap_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,18 @@ func (c ConfigMapEvent) Sync(sourceContext string, kubeClients *map[string]*kube

if namespace.Labels[label] == strings.Trim(value, "\"") {
namespaces = append(namespaces, namespaceName)
Logger.Debugf("The namespace '%s' contains the synchronization label '%s'. The configmap '%s' will be synchronized.", namespaceName, namespaceLabelAnnotation, configMap.Name)
Logger.Debugf("The namespace '%s' contains the synchronization label '%s'. The configMap '%s' will be synchronized.", namespaceName, namespaceLabelAnnotation, configMap.Name)
}

}
EntitiesToLabeledNamespaces["ConfigMaps"][configMap.Name] = configMap
}
}

if c.Type == Deleted {
delete(EntitiesToAllNamespaces["ConfigMaps"], configMap.Name)
}

for _, destinationNamespace := range namespaces {
if configMap.Namespace == destinationNamespace {
continue
Expand All @@ -87,6 +92,17 @@ func (c ConfigMapEvent) Sync(sourceContext string, kubeClients *map[string]*kube
annotation := configMap.Annotations[ClusterAnnotation]
clusters := StringToSlice(annotation)

var removedClusters []string = []string{}
for _, destinationContext := range ConnectedClusters {
contains := false
for _, cluster := range clusters {
contains = contains || cluster == destinationContext
}
if !contains {
removedClusters = append(removedClusters, destinationContext)
}
}

for _, destinationContext := range clusters {
if sourceContext == destinationContext {
continue
Expand All @@ -103,6 +119,11 @@ func (c ConfigMapEvent) Sync(sourceContext string, kubeClients *map[string]*kube
kubeEntity.Delete()
}
}

for _, removedCluster := range removedClusters {
kubeEntity := NewKubernetesEntity(*kubeClients, configMap, ConfigMapEntity, sourceNamespace, sourceNamespace, sourceContext, removedCluster)
kubeEntity.Delete()
}
}

if c.Type == Modified {
Expand Down
16 changes: 16 additions & 0 deletions kube_syncer/abstractions/secret_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ func (c SecretEvent) Sync(sourceContext string, kubeClients *map[string]*kuberne
annotation := secret.Annotations[ClusterAnnotation]
clusters := StringToSlice(annotation)

var removedClusters []string = []string{}
for _, destinationContext := range ConnectedClusters {
contains := false
for _, cluster := range clusters {
contains = contains || cluster == destinationContext
}
if !contains {
removedClusters = append(removedClusters, destinationContext)
}
}

for _, destinationContext := range clusters {
if sourceContext == destinationContext {
continue
Expand All @@ -108,6 +119,11 @@ func (c SecretEvent) Sync(sourceContext string, kubeClients *map[string]*kuberne
kubeEntity.Delete()
}
}

for _, removedCluster := range removedClusters {
kubeEntity := NewKubernetesEntity(*kubeClients, secret, SecretEntity, sourceNamespace, sourceNamespace, sourceContext, removedCluster)
kubeEntity.Delete()
}
}

if c.Type == Modified {
Expand Down
38 changes: 21 additions & 17 deletions kube_syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"k8s.io/client-go/util/homedir"
)

// Slice containing the connected clusters.
var ConnectedClusters []string = []string{}

// Represents a base structure for any syncer.
type Syncer struct {
kubeClients map[string]*kubernetes.Clientset
Expand Down Expand Up @@ -56,7 +59,7 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex

zapLogger, err := loggerConfig.Build()
if err != nil {
return err
s.logger.Error(err)
}
abstractions.Logger = zapLogger.Sugar()
s.logger = abstractions.Logger
Expand Down Expand Up @@ -87,7 +90,7 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
s.logger.Error(err)
}

inClusterConfig, err := rest.InClusterConfig()
Expand All @@ -97,31 +100,32 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex
// create the clientset
client, err = kubernetes.NewForConfig(inClusterConfig)
if err != nil {
return err
s.logger.Error(err)
}
s.logger.Info("Config loaded from service account.")
} else {
// create the clientset
client, err = kubernetes.NewForConfig(config)
if err != nil {
return err
s.logger.Error(err)
}
s.logger.Info("Config loaded from kube config.")
}

s.kubeClients = map[string]*kubernetes.Clientset{}
s.kubeClients[s.sourceContext] = client
abstractions.ConnectedClusters = destinationContexts

for _, context := range destinationContexts {
config, err := buildConfigWithContextFromFlags(context, *kubeconfig)
if err != nil {
panic(err)
s.logger.Error(err)
}

// create the clientset
client, err := kubernetes.NewForConfig(config)
if err != nil {
return err
s.logger.Error(err)
}

s.kubeClients[context] = client
Expand Down Expand Up @@ -157,7 +161,7 @@ func (s *Syncer) Run() error {
// First of all we need to load all namespaces.
namespaceList, err := kubeClient.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
s.logger.Error(err)
}

for _, namespace := range namespaceList.Items {
Expand All @@ -169,7 +173,7 @@ func (s *Syncer) Run() error {
LabelSelector: abstractions.LabelSelector,
})
if err != nil {
return err
s.logger.Error(err)
}

for _, configMap := range configMapList.Items {
Expand All @@ -187,7 +191,7 @@ func (s *Syncer) Run() error {
LabelSelector: abstractions.LabelSelector,
})
if err != nil {
return err
s.logger.Error(err)
}

for _, secret := range secretList.Items {
Expand All @@ -207,7 +211,7 @@ func (s *Syncer) Run() error {
LabelSelector: abstractions.ManagedLabelSelector,
})
if err != nil {
return err
s.logger.Error(err)
}

for _, configMap := range managedConfigMapList.Items {
Expand All @@ -226,7 +230,7 @@ func (s *Syncer) Run() error {
sourceConfigMap, err := sourceKubeClient.CoreV1().ConfigMaps(sourceNamespace).Get(context.TODO(), configMap.Name, metav1.GetOptions{})

if err != nil && !errorsTypes.IsNotFound(err) {
return err
s.logger.Error(err)
}

// Check if source configmap was deleted.
Expand All @@ -235,7 +239,7 @@ func (s *Syncer) Run() error {

err := entity.Delete()
if err != nil && !errorsTypes.IsNotFound(err) {
return err
s.logger.Error(err)
} else {
s.logger.Infof("The ConfigMap '%s' was deleted in namespace '%s' on context '%s' because it was deleted in the source namespace '%s' on the source context '%s'.", configMap.Name, configMap.Namespace, currentContext, sourceNamespace, sourceContext)
}
Expand All @@ -247,7 +251,7 @@ func (s *Syncer) Run() error {
entity = abstractions.NewKubernetesEntity(s.kubeClients, sourceConfigMap, abstractions.ConfigMapEntity, sourceNamespace, configMap.Namespace, sourceContext, currentContext)
err := entity.Update()
if err != nil {
return err
s.logger.Error(err)
} else {
s.logger.Infof("The ConfigMap '%s' was updated in namespace '%s' on context '%s' because It was updated in the source namespace '%s' on the source context '%s'.", configMap.Name, configMap.Namespace, currentContext, sourceNamespace, sourceContext)
}
Expand All @@ -260,7 +264,7 @@ func (s *Syncer) Run() error {
LabelSelector: abstractions.ManagedLabelSelector,
})
if err != nil {
return err
s.logger.Error(err)
}

for _, secret := range managedSecretList.Items {
Expand All @@ -279,7 +283,7 @@ func (s *Syncer) Run() error {
sourceSecret, err := sourceKubeClient.CoreV1().Secrets(sourceNamespace).Get(context.TODO(), secret.Name, metav1.GetOptions{})

if err != nil && !errorsTypes.IsNotFound(err) {
return err
s.logger.Error(err)
}

// Check if source secret was deleted.
Expand All @@ -288,7 +292,7 @@ func (s *Syncer) Run() error {

err := entity.Delete()
if err != nil && !errorsTypes.IsNotFound(err) {
return err
s.logger.Error(err)
} else {
s.logger.Infof("The Secret '%s' was deleted in namespace '%s' on context '%s' because It was deleted in the source namespace '%s' on the source context '%s'.", secret.Name, secret.Namespace, currentContext, sourceNamespace, sourceContext)
}
Expand All @@ -300,7 +304,7 @@ func (s *Syncer) Run() error {
entity = abstractions.NewKubernetesEntity(s.kubeClients, sourceSecret, abstractions.SecretEntity, sourceNamespace, secret.Namespace, sourceContext, currentContext)
err := entity.Update()
if err != nil {
return err
s.logger.Error(err)
} else {
s.logger.Infof("The Secret '%s' was updated in namespace '%s' on context '%s' because It was updated in the source namespace '%s' on the source context '%s'.", secret.Name, secret.Namespace, currentContext, sourceNamespace, sourceContext)
}
Expand Down

0 comments on commit ac9216a

Please sign in to comment.