Skip to content

Commit

Permalink
ETCD-681: Add etcd-backup-server container within separate daemonset
Browse files Browse the repository at this point in the history
  • Loading branch information
Elbehery committed Oct 10, 2024
1 parent 2846776 commit 99f31b8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 210 deletions.
107 changes: 75 additions & 32 deletions pkg/operator/periodicbackupcontroller/periodicbackupcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package periodicbackupcontroller
import (
"context"
"fmt"
"strings"
"os"
"time"

"k8s.io/apimachinery/pkg/labels"
clientv1 "k8s.io/client-go/listers/core/v1"

backupv1alpha1 "github.com/openshift/api/config/v1alpha1"
Expand All @@ -21,21 +20,25 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"

appv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes"
batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)

const (
backupJobLabel = "backup-name"
defaultBackupCRName = "default"
etcdBackupServerContainerName = "etcd-backup-server"
backupServerDaemonSet = "backup-server-daemon-set"
)

type PeriodicBackupController struct {
Expand Down Expand Up @@ -101,7 +104,11 @@ func (c *PeriodicBackupController) sync(ctx context.Context, _ factory.SyncConte
for _, item := range backups.Items {
if item.Name == defaultBackupCRName {
defaultFound = true
c.backupVarGetter.SetBackupSpec(&item.Spec.EtcdBackupSpec)
// c.backupVarGetter.SetBackupSpec(&item.Spec.EtcdBackupSpec)
_, err = c.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Update(ctx, deployBackupServerDaemonSet(), v1.UpdateOptions{})
if err != nil {
return fmt.Errorf("PeriodicBackupController could not update [defaultBackupDeployment]: %w", err)
}
continue
}

Expand All @@ -121,38 +128,13 @@ func (c *PeriodicBackupController) sync(ctx context.Context, _ factory.SyncConte
}
}

if defaultFound {
mirrorPods, err := c.podLister.List(labels.Set{"app": "etcd"}.AsSelector())
if !defaultFound {
err = c.kubeClient.AppsV1().DaemonSets(operatorclient.TargetNamespace).Delete(ctx, backupServerDaemonSet, v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("PeriodicBackupController could not list etcd pods: %w", err)
}

var terminationReasons []string
for _, p := range mirrorPods {
for _, cStatus := range p.Status.ContainerStatuses {
if cStatus.Name == etcdBackupServerContainerName {
// TODO we can also try different cStatus.State.Terminated.ExitCode
terminationReasons = append(terminationReasons, fmt.Sprintf("container %s within pod %s has been terminated: %s", etcdBackupServerContainerName, p.Name, cStatus.State.Terminated.Message))
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("PeriodicBackupController could not delete [defaultBackupDeployment]: %w", err)
}
}

if len(terminationReasons) > 0 {
_, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Type: "PeriodicBackupControllerDegraded",
Status: operatorv1.ConditionTrue,
Reason: "Error",
Message: fmt.Sprintf("found default backup errors: %s", strings.Join(terminationReasons, " ,")),
}))
if updateErr != nil {
klog.V(4).Infof("PeriodicBackupController error during default backup UpdateStatus: %v", err)
}

return nil
}
} else {
// disable etcd-backup-server
c.backupVarGetter.SetBackupSpec(nil)
}

_, _, updateErr := v1helpers.UpdateStatus(ctx, c.operatorClient, v1helpers.UpdateConditionFn(operatorv1.OperatorCondition{
Expand Down Expand Up @@ -318,3 +300,64 @@ func newCronJob() (*batchv1.CronJob, error) {

return obj.(*batchv1.CronJob), nil
}

func deployBackupServerDaemonSet() *appv1.DaemonSet {
deploy := appv1.DaemonSet{
ObjectMeta: v1.ObjectMeta{
Name: backupServerDaemonSet,
Namespace: operatorclient.TargetNamespace,
},
Spec: appv1.DaemonSetSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
NodeSelector: map[string]string{"node-role.kubernetes.io/master": ""},
Volumes: []corev1.Volume{
{Name: "data-dir", VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/etcd",
Type: ptr.To(corev1.HostPathUnset)}}},

{Name: "config-dir", VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/etc/kubernetes"}}},

{Name: "etcd-auto-backup-dir", VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/etcd-auto-backup"}}},

{Name: "cert-dir", VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/etc/kubernetes/static-pod-resources/etcd-certs"}}},
},
Containers: []corev1.Container{
{
Name: etcdBackupServerContainerName,
Image: os.Getenv("OPERATOR_IMAGE"),
Command: []string{"cluster-etcd-operator", "backup-server"},
Args: []string{
"--enabled=true",
"--timezone=GMT",
"--schedule=0 */5 * * *",
"--type=RetentionNumber",
"--maxNumberOfBackups=3",
},
VolumeMounts: []corev1.VolumeMount{
{Name: "data-dir", MountPath: "/var/lib/etcd"},
{Name: "config-dir", MountPath: "/etc/kubernetes"},
{Name: "etcd-auto-backup-dir", MountPath: "/var/lib/etcd-auto-backup"},
{Name: "cert-dir", MountPath: "/etc/kubernetes/static-pod-certs"},
},
TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
ImagePullPolicy: corev1.PullIfNotPresent,
SecurityContext: &corev1.SecurityContext{
Privileged: ptr.To(true),
},
},
},
},
},
},
}

return &deploy
}
178 changes: 0 additions & 178 deletions pkg/operator/periodicbackupcontroller/periodicbackupcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/openshift/library-go/pkg/operator/v1helpers"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
k8sfakeclient "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -60,162 +58,6 @@ func TestSyncLoopHappyPath(t *testing.T) {
requireOperatorStatus(t, fakeOperatorClient, false)
}

func TestSyncLoopWithDefaultBackupCR(t *testing.T) {
var backups backupv1alpha1.BackupList

backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"},
Spec: backupv1alpha1.BackupSpec{
EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{
Schedule: "20 4 * * *",
TimeZone: "UTC",
RetentionPolicy: backupv1alpha1.RetentionPolicy{
RetentionType: backupv1alpha1.RetentionTypeNumber,
RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}},
PVCName: "backup-happy-path-pvc"}}}

// no default CR
backups.Items = append(backups.Items, backup)
operatorFake := fake.NewClientset([]runtime.Object{&backups}...)
client := k8sfakeclient.NewClientset()
fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace)
fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient(
&operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}},
&operatorv1.StaticPodOperatorStatus{}, nil, nil)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
backupsClient: operatorFake.ConfigV1alpha1(),
kubeClient: client,
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
kubeInformers: fakeKubeInformerForNamespace,
}

stopChan := make(chan struct{})
t.Cleanup(func() {
close(stopChan)
})
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), nil)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())

// create default CR
defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName},
Spec: backupv1alpha1.BackupSpec{
EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{
Schedule: "0 */2 * * *",
TimeZone: "GMT",
RetentionPolicy: backupv1alpha1.RetentionPolicy{
RetentionType: backupv1alpha1.RetentionTypeNumber,
RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}}}}}

backups.Items = append(backups.Items, defaultBackup)
operatorFake = fake.NewClientset([]runtime.Object{&backups}...)
controller.backupsClient = operatorFake.ConfigV1alpha1()

expEnabledBackupVar := " args:\n - --enabled=true\n - --timezone=GMT\n - --schedule=0 */2 * * *\n - --type=RetentionNumber\n - --maxNumberOfBackups=3"
err = controller.sync(context.TODO(), nil)
require.NoError(t, err)
require.Equal(t, expEnabledBackupVar, controller.backupVarGetter.ArgString())

// removing defaultCR
backups.Items = backups.Items[:len(backups.Items)-1]
operatorFake = fake.NewClientset([]runtime.Object{&backups}...)
controller.backupsClient = operatorFake.ConfigV1alpha1()

err = controller.sync(context.TODO(), nil)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())
}

func TestSyncLoopFailsDegradesOperatorWithDefaultBackupCR(t *testing.T) {
var backups backupv1alpha1.BackupList

backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"},
Spec: backupv1alpha1.BackupSpec{
EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{
Schedule: "20 4 * * *",
TimeZone: "UTC",
RetentionPolicy: backupv1alpha1.RetentionPolicy{
RetentionType: backupv1alpha1.RetentionTypeNumber,
RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 5}},
PVCName: "backup-happy-path-pvc"}}}

backupServerFailureMsg := fmt.Sprintf("error running etcd backup: %s", "error running backup")
client := k8sfakeclient.NewClientset([]runtime.Object{
etcdPodWithFailingBackupServerContainer("1", backupServerFailureMsg),
etcdPodWithFailingBackupServerContainer("2", backupServerFailureMsg),
etcdPodWithFailingBackupServerContainer("3", backupServerFailureMsg)}...)

fakeKubeInformerForNamespace := v1helpers.NewKubeInformersForNamespaces(client, operatorclient.TargetNamespace)

fakeOperatorClient := v1helpers.NewFakeStaticPodOperatorClient(
&operatorv1.StaticPodOperatorSpec{OperatorSpec: operatorv1.OperatorSpec{ManagementState: operatorv1.Managed}},
&operatorv1.StaticPodOperatorStatus{}, nil, nil)

// no default CR
backups.Items = append(backups.Items, backup)
operatorFake := fake.NewClientset([]runtime.Object{&backups}...)

controller := PeriodicBackupController{
operatorClient: fakeOperatorClient,
podLister: fakeKubeInformerForNamespace.InformersFor(operatorclient.TargetNamespace).Core().V1().Pods().Lister(),
backupsClient: operatorFake.ConfigV1alpha1(),
kubeClient: client,
operatorImagePullSpec: "pullspec-image",
backupVarGetter: backuphelpers.NewDisabledBackupConfig(),
featureGateAccessor: backupFeatureGateAccessor,
kubeInformers: fakeKubeInformerForNamespace,
}

stopChan := make(chan struct{})
t.Cleanup(func() {
close(stopChan)
})
fakeKubeInformerForNamespace.Start(stopChan)

expDisabledBackupVar := " args:\n - --enabled=false"
err := controller.sync(context.TODO(), nil)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())
requireOperatorStatus(t, fakeOperatorClient, false)

// create default CR
defaultBackup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: defaultBackupCRName},
Spec: backupv1alpha1.BackupSpec{
EtcdBackupSpec: backupv1alpha1.EtcdBackupSpec{
Schedule: "0 */2 * * *",
TimeZone: "GMT",
RetentionPolicy: backupv1alpha1.RetentionPolicy{
RetentionType: backupv1alpha1.RetentionTypeNumber,
RetentionNumber: &backupv1alpha1.RetentionNumberConfig{MaxNumberOfBackups: 3}}}}}

backups.Items = append(backups.Items, defaultBackup)
operatorFake = fake.NewClientset([]runtime.Object{&backups}...)
controller.backupsClient = operatorFake.ConfigV1alpha1()

expEnabledBackupVar := " args:\n - --enabled=true\n - --timezone=GMT\n - --schedule=0 */2 * * *\n - --type=RetentionNumber\n - --maxNumberOfBackups=3"
err = controller.sync(context.TODO(), nil)
require.NoError(t, err)
require.Equal(t, expEnabledBackupVar, controller.backupVarGetter.ArgString())
requireOperatorStatus(t, fakeOperatorClient, true)

// removing defaultCR
backups.Items = backups.Items[:len(backups.Items)-1]
operatorFake = fake.NewClientset([]runtime.Object{&backups}...)
controller.backupsClient = operatorFake.ConfigV1alpha1()

err = controller.sync(context.TODO(), nil)
require.NoError(t, err)
require.Equal(t, expDisabledBackupVar, controller.backupVarGetter.ArgString())
requireOperatorStatus(t, fakeOperatorClient, false)
}

func TestSyncLoopExistingCronJob(t *testing.T) {
backup := backupv1alpha1.Backup{ObjectMeta: v1.ObjectMeta{Name: "test-backup"},
Spec: backupv1alpha1.BackupSpec{
Expand Down Expand Up @@ -402,23 +244,3 @@ func findFirstCreateAction(client *k8sfakeclient.Clientset) *k8stesting.CreateAc
}
return createAction
}

func etcdPodWithFailingBackupServerContainer(nodeName string, failureMsg string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: fmt.Sprintf("etcd-%v", nodeName),
Namespace: "openshift-etcd",
Labels: labels.Set{"app": "etcd"},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{{
Name: etcdBackupServerContainerName,
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
Message: failureMsg,
},
}},
},
},
}
}

0 comments on commit 99f31b8

Please sign in to comment.