Skip to content

Commit

Permalink
Merge pull request #361 from aruiz14/remove-event-handlers
Browse files Browse the repository at this point in the history
fix(relatedresource): remove event handlers when context is closed
  • Loading branch information
MbolotSuse authored Mar 7, 2024
2 parents 004384a + 06dfe87 commit 762039f
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 1 deletion.
23 changes: 22 additions & 1 deletion pkg/relatedresource/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"

"github.com/rancher/wrangler/v2/pkg/generic"
Expand Down Expand Up @@ -69,7 +70,7 @@ func watch(ctx context.Context, name string, enq Enqueuer, resolve Resolver, con
return nil
}

controller.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
addResourceEventHandler(ctx, controller.Informer(), cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
ro, ok := obj.(runtime.Object)
if !ok {
Expand Down Expand Up @@ -101,3 +102,23 @@ type wrapper struct {
func (w *wrapper) Enqueue(namespace, name string) {
w.ClusterScopedEnqueuer.Enqueue(name)
}

// informerRegisterer is a subset of the cache.SharedIndexInformer, so it's easier to replace in tests
type informerRegisterer interface {
AddEventHandler(funcs cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error)
RemoveEventHandler(cache.ResourceEventHandlerRegistration) error
}

func addResourceEventHandler(ctx context.Context, informer informerRegisterer, handler cache.ResourceEventHandler) {
handlerReg, err := informer.AddEventHandler(handler)
if err != nil {
logrus.WithError(err).Error("failed to add ResourceEventHandler")
return
}
go func() {
<-ctx.Done()
if err := informer.RemoveEventHandler(handlerReg); err != nil {
logrus.WithError(err).Warn("failed to remove ResourceEventHandler")
}
}()
}
107 changes: 107 additions & 0 deletions pkg/relatedresource/changeset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package relatedresource

import (
"context"
"fmt"
"testing"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)

func Test_addResourceEventHandler(t *testing.T) {
const expectedCalls = 5

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var counter int
handler := &cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { counter++ },
}

informer := &fakeInformer{}
addResourceEventHandler(ctx, informer, handler)

for i := 0; i < expectedCalls; i++ {
informer.add(nil)
}

if want, got := expectedCalls, counter; want != got {
t.Errorf("unexpected number of executions of handler func, want: %d, got: %d", want, got)
}

// Close enqueuer context and wait for unregistering goroutine
cancel()
informer.waitUntilDeleted(handler, 1*time.Second)

// New informer calls should not trigger our handler
informer.add(nil)
if got, want := counter, expectedCalls; got != want {
t.Errorf("resource event handler is not correctly removed")
}
}

type handlerRegistration struct{}

func (h handlerRegistration) HasSynced() bool { return true }

// fakeInformer implements a subset of cache.SharedIndexInformer, only those methods used by addResourceEventHandler
type fakeInformer struct {
handlers []cache.ResourceEventHandler
reg []cache.ResourceEventHandlerRegistration
deleteChan []chan struct{}
}

func (informer *fakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
handlerReg := handlerRegistration{}
informer.handlers = append(informer.handlers, handler)
informer.reg = append(informer.reg, handlerReg)
informer.deleteChan = append(informer.deleteChan, make(chan struct{}))
return handlerReg, nil
}

func (informer *fakeInformer) RemoveEventHandler(handlerReg cache.ResourceEventHandlerRegistration) error {
x := slicesIndex(informer.reg, handlerReg)
if x < 0 {
return fmt.Errorf("handler not found")
}

close(informer.deleteChan[x])
informer.reg = deleteIndex(informer.reg, x)
informer.handlers = deleteIndex(informer.handlers, x)
informer.deleteChan = deleteIndex(informer.deleteChan, x)
return nil
}

func (informer *fakeInformer) add(obj runtime.Object) {
for _, handler := range informer.handlers {
handler.OnAdd(obj, false)
}
}

func (informer *fakeInformer) waitUntilDeleted(handler cache.ResourceEventHandler, timeout time.Duration) {
x := slicesIndex(informer.handlers, handler)
if x < 0 {
return
}

select {
case <-informer.deleteChan[x]:
case <-time.After(timeout):
}
}

func slicesIndex[S ~[]E, E comparable](s S, v E) int {
for i := range s {
if v == s[i] {
return i
}
}
return -1
}

func deleteIndex[S ~[]E, E any](s S, i int) S {
return append(s[:i], s[i+1:]...)
}

0 comments on commit 762039f

Please sign in to comment.