diff --git a/cmd/service-controller/controller.go b/cmd/service-controller/controller.go index 40b5bb895..f16a8bef5 100644 --- a/cmd/service-controller/controller.go +++ b/cmd/service-controller/controller.go @@ -50,9 +50,10 @@ type Controller struct { externalBridges cache.SharedIndexInformer // control loop state: - events workqueue.RateLimitingInterface - bindings map[string]*service.ServiceBindings - ports *FreePorts + events workqueue.RateLimitingInterface + bindings map[string]*service.ServiceBindings + ports *FreePorts + previousDefinitions map[string]string // service_sync state: disableServiceSync bool @@ -452,28 +453,6 @@ func (c *Controller) updateExternalBridges() { } } -func (c *Controller) updateActualServices() { - for k, v := range c.bindings { - event.Recordf(ServiceControllerEvent, "Checking service for: %s", k) - err := v.RealiseIngress() - if err != nil { - event.Recordf(ServiceControllerError, "Error updating services: %s", err) - } - } - services := c.svcInformer.GetStore().List() - for _, v := range services { - svc := v.(*corev1.Service) - _, deleteSvc := c.getBindingsForService(svc) - if deleteSvc { - event.Recordf(ServiceControllerDeleteEvent, "No service binding found for %s", svc.ObjectMeta.Name) - c.DeleteService(svc) - } else if !isOwned(svc) && hasOriginalAnnotations(*svc) { - event.Recordf(ServiceControllerDeleteEvent, "Restoring service definitions for %s", svc.ObjectMeta.Name) - restoreServiceDefinitions(c.vanClient, svc.Name) - } - } -} - func (c *Controller) getBindingsForService(svc *corev1.Service) (*service.ServiceBindings, bool) { owned := isOwned(svc) if owned && svc.Spec.ClusterIP == "None" { @@ -491,6 +470,29 @@ func (c *Controller) getBindingsForService(svc *corev1.Service) (*service.Servic return nil, owned } +func (c *Controller) deleteServiceForBindings(bindings *service.ServiceBindings) error { + obj, exists, err := c.svcInformer.GetStore().GetByKey(c.namespaced(bindings.Address)) + if err != nil { + return err + } + if !exists { + return nil + } + svc, ok := obj.(*corev1.Service) + if !ok { + return fmt.Errorf("Expected Service but got %#v", obj) + } + owned := isOwned(svc) + if owned { + event.Recordf(ServiceControllerDeleteEvent, "No service binding found for %s", svc.ObjectMeta.Name) + return c.DeleteService(svc) + } else if hasOriginalAnnotations(*svc) { + event.Recordf(ServiceControllerDeleteEvent, "Restoring service definitions for %s", svc.ObjectMeta.Name) + return restoreServiceDefinitions(c.vanClient, svc.Name) + } + return nil +} + // TODO: move to pkg func equalOwnerRefs(a, b []metav1.OwnerReference) bool { if len(a) != len(b) { @@ -716,6 +718,22 @@ func (c *Controller) getBindingsForHeadlessProxy(statefulset *appsv1.StatefulSet return bindings } +// returns a map with only the entries which have changed since the last call to this function +func (c *Controller) changedServiceDefinitions(latest map[string]string) map[string]string { + if c.previousDefinitions == nil { + c.previousDefinitions = latest + return latest + } + changed := map[string]string{} + for k, v2 := range latest { + if v1, ok := c.previousDefinitions[k]; !ok || v1 != v2 { + changed[k] = v2 + } + } + c.previousDefinitions = latest + return changed +} + func (c *Controller) processNextEvent() bool { obj, shutdown := c.events.Get() @@ -742,64 +760,85 @@ func (c *Controller) processNextEvent() bool { obj, exists, err := c.svcDefInformer.GetStore().GetByKey(name) if err != nil { return fmt.Errorf("Error reading skupper-services from cache: %s", err) - } else if exists { - var portAllocations map[string][]int - if c.bindings == nil { - portAllocations, err = c.initialiseServiceBindingsMap() + } else if !exists { + log.Println("skupper-services has been deleted!") + return nil + } + var portAllocations map[string][]int + if c.bindings == nil { + portAllocations, err = c.initialiseServiceBindingsMap() + if err != nil { + return err + } + } + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + return fmt.Errorf("Expected ConfigMap for %s but got %#v", name, obj) + } + c.updateServiceSync(cm) + var updated []*service.ServiceBindings + if len(cm.Data) > 0 { + // filter out any entires that have not changed since the last change event + entries := c.changedServiceDefinitions(cm.Data) + for k, v := range entries { + si := types.ServiceInterface{} + err := jsonencoding.Unmarshal([]byte(v), &si) if err != nil { - return err + event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err) + continue } - } - cm, ok := obj.(*corev1.ConfigMap) - if !ok { - return fmt.Errorf("Expected ConfigMap for %s but got %#v", name, obj) - } - c.updateServiceSync(cm) - if cm.Data != nil && len(cm.Data) > 0 { - for k, v := range cm.Data { - si := types.ServiceInterface{} - err := jsonencoding.Unmarshal([]byte(v), &si) - if err == nil { - c.updateServiceBindings(si, portAllocations) - - tlsSupport := kubeqdr.TlsServiceSupport{Address: si.Address, Credentials: si.TlsCredentials} - err = c.tlsManager.EnableTlsSupport(tlsSupport) - if err != nil { - event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err) - } - } else { - event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err) - } + c.updateServiceBindings(si, portAllocations) + if bindings, ok := c.bindings[si.Address]; ok { + updated = append(updated, bindings) } - for k, v := range c.bindings { - _, ok := cm.Data[k] - if !ok { - c.deleteServiceBindings(k, v) - serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO()) - tlsSupport := kubeqdr.TlsServiceSupport{Address: v.Address, Credentials: v.TlsCredentials} - err = c.tlsManager.DisableTlsSupport(tlsSupport, serviceList) - if err != nil { - event.Recordf(ServiceControllerError, "Disabling TLS support for Skupper credentials has failed: %s", err) - } - } + + tlsSupport := kubeqdr.TlsServiceSupport{Address: si.Address, Credentials: si.TlsCredentials} + err = c.tlsManager.EnableTlsSupport(tlsSupport) + if err != nil { + event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err) } - } else if len(c.bindings) > 0 { - for k, v := range c.bindings { + } + for k, v := range c.bindings { + if _, ok := cm.Data[k]; !ok { c.deleteServiceBindings(k, v) - serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO()) - tlsSupport := kubeqdr.TlsServiceSupport{ - Address: v.Address, - Credentials: v.TlsCredentials, + err = c.deleteServiceForBindings(v) + if err != nil { + event.Recordf(ServiceControllerError, "Error deleting service for binding with address %s: %s", v.Address, err) } + serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO()) + tlsSupport := kubeqdr.TlsServiceSupport{Address: v.Address, Credentials: v.TlsCredentials} err = c.tlsManager.DisableTlsSupport(tlsSupport, serviceList) if err != nil { event.Recordf(ServiceControllerError, "Disabling TLS support for Skupper credentials has failed: %s", err) } } } + } else if len(c.bindings) > 0 { + c.changedServiceDefinitions(map[string]string{}) + for k, v := range c.bindings { + c.deleteServiceBindings(k, v) + err = c.deleteServiceForBindings(v) + if err != nil { + event.Recordf(ServiceControllerError, "Error deleting service for binding with address %s: %s", v.Address, err) + } + serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO()) + tlsSupport := kubeqdr.TlsServiceSupport{ + Address: v.Address, + Credentials: v.TlsCredentials, + } + err = c.tlsManager.DisableTlsSupport(tlsSupport, serviceList) + if err != nil { + event.Recordf(ServiceControllerError, "Disabling TLS support for Skupper credentials has failed: %s", err) + } + } } c.updateBridgeConfig(c.namespaced(types.TransportConfigMapName)) - c.updateActualServices() + for _, bindings := range updated { + err = bindings.RealiseIngress() + if err != nil { + return err + } + } c.updateHeadlessProxies() c.updateExternalBridges() case "bridges": @@ -856,7 +895,6 @@ func (c *Controller) processNextEvent() bool { event.Recordf(ServiceControllerEvent, "Got targetpods event %s", name) // name is the address of the skupper service c.updateBridgeConfig(c.namespaced(types.TransportConfigMapName)) - c.updateActualServices() case "statefulset": event.Recordf(ServiceControllerEvent, "Got statefulset proxy event %s", name) obj, exists, err := c.headlessInformer.GetStore().GetByKey(name) diff --git a/pkg/service/bindings.go b/pkg/service/bindings.go index 994800794..a657116d6 100644 --- a/pkg/service/bindings.go +++ b/pkg/service/bindings.go @@ -197,6 +197,9 @@ func NewServiceBindings(required types.ServiceInterface, ports []int, bindingCon } func (bindings *ServiceBindings) RealiseIngress() error { + if bindings.ingressBinding == nil { + return nil + } return bindings.ingressBinding.Realise(bindings) }