Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watchers to update budgets in reaction to events #42

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
linters:
enable:
- revive
- revive
linters-settings:
revive:
rules:
- name: dot-imports
disabled: true
24 changes: 24 additions & 0 deletions api/v1alpha1/applicationdisruptionbudget_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -88,6 +92,26 @@ type ApplicationDisruptionBudget struct {
Status DisruptionBudgetStatus `json:"status,omitempty"`
}

// SelectorMatchesObject return true if the object is matched by one of the selectors
func (adb *ApplicationDisruptionBudget) SelectorMatchesObject(object client.Object) bool {
objectLabelSet := labels.Set(object.GetLabels())

switch object.(type) {
case *corev1.Pod:
selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.PodSelector)
return selector.Matches(objectLabelSet)
case *corev1.PersistentVolumeClaim:
selector, _ := metav1.LabelSelectorAsSelector(&adb.Spec.PVCSelector)
return selector.Matches(objectLabelSet)
case *NodeDisruption:
// It is faster to trigger a reconcile for each ADB instead of checking if the
// Node Disruption is impacting the current ADB
return true
default:
return false
}
}

//+kubebuilder:object:root=true

// ApplicationDisruptionBudgetList contains a list of ApplicationDisruptionBudget
Expand Down
48 changes: 48 additions & 0 deletions internal/controller/applicationdisruptionbudget_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,17 @@ import (
"reflect"

"k8s.io/apimachinery/pkg/api/errors"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1"
"github.com/criteo/node-disruption-controller/pkg/resolver"
Expand Down Expand Up @@ -56,6 +64,7 @@ type ApplicationDisruptionBudgetReconciler struct {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.15.0/pkg/reconcile
func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
adb := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{}
err := r.Client.Get(ctx, req.NamespacedName, adb)

Expand All @@ -67,6 +76,8 @@ func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, r
return ctrl.Result{}, err
}

logger.Info("Start reconcile of ADB", "version", adb.ResourceVersion)

resolver := ApplicationDisruptionBudgetResolver{
ApplicationDisruptionBudget: adb.DeepCopy(),
Client: r.Client,
Expand All @@ -79,16 +90,53 @@ func (r *ApplicationDisruptionBudgetReconciler) Reconcile(ctx context.Context, r
}

if !reflect.DeepEqual(resolver.ApplicationDisruptionBudget.Status, adb.Status) {
logger.Info("Updating with", "version", adb.ResourceVersion)
err = resolver.UpdateStatus(ctx)
}

return ctrl.Result{}, err
}

// MapFuncBuilder returns a MapFunc that is used to dispatch reconcile requests to
// budgets when an event is triggered by one of their matching object
func (r *ApplicationDisruptionBudgetReconciler) MapFuncBuilder() handler.MapFunc {
// Look for all ADBs in the namespace, then see if they match the object
return func(ctx context.Context, object client.Object) (requests []reconcile.Request) {
adbs := nodedisruptionv1alpha1.ApplicationDisruptionBudgetList{}
err := r.Client.List(ctx, &adbs, &client.ListOptions{Namespace: object.GetNamespace()})
if err != nil {
// We cannot return an error so at least it should be logged
logger := log.FromContext(context.Background())
logger.Error(err, "Could not list ADBs in watch function")
return requests
}

for _, adb := range adbs.Items {
if adb.SelectorMatchesObject(object) {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: adb.Name,
Namespace: adb.Namespace,
},
})
}
}
return requests
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *ApplicationDisruptionBudgetReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nodedisruptionv1alpha1.ApplicationDisruptionBudget{}).
Watches(
&corev1.Pod{},
handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Watches(
&corev1.PersistentVolumeClaim{},
handler.EnqueueRequestsFromMapFunc(r.MapFuncBuilder()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
Complete(r)
}

Expand Down
190 changes: 190 additions & 0 deletions internal/controller/applicationdisruptionbudget_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +kubebuilder:docs-gen:collapse=Apache License

package controller

import (
"context"
"time"

nodedisruptionv1alpha1 "github.com/criteo/node-disruption-controller/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// +kubebuilder:docs-gen:collapse=Imports
var _ = Describe("ApplicationDisruptionBudget controller", func() {

// Define utility constants for object names and testing timeouts/durations and intervals.
const (
ADBname = "test-adb"
ADBNamespace = "test-adb"

timeout = time.Second * 10
duration = time.Second * 10
interval = time.Millisecond * 250
)

var (
podLabels = map[string]string{
"testselectpod": "testadb",
}

_, cancelFn = context.WithCancel(context.Background())
)

Context("In a cluster with several nodes", func() {
ctx := context.Background()
It("Create namespace and pods", func() {
By("Creating the namespace")
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ADBNamespace,
Namespace: ADBNamespace,
},
}
Expect(k8sClient.Create(ctx, ns)).Should(Succeed())

By("Adding Pods")
pod1 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "podadb1",
Namespace: ADBNamespace,
Labels: podLabels,
},
Spec: corev1.PodSpec{
NodeName: "node1",
Containers: []corev1.Container{
{
Name: "testcontainer",
Image: "ubuntu",
},
},
},
Status: corev1.PodStatus{},
}
Expect(k8sClient.Create(ctx, pod1)).Should(Succeed())
})

Context("With reconciler with default config", Ordered, func() {
BeforeAll(func() {
cancelFn = startReconcilerWithConfig(NodeDisruptionReconcilerConfig{
RejectEmptyNodeDisruption: false,
RetryInterval: time.Second * 1,
})
})

AfterAll(func() {
cancelFn()
})

AfterEach(func() {
clearAllNodeDisruptionRessources()
})

When("there are no budgets in the cluster", func() {
It("grants the node disruption", func() {
By("creating a budget that accepts one disruption")
ndb := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{
TypeMeta: metav1.TypeMeta{
APIVersion: "nodedisruption.criteo.com/v1alpha1",
Kind: "ApplicationDisruptionBudget",
},
ObjectMeta: metav1.ObjectMeta{
Name: ADBname,
Namespace: ADBNamespace,
},
Spec: nodedisruptionv1alpha1.ApplicationDisruptionBudgetSpec{
PodSelector: metav1.LabelSelector{MatchLabels: podLabels},
PVCSelector: metav1.LabelSelector{MatchLabels: podLabels},
MaxDisruptions: 1,
},
}
Expect(k8sClient.Create(ctx, ndb)).Should(Succeed())

By("checking the ApplicationDisruptionBudget in synchronized")
ADBLookupKey := types.NamespacedName{Name: ADBname, Namespace: ADBNamespace}
createdADB := &nodedisruptionv1alpha1.ApplicationDisruptionBudget{}
Eventually(func() []string {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1"}))

By("Adding Pods")
pod2 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "podadb2",
Namespace: ADBNamespace,
Labels: podLabels,
},
Spec: corev1.PodSpec{
NodeName: "node2",
Containers: []corev1.Container{
{
Name: "testcontainer",
Image: "ubuntu",
},
},
},
Status: corev1.PodStatus{},
}
Expect(k8sClient.Create(ctx, pod2)).Should(Succeed())

By("checking the ApplicationDisruptionBudget updated the status")
Eventually(func() []string {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2"}))

By("Adding PVCs")
ressources := make(corev1.ResourceList, 1)
ressources[corev1.ResourceStorage] = *resource.NewQuantity(100, ressources.Memory().Format)
pvc3 := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc3",
Namespace: ADBNamespace,
Labels: podLabels,
},
Spec: corev1.PersistentVolumeClaimSpec{
VolumeName: "node3-pv-local",
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{Requests: ressources},
},
Status: corev1.PersistentVolumeClaimStatus{},
}
Expect(k8sClient.Create(ctx, pvc3)).Should(Succeed())

By("checking the ApplicationDisruptionBudget updated the status")
Eventually(func() []string {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).Should(Equal([]string{"node1", "node2", "node3"}))
})
})

})

})
})
39 changes: 3 additions & 36 deletions internal/controller/nodedisruption_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,46 +117,14 @@ var _ = Describe("NodeDisruption controller", func() {
)

var (
nodeLabels1 = map[string]string{
"testselect": "test1",
}
nodeLabels2 = map[string]string{
"testselect": "test2",
}
podLabels = map[string]string{
"testselectpod": "test1",
}
NDLookupKey = types.NamespacedName{Name: NDName, Namespace: NDNamespace}

_, cancelFn = context.WithCancel(context.Background())
)

Context("In a cluster with several nodes", func() {
Context("In a cluster with pods", func() {
ctx := context.Background()
It("Create the nodes and pods", func() {
By("Adding Nodes")
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: nodeLabels1,
},
}
Expect(k8sClient.Create(ctx, node1)).Should(Succeed())
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
Labels: nodeLabels1,
},
}
Expect(k8sClient.Create(ctx, node2)).Should(Succeed())
node3 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node3",
Labels: nodeLabels2,
},
}
Expect(k8sClient.Create(ctx, node3)).Should(Succeed())

It("Create pods", func() {
By("Adding Pods")
pod1 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
Expand Down Expand Up @@ -281,8 +249,7 @@ var _ = Describe("NodeDisruption controller", func() {
err := k8sClient.Get(ctx, ADBLookupKey, createdADB)
Expect(err).Should(Succeed())
return createdADB.Status.WatchedNodes
}, timeout, interval).ShouldNot(BeEmpty())
Expect(createdADB.Status.WatchedNodes).Should(Equal([]string{"node1"}))
}, timeout, interval).Should(Equal([]string{"node1"}))

By("creating a new NodeDisruption")
disruption := &nodedisruptionv1alpha1.NodeDisruption{
Expand Down
Loading
Loading