Skip to content

Commit

Permalink
feat: Synchronize load balancer status from the Kubernetes Service as…
Browse files Browse the repository at this point in the history
…sociated with the gateway (#23)

Resolves #12
  • Loading branch information
Zachary Seguin authored Jul 14, 2022
1 parent c93a316 commit 01418b8
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 16 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ deploy/
Makefile
.github/
bin/
_out/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ ingress-istio-controller
.vscode
__debug_bin
bin/
_out/
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
kubeconfig string
clusterDomain string
defaultGateway string
scopedGateways bool
ingressClass string
defaultWeight int
)
Expand Down Expand Up @@ -52,12 +53,14 @@ func main() {
istioclient,
clusterDomain,
defaultGateway,
scopedGateways,
ingressClass,
defaultWeight,
kubeInformerFactory.Networking().V1().Ingresses(),
kubeInformerFactory.Networking().V1().IngressClasses(),
kubeInformerFactory.Core().V1().Services(),
istioInformerFactory.Networking().V1beta1().VirtualServices())
istioInformerFactory.Networking().V1beta1().VirtualServices(),
istioInformerFactory.Networking().V1beta1().Gateways())

kubeInformerFactory.Start(stopCh)
istioInformerFactory.Start(stopCh)
Expand All @@ -72,6 +75,7 @@ func init() {
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&clusterDomain, "cluster-domain", "cluster.local", "The cluster domain.")
flag.StringVar(&defaultGateway, "default-gateway", "istio-system/istio-autogenerated-k8s-ingress", "The default Istio gateway used when no existing VirtualService is located matching the host.")
flag.BoolVar(&scopedGateways, "scoped-gateways", false, "Gateways are scoped to the same namespace they exist within. This will limit the Service search for Load Balancer status. In istiod, this is controlled via the PILOT_SCOPE_GATEWAY_TO_NAMESPACE environment variable.")
flag.StringVar(&ingressClass, "ingress-class", "", "The ingress class annotation to monitor (empty string to skip checking annotation)")
flag.IntVar(&defaultWeight, "virtual-service-weight", 100, "The weight of the Virtual Service destination.")
}
22 changes: 19 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Controller struct {

clusterDomain string
defaultGateway string
scopedGateways bool
ingressClass string
defaultWeight int

Expand All @@ -51,6 +52,9 @@ type Controller struct {
virtualServicesListers istionetworkinglisters.VirtualServiceLister
virtualServicesSynched cache.InformerSynced

gatewaysListers istionetworkinglisters.GatewayLister
gatewaysSynched cache.InformerSynced

workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
Expand All @@ -61,12 +65,14 @@ func NewController(
istioclientset istio.Interface,
clusterDomain string,
defaultGateway string,
scopedGateways bool,
ingressClass string,
defaultWeight int,
ingressesInformer networkinginformers.IngressInformer,
ingressClassesInformer networkinginformers.IngressClassInformer,
servicesInformer corev1informers.ServiceInformer,
virtualServicesInformer istionetworkinginformers.VirtualServiceInformer) *Controller {
virtualServicesInformer istionetworkinginformers.VirtualServiceInformer,
gatewaysInformer istionetworkinginformers.GatewayInformer) *Controller {
klog.Infof("setting up controller %s: %s", controllerAgentName, controllerAgentVersion)

// Create event broadcaster
Expand All @@ -82,6 +88,7 @@ func NewController(
clusterDomain: clusterDomain,
defaultGateway: defaultGateway,
ingressClass: ingressClass,
scopedGateways: scopedGateways,
defaultWeight: defaultWeight,
ingressesLister: ingressesInformer.Lister(),
ingressesSynched: ingressesInformer.Informer().HasSynced,
Expand All @@ -91,6 +98,8 @@ func NewController(
servicesSynched: servicesInformer.Informer().HasSynced,
virtualServicesListers: virtualServicesInformer.Lister(),
virtualServicesSynched: virtualServicesInformer.Informer().HasSynced,
gatewaysListers: gatewaysInformer.Lister(),
gatewaysSynched: gatewaysInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "IngressIstio"),
recorder: recorder,
}
Expand Down Expand Up @@ -129,7 +138,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
klog.Info("starting controller")

klog.Info("waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched); !ok {
if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

Expand Down Expand Up @@ -205,12 +214,19 @@ func (c *Controller) syncHandler(key string) error {
}

// Handle the VirtualService
err = c.handleVirtualService(ingress)
vs, err := c.handleVirtualServiceForIngress(ingress)
if err != nil {
klog.Errorf("failed to handle virtual service: %v", err)
return err
}

// Handle the Ingress status
_, err = c.handleIngressStatus(ingress, vs)
if err != nil {
klog.Errorf("failed to handle Ingress status: %v", err)
return err
}

return nil
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func (c *Controller) findExistingVirtualServiceForIngress(ingress *networkingv1.
return nil, nil
}

func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error {
func (c *Controller) handleVirtualServiceForIngress(ingress *networkingv1.Ingress) (*istionetworkingv1beta1.VirtualService, error) {
ctx := context.Background()

// Find an existing virtual service of the same name
vs, err := c.findExistingVirtualServiceForIngress(ingress)
if err != nil {
return err
return nil, err
}

// Check for conditions which cause us to handle the Ingress
Expand All @@ -69,7 +69,7 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error {
ingressClass, err := c.ingressClassesLister.Get(*ingress.Spec.IngressClassName)
if err != nil {
klog.Error("error getting IngressClass %q", *ingress.Spec.IngressClassName)
return err
return nil, err
}

if ingressClass.Spec.Controller == IngressIstioController {
Expand All @@ -82,7 +82,7 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error {
if val, ok := ingress.Annotations[IgnoreAnnotation]; ok {
bval, err := strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("error parsing %s (%t): %v", IgnoreAnnotation, bval, err)
return nil, fmt.Errorf("error parsing %s (%t): %v", IgnoreAnnotation, bval, err)
}
handle = handle && !bval
}
Expand All @@ -92,11 +92,11 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error {
if vs != nil {
klog.Infof("removing owned virtualservice: \"%s/%s\"", vs.Namespace, vs.Name)
err := c.istioclientset.NetworkingV1beta1().VirtualServices(vs.Namespace).Delete(ctx, vs.Name, metav1.DeleteOptions{})
return err
return nil, err
}

klog.Infof("skipping ingress: \"%s/%s\"", ingress.Namespace, ingress.Name)
return nil
return nil, nil
}

// Identify the gateway to attach the ingress to
Expand All @@ -109,14 +109,14 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error {

nvs, err := c.generateVirtualService(ingress, vs, gateways)
if err != nil {
return err
return nil, err
}

// If we don't have virtual service, then let's make one
if vs == nil {
_, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Create(ctx, nvs, metav1.CreateOptions{})
vs, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Create(ctx, nvs, metav1.CreateOptions{})
if err != nil {
return err
return nil, err
}
} else if !reflect.DeepEqual(vs.ObjectMeta.Labels, nvs.ObjectMeta.Labels) || !reflect.DeepEqual(vs.ObjectMeta.Annotations, nvs.ObjectMeta.Annotations) || !reflect.DeepEqual(vs.Spec, nvs.Spec) {
klog.Infof("updating virtual service \"%s/%s\"", vs.Namespace, vs.Name)
Expand All @@ -128,13 +128,13 @@ func (c *Controller) handleVirtualService(ingress *networkingv1.Ingress) error {
uvs.ObjectMeta.Annotations = nvs.ObjectMeta.Annotations
uvs.Spec = nvs.Spec

_, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Update(ctx, uvs, metav1.UpdateOptions{})
vs, err = c.istioclientset.NetworkingV1beta1().VirtualServices(ingress.Namespace).Update(ctx, uvs, metav1.UpdateOptions{})
if err != nil {
return err
return nil, err
}
}

return nil
return vs, nil
}

func generateObjectMetadata(ingress *networkingv1.Ingress, existingVirtualService *istionetworkingv1beta1.VirtualService) (labels map[string]string, annotations map[string]string) {
Expand Down
101 changes: 101 additions & 0 deletions pkg/controller/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package controller

import (
"context"
"fmt"
"reflect"
"strings"

istionetworkingv1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog"
)

// handleIngressStatus will synchronize the status of the Load Balancer
// of the Service the Gateway is associated with.
func (c *Controller) handleIngressStatus(ingress *networkingv1.Ingress, vs *istionetworkingv1beta1.VirtualService) (*networkingv1.Ingress, error) {
ctx := context.Background()

gateways, err := c.getGatewaysForVirtualService(vs)
if err != nil {
return ingress, err
}

loadBalancerStatus := corev1.LoadBalancerStatus{}

for _, gateway := range gateways {
services, err := c.getServicesForGateway(gateway)
if err != nil {
return ingress, err
}

for _, service := range services {
loadBalancerStatus.Ingress = append(loadBalancerStatus.Ingress, service.Status.LoadBalancer.Ingress...)
}
}

// Compare the current status to the newly generated status
// and if they differ, apply the change.
if !reflect.DeepEqual(ingress.Status.LoadBalancer, loadBalancerStatus) {
klog.Infof("updating ingress status for \"%s/%s\"", ingress.Namespace, ingress.Name)
updatedIngress := ingress.DeepCopy()
updatedIngress.Status.LoadBalancer = loadBalancerStatus

ingress, err = c.kubeclientset.NetworkingV1().Ingresses(updatedIngress.Namespace).UpdateStatus(ctx, updatedIngress, metav1.UpdateOptions{})
if err != nil {
return ingress, err
}
}

return ingress, nil
}

// getGatewaysForVirtualService will get the gateways associated with the Virtual Service.
func (c *Controller) getGatewaysForVirtualService(vs *istionetworkingv1beta1.VirtualService) ([]*istionetworkingv1beta1.Gateway, error) {
gateways := []*istionetworkingv1beta1.Gateway{}

for _, gatewayId := range vs.Spec.Gateways {
var gateway *istionetworkingv1beta1.Gateway
var err error

// Split the gatewayId into [namespace, name]
idParts := strings.Split(gatewayId, "/")

switch len(idParts) {
case 1:
gateway, err = c.gatewaysListers.Gateways(vs.Namespace).Get(idParts[0])
case 2:
gateway, err = c.gatewaysListers.Gateways(idParts[0]).Get(idParts[1])
default:
return nil, fmt.Errorf("unexpected number of parts in Gateway identifier %q: %d", gatewayId, len(idParts))
}

// If the Gateway is not found, then ignore the error.
// Otherwise, this is an unexpected error and return it.
if err != nil && errors.IsNotFound(err) {
klog.Errorf("failed to load gateway %q: %v", gatewayId, err)
continue
} else if err != nil {
return nil, err
}

gateways = append(gateways, gateway)
}

return gateways, nil
}

// getServicesForGateway returns Services associated with the given Gateway.
func (c *Controller) getServicesForGateway(gateway *istionetworkingv1beta1.Gateway) ([]*corev1.Service, error) {
selector := labels.SelectorFromSet(gateway.Spec.Selector)

if c.scopedGateways {
return c.servicesLister.Services(gateway.Namespace).List(selector)
}

return c.servicesLister.List(selector)
}
5 changes: 5 additions & 0 deletions scripts/run-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

set -euxo pipefail

go run main.go -kubeconfig="$HOME/.kube/config" -default-gateway=istio-ingress/default
49 changes: 49 additions & 0 deletions scripts/setup-test-infrastructure.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash

set -euxo pipefail

KIND_VERSION=0.14.0
KUBERNETES_VERSION=1.21.12
ISTIO_VERSION=1.14.1
METALLB_VERSION=0.12.1

# Setup temp directory
mkdir -p _out

# Download kind
curl -Lo _out/kind https://github.com/kubernetes-sigs/kind/releases/download/v$KIND_VERSION/kind-linux-amd64 && chmod +x _out/kind

# Setup kind cluster
_out/kind create cluster --image kindest/node:v$KUBERNETES_VERSION --name ingress-istio

# Install metallb (for Load Balancer IPs)
kubectl create namespace metallb-system
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v$METALLB_VERSION/manifests/metallb.yaml
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: config
namespace: metallb-system
data:
config: |
address-pools:
- name: default
protocol: layer2
addresses:
- 172.19.255.200-172.19.255.250
EOF

# Install istio
helm repo add istio https://istio-release.storage.googleapis.com/charts
helm repo update

kubectl create namespace istio-system
helm install istio-base istio/base -n istio-system --version $ISTIO_VERSION
helm install istiod istio/istiod -n istio-system --wait --version $ISTIO_VERSION

kubectl create namespace istio-ingress
kubectl label namespace istio-ingress istio-injection=enabled
helm install istio-ingress istio/gateway -n istio-ingress --wait

echo 'Ready to test!'
Loading

0 comments on commit 01418b8

Please sign in to comment.