From 01418b84e3b5f05663e50d808006c4890bbf360e Mon Sep 17 00:00:00 2001 From: Zachary Seguin Date: Thu, 14 Jul 2022 09:50:50 -0400 Subject: [PATCH] feat: Synchronize load balancer status from the Kubernetes Service associated with the gateway (#23) Resolves #12 --- .dockerignore | 1 + .gitignore | 1 + main.go | 6 +- pkg/controller/controller.go | 22 +++++- pkg/controller/handler.go | 24 +++---- pkg/controller/status.go | 101 +++++++++++++++++++++++++++ scripts/run-test.sh | 5 ++ scripts/setup-test-infrastructure.sh | 49 +++++++++++++ scripts/setup-test-resources.sh | 48 +++++++++++++ 9 files changed, 241 insertions(+), 16 deletions(-) create mode 100644 pkg/controller/status.go create mode 100755 scripts/run-test.sh create mode 100755 scripts/setup-test-infrastructure.sh create mode 100755 scripts/setup-test-resources.sh diff --git a/.dockerignore b/.dockerignore index 904ccac..78b63e4 100644 --- a/.dockerignore +++ b/.dockerignore @@ -9,3 +9,4 @@ deploy/ Makefile .github/ bin/ +_out/ diff --git a/.gitignore b/.gitignore index d93e88c..1dfc91e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ ingress-istio-controller .vscode __debug_bin bin/ +_out/ diff --git a/main.go b/main.go index 1960e7a..413c1f1 100644 --- a/main.go +++ b/main.go @@ -19,6 +19,7 @@ var ( kubeconfig string clusterDomain string defaultGateway string + scopedGateways bool ingressClass string defaultWeight int ) @@ -52,12 +53,14 @@ func main() { istioclient, clusterDomain, defaultGateway, + scopedGateways, ingressClass, defaultWeight, kubeInformerFactory.Networking().V1().Ingresses(), kubeInformerFactory.Networking().V1().IngressClasses(), kubeInformerFactory.Core().V1().Services(), - istioInformerFactory.Networking().V1beta1().VirtualServices()) + istioInformerFactory.Networking().V1beta1().VirtualServices(), + istioInformerFactory.Networking().V1beta1().Gateways()) kubeInformerFactory.Start(stopCh) istioInformerFactory.Start(stopCh) @@ -72,6 +75,7 @@ func init() { flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.StringVar(&clusterDomain, "cluster-domain", "cluster.local", "The cluster domain.") flag.StringVar(&defaultGateway, "default-gateway", "istio-system/istio-autogenerated-k8s-ingress", "The default Istio gateway used when no existing VirtualService is located matching the host.") + flag.BoolVar(&scopedGateways, "scoped-gateways", false, "Gateways are scoped to the same namespace they exist within. This will limit the Service search for Load Balancer status. In istiod, this is controlled via the PILOT_SCOPE_GATEWAY_TO_NAMESPACE environment variable.") flag.StringVar(&ingressClass, "ingress-class", "", "The ingress class annotation to monitor (empty string to skip checking annotation)") flag.IntVar(&defaultWeight, "virtual-service-weight", 100, "The weight of the Virtual Service destination.") } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 69eb3c2..df56434 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -36,6 +36,7 @@ type Controller struct { clusterDomain string defaultGateway string + scopedGateways bool ingressClass string defaultWeight int @@ -51,6 +52,9 @@ type Controller struct { virtualServicesListers istionetworkinglisters.VirtualServiceLister virtualServicesSynched cache.InformerSynced + gatewaysListers istionetworkinglisters.GatewayLister + gatewaysSynched cache.InformerSynced + workqueue workqueue.RateLimitingInterface recorder record.EventRecorder } @@ -61,12 +65,14 @@ func NewController( istioclientset istio.Interface, clusterDomain string, defaultGateway string, + scopedGateways bool, ingressClass string, defaultWeight int, ingressesInformer networkinginformers.IngressInformer, ingressClassesInformer networkinginformers.IngressClassInformer, servicesInformer corev1informers.ServiceInformer, - virtualServicesInformer istionetworkinginformers.VirtualServiceInformer) *Controller { + virtualServicesInformer istionetworkinginformers.VirtualServiceInformer, + gatewaysInformer istionetworkinginformers.GatewayInformer) *Controller { klog.Infof("setting up controller %s: %s", controllerAgentName, controllerAgentVersion) // Create event broadcaster @@ -82,6 +88,7 @@ func NewController( clusterDomain: clusterDomain, defaultGateway: defaultGateway, ingressClass: ingressClass, + scopedGateways: scopedGateways, defaultWeight: defaultWeight, ingressesLister: ingressesInformer.Lister(), ingressesSynched: ingressesInformer.Informer().HasSynced, @@ -91,6 +98,8 @@ func NewController( servicesSynched: servicesInformer.Informer().HasSynced, virtualServicesListers: virtualServicesInformer.Lister(), virtualServicesSynched: virtualServicesInformer.Informer().HasSynced, + gatewaysListers: gatewaysInformer.Lister(), + gatewaysSynched: gatewaysInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "IngressIstio"), recorder: recorder, } @@ -129,7 +138,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { klog.Info("starting controller") klog.Info("waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched); !ok { + if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok { return fmt.Errorf("failed to wait for caches to sync") } @@ -205,12 +214,19 @@ func (c *Controller) syncHandler(key string) error { } // Handle the VirtualService - err = c.handleVirtualService(ingress) + vs, err := c.handleVirtualServiceForIngress(ingress) if err != nil { klog.Errorf("failed to handle virtual service: %v", err) return err } + // Handle the Ingress status + _, err = c.handleIngressStatus(ingress, vs) + if err != nil { + klog.Errorf("failed to handle Ingress status: %v", err) + return err + } + return nil } diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 01263a9..0a66ba6 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -42,13 +42,13 @@ func (c *Controller) findExistingVirtualServiceForIngress(ingress *networkingv1. return nil, nil } -func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error { +func (c *Controller) handleVirtualServiceForIngress(ingress *networkingv1.Ingress) (*istionetworkingv1beta1.VirtualService, error) { ctx := context.Background() // Find an existing virtual service of the same name vs, err := c.findExistingVirtualServiceForIngress(ingress) if err != nil { - return err + return nil, err } // Check for conditions which cause us to handle the Ingress @@ -69,7 +69,7 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error { ingressClass, err := c.ingressClassesLister.Get(*ingress.Spec.IngressClassName) if err != nil { klog.Error("error getting IngressClass %q", *ingress.Spec.IngressClassName) - return err + return nil, err } if ingressClass.Spec.Controller == IngressIstioController { @@ -82,7 +82,7 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error { if val, ok := ingress.Annotations[IgnoreAnnotation]; ok { bval, err := strconv.ParseBool(val) if err != nil { - return fmt.Errorf("error parsing %s (%t): %v", IgnoreAnnotation, bval, err) + return nil, fmt.Errorf("error parsing %s (%t): %v", IgnoreAnnotation, bval, err) } handle = handle && !bval } @@ -92,11 +92,11 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error { if vs != nil { klog.Infof("removing owned virtualservice: \"%s/%s\"", vs.Namespace, vs.Name) err := c.istioclientset.NetworkingV1beta1().VirtualServices(vs.Namespace).Delete(ctx, vs.Name, metav1.DeleteOptions{}) - return err + return nil, err } klog.Infof("skipping ingress: \"%s/%s\"", ingress.Namespace, ingress.Name) - return nil + return nil, nil } // Identify the gateway to attach the ingress to @@ -109,14 +109,14 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error { nvs, err := c.generateVirtualService(ingress, vs, gateways) if err != nil { - return err + return nil, err } // If we don't have virtual service, then let's make one if vs == nil { - _, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Create(ctx, nvs, metav1.CreateOptions{}) + vs, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Create(ctx, nvs, metav1.CreateOptions{}) if err != nil { - return err + return nil, err } } else if !reflect.DeepEqual(vs.ObjectMeta.Labels, nvs.ObjectMeta.Labels) || !reflect.DeepEqual(vs.ObjectMeta.Annotations, nvs.ObjectMeta.Annotations) || !reflect.DeepEqual(vs.Spec, nvs.Spec) { klog.Infof("updating virtual service \"%s/%s\"", vs.Namespace, vs.Name) @@ -128,13 +128,13 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error { uvs.ObjectMeta.Annotations = nvs.ObjectMeta.Annotations uvs.Spec = nvs.Spec - _, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Update(ctx, uvs, metav1.UpdateOptions{}) + vs, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Update(ctx, uvs, metav1.UpdateOptions{}) if err != nil { - return err + return nil, err } } - return nil + return vs, nil } func generateObjectMetadata(ingress *networkingv1.Ingress, existingVirtualService *istionetworkingv1beta1.VirtualService) (labels map[string]string, annotations map[string]string) { diff --git a/pkg/controller/status.go b/pkg/controller/status.go new file mode 100644 index 0000000..c14f458 --- /dev/null +++ b/pkg/controller/status.go @@ -0,0 +1,101 @@ +package controller + +import ( + "context" + "fmt" + "reflect" + "strings" + + istionetworkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1" + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog" +) + +// handleIngressStatus will synchronize the status of the Load Balancer +// of the Service the Gateway is associated with. +func (c *Controller) handleIngressStatus(ingress *networkingv1.Ingress, vs *istionetworkingv1beta1.VirtualService) (*networkingv1.Ingress, error) { + ctx := context.Background() + + gateways, err := c.getGatewaysForVirtualService(vs) + if err != nil { + return ingress, err + } + + loadBalancerStatus := corev1.LoadBalancerStatus{} + + for _, gateway := range gateways { + services, err := c.getServicesForGateway(gateway) + if err != nil { + return ingress, err + } + + for _, service := range services { + loadBalancerStatus.Ingress = append(loadBalancerStatus.Ingress, service.Status.LoadBalancer.Ingress...) + } + } + + // Compare the current status to the newly generated status + // and if they differ, apply the change. + if !reflect.DeepEqual(ingress.Status.LoadBalancer, loadBalancerStatus) { + klog.Infof("updating ingress status for \"%s/%s\"", ingress.Namespace, ingress.Name) + updatedIngress := ingress.DeepCopy() + updatedIngress.Status.LoadBalancer = loadBalancerStatus + + ingress, err = c.kubeclientset.NetworkingV1().Ingresses(updatedIngress.Namespace).UpdateStatus(ctx, updatedIngress, metav1.UpdateOptions{}) + if err != nil { + return ingress, err + } + } + + return ingress, nil +} + +// getGatewaysForVirtualService will get the gateways associated with the Virtual Service. +func (c *Controller) getGatewaysForVirtualService(vs *istionetworkingv1beta1.VirtualService) ([]*istionetworkingv1beta1.Gateway, error) { + gateways := []*istionetworkingv1beta1.Gateway{} + + for _, gatewayId := range vs.Spec.Gateways { + var gateway *istionetworkingv1beta1.Gateway + var err error + + // Split the gatewayId into [namespace, name] + idParts := strings.Split(gatewayId, "/") + + switch len(idParts) { + case 1: + gateway, err = c.gatewaysListers.Gateways(vs.Namespace).Get(idParts[0]) + case 2: + gateway, err = c.gatewaysListers.Gateways(idParts[0]).Get(idParts[1]) + default: + return nil, fmt.Errorf("unexpected number of parts in Gateway identifier %q: %d", gatewayId, len(idParts)) + } + + // If the Gateway is not found, then ignore the error. + // Otherwise, this is an unexpected error and return it. + if err != nil && errors.IsNotFound(err) { + klog.Errorf("failed to load gateway %q: %v", gatewayId, err) + continue + } else if err != nil { + return nil, err + } + + gateways = append(gateways, gateway) + } + + return gateways, nil +} + +// getServicesForGateway returns Services associated with the given Gateway. +func (c *Controller) getServicesForGateway(gateway *istionetworkingv1beta1.Gateway) ([]*corev1.Service, error) { + selector := labels.SelectorFromSet(gateway.Spec.Selector) + + if c.scopedGateways { + return c.servicesLister.Services(gateway.Namespace).List(selector) + } + + return c.servicesLister.List(selector) +} diff --git a/scripts/run-test.sh b/scripts/run-test.sh new file mode 100755 index 0000000..b617c65 --- /dev/null +++ b/scripts/run-test.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -euxo pipefail + +go run main.go -kubeconfig="$HOME/.kube/config" -default-gateway=istio-ingress/default diff --git a/scripts/setup-test-infrastructure.sh b/scripts/setup-test-infrastructure.sh new file mode 100755 index 0000000..257d8f4 --- /dev/null +++ b/scripts/setup-test-infrastructure.sh @@ -0,0 +1,49 @@ +#!/bin/bash + +set -euxo pipefail + +KIND_VERSION=0.14.0 +KUBERNETES_VERSION=1.21.12 +ISTIO_VERSION=1.14.1 +METALLB_VERSION=0.12.1 + +# Setup temp directory +mkdir -p _out + +# Download kind +curl -Lo _out/kind https://github.com/kubernetes-sigs/kind/releases/download/v$KIND_VERSION/kind-linux-amd64 && chmod +x _out/kind + +# Setup kind cluster +_out/kind create cluster --image kindest/node:v$KUBERNETES_VERSION --name ingress-istio + +# Install metallb (for Load Balancer IPs) +kubectl create namespace metallb-system +kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v$METALLB_VERSION/manifests/metallb.yaml +cat <