From 511af4c1d2c91a293bdf02bd0ba7ff0cbb4e2dbd Mon Sep 17 00:00:00 2001 From: Johnu George Date: Wed, 27 Mar 2019 21:20:19 +0530 Subject: [PATCH] Implement ActiveDeadlineSeconds and BackoffLimit feature --- Gopkg.lock | 5 +- pkg/apis/pytorch/v1beta2/types.go | 10 + .../pytorch/v1beta2/zz_generated.deepcopy.go | 10 + pkg/common/util/v1beta2/testutil/job.go | 34 +++ pkg/common/util/v1beta2/testutil/pod.go | 7 +- pkg/controller.v1beta2/pytorch/controller.go | 124 +++++++- .../pytorch/controller_test.go | 4 +- pkg/controller.v1beta2/pytorch/job.go | 45 +++ pkg/controller.v1beta2/pytorch/job_test.go | 273 +++++++++++++++++- pkg/controller.v1beta2/pytorch/status.go | 17 +- .../tf-operator/pkg/util/k8sutil/k8sutil.go | 32 ++ 11 files changed, 538 insertions(+), 23 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index c554818ec..3e59dacc7 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -211,7 +211,7 @@ [[projects]] branch = "master" - digest = "1:2da9794ff5835c3249a15deb04d8340cb51363637925db89fd4b67bb25dcd5c7" + digest = "1:a7968637d05f7cac8985ad2df5818fa5cc3162e4fc65b7ee6e834baa2085e79f" name = "github.com/kubeflow/tf-operator" packages = [ "pkg/apis/common/v1beta1", @@ -225,7 +225,7 @@ "pkg/version", ] pruneopts = "NUT" - revision = "4689a23554f2b8da2c5db8e397e414782156a945" + revision = "aa322c7b557f742743038a81de01c4629a7ebe9b" [[projects]] digest = "1:680c0fba95a0cff934e350b1ad6774d8229378a3e37d9902e07e2861e82a5908" @@ -886,6 +886,7 @@ "github.com/kubeflow/tf-operator/pkg/util/signals", "github.com/kubeflow/tf-operator/pkg/util/train", "github.com/kubeflow/tf-operator/pkg/version", + "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned", "github.com/onrik/logrus/filename", "github.com/sirupsen/logrus", "k8s.io/api/core/v1", diff --git a/pkg/apis/pytorch/v1beta2/types.go b/pkg/apis/pytorch/v1beta2/types.go index dd568293b..237da3bf4 100644 --- a/pkg/apis/pytorch/v1beta2/types.go +++ b/pkg/apis/pytorch/v1beta2/types.go @@ -42,6 +42,16 @@ type PyTorchJob struct { // PyTorchJobSpec is a desired state description of the PyTorchJob. type PyTorchJobSpec struct { + // Specifies the duration in seconds relative to the startTime that the job may be active + // before the system tries to terminate it; value must be positive integer. + // This method applies only to pods with restartPolicy == OnFailure or Always. + // +optional + ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"` + + // Optional number of retries before marking this job failed. + // +optional + BackoffLimit *int32 `json:"backoffLimit,omitempty"` + // CleanPodPolicy defines the policy to kill pods after PyTorchJob is // succeeded. // Default to Running. diff --git a/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go b/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go index 688fdaf05..e24ddebec 100644 --- a/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go +++ b/pkg/apis/pytorch/v1beta2/zz_generated.deepcopy.go @@ -87,6 +87,16 @@ func (in *PyTorchJobList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PyTorchJobSpec) DeepCopyInto(out *PyTorchJobSpec) { *out = *in + if in.ActiveDeadlineSeconds != nil { + in, out := &in.ActiveDeadlineSeconds, &out.ActiveDeadlineSeconds + *out = new(int64) + **out = **in + } + if in.BackoffLimit != nil { + in, out := &in.BackoffLimit, &out.BackoffLimit + *out = new(int32) + **out = **in + } if in.CleanPodPolicy != nil { in, out := &in.CleanPodPolicy, &out.CleanPodPolicy *out = new(commonv1beta2.CleanPodPolicy) diff --git a/pkg/common/util/v1beta2/testutil/job.go b/pkg/common/util/v1beta2/testutil/job.go index 8e46d6094..8fadb0a09 100644 --- a/pkg/common/util/v1beta2/testutil/job.go +++ b/pkg/common/util/v1beta2/testutil/job.go @@ -17,6 +17,7 @@ package testutil import ( "time" + "github.com/golang/protobuf/proto" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -50,9 +51,42 @@ func NewPyTorchJobWithCleanupJobDelay(master, worker int, ttl *int32) *v1beta2.P return job } +func NewPyTorchJobWithActiveDeadlineSeconds(master, worker int, ads *int64) *v1beta2.PyTorchJob { + if master == 1 { + job := NewPyTorchJobWithMaster(worker) + job.Spec.ActiveDeadlineSeconds = ads + policy := common.CleanPodPolicyAll + job.Spec.CleanPodPolicy = &policy + return job + } + job := NewPyTorchJob(worker) + job.Spec.ActiveDeadlineSeconds = ads + policy := common.CleanPodPolicyAll + job.Spec.CleanPodPolicy = &policy + return job +} + +func NewPyTorchJobWithBackoffLimit(master, worker int, backoffLimit *int32) *v1beta2.PyTorchJob { + if master == 1 { + job := NewPyTorchJobWithMaster(worker) + job.Spec.BackoffLimit = backoffLimit + job.Spec.PyTorchReplicaSpecs["Worker"].RestartPolicy = "OnFailure" + policy := common.CleanPodPolicyAll + job.Spec.CleanPodPolicy = &policy + return job + } + job := NewPyTorchJob(worker) + job.Spec.BackoffLimit = backoffLimit + job.Spec.PyTorchReplicaSpecs["Worker"].RestartPolicy = "OnFailure" + policy := common.CleanPodPolicyAll + job.Spec.CleanPodPolicy = &policy + return job +} + func NewPyTorchJobWithMaster(worker int) *v1beta2.PyTorchJob { job := NewPyTorchJob(worker) job.Spec.PyTorchReplicaSpecs[v1beta2.PyTorchReplicaTypeMaster] = &common.ReplicaSpec{ + Replicas: proto.Int32(1), Template: NewPyTorchReplicaSpecTemplate(), } return job diff --git a/pkg/common/util/v1beta2/testutil/pod.go b/pkg/common/util/v1beta2/testutil/pod.go index a86944eb2..b1f29db33 100644 --- a/pkg/common/util/v1beta2/testutil/pod.go +++ b/pkg/common/util/v1beta2/testutil/pod.go @@ -64,7 +64,7 @@ func NewPodList(count int32, status v1.PodPhase, job *v1beta2.PyTorchJob, typ st return pods } -func SetPodsStatuses(podIndexer cache.Indexer, job *v1beta2.PyTorchJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, t *testing.T) { +func SetPodsStatuses(podIndexer cache.Indexer, job *v1beta2.PyTorchJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, restartCounts []int32, t *testing.T) { var index int32 for _, pod := range NewPodList(pendingPods, v1.PodPending, job, typ, index, t) { if err := podIndexer.Add(pod); err != nil { @@ -72,7 +72,10 @@ func SetPodsStatuses(podIndexer cache.Indexer, job *v1beta2.PyTorchJob, typ stri } } index += pendingPods - for _, pod := range NewPodList(activePods, v1.PodRunning, job, typ, index, t) { + for i, pod := range NewPodList(activePods, v1.PodRunning, job, typ, index, t) { + if restartCounts != nil { + pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: restartCounts[i]}} + } if err := podIndexer.Add(pod); err != nil { t.Errorf("%s: unexpected error when adding pod %v", job.Name, err) } diff --git a/pkg/controller.v1beta2/pytorch/controller.go b/pkg/controller.v1beta2/pytorch/controller.go index 3febed768..a7a9f5c3c 100644 --- a/pkg/controller.v1beta2/pytorch/controller.go +++ b/pkg/controller.v1beta2/pytorch/controller.go @@ -17,6 +17,7 @@ package pytorch import ( "fmt" + "strings" "time" kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" @@ -38,8 +39,10 @@ import ( jobinformers "github.com/kubeflow/pytorch-operator/pkg/client/informers/externalversions" jobinformersv1beta2 "github.com/kubeflow/pytorch-operator/pkg/client/informers/externalversions/pytorch/v1beta2" joblisters "github.com/kubeflow/pytorch-operator/pkg/client/listers/pytorch/v1beta2" + common "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta2" "github.com/kubeflow/tf-operator/pkg/common/jobcontroller" pylogger "github.com/kubeflow/tf-operator/pkg/logger" + "github.com/kubeflow/tf-operator/pkg/util/k8sutil" ) const ( @@ -326,18 +329,15 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) { return true, err } -func getTotalReplicas(obj metav1.Object) int32 { - job := obj.(*v1beta2.PyTorchJob) - jobReplicas := int32(0) - for _, r := range job.Spec.PyTorchReplicaSpecs { - jobReplicas += *r.Replicas - } - return jobReplicas -} - // reconcilePyTorchJobs checks and updates replicas for each given PyTorchReplicaSpec. // It will requeue the job in case of an error while creating/deleting pods/services. func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error { + jobKey, err := KeyFunc(job) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for pytorch job object %#v: %v", job, err)) + return err + } + logger := pylogger.LoggerForJob(job) logger.Infof("Reconcile PyTorchJobs %s", job.Name) @@ -355,8 +355,46 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error return err } + // retrieve the previous number of retry + previousRetry := pc.WorkQueue.NumRequeues(jobKey) + + activePods := k8sutil.FilterActivePods(pods) + active := int32(len(activePods)) + failed := int32(k8sutil.FilterPods(pods, v1.PodFailed)) + totalReplicas := getTotalReplicas(job) + prevReplicasFailedNum := getTotalFailedReplicas(job) + + var failureMessage string + jobExceedsLimit := false + exceedsBackoffLimit := false + pastBackoffLimit := false + + if job.Spec.BackoffLimit != nil { + jobHasNewFailure := failed > prevReplicasFailedNum + // new failures happen when status does not reflect the failures and active + // is different than parallelism, otherwise the previous controller loop + // failed updating status so even if we pick up failure it is not a new one + exceedsBackoffLimit = jobHasNewFailure && (active != totalReplicas) && + (int32(previousRetry)+1 > *job.Spec.BackoffLimit) + + pastBackoffLimit, err = pc.pastBackoffLimit(job, pods) + if err != nil { + return err + } + } + + if exceedsBackoffLimit || pastBackoffLimit { + // check if the number of pod restart exceeds backoff (for restart OnFailure only) + // OR if the number of failed jobs increased since the last syncJob + jobExceedsLimit = true + failureMessage = fmt.Sprintf("PyTorchJob %s has failed because it has reached the specified backoff limit", job.Name) + } else if pc.pastActiveDeadline(job) { + failureMessage = fmt.Sprintf("PyTorchJob %s has failed because it was active longer than specified deadline", job.Name) + jobExceedsLimit = true + } + // If the PyTorchJob is terminated, delete all pods and services. - if isSucceeded(job.Status) || isFailed(job.Status) { + if isSucceeded(job.Status) || isFailed(job.Status) || jobExceedsLimit { if err := pc.deletePodsAndServices(job, pods); err != nil { return err } @@ -375,7 +413,18 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error } } - + if jobExceedsLimit { + pc.Recorder.Event(job, v1.EventTypeNormal, pytorchJobFailedReason, failureMessage) + if job.Status.CompletionTime == nil { + now := metav1.Now() + job.Status.CompletionTime = &now + } + err := updatePyTorchJobConditions(job, common.JobFailed, pytorchJobFailedReason, failureMessage) + if err != nil { + logger.Infof("Append pytorchjob condition error: %v", err) + return err + } + } // At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status. // If any replicas are still Active, set their status to succeeded. if isSucceeded(job.Status) { @@ -434,6 +483,59 @@ func (pc *PyTorchController) satisfiedExpectations(job *v1beta2.PyTorchJob) bool return satisfied } +// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit +// this method applies only to pods with restartPolicy == OnFailure or Always +func (pc *PyTorchController) pastBackoffLimit(job *v1beta2.PyTorchJob, pods []*v1.Pod) (bool, error) { + if job.Spec.BackoffLimit == nil { + return false, nil + } + logger := pylogger.LoggerForJob(job) + result := int32(0) + for rtype, spec := range job.Spec.PyTorchReplicaSpecs { + if spec.RestartPolicy != common.RestartPolicyOnFailure && spec.RestartPolicy != common.RestartPolicyAlways { + logger.Warnf("The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit.", rtype, job.Name) + continue + } + // Convert PyTorchReplicaType to lower string. + rt := strings.ToLower(string(rtype)) + pods, err := pc.FilterPodsForReplicaType(pods, rt) + if err != nil { + return false, err + } + for i := range pods { + po := pods[i] + if po.Status.Phase != v1.PodRunning { + continue + } + for j := range po.Status.InitContainerStatuses { + stat := po.Status.InitContainerStatuses[j] + result += stat.RestartCount + } + for j := range po.Status.ContainerStatuses { + stat := po.Status.ContainerStatuses[j] + result += stat.RestartCount + } + } + } + + if *job.Spec.BackoffLimit == 0 { + return result > 0, nil + } + return result >= *job.Spec.BackoffLimit, nil +} + +// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. +func (pc *PyTorchController) pastActiveDeadline(job *v1beta2.PyTorchJob) bool { + if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil { + return false + } + now := metav1.Now() + start := job.Status.StartTime.Time + duration := now.Time.Sub(start) + allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second + return duration >= allowedDuration +} + func (pc *PyTorchController) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) { return pc.getPyTorchJobFromName(namespace, name) } diff --git a/pkg/controller.v1beta2/pytorch/controller_test.go b/pkg/controller.v1beta2/pytorch/controller_test.go index 92b25c36b..b0bfb8510 100644 --- a/pkg/controller.v1beta2/pytorch/controller_test.go +++ b/pkg/controller.v1beta2/pytorch/controller_test.go @@ -272,8 +272,8 @@ func TestNormalPath(t *testing.T) { } podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() - testutil.SetPodsStatuses(podIndexer, job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t) - testutil.SetPodsStatuses(podIndexer, job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, t) + testutil.SetPodsStatuses(podIndexer, job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, nil, t) serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() testutil.SetServices(serviceIndexer, job, testutil.LabelWorker, tc.activeWorkerServices, t) diff --git a/pkg/controller.v1beta2/pytorch/job.go b/pkg/controller.v1beta2/pytorch/job.go index bdd42dc1d..9f07e3017 100644 --- a/pkg/controller.v1beta2/pytorch/job.go +++ b/pkg/controller.v1beta2/pytorch/job.go @@ -105,8 +105,37 @@ func (pc *PyTorchController) updatePyTorchJob(old, cur interface{}) { if err != nil { return } + curPyTorchJob, err := jobFromUnstructured(cur) + if err != nil { + return + } + + // never return error + key, err := KeyFunc(curPyTorchJob) + if err != nil { + return + } + log.Infof("Updating pytorchjob: %s", oldPyTorchJob.Name) pc.enqueuePyTorchJob(cur) + + // check if need to add a new rsync for ActiveDeadlineSeconds + if curPyTorchJob.Status.StartTime != nil { + curPyTorchJobADS := curPyTorchJob.Spec.ActiveDeadlineSeconds + if curPyTorchJobADS == nil { + return + } + oldPyTorchJobADS := oldPyTorchJob.Spec.ActiveDeadlineSeconds + if oldPyTorchJobADS == nil || *oldPyTorchJobADS != *curPyTorchJobADS { + now := metav1.Now() + start := curPyTorchJob.Status.StartTime.Time + passed := now.Time.Sub(start) + total := time.Duration(*curPyTorchJobADS) * time.Second + // AddAfter will handle total < passed + pc.WorkQueue.AddAfter(key, total-passed) + log.Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed) + } + } } func (pc *PyTorchController) deletePodsAndServices(job *v1beta2.PyTorchJob, pods []*v1.Pod) error { @@ -160,3 +189,19 @@ func (pc *PyTorchController) cleanupPyTorchJob(job *v1beta2.PyTorchJob) error { func (pc *PyTorchController) deletePyTorchJob(job *v1beta2.PyTorchJob) error { return pc.jobClientSet.KubeflowV1beta2().PyTorchJobs(job.Namespace).Delete(job.Name, &metav1.DeleteOptions{}) } + +func getTotalReplicas(job *v1beta2.PyTorchJob) int32 { + jobReplicas := int32(0) + for _, r := range job.Spec.PyTorchReplicaSpecs { + jobReplicas += *r.Replicas + } + return jobReplicas +} + +func getTotalFailedReplicas(job *v1beta2.PyTorchJob) int32 { + totalFailedReplicas := int32(0) + for rtype := range job.Status.ReplicaStatuses { + totalFailedReplicas += job.Status.ReplicaStatuses[rtype].Failed + } + return totalFailedReplicas +} diff --git a/pkg/controller.v1beta2/pytorch/job_test.go b/pkg/controller.v1beta2/pytorch/job_test.go index 19a75ebf8..436380efa 100644 --- a/pkg/controller.v1beta2/pytorch/job_test.go +++ b/pkg/controller.v1beta2/pytorch/job_test.go @@ -20,6 +20,7 @@ import ( kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -306,8 +307,8 @@ func TestDeletePodsAndServices(t *testing.T) { } podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() - testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t) - testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, t) + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, nil, t) serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() testutil.SetServices(serviceIndexer, tc.job, testutil.LabelWorker, tc.activeWorkerServices, t) @@ -476,8 +477,8 @@ func TestCleanupPyTorchJob(t *testing.T) { } podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() - testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t) - testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, t) + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, nil, t) serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() testutil.SetServices(serviceIndexer, tc.job, testutil.LabelWorker, tc.activeWorkerServices, t) @@ -502,3 +503,267 @@ func TestCleanupPyTorchJob(t *testing.T) { } } } + +func TestActiveDeadlineSeconds(t *testing.T) { + type testCase struct { + description string + job *v1beta2.PyTorchJob + + pendingWorkerPods int32 + activeWorkerPods int32 + succeededWorkerPods int32 + failedWorkerPods int32 + + pendingMasterPods int32 + activeMasterPods int32 + succeededMasterPods int32 + failedMasterPods int32 + + activeWorkerServices int32 + activeMasterServices int32 + + expectedPodDeletions int + } + + ads2 := int64(2) + adsTest2 := &ads2 + testCases := []testCase{ + testCase{ + description: "1 master and 4 workers running, ActiveDeadlineSeconds unset", + job: testutil.NewPyTorchJobWithActiveDeadlineSeconds(1, 4, nil), + + pendingWorkerPods: 0, + activeWorkerPods: 4, + succeededWorkerPods: 0, + failedWorkerPods: 0, + + pendingMasterPods: 0, + activeMasterPods: 1, + succeededMasterPods: 0, + failedMasterPods: 0, + + activeWorkerServices: 4, + activeMasterServices: 1, + + expectedPodDeletions: 0, + }, + testCase{ + description: "1 master and 4 workers running, ActiveDeadlineSeconds is 2", + job: testutil.NewPyTorchJobWithActiveDeadlineSeconds(1, 4, adsTest2), + + pendingWorkerPods: 0, + activeWorkerPods: 4, + succeededWorkerPods: 0, + failedWorkerPods: 0, + + pendingMasterPods: 0, + activeMasterPods: 1, + succeededMasterPods: 0, + failedMasterPods: 0, + + activeWorkerServices: 4, + activeMasterServices: 1, + + expectedPodDeletions: 5, + }, + } + for _, tc := range testCases { + // Prepare the clientset and controller for the test. + kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + // Prepare the kube-batch clientset and controller for the test. + kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + config := &rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1beta2.SchemeGroupVersion, + }, + } + jobClientSet := jobclientset.NewForConfigOrDie(config) + ctr, kubeInformerFactory, _ := newPyTorchController(config, kubeClientSet, kubeBatchClientSet, jobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{}) + fakePodControl := &controller.FakePodControl{} + ctr.PodControl = fakePodControl + fakeServiceControl := &control.FakeServiceControl{} + ctr.ServiceControl = fakeServiceControl + ctr.Recorder = &record.FakeRecorder{} + ctr.jobInformerSynced = testutil.AlwaysReady + ctr.PodInformerSynced = testutil.AlwaysReady + ctr.ServiceInformerSynced = testutil.AlwaysReady + jobIndexer := ctr.jobInformer.GetIndexer() + ctr.updateStatusHandler = func(job *v1beta2.PyTorchJob) error { + return nil + } + + unstructured, err := testutil.ConvertPyTorchJobToUnstructured(tc.job) + if err != nil { + t.Errorf("Failed to convert the PyTorchJob to Unstructured: %v", err) + } + + if err := jobIndexer.Add(unstructured); err != nil { + t.Errorf("Failed to add job to jobIndexer: %v", err) + } + + podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t) + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, nil, t) + + serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() + testutil.SetServices(serviceIndexer, tc.job, testutil.LabelWorker, tc.activeWorkerServices, t) + testutil.SetServices(serviceIndexer, tc.job, testutil.LabelMaster, tc.activeMasterServices, t) + + foo, _ := ctr.getPyTorchJobFromName("default", "test-pytorchjob") + now := metav1.Now() + foo.Status.StartTime = &now + + ads := tc.job.Spec.ActiveDeadlineSeconds + if ads != nil { + dur := time.Second * time.Duration(*ads) + time.Sleep(dur) + } + err = ctr.reconcilePyTorchJobs(foo) + if err != nil { + t.Errorf("%s: unexpected error when syncing jobs %v", tc.description, err) + } + + if len(fakePodControl.DeletePodName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of pod deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakePodControl.DeletePodName)) + } + if len(fakeServiceControl.DeleteServiceName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of service deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakeServiceControl.DeleteServiceName)) + } + } +} + +func TestBackoffForOnFailure(t *testing.T) { + type testCase struct { + description string + job *v1beta2.PyTorchJob + + pendingWorkerPods int32 + activeWorkerPods int32 + succeededWorkerPods int32 + failedWorkerPods int32 + + restartCounts []int32 + + pendingMasterPods int32 + activeMasterPods int32 + succeededMasterPods int32 + failedMasterPods int32 + + activeWorkerServices int32 + activeMasterServices int32 + + expectedPodDeletions int + } + + backoffLimit4 := int32(4) + backoffLimitTest4 := &backoffLimit4 + testCases := []testCase{ + testCase{ + description: "1 master and 4 workers each having 1 restartCount running, backoffLimit 4 ", + job: testutil.NewPyTorchJobWithBackoffLimit(1, 4, backoffLimitTest4), + + pendingWorkerPods: 0, + activeWorkerPods: 4, + succeededWorkerPods: 0, + failedWorkerPods: 0, + + restartCounts: []int32{1, 1, 1, 1}, + + pendingMasterPods: 0, + activeMasterPods: 1, + succeededMasterPods: 0, + failedMasterPods: 0, + + activeWorkerServices: 4, + activeMasterServices: 1, + + expectedPodDeletions: 5, + }, + } + for _, tc := range testCases { + // Prepare the clientset and controller for the test. + kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + // Prepare the kube-batch clientset and controller for the test. + kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1.SchemeGroupVersion, + }, + }, + ) + + config := &rest.Config{ + Host: "", + ContentConfig: rest.ContentConfig{ + GroupVersion: &v1beta2.SchemeGroupVersion, + }, + } + jobClientSet := jobclientset.NewForConfigOrDie(config) + ctr, kubeInformerFactory, _ := newPyTorchController(config, kubeClientSet, kubeBatchClientSet, jobClientSet, controller.NoResyncPeriodFunc, options.ServerOption{}) + fakePodControl := &controller.FakePodControl{} + ctr.PodControl = fakePodControl + fakeServiceControl := &control.FakeServiceControl{} + ctr.ServiceControl = fakeServiceControl + ctr.Recorder = &record.FakeRecorder{} + ctr.jobInformerSynced = testutil.AlwaysReady + ctr.PodInformerSynced = testutil.AlwaysReady + ctr.ServiceInformerSynced = testutil.AlwaysReady + jobIndexer := ctr.jobInformer.GetIndexer() + ctr.updateStatusHandler = func(job *v1beta2.PyTorchJob) error { + return nil + } + + unstructured, err := testutil.ConvertPyTorchJobToUnstructured(tc.job) + if err != nil { + t.Errorf("Failed to convert the PyTorchJob to Unstructured: %v", err) + } + + if err := jobIndexer.Add(unstructured); err != nil { + t.Errorf("Failed to add job to jobIndexer: %v", err) + } + + podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer() + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, tc.restartCounts, t) + testutil.SetPodsStatuses(podIndexer, tc.job, testutil.LabelMaster, tc.pendingMasterPods, tc.activeMasterPods, tc.succeededMasterPods, tc.failedMasterPods, tc.restartCounts, t) + + serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer() + testutil.SetServices(serviceIndexer, tc.job, testutil.LabelWorker, tc.activeWorkerServices, t) + testutil.SetServices(serviceIndexer, tc.job, testutil.LabelMaster, tc.activeMasterServices, t) + + forget, err := ctr.syncPyTorchJob(testutil.GetKey(tc.job, t)) + if err != nil { + t.Errorf("%s: unexpected error when syncing jobs %v", tc.description, err) + } + if !forget { + t.Errorf("%s: unexpected forget value. Expected true, saw %v\n", tc.description, forget) + } + if len(fakePodControl.DeletePodName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of pod deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakePodControl.DeletePodName)) + } + if len(fakeServiceControl.DeleteServiceName) != tc.expectedPodDeletions { + t.Errorf("%s: unexpected number of service deletes. Expected %d, saw %d\n", tc.description, tc.expectedPodDeletions, len(fakeServiceControl.DeleteServiceName)) + } + } +} diff --git a/pkg/controller.v1beta2/pytorch/status.go b/pkg/controller.v1beta2/pytorch/status.go index e77a56549..f1fa4d6e5 100644 --- a/pkg/controller.v1beta2/pytorch/status.go +++ b/pkg/controller.v1beta2/pytorch/status.go @@ -18,9 +18,11 @@ package pytorch import ( "errors" "fmt" + "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" v1beta2 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta2" common "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta2" @@ -32,9 +34,9 @@ const ( pytorchJobCreatedReason = "PyTorchJobCreated" // pytorchJobSucceededReason is added in a job when it is succeeded. pytorchJobSucceededReason = "PyTorchJobSucceeded" - // pytorchJobSucceededReason is added in a job when it is running. + // pytorchJobRunningReason is added in a job when it is running. pytorchJobRunningReason = "PyTorchJobRunning" - // pytorchJobSucceededReason is added in a job when it is failed. + // pytorchJobFailedReason is added in a job when it is failed. pytorchJobFailedReason = "PyTorchJobFailed" // pytorchJobRestarting is added in a job when it is restarting. pytorchJobRestartingReason = "PyTorchJobRestarting" @@ -42,6 +44,12 @@ const ( // updateStatus updates the status of the job. func (pc *PyTorchController) updateStatusSingle(job *v1beta2.PyTorchJob, rtype v1beta2.PyTorchReplicaType, replicas int, restart bool) error { + jobKey, err := KeyFunc(job) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err)) + return err + } + // Expect to have `replicas - succeeded` pods alive. commonType := common.ReplicaType(rtype) expected := replicas - int(job.Status.ReplicaStatuses[commonType].Succeeded) @@ -54,6 +62,11 @@ func (pc *PyTorchController) updateStatusSingle(job *v1beta2.PyTorchJob, rtype v if running == replicas && job.Status.StartTime == nil { now := metav1.Now() job.Status.StartTime = &now + // enqueue a sync to check if job past ActiveDeadlineSeconds + if job.Spec.ActiveDeadlineSeconds != nil { + pylogger.LoggerForJob(job).Infof("Job with ActiveDeadlineSeconds will sync after %d seconds", *job.Spec.ActiveDeadlineSeconds) + pc.WorkQueue.AddAfter(jobKey, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) + } } if ContainMasterSpec(job) { diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go b/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go index c44681c26..d44a3fdfa 100644 --- a/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go @@ -19,6 +19,7 @@ import ( "os" log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -89,3 +90,34 @@ func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions { }(), } } + +// FilterActivePods returns pods that have not terminated. +func FilterActivePods(pods []*v1.Pod) []*v1.Pod { + var result []*v1.Pod + for _, p := range pods { + if IsPodActive(p) { + result = append(result, p) + } else { + log.Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", + p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) + } + } + return result +} + +func IsPodActive(p *v1.Pod) bool { + return v1.PodSucceeded != p.Status.Phase && + v1.PodFailed != p.Status.Phase && + p.DeletionTimestamp == nil +} + +// filterPods returns pods based on their phase. +func FilterPods(pods []*v1.Pod, phase v1.PodPhase) int { + result := 0 + for i := range pods { + if phase == pods[i].Status.Phase { + result++ + } + } + return result +}