diff --git a/api/v1alpha1/nodedisruptionbudget_types.go b/api/v1alpha1/nodedisruptionbudget_types.go index 40a5493..a22da6c 100644 --- a/api/v1alpha1/nodedisruptionbudget_types.go +++ b/api/v1alpha1/nodedisruptionbudget_types.go @@ -17,7 +17,10 @@ limitations under the License. package v1alpha1 import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! @@ -53,6 +56,23 @@ type NodeDisruptionBudget struct { Status DisruptionBudgetStatus `json:"status,omitempty"` } +// SelectorMatchesObject return true if the object is matched by one of the selectors +func (adb *NodeDisruptionBudget) SelectorMatchesObject(object client.Object) bool { + objectLabelSet := labels.Set(object.GetLabels()) + + switch object.(type) { + case *corev1.Node: + selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.NodeSelector) + return selector.Matches(objectLabelSet) + case *NodeDisruption: + // It is faster to trigger a reconcile for each ADB instead of checking if the + // Node Disruption is impacting the current ADB + return true + default: + return false + } +} + //+kubebuilder:object:root=true // NodeDisruptionBudgetList contains a list of NodeDisruptionBudget diff --git a/internal/controller/nodedisruptionbudget_controller.go b/internal/controller/nodedisruptionbudget_controller.go index 0698a0f..7358325 100644 --- a/internal/controller/nodedisruptionbudget_controller.go +++ b/internal/controller/nodedisruptionbudget_controller.go @@ -21,10 +21,17 @@ import ( "math" "reflect" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1" "github.com/criteo/node-disruption-controller/pkg/resolver" @@ -79,10 +86,42 @@ func (r *NodeDisruptionBudgetReconciler) Reconcile(ctx context.Context, req ctrl return ctrl.Result{}, err } +// MapFuncBuilder returns a MapFunc that is used to dispatch reconcile requests to +// budgets when an event is triggered by one of their matching object +func (r *NodeDisruptionBudgetReconciler) MapFuncBuilder() handler.MapFunc { + // Look for all ADBs in the namespace, then see if they match the object + return func(ctx context.Context, object client.Object) (requests []reconcile.Request) { + ndbs := nodedisruptionv1alpha1.NodeDisruptionBudgetList{} + err := r.Client.List(ctx, &ndbs, &client.ListOptions{Namespace: object.GetNamespace()}) + if err != nil { + // We cannot return an error so at least it should be logged + logger := log.FromContext(context.Background()) + logger.Error(err, "Could not list NDBs in watch function") + return requests + } + + for _, ndb := range ndbs.Items { + if ndb.SelectorMatchesObject(object) { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: ndb.Name, + Namespace: ndb.Namespace, + }, + }) + } + } + return requests + } +} + // SetupWithManager sets up the controller with the Manager. func (r *NodeDisruptionBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&nodedisruptionv1alpha1.NodeDisruptionBudget{}). + Watches( + &corev1.Node{}, + handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()), + builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})). Complete(r) } diff --git a/internal/controller/nodedisruptionbudget_controller_test.go b/internal/controller/nodedisruptionbudget_controller_test.go new file mode 100644 index 0000000..825a09d --- /dev/null +++ b/internal/controller/nodedisruptionbudget_controller_test.go @@ -0,0 +1,130 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// +kubebuilder:docs-gen:collapse=Apache License + +package controller + +import ( + "context" + "time" + + nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// +kubebuilder:docs-gen:collapse=Imports +var _ = Describe("NodeDisruptionBudget controller", func() { + + // Define utility constants for object names and testing timeouts/durations and intervals. + const ( + NDBname = "test-ndb" + NDBNamespace = "test-ndb" + + timeout = time.Second * 10 + duration = time.Second * 10 + interval = time.Millisecond * 250 + ) + + var ( + _, cancelFn = context.WithCancel(context.Background()) + ) + + Context("In a cluster with several nodes", func() { + ctx := context.Background() + + Context("With reconciler with default config", Ordered, func() { + BeforeAll(func() { + cancelFn = startReconcilerWithConfig(NodeDisruptionReconcilerConfig{ + RejectEmptyNodeDisruption: false, + RetryInterval: time.Second * 1, + }) + }) + + AfterAll(func() { + cancelFn() + }) + + AfterEach(func() { + clearAllNodeDisruptionRessources() + }) + + When("there are no budgets in the cluster", func() { + It("it updates the NDB", func() { + By("creating a budget that accepts one disruption") + ndb := &nodedisruptionv1alpha1.NodeDisruptionBudget{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "nodedisruption.criteo.com/v1alpha1", + Kind: "NodeDisruptionBudget", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: NDBname, + Namespace: NDBNamespace, + }, + Spec: nodedisruptionv1alpha1.NodeDisruptionBudgetSpec{ + NodeSelector: metav1.LabelSelector{MatchLabels: nodeLabels1}, + MaxDisruptedNodes: 1, + }, + } + Expect(k8sClient.Create(ctx, ndb)).Should(Succeed()) + + By("checking the NodeDisruptionBudget in synchronized") + NDBLookupKey := types.NamespacedName{Name: NDBname, Namespace: NDBNamespace} + createdNDB := &nodedisruptionv1alpha1.NodeDisruptionBudget{} + Eventually(func() []string { + err := k8sClient.Get(ctx, NDBLookupKey, createdNDB) + Expect(err).Should(Succeed()) + return createdNDB.Status.WatchedNodes + }, timeout, interval).Should(Equal([]string{"node1", "node2"})) + + By("Adding Node") + labels := map[string]string{ + "testselect": "test1", + "kubernetes.io/hostname": "node4", + } + node4 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node4", + Labels: labels, + }, + } + Expect(k8sClient.Create(ctx, node4)).Should(Succeed()) + + By("checking the NodeDisruptionBudget updated the status") + Eventually(func() []string { + err := k8sClient.Get(ctx, NDBLookupKey, createdNDB) + Expect(err).Should(Succeed()) + return createdNDB.Status.WatchedNodes + }, timeout, interval).Should(Equal([]string{"node1", "node2", "node4"})) + + By("Removing Node") + Expect(k8sClient.Delete(ctx, node4)).Should(Succeed()) + + By("checking the NodeDisruptionBudget updated the status") + Eventually(func() []string { + err := k8sClient.Get(ctx, NDBLookupKey, createdNDB) + Expect(err).Should(Succeed()) + return createdNDB.Status.WatchedNodes + }, timeout, interval).Should(Equal([]string{"node1", "node2"})) + }) + }) + + }) + + }) +})