Skip to content

Commit

Permalink
Merge pull request #17 from vshn/feat/copy-job-custom-sa-rb
Browse files Browse the repository at this point in the history
Add support to allow sync jobs to use an existing cluster role
  • Loading branch information
simu authored Oct 20, 2021
2 parents c0f1abf + 1ac0fde commit 13d83e1
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 18 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ This means increasing the storage size of a Statefulset involves quite a lot of
The Statefulset Resize Controller is general solution to this problem and does not require any additional support from the storage backend.
When recreating a Statefulset using `--cascade=orphan`, the controller will notice the change, scale down the Statefulset, recreate the PVCs, and migrate the data.

### Controller command line arguments

* `--sync-image`: A container image containing rsync, used to move data.
Default `instrumentisto/rsync-ssh`.
* `--sync-cluster-role`: A ClusterRole to use for the sync jobs.
If this is not specified, the sync jobs run with the default service account in the StatefulSet's namespace.
For example, this can be used to allow the sync job to run as root on a cluster with PSPs enabled by providing the name of a ClusterRole which allows usage of a privileged PSP.
Default `""`.
* `--metrics-bind-address`: The address the metric endpoint binds to.
Default `:8080`.
* `--health-probe-bind-address`: The address the probe endpoint binds to.
Default `:8081`.
* `--leader-elect`: Enable leader election for controller manager.
Enabling this will ensure there is only one active controller manager.
Default `false`.

### Example

Get access to a Kubernetes cluster that has support for automatic PV provisioning.
Expand Down
24 changes: 24 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ metadata:
creationTimestamp: null
name: controller-manager
rules:
- apiGroups:
- ""
resources:
- serviceaccount
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- apps
resources:
Expand Down Expand Up @@ -64,3 +76,15 @@ rules:
- patch
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- rolebinding
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
3 changes: 3 additions & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type StatefulSetReconciler struct {
Recorder record.EventRecorder

SyncContainerImage string
SyncClusterRole string
RequeueAfter time.Duration
}

Expand All @@ -28,6 +29,8 @@ type StatefulSetReconciler struct {
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=rolebinding,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=serviceaccount,verbs=get;list;watch;create;update;patch;delete

// Reconcile is the main work loop, reacting to changes in statefulsets and initiating resizing of StatefulSets.
func (r *StatefulSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
109 changes: 100 additions & 9 deletions controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -27,7 +28,7 @@ import (
func TestController(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, stop := startTestReconciler(t, ctx)
c, stop := startTestReconciler(t, ctx, "")
defer stop()

t.Run("e2e", func(t *testing.T) { // This allows the subtest to run in parallel
Expand Down Expand Up @@ -101,11 +102,16 @@ func TestController(t *testing.T) {
t.Run("Scale down", func(t *testing.T) {
eventuallyScaledDown(t, ctx, c, sts)
})

consistently(t, func() bool {
return rbacNotExists(t, ctx, c, ns, "somesa", "someclusterrole")
}, duration, interval, "RBAC doesn't exist")

t.Run("Back up", func(t *testing.T) {
eventuallyBackedUp(t, ctx, c, pvc, true)
eventuallyBackedUp(t, ctx, c, pvc, true, "")
})
t.Run("Restored", func(t *testing.T) {
eventuallyRestored(t, ctx, c, pvc, "2G")
eventuallyRestored(t, ctx, c, pvc, "2G", "")
})
t.Run("Scale up", func(t *testing.T) {
eventuallyScaledUp(t, ctx, c, sts, 1)
Expand Down Expand Up @@ -134,7 +140,7 @@ func TestController(t *testing.T) {
eventuallyScaledDown(t, ctx, c, sts)
})
t.Run("Back up failed", func(t *testing.T) {
eventuallyBackedUp(t, ctx, c, pvc, false)
eventuallyBackedUp(t, ctx, c, pvc, false, "")
})
t.Run("Scale up", func(t *testing.T) {
eventuallyScaledUp(t, ctx, c, sts, 1)
Expand All @@ -147,6 +153,57 @@ func TestController(t *testing.T) {
})
}

func TestControllerWithClusterRole(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
crname := "myclusterrole"
c, stop := startTestReconciler(t, ctx, crname)
defer stop()

t.Run("e2e", func(t *testing.T) { // This allows the subtest to run in parallel
t.Run("Resize StatfulSet", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
ns := "e2e"
require := require.New(t)
require.NoError(c.Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
},
}))
sts := newTestStatefulSet(ns, "test", 1, "2G")
pvc := newSource(ns, "data-test-0", "1G",
func(pvc *corev1.PersistentVolumeClaim) *corev1.PersistentVolumeClaim {
pvc.Labels = sts.Spec.Selector.MatchLabels
return pvc
})
require.NoError(c.Create(ctx, pvc))
require.NoError(c.Create(ctx, sts))

rbacobjname := RbacObjNamePrefix + "test"

t.Run("Scale down", func(t *testing.T) {
eventuallyScaledDown(t, ctx, c, sts)
})
t.Run("RBAC created", func(t *testing.T) {
eventuallyRbacCreated(t, ctx, c, ns, rbacobjname, crname)
})
t.Run("Back up", func(t *testing.T) {
eventuallyBackedUp(t, ctx, c, pvc, true, rbacobjname)
})
t.Run("Restored", func(t *testing.T) {
eventuallyRestored(t, ctx, c, pvc, "2G", rbacobjname)
})
t.Run("RBAC removed", func(t *testing.T) {
eventuallyRbacRemoved(t, ctx, c, ns, rbacobjname, crname)
})
t.Run("Scale up", func(t *testing.T) {
eventuallyScaledUp(t, ctx, c, sts, 1)
})
})
})
}

func eventuallyScaledDown(t *testing.T, ctx context.Context, c client.Client, sts *appsv1.StatefulSet) bool {
ok := assert.Eventually(t, func() bool {
found := &appsv1.StatefulSet{}
Expand Down Expand Up @@ -179,7 +236,7 @@ func eventuallyScaledUp(t *testing.T, ctx context.Context, c client.Client, sts
require.NoError(t, c.Status().Update(ctx, sts)) // manualy do what k8s would do
return ok
}
func eventuallyBackedUp(t *testing.T, ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, successful bool) bool {
func eventuallyBackedUp(t *testing.T, ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, successful bool, saname string) bool {
// Check if backup is created
bname := strings.ToLower(fmt.Sprintf("%s-backup-1g", pvc.Name))
bu := newBackup(pvc.Namespace, bname, "1G")
Expand All @@ -194,7 +251,7 @@ func eventuallyBackedUp(t *testing.T, ctx context.Context, c client.Client, pvc
job := newTestJob(pvc.Namespace,
client.ObjectKey{Namespace: pvc.Namespace, Name: pvc.Name},
client.ObjectKey{Namespace: bu.Namespace, Name: bu.Name},
"test", &jobState)
"test", saname, &jobState)
jobStatus := job.Status
ok := assert.Eventually(t, func() bool {
return jobExists(ctx, c, job)
Expand All @@ -205,7 +262,7 @@ func eventuallyBackedUp(t *testing.T, ctx context.Context, c client.Client, pvc
return ok
}

func eventuallyRestored(t *testing.T, ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, size string) bool {
func eventuallyRestored(t *testing.T, ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, size string, saname string) bool {
require.Eventually(t, func() bool {
return pvcNotExists(ctx, c, pvc)
}, duration, interval, "pvc removed")
Expand All @@ -227,7 +284,7 @@ func eventuallyRestored(t *testing.T, ctx context.Context, c client.Client, pvc
job := newTestJob(pvc.Namespace,
client.ObjectKey{Namespace: pvc.Namespace, Name: bname},
client.ObjectKey{Namespace: pvc.Namespace, Name: pvc.Name},
"test", &jobSucceeded)
"test", saname, &jobSucceeded)
jobStatus := job.Status
ok := assert.Eventually(t, func() bool {
return jobExists(ctx, c, job)
Expand All @@ -238,8 +295,40 @@ func eventuallyRestored(t *testing.T, ctx context.Context, c client.Client, pvc
return ok
}

func eventuallyRbacCreated(t *testing.T, ctx context.Context, c client.Client, namespace, objname, crname string) bool {
sa := newTestSA(namespace, objname)
require.Eventually(t, func() bool {
return saExists(ctx, c, sa)
}, duration, interval, "sa created")
require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(sa), sa))
rb := newTestRB(namespace, objname, crname)
require.Eventually(t, func() bool {
return rbExists(ctx, c, rb)
}, duration, interval, "rb created")
require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(rb), rb))
return true
}

func eventuallyRbacRemoved(t *testing.T, ctx context.Context, c client.Client, namespace, objname, crname string) bool {
sa := newTestSA(namespace, objname)
require.Eventually(t, func() bool {
return saNotExists(ctx, c, sa)
}, duration, interval, "sa created")
rb := newTestRB(namespace, objname, crname)
require.Eventually(t, func() bool {
return rbNotExists(ctx, c, rb)
}, duration, interval, "rb created")
return true
}

func rbacNotExists(t *testing.T, ctx context.Context, c client.Client, namespace, objname, crname string) bool {
sa := newTestSA(namespace, objname)
rb := newTestRB(namespace, crname, objname)
return saNotExists(ctx, c, sa) && rbNotExists(ctx, c, rb)
}

// startTestReconciler sets up a separate test env and starts the controller
func startTestReconciler(t *testing.T, ctx context.Context) (client.Client, func() error) {
func startTestReconciler(t *testing.T, ctx context.Context, crname string) (client.Client, func() error) {
req := require.New(t)

testEnv := &envtest.Environment{}
Expand All @@ -250,6 +339,7 @@ func startTestReconciler(t *testing.T, ctx context.Context) (client.Client, func
req.NoError(appsv1.AddToScheme(s))
req.NoError(corev1.AddToScheme(s))
req.NoError(batchv1.AddToScheme(s))
req.NoError(rbacv1.AddToScheme(s))

mgr, err := ctrl.NewManager(conf, ctrl.Options{
Scheme: s,
Expand All @@ -260,6 +350,7 @@ func startTestReconciler(t *testing.T, ctx context.Context) (client.Client, func
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("statefulset-resize-controller"),
SyncContainerImage: "test",
SyncClusterRole: crname,
RequeueAfter: time.Second,
}).SetupWithManager(mgr))
go func() {
Expand Down
79 changes: 76 additions & 3 deletions controllers/controller_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -80,7 +81,7 @@ func newBackup(namespace, name, size string, fs ...func(*corev1.PersistentVolume
return pvc
}

func newTestJob(namespace string, src, dst client.ObjectKey, image string, state *batchv1.JobConditionType, fs ...func(*batchv1.Job) *batchv1.Job) *batchv1.Job {
func newTestJob(namespace string, src, dst client.ObjectKey, image string, saname string, state *batchv1.JobConditionType, fs ...func(*batchv1.Job) *batchv1.Job) *batchv1.Job {
name := fmt.Sprintf("sync-%s-to-%s", src.Name, dst.Name)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -114,7 +115,8 @@ func newTestJob(namespace string, src, dst client.ObjectKey, image string, state
ImagePullPolicy: "Always",
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
RestartPolicy: corev1.RestartPolicyOnFailure,
ServiceAccountName: saname,
Volumes: []corev1.Volume{
{
Name: "src",
Expand Down Expand Up @@ -206,6 +208,42 @@ func newTestStatefulSet(namespace, name string, replicas int32, size string) *ap
}
}

func newTestSA(namespace, objname string) *corev1.ServiceAccount {
return &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: objname,
Namespace: namespace,
Labels: map[string]string{
ManagedLabel: "true",
},
},
}
}

func newTestRB(namespace, objname, crname string) *rbacv1.RoleBinding {
return &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: objname,
Namespace: namespace,
Labels: map[string]string{
ManagedLabel: "true",
},
},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "ClusterRole",
Name: crname,
},
Subjects: []rbacv1.Subject{
rbacv1.Subject{
Kind: rbacv1.ServiceAccountKind,
Name: objname,
Namespace: namespace,
},
},
}
}

var jobSucceeded = batchv1.JobComplete
var jobFailed = batchv1.JobFailed

Expand Down Expand Up @@ -234,7 +272,8 @@ func jobExists(ctx context.Context, c client.Client, other *batchv1.Job) bool {
}
return assert.ObjectsAreEqual(job.Spec.Template.Spec.Containers, other.Spec.Template.Spec.Containers) &&
assert.ObjectsAreEqual(job.Spec.Template.Spec.Volumes, other.Spec.Template.Spec.Volumes) &&
assert.ObjectsAreEqual(job.Labels, other.Labels)
assert.ObjectsAreEqual(job.Labels, other.Labels) &&
job.Spec.Template.Spec.ServiceAccountName == other.Spec.Template.Spec.ServiceAccountName
}

func stsExists(ctx context.Context, c client.Client, other *appsv1.StatefulSet) bool {
Expand All @@ -246,6 +285,40 @@ func stsExists(ctx context.Context, c client.Client, other *appsv1.StatefulSet)
return assert.ObjectsAreEqual(sts.Spec, other.Spec) && assert.ObjectsAreEqual(sts.Labels, other.Labels)
}

func saExists(ctx context.Context, c client.Client, other *corev1.ServiceAccount) bool {
sa := &corev1.ServiceAccount{}
key := client.ObjectKeyFromObject(other)
if err := c.Get(ctx, key, sa); err != nil {
return false
}
return sa.Name == other.Name && sa.Namespace == other.Namespace &&
assert.ObjectsAreEqual(sa.Labels, other.Labels)
}

func saNotExists(ctx context.Context, c client.Client, other *corev1.ServiceAccount) bool {
sa := &corev1.ServiceAccount{}
key := client.ObjectKeyFromObject(other)
err := c.Get(ctx, key, sa)
return apierrors.IsNotFound(err) || (err == nil && sa.DeletionTimestamp != nil)
}

func rbExists(ctx context.Context, c client.Client, other *rbacv1.RoleBinding) bool {
rb := &rbacv1.RoleBinding{}
key := client.ObjectKeyFromObject(other)
if err := c.Get(ctx, key, rb); err != nil {
return false
}
return rb.Name == other.Name && rb.Namespace == other.Namespace &&
assert.ObjectsAreEqual(rb.Labels, other.Labels)
}

func rbNotExists(ctx context.Context, c client.Client, other *rbacv1.RoleBinding) bool {
rb := &rbacv1.RoleBinding{}
key := client.ObjectKeyFromObject(other)
err := c.Get(ctx, key, rb)
return apierrors.IsNotFound(err) || (err == nil && rb.DeletionTimestamp != nil)
}

// Only succeeds if the condition is valid for `waitFor` time.
// Checks the condition every `tick`
func consistently(t assert.TestingT, condition func() bool, waitFor time.Duration, tick time.Duration, msgAndArgs ...interface{}) bool {
Expand Down
Loading

0 comments on commit 13d83e1

Please sign in to comment.