Skip to content

Commit

Permalink
optimisation of service definition reconciliation loop
Browse files Browse the repository at this point in the history
  • Loading branch information
grs committed Jun 12, 2023
1 parent ad3a1f9 commit 826d37a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 70 deletions.
178 changes: 108 additions & 70 deletions cmd/service-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ type Controller struct {
externalBridges cache.SharedIndexInformer

// control loop state:
events workqueue.RateLimitingInterface
bindings map[string]*service.ServiceBindings
ports *FreePorts
events workqueue.RateLimitingInterface
bindings map[string]*service.ServiceBindings
ports *FreePorts
previousDefinitions map[string]string

// service_sync state:
disableServiceSync bool
Expand Down Expand Up @@ -452,28 +453,6 @@ func (c *Controller) updateExternalBridges() {
}
}

func (c *Controller) updateActualServices() {
for k, v := range c.bindings {
event.Recordf(ServiceControllerEvent, "Checking service for: %s", k)
err := v.RealiseIngress()
if err != nil {
event.Recordf(ServiceControllerError, "Error updating services: %s", err)
}
}
services := c.svcInformer.GetStore().List()
for _, v := range services {
svc := v.(*corev1.Service)
_, deleteSvc := c.getBindingsForService(svc)
if deleteSvc {
event.Recordf(ServiceControllerDeleteEvent, "No service binding found for %s", svc.ObjectMeta.Name)
c.DeleteService(svc)
} else if !isOwned(svc) && hasOriginalAnnotations(*svc) {
event.Recordf(ServiceControllerDeleteEvent, "Restoring service definitions for %s", svc.ObjectMeta.Name)
restoreServiceDefinitions(c.vanClient, svc.Name)
}
}
}

func (c *Controller) getBindingsForService(svc *corev1.Service) (*service.ServiceBindings, bool) {
owned := isOwned(svc)
if owned && svc.Spec.ClusterIP == "None" {
Expand All @@ -491,6 +470,29 @@ func (c *Controller) getBindingsForService(svc *corev1.Service) (*service.Servic
return nil, owned
}

func (c *Controller) deleteServiceForBindings(bindings *service.ServiceBindings) error {
obj, exists, err := c.svcInformer.GetStore().GetByKey(c.namespaced(bindings.Address))
if err != nil {
return err
}
if !exists {
return nil
}
svc, ok := obj.(*corev1.Service)
if !ok {
return fmt.Errorf("Expected Service but got %#v", obj)
}
owned := isOwned(svc)
if owned {
event.Recordf(ServiceControllerDeleteEvent, "No service binding found for %s", svc.ObjectMeta.Name)
return c.DeleteService(svc)
} else if hasOriginalAnnotations(*svc) {
event.Recordf(ServiceControllerDeleteEvent, "Restoring service definitions for %s", svc.ObjectMeta.Name)
return restoreServiceDefinitions(c.vanClient, svc.Name)
}
return nil
}

// TODO: move to pkg
func equalOwnerRefs(a, b []metav1.OwnerReference) bool {
if len(a) != len(b) {
Expand Down Expand Up @@ -716,6 +718,22 @@ func (c *Controller) getBindingsForHeadlessProxy(statefulset *appsv1.StatefulSet
return bindings
}

// returns a map with only the entries which have changed since the last call to this function
func (c *Controller) changedServiceDefinitions(latest map[string]string) map[string]string {
if c.previousDefinitions == nil {
c.previousDefinitions = latest
return latest
}
changed := map[string]string{}
for k, v2 := range latest {
if v1, ok := c.previousDefinitions[k]; !ok || v1 != v2 {
changed[k] = v2
}
}
c.previousDefinitions = latest
return changed
}

func (c *Controller) processNextEvent() bool {

obj, shutdown := c.events.Get()
Expand All @@ -742,64 +760,85 @@ func (c *Controller) processNextEvent() bool {
obj, exists, err := c.svcDefInformer.GetStore().GetByKey(name)
if err != nil {
return fmt.Errorf("Error reading skupper-services from cache: %s", err)
} else if exists {
var portAllocations map[string][]int
if c.bindings == nil {
portAllocations, err = c.initialiseServiceBindingsMap()
} else if !exists {
log.Println("skupper-services has been deleted!")
return nil
}
var portAllocations map[string][]int
if c.bindings == nil {
portAllocations, err = c.initialiseServiceBindingsMap()
if err != nil {
return err
}
}
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
return fmt.Errorf("Expected ConfigMap for %s but got %#v", name, obj)
}
c.updateServiceSync(cm)
var updated []*service.ServiceBindings
if len(cm.Data) > 0 {
// filter out any entires that have not changed since the last change event
entries := c.changedServiceDefinitions(cm.Data)
for k, v := range entries {
si := types.ServiceInterface{}
err := jsonencoding.Unmarshal([]byte(v), &si)
if err != nil {
return err
event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err)
continue
}
}
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
return fmt.Errorf("Expected ConfigMap for %s but got %#v", name, obj)
}
c.updateServiceSync(cm)
if cm.Data != nil && len(cm.Data) > 0 {
for k, v := range cm.Data {
si := types.ServiceInterface{}
err := jsonencoding.Unmarshal([]byte(v), &si)
if err == nil {
c.updateServiceBindings(si, portAllocations)

tlsSupport := kubeqdr.TlsServiceSupport{Address: si.Address, Credentials: si.TlsCredentials}
err = c.tlsManager.EnableTlsSupport(tlsSupport)
if err != nil {
event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err)
}
} else {
event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err)
}
c.updateServiceBindings(si, portAllocations)
if bindings, ok := c.bindings[si.Address]; ok {
updated = append(updated, bindings)
}
for k, v := range c.bindings {
_, ok := cm.Data[k]
if !ok {
c.deleteServiceBindings(k, v)
serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO())
tlsSupport := kubeqdr.TlsServiceSupport{Address: v.Address, Credentials: v.TlsCredentials}
err = c.tlsManager.DisableTlsSupport(tlsSupport, serviceList)
if err != nil {
event.Recordf(ServiceControllerError, "Disabling TLS support for Skupper credentials has failed: %s", err)
}
}

tlsSupport := kubeqdr.TlsServiceSupport{Address: si.Address, Credentials: si.TlsCredentials}
err = c.tlsManager.EnableTlsSupport(tlsSupport)
if err != nil {
event.Recordf(ServiceControllerError, "Could not parse service definition for %s: %s", k, err)
}
} else if len(c.bindings) > 0 {
for k, v := range c.bindings {
}
for k, v := range c.bindings {
if _, ok := cm.Data[k]; !ok {
c.deleteServiceBindings(k, v)
serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO())
tlsSupport := kubeqdr.TlsServiceSupport{
Address: v.Address,
Credentials: v.TlsCredentials,
err = c.deleteServiceForBindings(v)
if err != nil {
event.Recordf(ServiceControllerError, "Error deleting service for binding with address %s: %s", v.Address, err)
}
serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO())
tlsSupport := kubeqdr.TlsServiceSupport{Address: v.Address, Credentials: v.TlsCredentials}
err = c.tlsManager.DisableTlsSupport(tlsSupport, serviceList)
if err != nil {
event.Recordf(ServiceControllerError, "Disabling TLS support for Skupper credentials has failed: %s", err)
}
}
}
} else if len(c.bindings) > 0 {
c.changedServiceDefinitions(map[string]string{})
for k, v := range c.bindings {
c.deleteServiceBindings(k, v)
err = c.deleteServiceForBindings(v)
if err != nil {
event.Recordf(ServiceControllerError, "Error deleting service for binding with address %s: %s", v.Address, err)
}
serviceList, err := c.vanClient.ServiceInterfaceList(context.TODO())
tlsSupport := kubeqdr.TlsServiceSupport{
Address: v.Address,
Credentials: v.TlsCredentials,
}
err = c.tlsManager.DisableTlsSupport(tlsSupport, serviceList)
if err != nil {
event.Recordf(ServiceControllerError, "Disabling TLS support for Skupper credentials has failed: %s", err)
}
}
}
c.updateBridgeConfig(c.namespaced(types.TransportConfigMapName))
c.updateActualServices()
for _, bindings := range updated {
err = bindings.RealiseIngress()
if err != nil {
return err
}
}
c.updateHeadlessProxies()
c.updateExternalBridges()
case "bridges":
Expand Down Expand Up @@ -856,7 +895,6 @@ func (c *Controller) processNextEvent() bool {
event.Recordf(ServiceControllerEvent, "Got targetpods event %s", name)
// name is the address of the skupper service
c.updateBridgeConfig(c.namespaced(types.TransportConfigMapName))
c.updateActualServices()
case "statefulset":
event.Recordf(ServiceControllerEvent, "Got statefulset proxy event %s", name)
obj, exists, err := c.headlessInformer.GetStore().GetByKey(name)
Expand Down
3 changes: 3 additions & 0 deletions pkg/service/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ func NewServiceBindings(required types.ServiceInterface, ports []int, bindingCon
}

func (bindings *ServiceBindings) RealiseIngress() error {
if bindings.ingressBinding == nil {
return nil
}
return bindings.ingressBinding.Realise(bindings)
}

Expand Down

0 comments on commit 826d37a

Please sign in to comment.