diff --git a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go index 1ca89e04d..34a40f7a6 100644 --- a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go +++ b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go @@ -8,6 +8,10 @@ import ( "strings" "time" + "github.com/openshift/cluster-etcd-operator/pkg/etcdcli" + "github.com/openshift/cluster-etcd-operator/pkg/operator/ceohelpers" + "k8s.io/apimachinery/pkg/util/sets" + "github.com/openshift/library-go/pkg/operator/resource/resourceapply" "github.com/openshift/library-go/pkg/operator/resource/resourcemerge" @@ -61,6 +65,7 @@ const ( nodeNameEnvVar = "NODE_NAME" nodeNameFieldPath = "spec.nodeName" masterNodeSelector = "node-role.kubernetes.io/master" + votingNodeSelector = "node-role.kubernetes.io/voting" backupDSLabelKey = "app" backupDSLabelValue = "etcd-auto-backup" ) @@ -74,6 +79,7 @@ type PeriodicBackupController struct { backupVarGetter backuphelpers.BackupVar featureGateAccessor featuregates.FeatureGateAccess kubeInformers v1helpers.KubeInformersForNamespaces + etcdClient etcdcli.EtcdClient } func NewPeriodicBackupController( @@ -86,6 +92,7 @@ func NewPeriodicBackupController( accessor featuregates.FeatureGateAccess, backupVarGetter backuphelpers.BackupVar, backupsInformer factory.Informer, + etcdClient etcdcli.EtcdClient, kubeInformers v1helpers.KubeInformersForNamespaces) factory.Controller { c := &PeriodicBackupController{ @@ -96,6 +103,7 @@ func NewPeriodicBackupController( operatorImagePullSpec: operatorImagePullSpec, backupVarGetter: backupVarGetter, featureGateAccessor: accessor, + etcdClient: etcdClient, kubeInformers: kubeInformers, } @@ -131,6 +139,10 @@ func (c *PeriodicBackupController) sync(ctx context.Context, syncContext factory if item.Name == defaultBackupCRName { defaultFound = true + err = ensureVotingNodesLabeled(ctx, c.kubeClient, c.etcdClient) + if err != nil { + return fmt.Errorf("PeriodicBackupController could not label voting master nodes: %w", err) + } currentEtcdBackupDS, err := c.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Get(ctx, backupServerDaemonSet, v1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("PeriodicBackupController could not retrieve [defaultBackupDeployment]: %w", err) @@ -179,6 +191,10 @@ func (c *PeriodicBackupController) sync(ctx context.Context, syncContext factory } klog.V(4).Infof("PeriodicBackupController deleted DaemonSet [%v] successfully", backupServerDaemonSet) } + err = ensureVotingNodesUnLabeled(ctx, c.kubeClient) + if err != nil { + return fmt.Errorf("PeriodicBackupController could not unlabel voting master nodes: %w", err) + } } else { terminationReasons, err := checkBackupServerPodsStatus(c.podLister) if err != nil { @@ -390,8 +406,15 @@ func createBackupServerDaemonSet(cr backupv1alpha1.Backup, endpoints []string) * Key: masterNodeSelector, Effect: corev1.TaintEffectNoSchedule, }, + { + Key: votingNodeSelector, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + NodeSelector: map[string]string{ + masterNodeSelector: "", + votingNodeSelector: "", }, - NodeSelector: map[string]string{masterNodeSelector: ""}, Volumes: []corev1.Volume{ {Name: etcdDataDirVolName, VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ @@ -511,3 +534,63 @@ func etcdBackupServerDSDiffers(l, r appv1.DaemonSetSpec) bool { return false } + +func ensureVotingNodesLabeled(ctx context.Context, client kubernetes.Interface, etcdClient etcdcli.EtcdClient) error { + members, err := etcdClient.VotingMemberList(ctx) + if err != nil { + return fmt.Errorf("failed to list voting members: %w", err) + } + + votingMemberIPs := sets.NewString() + for _, m := range members { + memberIP, mErr := ceohelpers.MemberToNodeInternalIP(m) + if mErr != nil { + return mErr + } + votingMemberIPs.Insert(memberIP) + } + + masterNodes, err := client.CoreV1().Nodes().List(ctx, v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + if err != nil { + return fmt.Errorf("failed to list master nodes: %w", err) + } + + for _, node := range masterNodes.Items { + for _, addr := range node.Status.Addresses { + if addr.Type == corev1.NodeInternalIP { + if votingMemberIPs.Has(addr.Address) { + // update node's labels + node.Labels[votingNodeSelector] = "" + _, err = client.CoreV1().Nodes().Update(ctx, &node, v1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update master node [%v] with label [%v]", node, votingNodeSelector) + } + } + } + } + } + + return nil +} + +func ensureVotingNodesUnLabeled(ctx context.Context, client kubernetes.Interface) error { + masterNodes, err := client.CoreV1().Nodes().List(ctx, v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + if err != nil { + return fmt.Errorf("failed to list master nodes: %w", err) + } + + for _, node := range masterNodes.Items { + delete(node.Labels, votingNodeSelector) + + _, err = client.CoreV1().Nodes().Update(ctx, &node, v1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update master node [%v] with deleting label [%v]", node, votingNodeSelector) + } + } + + return nil +} diff --git a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go index 646755591..e1527d084 100644 --- a/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go +++ b/pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go @@ -8,6 +8,9 @@ import ( "testing" "time" + "github.com/openshift/cluster-etcd-operator/pkg/etcdcli" + "go.etcd.io/etcd/api/v3/etcdserverpb" + testing2 "k8s.io/utils/clock/testing" "github.com/openshift/library-go/pkg/controller/factory" @@ -93,6 +96,14 @@ func TestSyncLoopWithDefaultBackupCR(t *testing.T) { &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, &operatorv1.StaticPodOperatorStatus{}, nil, nil) + defaultEtcdMembers := []*etcdserverpb.Member{ + u.FakeEtcdMemberWithoutServer(0), + u.FakeEtcdMemberWithoutServer(1), + u.FakeEtcdMemberWithoutServer(2), + } + fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0})) + require.NoError(t, err) + controller := PeriodicBackupController{ operatorClient: fakeOperatorClient, podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), @@ -101,6 +112,7 @@ func TestSyncLoopWithDefaultBackupCR(t *testing.T) { operatorImagePullSpec: "pullspec-image", backupVarGetter: backuphelpers.NewDisabledBackupConfig(), featureGateAccessor: backupFeatureGateAccessor, + etcdClient: fakeEtcdClient, kubeInformers: fakeKubeInformerForNamespace, } @@ -117,7 +129,7 @@ func TestSyncLoopWithDefaultBackupCR(t *testing.T) { fakeKubeInformerForNamespace.Start(stopChan) expDisabledBackupVar := " args:\n - --enabled=false" - err := controller.sync(context.TODO(), syncCtx) + err = controller.sync(context.TODO(), syncCtx) require.NoError(t, err) require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) @@ -181,6 +193,14 @@ func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) { &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, &operatorv1.StaticPodOperatorStatus{}, nil, nil) + defaultEtcdMembers := []*etcdserverpb.Member{ + u.FakeEtcdMemberWithoutServer(0), + u.FakeEtcdMemberWithoutServer(1), + u.FakeEtcdMemberWithoutServer(2), + } + fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0})) + require.NoError(t, err) + controller := PeriodicBackupController{ operatorClient: fakeOperatorClient, podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), @@ -189,6 +209,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) { operatorImagePullSpec: "pullspec-image", backupVarGetter: backuphelpers.NewDisabledBackupConfig(), featureGateAccessor: backupFeatureGateAccessor, + etcdClient: fakeEtcdClient, kubeInformers: fakeKubeInformerForNamespace, } @@ -205,7 +226,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutRetentionSpec(t *testing.T) { fakeKubeInformerForNamespace.Start(stopChan) expDisabledBackupVar := " args:\n - --enabled=false" - err := controller.sync(context.TODO(), syncCtx) + err = controller.sync(context.TODO(), syncCtx) require.NoError(t, err) require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) @@ -269,6 +290,14 @@ func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) { &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, &operatorv1.StaticPodOperatorStatus{}, nil, nil) + defaultEtcdMembers := []*etcdserverpb.Member{ + u.FakeEtcdMemberWithoutServer(0), + u.FakeEtcdMemberWithoutServer(1), + u.FakeEtcdMemberWithoutServer(2), + } + fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0})) + require.NoError(t, err) + controller := PeriodicBackupController{ operatorClient: fakeOperatorClient, podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), @@ -277,6 +306,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) { operatorImagePullSpec: "pullspec-image", backupVarGetter: backuphelpers.NewDisabledBackupConfig(), featureGateAccessor: backupFeatureGateAccessor, + etcdClient: fakeEtcdClient, kubeInformers: fakeKubeInformerForNamespace, } @@ -293,7 +323,7 @@ func TestSyncLoopWithDefaultBackupCRWithoutScheduleSpec(t *testing.T) { fakeKubeInformerForNamespace.Start(stopChan) expDisabledBackupVar := " args:\n - --enabled=false" - err := controller.sync(context.TODO(), syncCtx) + err = controller.sync(context.TODO(), syncCtx) require.NoError(t, err) require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) @@ -358,6 +388,14 @@ func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) { &operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}}, &operatorv1.StaticPodOperatorStatus{}, nil, nil) + defaultEtcdMembers := []*etcdserverpb.Member{ + u.FakeEtcdMemberWithoutServer(0), + u.FakeEtcdMemberWithoutServer(1), + u.FakeEtcdMemberWithoutServer(2), + } + fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0})) + require.NoError(t, err) + controller := PeriodicBackupController{ operatorClient: fakeOperatorClient, podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), @@ -366,6 +404,7 @@ func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) { operatorImagePullSpec: "pullspec-image", backupVarGetter: backuphelpers.NewDisabledBackupConfig(), featureGateAccessor: backupFeatureGateAccessor, + etcdClient: fakeEtcdClient, kubeInformers: fakeKubeInformerForNamespace, } @@ -382,7 +421,7 @@ func TestSyncLoopWithDefaultBackupCREditSpec(t *testing.T) { fakeKubeInformerForNamespace.Start(stopChan) expDisabledBackupVar := " args:\n - --enabled=false" - err := controller.sync(context.TODO(), syncCtx) + err = controller.sync(context.TODO(), syncCtx) require.NoError(t, err) require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) @@ -473,6 +512,14 @@ func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) { backups.Items = append(backups.Items, backup) operatorFake := fake.NewClientset([]runtime.Object{&backups}...) + defaultEtcdMembers := []*etcdserverpb.Member{ + u.FakeEtcdMemberWithoutServer(0), + u.FakeEtcdMemberWithoutServer(1), + u.FakeEtcdMemberWithoutServer(2), + } + fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0})) + require.NoError(t, err) + controller := PeriodicBackupController{ operatorClient: fakeOperatorClient, podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(), @@ -481,6 +528,7 @@ func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) { operatorImagePullSpec: "pullspec-image", backupVarGetter: backuphelpers.NewDisabledBackupConfig(), featureGateAccessor: backupFeatureGateAccessor, + etcdClient: fakeEtcdClient, kubeInformers: fakeKubeInformerForNamespace, } @@ -497,7 +545,7 @@ func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) { fakeKubeInformerForNamespace.Start(stopChan) expDisabledBackupVar := " args:\n - --enabled=false" - err := controller.sync(context.TODO(), syncCtx) + err = controller.sync(context.TODO(), syncCtx) require.NoError(t, err) require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString()) requireOperatorStatus(t, fakeOperatorClient, false) @@ -801,3 +849,100 @@ func extractEtcdBackupServerArgVal(t testing.TB, argName string, args []string) t.Errorf("expected [etcd-backup-server] arg [%v], but found none", argName) return "" } + +func TestEnsureVotingNodesLabeled(t *testing.T) { + // arrange + defaultEtcdMembers := []*etcdserverpb.Member{ + u.FakeEtcdMemberWithoutServer(0), + u.FakeEtcdMemberWithoutServer(1), + u.FakeEtcdMemberWithoutServer(2), + } + fakeEtcdClient, err := etcdcli.NewFakeEtcdClient(defaultEtcdMembers, etcdcli.WithFakeClusterHealth(&etcdcli.FakeMemberHealth{Healthy: 3, Unhealthy: 0})) + require.NoError(t, err) + + allClusterNodes := defaultClusterNodes() + var objects []runtime.Object + for _, n := range allClusterNodes { + objects = append(objects, n) + } + client := k8sfakeclient.NewClientset(objects...) + + // act + err = ensureVotingNodesLabeled(context.TODO(), client, fakeEtcdClient) + require.NoError(t, err) + + // assert + masterNodes, err := client.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + require.NoError(t, err) + for _, m := range masterNodes.Items { + require.Contains(t, m.Labels, votingNodeSelector) + } +} + +func TestEnsureVotingNodesUnLabeled(t *testing.T) { + // arrange + allClusterNodes := defaultClusterNodes() + for _, n := range allClusterNodes { + if _, ok := n.Labels[masterNodeSelector]; ok { + n.Labels[votingNodeSelector] = "" + } + } + + var objects []runtime.Object + for _, n := range allClusterNodes { + objects = append(objects, n) + } + client := k8sfakeclient.NewClientset(objects...) + + // act + err := ensureVotingNodesUnLabeled(context.TODO(), client) + require.NoError(t, err) + + // assert + masterNodes, err := client.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{ + LabelSelector: labels.Set{masterNodeSelector: ""}.String(), + }) + require.NoError(t, err) + for _, m := range masterNodes.Items { + require.NotContains(t, m.Labels, votingNodeSelector) + } +} + +func defaultClusterNodes() []*corev1.Node { + var nodes []*corev1.Node + + for i := 1; i <= 6; i++ { + isMaster := false + if i <= 3 { + isMaster = true + } + nodes = append(nodes, createNode(i, isMaster)) + } + return nodes +} + +func createNode(idx int, isMaster bool) *corev1.Node { + node := &corev1.Node{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("n-%d", idx), + }, + } + + if isMaster { + node.ObjectMeta.Labels = map[string]string{ + masterNodeSelector: "", + } + + if node.Status.Addresses == nil { + node.Status.Addresses = []corev1.NodeAddress{} + } + node.Status.Addresses = append(node.Status.Addresses, corev1.NodeAddress{ + Type: corev1.NodeInternalIP, + Address: fmt.Sprintf("10.0.0.%d", idx), + }) + } + + return node +} diff --git a/pkg/operator/starter.go b/pkg/operator/starter.go index ac3c164aa..8da52ac53 100644 --- a/pkg/operator/starter.go +++ b/pkg/operator/starter.go @@ -493,6 +493,7 @@ func RunOperator(ctx context.Context, controllerContext *controllercmd.Controlle featureGateAccessor, backuphelpers.NewDisabledBackupConfig(), configBackupInformer, + etcdClient, kubeInformersForNamespaces) backupController := backupcontroller.NewBackupController(