From b10efd6e2ac595623160fc3be1a193512524a63b Mon Sep 17 00:00:00 2001 From: Geoffrey Beausire Date: Wed, 25 Oct 2023 15:10:23 +0200 Subject: [PATCH] Add watchers to update budgets in reaction to events Watch Pods and PVC for ApplicationDisruptionBudgets. Implemented using EnqueueRequestsFromMapFunc. By providing a fan-out function, for each event we can determine which ADB is impacted and notify it. Co-authored-by: Jonathan Amiez --- .golangci.yaml | 7 +- .../applicationdisruptionbudget_types.go | 24 +++ .../applicationdisruptionbudget_controller.go | 48 +++++ ...icationdisruptionbudget_controller_test.go | 190 ++++++++++++++++++ .../nodedisruption_controller_test.go | 39 +--- internal/controller/suite_test.go | 82 ++++++++ 6 files changed, 353 insertions(+), 37 deletions(-) create mode 100644 internal/controller/applicationdisruptionbudget_controller_test.go diff --git a/.golangci.yaml b/.golangci.yaml index c82e4da..a5ac62d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -1,3 +1,8 @@ linters: enable: - - revive \ No newline at end of file + - revive +linters-settings: + revive: + rules: + - name: dot-imports + disabled: true \ No newline at end of file diff --git a/api/v1alpha1/applicationdisruptionbudget_types.go b/api/v1alpha1/applicationdisruptionbudget_types.go index d412376..4b6f199 100644 --- a/api/v1alpha1/applicationdisruptionbudget_types.go +++ b/api/v1alpha1/applicationdisruptionbudget_types.go @@ -18,6 +18,10 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -88,6 +92,26 @@ type ApplicationDisruptionBudget struct { Status DisruptionBudgetStatus `json:"status,omitempty"` } +// SelectorMatchesObject return true if the object is matched by one of the selectors +func (adb *ApplicationDisruptionBudget) SelectorMatchesObject(object client.Object) bool { + objectLabelSet := labels.Set(object.GetLabels()) + + switch object.(type) { + case *corev1.Pod: + selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.PodSelector) + return selector.Matches(objectLabelSet) + case *corev1.PersistentVolumeClaim: + selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.PVCSelector) + return selector.Matches(objectLabelSet) + case *NodeDisruption: + // It is faster to trigger a reconcile for each ADB instead of checking if the + // Node Disruption is impacting the current ADB + return true + default: + return false + } +} + //+kubebuilder:object:root=true // ApplicationDisruptionBudgetList contains a list of ApplicationDisruptionBudget diff --git a/internal/controller/applicationdisruptionbudget_controller.go b/internal/controller/applicationdisruptionbudget_controller.go index fc5080a..cbce1f1 100644 --- a/internal/controller/applicationdisruptionbudget_controller.go +++ b/internal/controller/applicationdisruptionbudget_controller.go @@ -26,9 +26,17 @@ import ( "reflect" "k8s.io/apimachinery/pkg/api/errors" + + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1" "github.com/criteo/node-disruption-controller/pkg/resolver" @@ -56,6 +64,7 @@ type ApplicationDisruptionBudgetReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) adb := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{} err := r.Client.Get(ctx, req.NamespacedName, adb) @@ -67,6 +76,8 @@ func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, r return ctrl.Result{}, err } + logger.Info("Start reconcile of ADB", "version", adb.ResourceVersion) + resolver := ApplicationDisruptionBudgetResolver{ ApplicationDisruptionBudget: adb.DeepCopy(), Client: r.Client, @@ -79,16 +90,53 @@ func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, r } if !reflect.DeepEqual(resolver.ApplicationDisruptionBudget.Status, adb.Status) { + logger.Info("Updating with", "version", adb.ResourceVersion) err = resolver.UpdateStatus(ctx) } return ctrl.Result{}, err } +// MapFuncBuilder returns a MapFunc that is used to dispatch reconcile requests to +// budgets when an event is triggered by one of their matching object +func (r *ApplicationDisruptionBudgetReconciler) MapFuncBuilder() handler.MapFunc { + // Look for all ADBs in the namespace, then see if they match the object + return func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + adbs := nodedisruptionv1alpha1.ApplicationDisruptionBudgetList{} + err := r.Client.List(ctx, &adbs, &client.ListOptions{Namespace: object.GetNamespace()}) + if err != nil { + // We cannot return an error so at least it should be logged + logger := log.FromContext(context.Background()) + logger.Error(err, "Could not list ADBs in watch function") + return requests + } + + for _, adb := range adbs.Items { + if adb.SelectorMatchesObject(object) { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: adb.Name, + Namespace: adb.Namespace, + }, + }) + } + } + return requests + } +} + // SetupWithManager sets up the controller with the Manager. func (r *ApplicationDisruptionBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&nodedisruptionv1alpha1.ApplicationDisruptionBudget{}). + Watches( + &corev1.Pod{}, + handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})). + Watches( + &corev1.PersistentVolumeClaim{}, + handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})). Complete(r) } diff --git a/internal/controller/applicationdisruptionbudget_controller_test.go b/internal/controller/applicationdisruptionbudget_controller_test.go new file mode 100644 index 0000000..2c7ca56 --- /dev/null +++ b/internal/controller/applicationdisruptionbudget_controller_test.go @@ -0,0 +1,190 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// +kubebuilder:docs-gen:collapse=Apache License + +package controller + +import ( + "context" + "time" + + nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// +kubebuilder:docs-gen:collapse=Imports +var _ = Describe("ApplicationDisruptionBudget controller", func() { + + // Define utility constants for object names and testing timeouts/durations and intervals. + const ( + ADBname = "test-adb" + ADBNamespace = "test-adb" + + timeout = time.Second * 10 + duration = time.Second * 10 + interval = time.Millisecond * 250 + ) + + var ( + podLabels = map[string]string{ + "testselectpod": "testadb", + } + + _, cancelFn = context.WithCancel(context.Background()) + ) + + Context("In a cluster with several nodes", func() { + ctx := context.Background() + It("Create namespace and pods", func() { + By("Creating the namespace") + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: ADBNamespace, + Namespace: ADBNamespace, + }, + } + Expect(k8sClient.Create(ctx, ns)).Should(Succeed()) + + By("Adding Pods") + pod1 := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "podadb1", + Namespace: ADBNamespace, + Labels: podLabels, + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Containers: []corev1.Container{ + { + Name: "testcontainer", + Image: "ubuntu", + }, + }, + }, + Status: corev1.PodStatus{}, + } + Expect(k8sClient.Create(ctx, pod1)).Should(Succeed()) + }) + + Context("With reconciler with default config", Ordered, func() { + BeforeAll(func() { + cancelFn = startReconcilerWithConfig(NodeDisruptionReconcilerConfig{ + RejectEmptyNodeDisruption: false, + RetryInterval: time.Second * 1, + }) + }) + + AfterAll(func() { + cancelFn() + }) + + AfterEach(func() { + clearAllNodeDisruptionRessources() + }) + + When("there are no budgets in the cluster", func() { + It("grants the node disruption", func() { + By("creating a budget that accepts one disruption") + ndb := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "nodedisruption.criteo.com/v1alpha1", + Kind: "ApplicationDisruptionBudget", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: ADBname, + Namespace: ADBNamespace, + }, + Spec: nodedisruptionv1alpha1.ApplicationDisruptionBudgetSpec{ + PodSelector: metav1.LabelSelector{MatchLabels: podLabels}, + PVCSelector: metav1.LabelSelector{MatchLabels: podLabels}, + MaxDisruptions: 1, + }, + } + Expect(k8sClient.Create(ctx, ndb)).Should(Succeed()) + + By("checking the ApplicationDisruptionBudget in synchronized") + ADBLookupKey := types.NamespacedName{Name: ADBname, Namespace: ADBNamespace} + createdADB := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{} + Eventually(func() []string { + err := k8sClient.Get(ctx, ADBLookupKey, createdADB) + Expect(err).Should(Succeed()) + return createdADB.Status.WatchedNodes + }, timeout, interval).Should(Equal([]string{"node1"})) + + By("Adding Pods") + pod2 := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "podadb2", + Namespace: ADBNamespace, + Labels: podLabels, + }, + Spec: corev1.PodSpec{ + NodeName: "node2", + Containers: []corev1.Container{ + { + Name: "testcontainer", + Image: "ubuntu", + }, + }, + }, + Status: corev1.PodStatus{}, + } + Expect(k8sClient.Create(ctx, pod2)).Should(Succeed()) + + By("checking the ApplicationDisruptionBudget updated the status") + Eventually(func() []string { + err := k8sClient.Get(ctx, ADBLookupKey, createdADB) + Expect(err).Should(Succeed()) + return createdADB.Status.WatchedNodes + }, timeout, interval).Should(Equal([]string{"node1", "node2"})) + + By("Adding PVCs") + ressources := make(corev1.ResourceList, 1) + ressources[corev1.ResourceStorage] = *resource.NewQuantity(100, ressources.Memory().Format) + pvc3 := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc3", + Namespace: ADBNamespace, + Labels: podLabels, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + VolumeName: "node3-pv-local", + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + Resources: corev1.ResourceRequirements{Requests: ressources}, + }, + Status: corev1.PersistentVolumeClaimStatus{}, + } + Expect(k8sClient.Create(ctx, pvc3)).Should(Succeed()) + + By("checking the ApplicationDisruptionBudget updated the status") + Eventually(func() []string { + err := k8sClient.Get(ctx, ADBLookupKey, createdADB) + Expect(err).Should(Succeed()) + return createdADB.Status.WatchedNodes + }, timeout, interval).Should(Equal([]string{"node1", "node2", "node3"})) + }) + }) + + }) + + }) +}) diff --git a/internal/controller/nodedisruption_controller_test.go b/internal/controller/nodedisruption_controller_test.go index bb56125..c41688a 100644 --- a/internal/controller/nodedisruption_controller_test.go +++ b/internal/controller/nodedisruption_controller_test.go @@ -117,46 +117,14 @@ var _ = Describe("NodeDisruption controller", func() { ) var ( - nodeLabels1 = map[string]string{ - "testselect": "test1", - } - nodeLabels2 = map[string]string{ - "testselect": "test2", - } - podLabels = map[string]string{ - "testselectpod": "test1", - } NDLookupKey = types.NamespacedName{Name: NDName, Namespace: NDNamespace} _, cancelFn = context.WithCancel(context.Background()) ) - Context("In a cluster with several nodes", func() { + Context("In a cluster with pods", func() { ctx := context.Background() - It("Create the nodes and pods", func() { - By("Adding Nodes") - node1 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node1", - Labels: nodeLabels1, - }, - } - Expect(k8sClient.Create(ctx, node1)).Should(Succeed()) - node2 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node2", - Labels: nodeLabels1, - }, - } - Expect(k8sClient.Create(ctx, node2)).Should(Succeed()) - node3 := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node3", - Labels: nodeLabels2, - }, - } - Expect(k8sClient.Create(ctx, node3)).Should(Succeed()) - + It("Create pods", func() { By("Adding Pods") pod1 := &corev1.Pod{ TypeMeta: metav1.TypeMeta{}, @@ -281,8 +249,7 @@ var _ = Describe("NodeDisruption controller", func() { err := k8sClient.Get(ctx, ADBLookupKey, createdADB) Expect(err).Should(Succeed()) return createdADB.Status.WatchedNodes - }, timeout, interval).ShouldNot(BeEmpty()) - Expect(createdADB.Status.WatchedNodes).Should(Equal([]string{"node1"})) + }, timeout, interval).Should(Equal([]string{"node1"})) By("creating a new NodeDisruption") disruption := &nodedisruptionv1alpha1.NodeDisruption{ diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 6d7e356..2f8ebe3 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -17,11 +17,16 @@ limitations under the License. package controller import ( + "context" + "fmt" "path/filepath" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" @@ -48,6 +53,41 @@ func TestControllers(t *testing.T) { RunSpecs(t, "Controller Suite") } +func newPVforNode(nodeName string) (pv corev1.PersistentVolume) { + ressources := make(corev1.ResourceList, 1) + ressources[corev1.ResourceStorage] = *resource.NewQuantity(100, ressources.Memory().Format) + return corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pv-local", nodeName), + }, + Spec: corev1.PersistentVolumeSpec{ + Capacity: ressources, + AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, + PersistentVolumeSource: corev1.PersistentVolumeSource{Local: &corev1.LocalVolumeSource{Path: "path/to/nothing"}}, + NodeAffinity: &corev1.VolumeNodeAffinity{Required: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{{ + Key: "kubernetes.io/hostname", + Operator: corev1.NodeSelectorOpIn, + Values: []string{nodeName}, + }}, + }, + }, + }}, + }, + } +} + +var ( + nodeLabels1 = map[string]string{ + "testselect": "test1", + } + podLabels = map[string]string{ + "testselectpod": "test1", + } +) + var _ = BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) @@ -71,6 +111,48 @@ var _ = BeforeSuite(func() { k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) + + ctx := context.Background() + labels := map[string]string{ + "testselect": "test1", + "kubernetes.io/hostname": "node1", + } + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: labels, + }, + } + Expect(k8sClient.Create(ctx, node1)).Should(Succeed()) + labels = map[string]string{ + "testselect": "test1", + "kubernetes.io/hostname": "node2", + } + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: labels, + }, + } + Expect(k8sClient.Create(ctx, node2)).Should(Succeed()) + labels = map[string]string{ + "testselect": "test2", + "kubernetes.io/hostname": "node3", + } + node3 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: labels, + }, + } + Expect(k8sClient.Create(ctx, node3)).Should(Succeed()) + + pv1 := newPVforNode("node1") + Expect(k8sClient.Create(ctx, &pv1)).Should(Succeed()) + pv2 := newPVforNode("node2") + Expect(k8sClient.Create(ctx, &pv2)).Should(Succeed()) + pv3 := newPVforNode("node3") + Expect(k8sClient.Create(ctx, &pv3)).Should(Succeed()) }) var _ = AfterSuite(func() {