diff --git a/test/e2e/framework/resources/resources.go b/test/e2e/framework/resources/resources.go index e178a8d2..fc1c5996 100644 --- a/test/e2e/framework/resources/resources.go +++ b/test/e2e/framework/resources/resources.go @@ -30,6 +30,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/utils/pointer" + + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" ) var DefaultPodImage = "ealen/echo-server:latest" @@ -47,6 +49,10 @@ func GetSimpleDeployment(baseName string) *appsv1.Deployment { name := fmt.Sprintf("%s-%s", baseName, rand.String(12)) return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: common.DeploymentKind, + }, ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -76,6 +82,18 @@ func GetSimpleDeployment(baseName string) *appsv1.Deployment { } } +func IsDeploymentProgressing(deployment *appsv1.Deployment) bool { + for _, c := range deployment.Status.Conditions { + if c.Type != appsv1.DeploymentProgressing { + continue + } + + return c.Status == corev1.ConditionTrue + } + + return false +} + func GetSimpleDaemonset() *appsv1.DaemonSet { return nil } @@ -88,6 +106,10 @@ func GetSimpleJob(baseName string) *batchv1.Job { name := fmt.Sprintf("%s-%s", baseName, rand.String(12)) return &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: common.JobKind, + }, ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -121,6 +143,10 @@ func GetSimpleCronJob(baseName string) *batchv1b1.CronJob { name := fmt.Sprintf("%s-%s", baseName, rand.String(12)) return &batchv1b1.CronJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: common.CronJobKind, + }, ObjectMeta: metav1.ObjectMeta{ Name: name, }, diff --git a/test/e2e/framework/util/util.go b/test/e2e/framework/util/util.go index f4b30366..400c5498 100644 --- a/test/e2e/framework/util/util.go +++ b/test/e2e/framework/util/util.go @@ -83,6 +83,8 @@ func AssertForItems[T any]( ginkgo.Fail(buf.String()) } +// PollUntilForItems polls each item in items with condFn until +// it returns true, an error, or until ctx is cancelled. func PollUntilForItems[T any]( ctx context.Context, items []T, diff --git a/test/e2e/resourcepropagation/cronjobs.go b/test/e2e/resourcepropagation/cronjobs.go index 58d5afe0..b6f52750 100644 --- a/test/e2e/resourcepropagation/cronjobs.go +++ b/test/e2e/resourcepropagation/cronjobs.go @@ -17,158 +17,38 @@ limitations under the License. package resourcepropagation import ( - "context" - "time" - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" batchv1b1 "k8s.io/api/batch/v1beta1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" "github.com/kubewharf/kubeadmiral/test/e2e/framework" - "github.com/kubewharf/kubeadmiral/test/e2e/framework/policies" "github.com/kubewharf/kubeadmiral/test/e2e/framework/resources" - "github.com/kubewharf/kubeadmiral/test/e2e/framework/util" ) -var _ = ginkgo.Describe("CronJob Propagation", resourcePropagationTestLabel, func() { +var _ = ginkgo.Describe("CronJob Propagation", func() { f := framework.NewFramework("cronjob-propagation", framework.FrameworkOptions{CreateNamespace: true}) - var cronJob *batchv1b1.CronJob - var clusters []*fedcorev1a1.FederatedCluster - - ginkgo.BeforeEach(func(ctx ginkgo.SpecContext) { - cronJob = resources.GetSimpleCronJob(f.Name()) - clusterList, err := f.HostFedClient().CoreV1alpha1().FederatedClusters().List(ctx, metav1.ListOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - - clusters = make([]*fedcorev1a1.FederatedCluster, len(clusterList.Items)) - for i := range clusterList.Items { - clusters[i] = &clusterList.Items[i] - } - clusters = util.FilterOutE2ETestObjects(clusters) - - policy := policies.PropagationPolicyForClustersWithPlacements(f.Name(), clusters) - policies.SetPropagationPolicy(cronJob, policy) - - _, err = f.HostFedClient().CoreV1alpha1().PropagationPolicies(f.TestNamespace().Name).Create( - ctx, - policy, - metav1.CreateOptions{}, - ) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - }) - - assertCronJobsPropagated := func(ctx context.Context) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, resourcePropagationTimeout) - defer cancel() - - failedClusters, err := util.PollUntilForItems( - ctxWithTimeout, - clusters, - func(c *fedcorev1a1.FederatedCluster) (bool, error) { - _, err := f.ClusterKubeClient(ctx, c).BatchV1beta1().CronJobs(f.TestNamespace().Name).Get( - ctx, - cronJob.Name, - metav1.GetOptions{}, - ) - if err != nil && apierrors.IsNotFound(err) { - return false, nil - } - return true, err + resourcePropagationTest( + f, + &resourcePropagationTestConfig[*batchv1b1.CronJob]{ + gvr: batchv1b1.SchemeGroupVersion.WithResource("jobs"), + objectFactory: resources.GetSimpleCronJob, + clientGetter: func(client kubernetes.Interface, namespace string) resourceClient[*batchv1b1.CronJob] { + return client.BatchV1beta1().CronJobs(namespace) }, - defaultPollingInterval, - ) - - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - gomega.Expect(failedClusters). - To(gomega.BeEmpty(), "Timed out waiting for cronjob to propagate to clusters %v", util.NameList(failedClusters)) - } - - assertCronJobsRunOnce := func(ctx context.Context) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, resourceReadyTimeout) - defer cancel() - - failedClusters, err := util.PollUntilForItems( - ctxWithTimeout, - clusters, - func(c *fedcorev1a1.FederatedCluster) (bool, error) { - clusterJob, err := f.ClusterKubeClient(ctx, c).BatchV1beta1().CronJobs(f.TestNamespace().Name).Get( - ctx, - cronJob.Name, - metav1.GetOptions{}, - ) - if err != nil { - return true, err - } - return resources.IsCronJobScheduledOnce(clusterJob), nil + isPropagatedResourceWorking: func( + _ kubernetes.Interface, + _ dynamic.Interface, + cronjob *batchv1b1.CronJob, + ) (bool, error) { + return resources.IsCronJobScheduledOnce(cronjob), nil }, - defaultPollingInterval, - ) - - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - gomega.Expect(failedClusters). - To(gomega.BeEmpty(), "Timed out waiting for cronjob %s to be run once in clusters %v", cronJob.Name, failedClusters) - } - - assertCronJobsDeleted := func(ctx context.Context) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, resourceDeleteTimeout) - defer cancel() - - failedClusters, err := util.PollUntilForItems( - ctxWithTimeout, - clusters, - func(c *fedcorev1a1.FederatedCluster) (bool, error) { - _, err := f.ClusterKubeClient(ctx, c).BatchV1beta1().CronJobs(f.TestNamespace().Name).Get( - ctx, - cronJob.Name, - metav1.GetOptions{}, - ) - if err != nil && !apierrors.IsNotFound(err) { - return true, err - } - return apierrors.IsNotFound(err), nil + statusCollection: &resourceStatusCollectionTestConfig{ + gvr: fedtypesv1a1.SchemeGroupVersion.WithResource("federatedcronjobstatuses"), + path: "status", }, - defaultPollingInterval, - ) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - gomega.Expect(failedClusters). - To(gomega.BeEmpty(), "Timed out waiting for job to be deleted in clusters %v", failedClusters) - - gomega.Eventually(func(g gomega.Gomega, ctx context.Context) { - _, err := f.HostKubeClient().BatchV1beta1().CronJobs(f.TestNamespace().Name).Get(ctx, cronJob.Name, metav1.GetOptions{}) - gomega.Expect(err).To(gomega.Or(gomega.BeNil(), gomega.Satisfy(apierrors.IsNotFound))) - g.Expect(err).To(gomega.Satisfy(apierrors.IsNotFound)) - }).WithContext(ctxWithTimeout).Should(gomega.Succeed(), "Timed out waiting for source object deletion") - } - - ginkgo.It("should succeed", func(ctx ginkgo.SpecContext) { - var err error - - ginkgo.By("Creating cronjob") - cronJob, err = f.HostKubeClient().BatchV1beta1().CronJobs(f.TestNamespace().Name).Create(ctx, cronJob, metav1.CreateOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - ginkgo.GinkgoLogr.Info("created cronJob for propagation", "cronjob-name", cronJob.Name, "cronjob-namespace", f.TestNamespace().Name) - - start := time.Now() - - ginkgo.By("Waiting for cronjob propagation") - assertCronJobsPropagated(ctx) - ginkgo.GinkgoLogr.Info("all cronjobs propagated", "duration", time.Since(start)) - - ginkgo.By("Waiting for cronjob to be scheduled once") - assertCronJobsRunOnce(ctx) - ginkgo.GinkgoLogr.Info("all cronjobs run once", "duration", time.Since(start)) - - err = f.HostKubeClient().BatchV1beta1().CronJobs(f.TestNamespace().Name).Delete(ctx, cronJob.Name, metav1.DeleteOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - ginkgo.GinkgoLogr.Info("deleted source cronjob object") - - start = time.Now() - ginkgo.By("Waiting for cronjob deletion") - assertCronJobsDeleted(ctx) - ginkgo.GinkgoLogr.Info("all cronjobs deleted", "duration", time.Since(start)) - }) + }, + ) }) diff --git a/test/e2e/resourcepropagation/deployments.go b/test/e2e/resourcepropagation/deployments.go new file mode 100644 index 00000000..8b253586 --- /dev/null +++ b/test/e2e/resourcepropagation/deployments.go @@ -0,0 +1,54 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcepropagation + +import ( + "github.com/onsi/ginkgo/v2" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + + fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" + "github.com/kubewharf/kubeadmiral/test/e2e/framework" + "github.com/kubewharf/kubeadmiral/test/e2e/framework/resources" +) + +var _ = ginkgo.Describe("Deployment Propagation", func() { + f := framework.NewFramework("deployment-propagation", framework.FrameworkOptions{CreateNamespace: true}) + + resourcePropagationTest( + f, + &resourcePropagationTestConfig[*appsv1.Deployment]{ + gvr: appsv1.SchemeGroupVersion.WithResource("deployments"), + objectFactory: resources.GetSimpleDeployment, + clientGetter: func(client kubernetes.Interface, namespace string) resourceClient[*appsv1.Deployment] { + return client.AppsV1().Deployments(namespace) + }, + isPropagatedResourceWorking: func( + _ kubernetes.Interface, + _ dynamic.Interface, + deployment *appsv1.Deployment, + ) (bool, error) { + return resources.IsDeploymentProgressing(deployment), nil + }, + statusCollection: &resourceStatusCollectionTestConfig{ + gvr: fedtypesv1a1.SchemeGroupVersion.WithResource("federateddeploymentstatuses"), + path: "status", + }, + }, + ) +}) diff --git a/test/e2e/resourcepropagation/framework.go b/test/e2e/resourcepropagation/framework.go index 78a584f2..c0ffa562 100644 --- a/test/e2e/resourcepropagation/framework.go +++ b/test/e2e/resourcepropagation/framework.go @@ -17,17 +17,298 @@ limitations under the License. package resourcepropagation import ( + "context" + "encoding/json" + "fmt" + "strings" "time" "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + controllerutil "github.com/kubewharf/kubeadmiral/pkg/controllers/util" + "github.com/kubewharf/kubeadmiral/test/e2e/framework" + "github.com/kubewharf/kubeadmiral/test/e2e/framework/policies" + "github.com/kubewharf/kubeadmiral/test/e2e/framework/util" ) var ( resourcePropagationTestLabel = ginkgo.Label("resource-propagation") +) + +const ( + resourceUpdateTestAnnotationKey = "kubeadmiral.io/e2e-update-test" + resourceUpdateTestAnnotationValue = "1" defaultPollingInterval = 10 * time.Millisecond resourcePropagationTimeout = 30 * time.Second + resourceStatusTimeout = 30 * time.Second resourceReadyTimeout = 2 * time.Minute resourceDeleteTimeout = 30 * time.Second ) + +type k8sObject interface { + metav1.Object + pkgruntime.Object +} + +type resourceClient[T k8sObject] interface { + Create(ctx context.Context, object T, opts metav1.CreateOptions) (T, error) + Update(ctx context.Context, object T, opts metav1.UpdateOptions) (T, error) + Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error + Get(ctx context.Context, name string, opts metav1.GetOptions) (T, error) + Patch( + ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string, + ) (result T, err error) +} + +type resourceStatusCollectionTestConfig struct { + // GVR of the federatedstatus. + gvr schema.GroupVersionResource + // Path to a field in the resource whose value should be collected by status collection. + path string +} + +type resourcePropagationTestConfig[T k8sObject] struct { + gvr schema.GroupVersionResource + statusCollection *resourceStatusCollectionTestConfig + // Returns an object template with the given name. + objectFactory func(name string) T + // Given a Kubernetes client interface and namespace, returns a client that can be used to perform operations on T in the host apiserver. + clientGetter func(client kubernetes.Interface, namespace string) resourceClient[T] + // Checks whether the given resource is working properly in the given cluster. + isPropagatedResourceWorking func(kubernetes.Interface, dynamic.Interface, T) (bool, error) +} + +func resourcePropagationTest[T k8sObject]( + f framework.Framework, + config *resourcePropagationTestConfig[T], +) { + ginkgo.It("Should succeed", resourcePropagationTestLabel, func(ctx ginkgo.SpecContext) { + var err error + object := config.objectFactory(f.Name()) + hostClient := config.clientGetter(f.HostKubeClient(), f.TestNamespace().Name) + + var clusters []*fedcorev1a1.FederatedCluster + ginkgo.By("Getting clusters", func() { + clusterList, err := f.HostFedClient().CoreV1alpha1().FederatedClusters().List(ctx, metav1.ListOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + + clusters = make([]*fedcorev1a1.FederatedCluster, len(clusterList.Items)) + for i := range clusterList.Items { + clusters[i] = &clusterList.Items[i] + } + clusters = util.FilterOutE2ETestObjects(clusters) + }) + + ginkgo.By("Creating PropagationPolicy", func() { + policy := policies.PropagationPolicyForClustersWithPlacements(f.Name(), clusters) + + _, err = f.HostFedClient().CoreV1alpha1().PropagationPolicies(f.TestNamespace().Name).Create( + ctx, + policy, + metav1.CreateOptions{}, + ) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + + policies.SetPropagationPolicy(object, policy) + }) + + ginkgo.By("Creating resource", func() { + object, err = hostClient.Create(ctx, object, metav1.CreateOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + }) + + ginkgo.By("Waiting for resource propagation", func() { + ctx, cancel := context.WithTimeout(ctx, resourcePropagationTimeout) + defer cancel() + + failedClusters, err := util.PollUntilForItems( + ctx, + clusters, + func(c *fedcorev1a1.FederatedCluster) (bool, error) { + _, err := config.clientGetter( + f.ClusterKubeClient(ctx, c), + object.GetNamespace(), + ).Get(ctx, object.GetName(), metav1.GetOptions{}) + if err != nil && apierrors.IsNotFound(err) { + return false, nil + } + return true, err + }, + defaultPollingInterval, + ) + + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + gomega.Expect(failedClusters). + To(gomega.BeEmpty(), "Timed out waiting for resource to propagate to clusters %v", util.NameList(failedClusters)) + }) + + ginkgo.By("Updating the source object", func() { + patch := []map[string]interface{}{ + { + "op": "add", + // escape the / in annotation key + "path": "/metadata/annotations/" + strings.Replace(resourceUpdateTestAnnotationKey, "/", "~1", 1), + "value": resourceUpdateTestAnnotationValue, + }, + } + patchBytes, err := json.Marshal(patch) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + + object, err = hostClient.Patch(ctx, object.GetName(), types.JSONPatchType, patchBytes, metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + }) + + ginkgo.By("Waiting for update to be propagated", func() { + ctx, cancel := context.WithTimeout(ctx, resourcePropagationTimeout) + defer cancel() + + failedClusters, err := util.PollUntilForItems( + ctx, + clusters, + func(c *fedcorev1a1.FederatedCluster) (bool, error) { + clusterObj, err := config.clientGetter( + f.ClusterKubeClient(ctx, c), + object.GetNamespace(), + ).Get(ctx, object.GetName(), metav1.GetOptions{}) + if err != nil { + return false, err + } + updatePropagated := clusterObj.GetAnnotations()[resourceUpdateTestAnnotationKey] == resourceUpdateTestAnnotationValue + return updatePropagated, nil + }, + defaultPollingInterval, + ) + + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + gomega.Expect(failedClusters). + To(gomega.BeEmpty(), "Timed out waiting for resource update to propagate to clusters %v", util.NameList(failedClusters)) + }) + + ginkgo.By("Waiting for propagated resource to work correctly", func() { + ctx, cancel := context.WithTimeout(ctx, resourceReadyTimeout) + defer cancel() + + failedClusters, err := util.PollUntilForItems( + ctx, + clusters, + func(c *fedcorev1a1.FederatedCluster) (bool, error) { + clusterObj, err := config.clientGetter( + f.ClusterKubeClient(ctx, c), + object.GetNamespace(), + ).Get(ctx, object.GetName(), metav1.GetOptions{}) + if err != nil { + return true, err + } + return config.isPropagatedResourceWorking( + f.ClusterKubeClient(ctx, c), + f.ClusterDynamicClient(ctx, c), + clusterObj, + ) + }, + defaultPollingInterval, + ) + + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + gomega.Expect(failedClusters). + To(gomega.BeEmpty(), "Timed out waiting for resource to be working in clusters %v", failedClusters) + }) + + if config.statusCollection != nil { + pathSegments := strings.Split(config.statusCollection.path, ".") + ginkgo.By("Waiting for status collection", func() { + gomega.Eventually(ctx, func(g gomega.Gomega) { + actualFieldByCluster := make(map[string]any, len(clusters)) + for _, cluster := range clusters { + clusterObject, err := config.clientGetter( + f.ClusterKubeClient(ctx, cluster), object.GetNamespace(), + ).Get(ctx, object.GetName(), metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + + uns, err := pkgruntime.DefaultUnstructuredConverter.ToUnstructured(clusterObject) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + actualField, exists, err := unstructured.NestedFieldNoCopy(uns, pathSegments...) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + gomega.Expect(exists).To( + gomega.BeTrue(), + fmt.Sprintf("Cluster object does not contain specified field %q", config.statusCollection.path), + ) + actualFieldByCluster[cluster.Name] = actualField + } + + fedStatusUns, err := f.HostDynamicClient().Resource(config.statusCollection.gvr).Namespace(object.GetNamespace()).Get( + ctx, object.GetName(), metav1.GetOptions{}) + if err != nil && apierrors.IsNotFound(err) { + // status might not have been created yet, use local g to fail only this attempt + g.Expect(err).NotTo(gomega.HaveOccurred(), "Federated status object has not been created") + } + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + + fedStatus := controllerutil.FederatedResource{} + err = pkgruntime.DefaultUnstructuredConverter.FromUnstructured(fedStatusUns.Object, &fedStatus) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + + g.Expect(fedStatus.ClusterStatus).To(gomega.HaveLen(len(actualFieldByCluster)), "Collected status has wrong number of clusters") + for _, clusterStatus := range fedStatus.ClusterStatus { + actualField, exists := actualFieldByCluster[clusterStatus.ClusterName] + g.Expect(exists).To(gomega.BeTrue(), fmt.Sprintf("collected from unexpected cluster %s", clusterStatus.ClusterName)) + + collectedField, exists, err := unstructured.NestedFieldNoCopy(clusterStatus.CollectedFields, pathSegments...) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), framework.MessageUnexpectedError) + g.Expect(exists).To( + gomega.BeTrue(), + fmt.Sprintf("collected fields does not contain %q for cluster %s", config.statusCollection.path, clusterStatus.ClusterName), + ) + g.Expect(collectedField).To(gomega.Equal(actualField), "collected and actual fields differ") + } + }).WithTimeout(resourceStatusTimeout).WithPolling(defaultPollingInterval).Should(gomega.Succeed()) + }) + } + + ginkgo.By("Deleting source object", func() { + err = hostClient.Delete(ctx, object.GetName(), metav1.DeleteOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + }) + + ginkgo.By("Waiting for resource deletion", func() { + ctx, cancel := context.WithTimeout(ctx, resourceDeleteTimeout) + defer cancel() + + failedClusters, err := util.PollUntilForItems( + ctx, + clusters, + func(c *fedcorev1a1.FederatedCluster) (bool, error) { + _, err := config.clientGetter( + f.ClusterKubeClient(ctx, c), + object.GetNamespace(), + ).Get(ctx, object.GetName(), metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return true, err + } + return apierrors.IsNotFound(err), nil + }, + defaultPollingInterval, + ) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) + gomega.Expect(failedClusters). + To(gomega.BeEmpty(), "Timed out waiting for resource to be deleted in clusters %v", failedClusters) + + gomega.Eventually(func(g gomega.Gomega, ctx context.Context) { + _, err := hostClient.Get(ctx, object.GetName(), metav1.GetOptions{}) + gomega.Expect(err).To(gomega.Or(gomega.BeNil(), gomega.Satisfy(apierrors.IsNotFound))) + g.Expect(err).To(gomega.Satisfy(apierrors.IsNotFound)) + }).WithContext(ctx).Should(gomega.Succeed(), "Timed out waiting for source object deletion") + }) + }) +} diff --git a/test/e2e/resourcepropagation/jobs.go b/test/e2e/resourcepropagation/jobs.go index c27d9a8f..c6ac322a 100644 --- a/test/e2e/resourcepropagation/jobs.go +++ b/test/e2e/resourcepropagation/jobs.go @@ -17,201 +17,81 @@ limitations under the License. package resourcepropagation import ( - "context" - "time" - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" batchv1 "k8s.io/api/batch/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" "github.com/kubewharf/kubeadmiral/test/e2e/framework" - "github.com/kubewharf/kubeadmiral/test/e2e/framework/policies" "github.com/kubewharf/kubeadmiral/test/e2e/framework/resources" - "github.com/kubewharf/kubeadmiral/test/e2e/framework/util" ) -var _ = ginkgo.Describe("Job Propagation", resourcePropagationTestLabel, func() { - f := framework.NewFramework("job-propagation", framework.FrameworkOptions{CreateNamespace: true}) - - var job *batchv1.Job - var clusters []*fedcorev1a1.FederatedCluster - - ginkgo.BeforeEach(func(ctx ginkgo.SpecContext) { - job = resources.GetSimpleJob(f.Name()) - clusterList, err := f.HostFedClient().CoreV1alpha1().FederatedClusters().List(ctx, metav1.ListOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - - clusters = make([]*fedcorev1a1.FederatedCluster, len(clusterList.Items)) - for i := range clusterList.Items { - clusters[i] = &clusterList.Items[i] - } - clusters = util.FilterOutE2ETestObjects(clusters) +func getJobWithoutManualSelector(baseName string) *batchv1.Job { + return resources.GetSimpleJob(baseName) +} - policy := policies.PropagationPolicyForClustersWithPlacements(f.Name(), clusters) - policies.SetPropagationPolicy(job, policy) +func getJobWithManualSelector(baseName string) *batchv1.Job { + job := resources.GetSimpleJob(baseName) - _, err = f.HostFedClient().CoreV1alpha1().PropagationPolicies(f.TestNamespace().Name).Create( - ctx, - policy, - metav1.CreateOptions{}, - ) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - }) - - assertJobPropagated := func(ctx context.Context) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, resourcePropagationTimeout) - defer cancel() - - failedClusters, err := util.PollUntilForItems( - ctxWithTimeout, - clusters, - func(c *fedcorev1a1.FederatedCluster) (bool, error) { - _, err := f.ClusterKubeClient(ctx, c).BatchV1().Jobs(f.TestNamespace().Name).Get( - ctx, - job.Name, - metav1.GetOptions{}, - ) - if err != nil && apierrors.IsNotFound(err) { - return false, nil - } - return true, err - }, - defaultPollingInterval, - ) - - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - gomega.Expect(failedClusters). - To(gomega.BeEmpty(), "Timed out waiting for job to propagate to clusters %v", util.NameList(failedClusters)) + jobLabels := map[string]string{ + "kubeadmiral-e2e": baseName, } - assertJobsCompleted := func(ctx context.Context) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, resourceReadyTimeout) - defer cancel() - - failedClusters, err := util.PollUntilForItems( - ctxWithTimeout, - clusters, - func(c *fedcorev1a1.FederatedCluster) (bool, error) { - clusterJob, err := f.ClusterKubeClient(ctx, c).BatchV1().Jobs(f.TestNamespace().Name).Get( - ctx, - job.Name, - metav1.GetOptions{}, - ) - if err != nil { - return true, err - } - return resources.IsJobComplete(clusterJob), nil - }, - defaultPollingInterval, - ) - - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - gomega.Expect(failedClusters). - To(gomega.BeEmpty(), "Timed out waiting for job to be completed in clusters %v", failedClusters) + job.Spec.ManualSelector = pointer.Bool(true) + job.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: jobLabels, } - - assertJobsDeleted := func(ctx context.Context) { - ctxWithTimeout, cancel := context.WithTimeout(ctx, resourceDeleteTimeout) - defer cancel() - - failedClusters, err := util.PollUntilForItems( - ctxWithTimeout, - clusters, - func(c *fedcorev1a1.FederatedCluster) (bool, error) { - _, err := f.ClusterKubeClient(ctx, c).BatchV1().Jobs(f.TestNamespace().Name).Get( - ctx, - job.Name, - metav1.GetOptions{}, - ) - if err != nil && !apierrors.IsNotFound(err) { - return true, err - } - return apierrors.IsNotFound(err), nil - }, - defaultPollingInterval, - ) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - gomega.Expect(failedClusters). - To(gomega.BeEmpty(), "Timed out waiting for job to be deleted in clusters %v", failedClusters) - - gomega.Eventually(func(g gomega.Gomega, ctx context.Context) { - _, err := f.HostKubeClient().BatchV1().Jobs(f.TestNamespace().Name).Get(ctx, job.Name, metav1.GetOptions{}) - gomega.Expect(err).To(gomega.Or(gomega.BeNil(), gomega.Satisfy(apierrors.IsNotFound))) - g.Expect(err).To(gomega.Satisfy(apierrors.IsNotFound)) - }).WithContext(ctxWithTimeout).Should(gomega.Succeed(), "Timed out waiting for source object deletion") + job.Spec.Template.Labels = jobLabels + + return job +} + +var _ = ginkgo.Context("Job Propagation", func() { + testCases := []struct { + name string + jobFactory func(string) *batchv1.Job + }{ + { + name: "Without manual selector", + jobFactory: getJobWithoutManualSelector, + }, + { + name: "With manual selector", + jobFactory: getJobWithManualSelector, + }, } - ginkgo.Context("Without manual selector", func() { - ginkgo.It("should succeed", func(ctx ginkgo.SpecContext) { - var err error - - ginkgo.By("Creating job") - job, err = f.HostKubeClient().BatchV1().Jobs(f.TestNamespace().Name).Create(ctx, job, metav1.CreateOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - ginkgo.GinkgoLogr.Info("created job for propagation", "job-name", job.Name, "job-namespace", f.TestNamespace().Name) - - start := time.Now() - - ginkgo.By("Waiting for job propagation") - assertJobPropagated(ctx) - ginkgo.GinkgoLogr.Info("all jobs propagated", "duration", time.Since(start)) - - ginkgo.By("Waiting for job completion") - assertJobsCompleted(ctx) - ginkgo.GinkgoLogr.Info("all jobs completed", "duration", time.Since(start)) - - err = f.HostKubeClient().BatchV1().Jobs(f.TestNamespace().Name).Delete(ctx, job.Name, metav1.DeleteOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - ginkgo.GinkgoLogr.Info("deleted source job object") - - start = time.Now() - ginkgo.By("Waiting for job deletion") - assertJobsDeleted(ctx) - ginkgo.GinkgoLogr.Info("all jobs deleted", "duration", time.Since(start)) - }) - }) - - ginkgo.Context("With manual selectors", func() { - ginkgo.It("should succeed", func(ctx ginkgo.SpecContext) { - jobLabels := map[string]string{ - "kubeadmiral-e2e": f.Name(), - } - - job.Spec.ManualSelector = pointer.Bool(true) - job.Spec.Selector = &metav1.LabelSelector{ - MatchLabels: jobLabels, - } - job.Spec.Template.Labels = jobLabels - - var err error - - ginkgo.By("Creating job") - job, err = f.HostKubeClient().BatchV1().Jobs(f.TestNamespace().Name).Create(ctx, job, metav1.CreateOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - ginkgo.GinkgoLogr.Info("created job for propagation", "job-name", job.Name, "job-namespace", f.TestNamespace().Name) - - start := time.Now() - - ginkgo.By("Waiting for job propagation") - assertJobPropagated(ctx) - ginkgo.GinkgoLogr.Info("all jobs propagated", "duration", time.Since(start)) - - ginkgo.By("Waiting for job completion") - assertJobsCompleted(ctx) - ginkgo.GinkgoLogr.Info("all jobs completed", "duration", time.Since(start)) - - err = f.HostKubeClient().BatchV1().Jobs(f.TestNamespace().Name).Delete(ctx, job.Name, metav1.DeleteOptions{}) - gomega.Expect(err).ToNot(gomega.HaveOccurred(), framework.MessageUnexpectedError) - ginkgo.GinkgoLogr.Info("deleted source job object") - - start = time.Now() - ginkgo.By("Waiting for job deletion") - assertJobsDeleted(ctx) - ginkgo.GinkgoLogr.Info("all jobs deleted", "duration", time.Since(start)) + f := framework.NewFramework( + "job-propagation", + framework.FrameworkOptions{CreateNamespace: true}, + ) + + for _, testCase := range testCases { + ginkgo.Context(testCase.name, func() { + resourcePropagationTest( + f, + &resourcePropagationTestConfig[*batchv1.Job]{ + gvr: batchv1.SchemeGroupVersion.WithResource("jobs"), + objectFactory: testCase.jobFactory, + clientGetter: func(client kubernetes.Interface, namespace string) resourceClient[*batchv1.Job] { + return client.BatchV1().Jobs(namespace) + }, + isPropagatedResourceWorking: func( + _ kubernetes.Interface, + _ dynamic.Interface, + job *batchv1.Job, + ) (bool, error) { + return resources.IsJobComplete(job), nil + }, + statusCollection: &resourceStatusCollectionTestConfig{ + gvr: fedtypesv1a1.SchemeGroupVersion.WithResource("federatedjobstatuses"), + path: "status", + }, + }, + ) }) - }) + } })