Skip to content

Commit

Permalink
Add watchers for NodeDisruptionBudget
Browse files Browse the repository at this point in the history
Like #42 but for NDB
Fixes #29
  • Loading branch information
geobeau committed Nov 2, 2023
1 parent e22dad0 commit 2cfb540
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 0 deletions.
20 changes: 20 additions & 0 deletions api/v1alpha1/nodedisruptionbudget_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -53,6 +56,23 @@ type NodeDisruptionBudget struct {
Status DisruptionBudgetStatus `json:"status,omitempty"`
}

// SelectorMatchesObject return true if the object is matched by one of the selectors
func (adb *NodeDisruptionBudget) SelectorMatchesObject(object client.Object) bool {
objectLabelSet := labels.Set(object.GetLabels())

switch object.(type) {
case *corev1.Node:
selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.NodeSelector)
return selector.Matches(objectLabelSet)
case *NodeDisruption:
// It is faster to trigger a reconcile for each ADB instead of checking if the
// Node Disruption is impacting the current ADB
return true
default:
return false
}
}

//+kubebuilder:object:root=true

// NodeDisruptionBudgetList contains a list of NodeDisruptionBudget
Expand Down
39 changes: 39 additions & 0 deletions internal/controller/nodedisruptionbudget_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ import (
"math"
"reflect"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1"
"github.com/criteo/node-disruption-controller/pkg/resolver"
Expand Down Expand Up @@ -79,10 +86,42 @@ func (r *NodeDisruptionBudgetReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, err
}

// MapFuncBuilder returns a MapFunc that is used to dispatch reconcile requests to
// budgets when an event is triggered by one of their matching object
func (r *NodeDisruptionBudgetReconciler) MapFuncBuilder() handler.MapFunc {
// Look for all ADBs in the namespace, then see if they match the object
return func(ctx context.Context, object client.Object) (requests []reconcile.Request) {
ndbs := nodedisruptionv1alpha1.NodeDisruptionBudgetList{}
err := r.Client.List(ctx, &ndbs, &client.ListOptions{Namespace: object.GetNamespace()})
if err != nil {
// We cannot return an error so at least it should be logged
logger := log.FromContext(context.Background())
logger.Error(err, "Could not list NDBs in watch function")
return requests
}

for _, ndb := range ndbs.Items {
if ndb.SelectorMatchesObject(object) {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: ndb.Name,
Namespace: ndb.Namespace,
},
})
}
}
return requests
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *NodeDisruptionBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nodedisruptionv1alpha1.NodeDisruptionBudget{}).
Watches(
&corev1.Node{},
handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Complete(r)
}

Expand Down
130 changes: 130 additions & 0 deletions internal/controller/nodedisruptionbudget_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +kubebuilder:docs-gen:collapse=Apache License

package controller

import (
"context"
"time"

nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// +kubebuilder:docs-gen:collapse=Imports
var _ = Describe("NodeDisruptionBudget controller", func() {

// Define utility constants for object names and testing timeouts/durations and intervals.
const (
NDBname = "test-ndb"
NDBNamespace = "test-ndb"

timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)

var (
_, cancelFn = context.WithCancel(context.Background())
)

Context("In a cluster with several nodes", func() {
ctx := context.Background()

Context("With reconciler with default config", Ordered, func() {
BeforeAll(func() {
cancelFn = startReconcilerWithConfig(NodeDisruptionReconcilerConfig{
RejectEmptyNodeDisruption: false,
RetryInterval: time.Second * 1,
})
})

AfterAll(func() {
cancelFn()
})

AfterEach(func() {
clearAllNodeDisruptionRessources()
})

When("there are no budgets in the cluster", func() {
It("it updates the NDB", func() {
By("creating a budget that accepts one disruption")
ndb := &nodedisruptionv1alpha1.NodeDisruptionBudget{
TypeMeta: metav1.TypeMeta{
APIVersion: "nodedisruption.criteo.com/v1alpha1",
Kind: "NodeDisruptionBudget",
},
ObjectMeta: metav1.ObjectMeta{
Name: NDBname,
Namespace: NDBNamespace,
},
Spec: nodedisruptionv1alpha1.NodeDisruptionBudgetSpec{
NodeSelector: metav1.LabelSelector{MatchLabels: nodeLabels1},
MaxDisruptedNodes: 1,
},
}
Expect(k8sClient.Create(ctx, ndb)).Should(Succeed())

By("checking the NodeDisruptionBudget in synchronized")
NDBLookupKey := types.NamespacedName{Name: NDBname, Namespace: NDBNamespace}
createdNDB := &nodedisruptionv1alpha1.NodeDisruptionBudget{}
Eventually(func() []string {
err := k8sClient.Get(ctx, NDBLookupKey, createdNDB)
Expect(err).Should(Succeed())
return createdNDB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2"}))

By("Adding Node")
labels := map[string]string{
"testselect": "test1",
"kubernetes.io/hostname": "node4",
}
node4 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node4",
Labels: labels,
},
}
Expect(k8sClient.Create(ctx, node4)).Should(Succeed())

By("checking the NodeDisruptionBudget updated the status")
Eventually(func() []string {
err := k8sClient.Get(ctx, NDBLookupKey, createdNDB)
Expect(err).Should(Succeed())
return createdNDB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2", "node4"}))

By("Removing Node")
Expect(k8sClient.Delete(ctx, node4)).Should(Succeed())

By("checking the NodeDisruptionBudget updated the status")
Eventually(func() []string {
err := k8sClient.Get(ctx, NDBLookupKey, createdNDB)
Expect(err).Should(Succeed())
return createdNDB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2"}))
})
})

})

})
})

0 comments on commit 2cfb540

Please sign in to comment.