diff --git a/cmd/main.go b/cmd/main.go index a9727ad..64822cd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -113,7 +113,12 @@ func main() { glog.Infof("launching watcher for ConfigMaps") err := configWatcher.Watch(ctx, sigChan) if err != nil { - glog.Fatalf("error watching for new ConfigMaps (terminating): %s", err.Error()) + switch err { + case watcher.WatchChannelClosedError: + glog.Errorf("watcher got error, try to restart watcher: %s", err.Error()) + default: + glog.Fatalf("error watching for new ConfigMaps (terminating): %s", err.Error()) + } } } }() diff --git a/internal/pkg/config/watcher/watcher.go b/internal/pkg/config/watcher/watcher.go index 844399b..a3c39e0 100644 --- a/internal/pkg/config/watcher/watcher.go +++ b/internal/pkg/config/watcher/watcher.go @@ -4,6 +4,7 @@ package watcher import ( "context" + "errors" "fmt" "io/ioutil" "strings" @@ -11,6 +12,7 @@ import ( "github.com/golang/glog" "github.com/tumblr/k8s-sidecar-injector/internal/pkg/config" "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" @@ -24,6 +26,9 @@ const ( serviceAccountNamespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" ) +// WatchChannelClosedError: should restart watcher +var WatchChannelClosedError = errors.New("watcher channel has closed") + // K8sConfigMapWatcher is a struct that connects to the API and collects, parses, and emits sidecar configurations type K8sConfigMapWatcher struct { Config @@ -98,14 +103,22 @@ func (c *K8sConfigMapWatcher) Watch(ctx context.Context, notifyMe chan<- interfa watcher, err := c.client.ConfigMaps(c.Namespace).Watch(metav1.ListOptions{ LabelSelector: mapStringStringToLabelSelector(c.ConfigMapLabels), }) + defer watcher.Stop() if err != nil { return fmt.Errorf("unable to create watcher (possible serviceaccount RBAC/ACL failure?): %s", err.Error()) } - ch := watcher.ResultChan() - var e watch.Event for { select { - case e = <-ch: + case e, ok := <-watcher.ResultChan(): + // channel may closed caused by HTTP timeout, should restart watcher + // detail at https://github.com/kubernetes/client-go/issues/334 + if !ok { + glog.Errorf("channel has closed, should restart watcher") + return WatchChannelClosedError + } + if e.Type == watch.Error { + return apierrs.FromObject(e.Object) + } glog.V(3).Infof("event: %s %s", e.Type, e.Object.GetObjectKind()) switch e.Type { case watch.Added: @@ -123,7 +136,6 @@ func (c *K8sConfigMapWatcher) Watch(ctx context.Context, notifyMe chan<- interfa case <-ctx.Done(): glog.V(2).Infof("stopping configmap watcher, context indicated we are done") // clean up, we cancelled the context, so stop the watch - watcher.Stop() return nil } } diff --git a/internal/pkg/config/watcher/watcher_test.go b/internal/pkg/config/watcher/watcher_test.go index 5ecfd59..dbf8fd8 100644 --- a/internal/pkg/config/watcher/watcher_test.go +++ b/internal/pkg/config/watcher/watcher_test.go @@ -1,6 +1,9 @@ package watcher import ( + "context" + "k8s.io/apimachinery/pkg/watch" + testcore "k8s.io/client-go/testing" "testing" _ "github.com/tumblr/k8s-sidecar-injector/internal/pkg/testing" @@ -30,3 +33,23 @@ func TestGet(t *testing.T) { t.Fatalf("expected 0 messages, but got %d", len(messages)) } } + +func TestWatcherChannelClose(t *testing.T) { + client := fake.NewSimpleClientset() + watcher := watch.NewEmptyWatch() + client.PrependWatchReactor("configmaps", testcore.DefaultWatchReactor(watcher, nil)) + + w := K8sConfigMapWatcher{ + Config: testConfig, + client: client.CoreV1(), + } + + sigChan := make(chan interface{}, 10) + // background context never canceled, no deadline + ctx := context.Background() + + err := w.Watch(ctx, sigChan) + if err != nil && err != WatchChannelClosedError { + t.Errorf("expect catch WatchChannelClosedError, but got %s", err) + } +}