Skip to content

Commit

Permalink
Add watchers to update budgets in reaction to events
Browse files Browse the repository at this point in the history
Watch Pods and PVC for ApplicationDisruptionBudgets.
Implemented using EnqueueRequestsFromMapFunc. By providing
a fan-out function, for each event we can determine which ADB
is impacted and notify it.

Co-authored-by: Jonathan Amiez <j.amiez@criteo.com>
  • Loading branch information
geobeau and josqu4red committed Oct 26, 2023
1 parent 6832ad8 commit 89356d4
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 37 deletions.
7 changes: 6 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
linters:
enable:
- revive
- revive
linters-settings:
revive:
rules:
- name: dot-imports
disabled: true
24 changes: 24 additions & 0 deletions api/v1alpha1/applicationdisruptionbudget_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -88,6 +92,26 @@ type ApplicationDisruptionBudget struct {
Status DisruptionBudgetStatus `json:"status,omitempty"`
}

// SelectorMatchesObject return true if the object is matched by one of the selectors
func (adb *ApplicationDisruptionBudget) SelectorMatchesObject(object client.Object) bool {
objectLabelSet := labels.Set(object.GetLabels())

switch object.(type) {
case *corev1.Pod:
selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.PodSelector)
return selector.Matches(objectLabelSet)
case *corev1.PersistentVolumeClaim:
selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.PVCSelector)
return selector.Matches(objectLabelSet)
case *NodeDisruption:
// It is faster to trigger a reconcile for each ADB instead of checking if the
// Node Disruption is impacting the current ADB
return true
default:
return false
}
}

//+kubebuilder:object:root=true

// ApplicationDisruptionBudgetList contains a list of ApplicationDisruptionBudget
Expand Down
48 changes: 48 additions & 0 deletions internal/controller/applicationdisruptionbudget_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ import (
"reflect"

"k8s.io/apimachinery/pkg/api/errors"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1"
"github.com/criteo/node-disruption-controller/pkg/resolver"
Expand Down Expand Up @@ -56,6 +64,7 @@ type ApplicationDisruptionBudgetReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
adb := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{}
err := r.Client.Get(ctx, req.NamespacedName, adb)

Expand All @@ -67,6 +76,8 @@ func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, r
return ctrl.Result{}, err
}

logger.Info("Start reconcile of ADB", "version", adb.ResourceVersion)

resolver := ApplicationDisruptionBudgetResolver{
ApplicationDisruptionBudget: adb.DeepCopy(),
Client: r.Client,
Expand All @@ -79,16 +90,53 @@ func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, r
}

if !reflect.DeepEqual(resolver.ApplicationDisruptionBudget.Status, adb.Status) {
logger.Info("Updating with", "version", adb.ResourceVersion)
err = resolver.UpdateStatus(ctx)
}

return ctrl.Result{}, err
}

// MapFuncBuilder returns a MapFunc that is used to dispatch reconcile requests to
// budgets when an event is triggered by one of their matching object
func (r *ApplicationDisruptionBudgetReconciler) MapFuncBuilder() handler.MapFunc {
// Look for all ADBs in the namespace, then see if they match the object
return func(ctx context.Context, object client.Object) (requests []reconcile.Request) {
adbs := nodedisruptionv1alpha1.ApplicationDisruptionBudgetList{}
err := r.Client.List(ctx, &adbs, &client.ListOptions{Namespace: object.GetNamespace()})
if err != nil {
// We cannot return an error so at least it should be logged
logger := log.FromContext(context.Background())
logger.Error(err, "Could not list ADBs in watch function")
return requests
}

for _, adb := range adbs.Items {
if adb.SelectorMatchesObject(object) {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: adb.Name,
Namespace: adb.Namespace,
},
})
}
}
return requests
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *ApplicationDisruptionBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nodedisruptionv1alpha1.ApplicationDisruptionBudget{}).
Watches(
&corev1.Pod{},
handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Watches(
&corev1.PersistentVolumeClaim{},
handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Complete(r)
}

Expand Down
181 changes: 181 additions & 0 deletions internal/controller/applicationdisruptionbudget_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +kubebuilder:docs-gen:collapse=Apache License

package controller

import (
"context"
"time"

nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// +kubebuilder:docs-gen:collapse=Imports
var _ = Describe("ApplicationDisruptionBudget controller", func() {

// Define utility constants for object names and testing timeouts/durations and intervals.
const (
ADBname = "test-adb"
ADBNamespace = "default"

timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)

var (
podLabels = map[string]string{
"testselectpod": "testadb",
}

_, cancelFn = context.WithCancel(context.Background())
)

Context("In a cluster with several nodes", func() {
ctx := context.Background()
It("Create pods and PVC/PV", func() {
By("Adding Pods")
pod1 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "podadb1",
Namespace: ADBNamespace,
Labels: podLabels,
},
Spec: corev1.PodSpec{
NodeName: "node1",
Containers: []corev1.Container{
{
Name: "testcontainer",
Image: "ubuntu",
},
},
},
Status: corev1.PodStatus{},
}
Expect(k8sClient.Create(ctx, pod1)).Should(Succeed())
})

Context("With reconciler with default config", Ordered, func() {
BeforeAll(func() {
cancelFn = startReconcilerWithConfig(NodeDisruptionReconcilerConfig{
RejectEmptyNodeDisruption: false,
RetryInterval: time.Second * 1,
})
})

AfterAll(func() {
cancelFn()
})

AfterEach(func() {
clearAllNodeDisruptionRessources()
})

When("there are no budgets in the cluster", func() {
It("grants the node disruption", func() {
By("creating a budget that accepts one disruption")
ndb := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{
TypeMeta: metav1.TypeMeta{
APIVersion: "nodedisruption.criteo.com/v1alpha1",
Kind: "ApplicationDisruptionBudget",
},
ObjectMeta: metav1.ObjectMeta{
Name: ADBname,
Namespace: ADBNamespace,
},
Spec: nodedisruptionv1alpha1.ApplicationDisruptionBudgetSpec{
PodSelector: metav1.LabelSelector{MatchLabels: podLabels},
PVCSelector: metav1.LabelSelector{MatchLabels: podLabels},
MaxDisruptions: 1,
},
}
Expect(k8sClient.Create(ctx, ndb)).Should(Succeed())

By("checking the ApplicationDisruptionBudget in synchronized")
ADBLookupKey := types.NamespacedName{Name: ADBname, Namespace: ADBNamespace}
createdADB := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{}
Eventually(func() []string {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1"}))

By("Adding Pods")
pod2 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "podadb2",
Namespace: ADBNamespace,
Labels: podLabels,
},
Spec: corev1.PodSpec{
NodeName: "node2",
Containers: []corev1.Container{
{
Name: "testcontainer",
Image: "ubuntu",
},
},
},
Status: corev1.PodStatus{},
}
Expect(k8sClient.Create(ctx, pod2)).Should(Succeed())

By("checking the ApplicationDisruptionBudget updated the status")
Eventually(func() []string {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2"}))

By("Adding PVCs")
ressources := make(corev1.ResourceList, 1)
ressources[corev1.ResourceStorage] = *resource.NewQuantity(100, ressources.Memory().Format)
pvc3 := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc3",
Namespace: ADBNamespace,
Labels: podLabels,
},
Spec: corev1.PersistentVolumeClaimSpec{
VolumeName: "node3-pv-local",
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{Requests: ressources},
},
Status: corev1.PersistentVolumeClaimStatus{},
}
Expect(k8sClient.Create(ctx, pvc3)).Should(Succeed())

By("checking the ApplicationDisruptionBudget updated the status")
Eventually(func() []string {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2", "node3"}))
})
})

})

})
})
39 changes: 3 additions & 36 deletions internal/controller/nodedisruption_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,46 +117,14 @@ var _ = Describe("NodeDisruption controller", func() {
)

var (
nodeLabels1 = map[string]string{
"testselect": "test1",
}
nodeLabels2 = map[string]string{
"testselect": "test2",
}
podLabels = map[string]string{
"testselectpod": "test1",
}
NDLookupKey = types.NamespacedName{Name: NDName, Namespace: NDNamespace}

_, cancelFn = context.WithCancel(context.Background())
)

Context("In a cluster with several nodes", func() {
Context("In a cluster with pods", func() {
ctx := context.Background()
It("Create the nodes and pods", func() {
By("Adding Nodes")
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: nodeLabels1,
},
}
Expect(k8sClient.Create(ctx, node1)).Should(Succeed())
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
Labels: nodeLabels1,
},
}
Expect(k8sClient.Create(ctx, node2)).Should(Succeed())
node3 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node3",
Labels: nodeLabels2,
},
}
Expect(k8sClient.Create(ctx, node3)).Should(Succeed())

It("Create pods", func() {
By("Adding Pods")
pod1 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
Expand Down Expand Up @@ -281,8 +249,7 @@ var _ = Describe("NodeDisruption controller", func() {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).ShouldNot(BeEmpty())
Expect(createdADB.Status.WatchedNodes).Should(Equal([]string{"node1"}))
}, timeout, interval).Should(Equal([]string{"node1"}))

By("creating a new NodeDisruption")
disruption := &nodedisruptionv1alpha1.NodeDisruption{
Expand Down
Loading

0 comments on commit 89356d4

Please sign in to comment.