diff --git a/Gopkg.lock b/Gopkg.lock index c9ab102a2..c42131bd9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -218,7 +218,7 @@ version = "v1.0.1" [[projects]] - digest = "1:cccff701344cc105ecb0b024e07b68c4455b7c4b94c6a317a2a652fff3215270" + digest = "1:0edf4c6c87de84afe0cab647d88b6812bb0b9dcaa31a735914539753ae214095" name = "github.com/kubeflow/tf-operator" packages = [ "pkg/apis/common/v1", @@ -232,8 +232,8 @@ "pkg/version", ] pruneopts = "NUT" - revision = "63de5cbdf3f4b90c4d07b009da1610946d2b5816" - version = "v0.5.1" + revision = "d0b973be7135b7cbed3138d3d5ac70ae3a9bcd88" + version = "v0.5.3" [[projects]] digest = "1:680c0fba95a0cff934e350b1ad6774d8229378a3e37d9902e07e2861e82a5908" diff --git a/Gopkg.toml b/Gopkg.toml index a36229d6d..75de9aec7 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -19,7 +19,7 @@ required = [ [[constraint]] name = "github.com/kubeflow/tf-operator" - version = "v0.5.1" + version = "v0.5.3" [[constraint]] name = "github.com/sirupsen/logrus" diff --git a/pkg/controller.v1/pytorch/controller.go b/pkg/controller.v1/pytorch/controller.go index 8c3a2a7d5..8b1d76a5f 100644 --- a/pkg/controller.v1/pytorch/controller.go +++ b/pkg/controller.v1/pytorch/controller.go @@ -307,14 +307,6 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) { job := sharedJob.DeepCopy() jobNeedsSync := pc.satisfiedExpectations(job) - if pc.Config.EnableGangScheduling { - minAvailableReplicas := getTotalReplicas(job) - _, err := pc.SyncPodGroup(job, minAvailableReplicas) - if err != nil { - logger.Warnf("Sync PodGroup %v: %v", job.Name, err) - } - } - // Set default for the new job. scheme.Scheme.Default(job) @@ -419,13 +411,8 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error { } if pc.Config.EnableGangScheduling { - pc.Recorder.Event(job, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup") if err := pc.DeletePodGroup(job); err != nil { - pc.Recorder.Eventf(job, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) return err - } else { - pc.Recorder.Eventf(job, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", job.Name) - } } @@ -437,11 +424,15 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error { job.Status.ReplicaStatuses[rtype].Active = 0 } } - // no need to update the job if the status hasn't changed since last time. - if !reflect.DeepEqual(*oldStatus, job.Status) { - return pc.updateStatusHandler(job) + return pc.updateStatusHandler(job) + } + + if pc.Config.EnableGangScheduling { + minAvailableReplicas := getTotalReplicas(job) + _, err := pc.SyncPodGroup(job, minAvailableReplicas) + if err != nil { + logger.Warnf("Sync PodGroup %v: %v", job.Name, err) } - return nil } // Save the current state of the replicas @@ -463,8 +454,11 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error { } } - // TODO(CPH): Add check here, no need to update the job if the status hasn't changed since last time. - return pc.updateStatusHandler(job) + // No need to update the job if the status hasn't changed since last time. + if !reflect.DeepEqual(*oldStatus, job.Status) { + return pc.updateStatusHandler(job) + } + return nil } // satisfiedExpectations returns true if the required adds/dels for the given job have been observed. diff --git a/pkg/controller.v1beta2/pytorch/controller.go b/pkg/controller.v1beta2/pytorch/controller.go index 9f421f77d..57fb93082 100644 --- a/pkg/controller.v1beta2/pytorch/controller.go +++ b/pkg/controller.v1beta2/pytorch/controller.go @@ -307,14 +307,6 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) { job := sharedJob.DeepCopy() jobNeedsSync := pc.satisfiedExpectations(job) - if pc.Config.EnableGangScheduling { - minAvailableReplicas := getTotalReplicas(job) - _, err := pc.SyncPodGroup(job, minAvailableReplicas) - if err != nil { - logger.Warnf("Sync PodGroup %v: %v", job.Name, err) - } - } - // Set default for the new job. scheme.Scheme.Default(job) @@ -419,13 +411,8 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error } if pc.Config.EnableGangScheduling { - pc.Recorder.Event(job, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup") if err := pc.DeletePodGroup(job); err != nil { - pc.Recorder.Eventf(job, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) return err - } else { - pc.Recorder.Eventf(job, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", job.Name) - } } @@ -437,11 +424,15 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error job.Status.ReplicaStatuses[rtype].Active = 0 } } - // no need to update the job if the status hasn't changed since last time. - if !reflect.DeepEqual(*oldStatus, job.Status) { - return pc.updateStatusHandler(job) + return pc.updateStatusHandler(job) + } + + if pc.Config.EnableGangScheduling { + minAvailableReplicas := getTotalReplicas(job) + _, err := pc.SyncPodGroup(job, minAvailableReplicas) + if err != nil { + logger.Warnf("Sync PodGroup %v: %v", job.Name, err) } - return nil } // Save the current state of the replicas @@ -463,8 +454,11 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error } } - // TODO(CPH): Add check here, no need to update the job if the status hasn't changed since last time. - return pc.updateStatusHandler(job) + // No need to update the job if the status hasn't changed since last time. + if !reflect.DeepEqual(*oldStatus, job.Status) { + return pc.updateStatusHandler(job) + } + return nil } // satisfiedExpectations returns true if the required adds/dels for the given job have been observed. diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/openapi_generated.go b/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/openapi_generated.go index 29ceb8086..a718227aa 100644 --- a/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/openapi_generated.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/openapi_generated.go @@ -44,7 +44,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA }, "status": { SchemaProps: spec.SchemaProps{ - Description: "Status of the condition, one of True, False, Unknown.", + Description: "Status of the condition, one of True, False, or Unknown.", Type: []string{"string"}, Format: "", }, @@ -58,7 +58,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA }, "message": { SchemaProps: spec.SchemaProps{ - Description: "A human readable message indicating details about the transition.", + Description: "A readable message indicating details about the transition.", Type: []string{"string"}, Format: "", }, @@ -85,11 +85,11 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/kubeflow/tf-operator/pkg/apis/common/v1.JobStatus": { Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "JobStatus represents the current observed state of the training Job.", + Description: "JobStatus represents the current observed state of the training job.", Properties: map[string]spec.Schema{ "conditions": { SchemaProps: spec.SchemaProps{ - Description: "Conditions is an array of current observed job conditions.", + Description: "An array of current observed job conditions.", Type: []string{"array"}, Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ @@ -102,7 +102,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA }, "replicaStatuses": { SchemaProps: spec.SchemaProps{ - Description: "ReplicaStatuses is map of ReplicaType and ReplicaStatus, specifies the status of each replica.", + Description: "A map from ReplicaType (key) to ReplicaStatus (value), specifying the status of each replica.", Type: []string{"object"}, AdditionalProperties: &spec.SchemaOrBool{ Schema: &spec.Schema{ @@ -115,19 +115,19 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA }, "startTime": { SchemaProps: spec.SchemaProps{ - Description: "Represents time when the job was acknowledged by the job controller. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", + Description: "Represents the time when the job was acknowledged by the job controller. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), }, }, "completionTime": { SchemaProps: spec.SchemaProps{ - Description: "Represents time when the job was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", + Description: "Represents the time when the job was completed. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), }, }, "lastReconcileTime": { SchemaProps: spec.SchemaProps{ - Description: "Represents last time when the job was reconciled. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", + Description: "Represents the last time when the job was reconciled. It is not guaranteed to be set in happens-before order across separate operations. It is represented in RFC3339 form and is in UTC.", Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), }, }, @@ -141,24 +141,24 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/kubeflow/tf-operator/pkg/apis/common/v1.ReplicaSpec": { Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ - Description: "ReplicaSpec is a description of the replica", + Description: "ReplicaSpec is a description of the job replica.", Properties: map[string]spec.Schema{ "replicas": { SchemaProps: spec.SchemaProps{ - Description: "Replicas is the desired number of replicas of the given template. If unspecified, defaults to 1.", + Description: "The desired number of replicas of the given template. If unspecified, defaults to 1.", Type: []string{"integer"}, Format: "int32", }, }, "template": { SchemaProps: spec.SchemaProps{ - Description: "Template is the object that describes the pod that will be created for this replica. RestartPolicy in PodTemplateSpec will be overide by RestartPolicy in ReplicaSpec", + Description: "Describes the pod that will be created for this replica. Note that RestartPolicy in PodTemplateSpec will be overidden by RestartPolicy in ReplicaSpec.", Ref: ref("k8s.io/api/core/v1.PodTemplateSpec"), }, }, "restartPolicy": { SchemaProps: spec.SchemaProps{ - Description: "Restart policy for all replicas within the job. One of Always, OnFailure, Never and ExitCode. Default to Never.", + Description: "Restart policy for all replicas within the job. One of Always, OnFailure, Never, or ExitCode. Defaults to Never.", Type: []string{"string"}, Format: "", }, diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/types.go b/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/types.go index 8563710d7..bfe2bfd65 100644 --- a/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/types.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/types.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Kubeflow Authors +// Copyright 2019 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,33 +19,32 @@ import ( ) // +k8s:deepcopy-gen=true -// JobStatus represents the current observed state of the training Job. +// JobStatus represents the current observed state of the training job. type JobStatus struct { - // Conditions is an array of current observed job conditions. + // An array of current observed job conditions. Conditions []JobCondition `json:"conditions"` - // ReplicaStatuses is map of ReplicaType and ReplicaStatus, - // specifies the status of each replica. + // A map from ReplicaType (key) to ReplicaStatus (value), specifying the status of each replica. ReplicaStatuses map[ReplicaType]*ReplicaStatus `json:"replicaStatuses"` - // Represents time when the job was acknowledged by the job controller. + // Represents the time when the job was acknowledged by the job controller. // It is not guaranteed to be set in happens-before order across separate operations. // It is represented in RFC3339 form and is in UTC. StartTime *metav1.Time `json:"startTime,omitempty"` - // Represents time when the job was completed. It is not guaranteed to + // Represents the time when the job was completed. It is not guaranteed to // be set in happens-before order across separate operations. // It is represented in RFC3339 form and is in UTC. CompletionTime *metav1.Time `json:"completionTime,omitempty"` - // Represents last time when the job was reconciled. It is not guaranteed to + // Represents the last time when the job was reconciled. It is not guaranteed to // be set in happens-before order across separate operations. // It is represented in RFC3339 form and is in UTC. LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"` } -// ReplicaType represents the type of the replica. Each operator needs to define its -// own set of ReplicaTypes. +// ReplicaType represents the type of the job replica. Each operator (e.g. TensorFlow, PyTorch) +// needs to define its own set of ReplicaTypes. type ReplicaType string // ReplicaStatus represents the current observed state of the replica. @@ -61,20 +60,19 @@ type ReplicaStatus struct { } // +k8s:deepcopy-gen=true -// ReplicaSpec is a description of the replica +// ReplicaSpec is a description of the job replica. type ReplicaSpec struct { - // Replicas is the desired number of replicas of the given template. + // The desired number of replicas of the given template. // If unspecified, defaults to 1. Replicas *int32 `json:"replicas,omitempty"` - // Template is the object that describes the pod that - // will be created for this replica. RestartPolicy in PodTemplateSpec - // will be overide by RestartPolicy in ReplicaSpec + // Describes the pod that will be created for this replica. Note that + // RestartPolicy in PodTemplateSpec will be overidden by RestartPolicy in ReplicaSpec. Template v1.PodTemplateSpec `json:"template,omitempty"` // Restart policy for all replicas within the job. - // One of Always, OnFailure, Never and ExitCode. - // Default to Never. + // One of Always, OnFailure, Never, or ExitCode. + // Defaults to Never. RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"` } @@ -83,19 +81,25 @@ type ReplicaSpec struct { type JobCondition struct { // Type of job condition. Type JobConditionType `json:"type"` - // Status of the condition, one of True, False, Unknown. + + // Status of the condition, one of True, False, or Unknown. Status v1.ConditionStatus `json:"status"` + // The reason for the condition's last transition. Reason string `json:"reason,omitempty"` - // A human readable message indicating details about the transition. + + // A readable message indicating details about the transition. Message string `json:"message,omitempty"` + // The last time this condition was updated. LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + // Last time the condition transitioned from one status to another. LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"` } -// JobConditionType defines all kinds of types of JobStatus. +// JobConditionType defines all possible types of JobStatus. Can be one of: +// Created, Running, Restarting, Succeeded, or Failed. type JobConditionType string const ( @@ -126,7 +130,8 @@ const ( JobFailed JobConditionType = "Failed" ) -// CleanPodPolicy describes how to deal with pods when the job is finished. +// CleanPodPolicy describes how to deal with pods when the job is finished. Can be one +// of: All, Running, or None. type CleanPodPolicy string const ( @@ -137,7 +142,7 @@ const ( ) // RestartPolicy describes how the replicas should be restarted. -// Only one of the following restart policies may be specified. +// Can be one of: Always, OnFailure, Never, or ExitCode. // If none of the following policies is specified, the default one // is RestartPolicyAlways. type RestartPolicy string diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go b/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go index d0a375e9f..e39d3c6b7 100644 --- a/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/jobcontroller.go @@ -11,7 +11,9 @@ import ( "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" kubeinformers "k8s.io/client-go/informers" @@ -29,7 +31,6 @@ import ( // Common Interface to be implemented by all operators. type ControllerInterface interface { - // Returns the Controller name ControllerName() string @@ -266,21 +267,32 @@ func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) return jc.KubeClientSet.PolicyV1beta1().PodDisruptionBudgets(job.GetNamespace()).Create(createPdb) } -func (jc *JobController) DeletePodGroup(job metav1.Object) error { +func (jc *JobController) DeletePodGroup(object runtime.Object) error { kubeBatchClientInterface := jc.KubeBatchClientSet + accessor, err := meta.Accessor(object) + if err != nil { + return fmt.Errorf("object does not have ObjectMeta, %v", err) + } + //check whether podGroup exists or not - _, err := kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Get(job.GetName(), metav1.GetOptions{}) - if err != nil && k8serrors.IsNotFound(err) { - return nil + _, err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(accessor.GetNamespace()).Get(accessor.GetName(), metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err } - log.Infof("Deleting PodGroup %s", job.GetName()) + log.Infof("Deleting PodGroup %s", accessor.GetName()) //delete podGroup - err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Delete(job.GetName(), &metav1.DeleteOptions{}) + err = kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(accessor.GetNamespace()).Delete(accessor.GetName(), &metav1.DeleteOptions{}) if err != nil { + jc.Recorder.Eventf(object, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err) return fmt.Errorf("unable to delete PodGroup: %v", err) + } else { + jc.Recorder.Eventf(object, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", accessor.GetName()) } return nil } diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/pod.go b/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/pod.go index 35b8cbadf..c1abcf7f2 100644 --- a/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/pod.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/common/jobcontroller/pod.go @@ -114,8 +114,6 @@ func (jc *JobController) UpdatePod(old, cur interface{}) { func (jc *JobController) DeletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) - logger := jclogger.LoggerForPod(pod, jc.Controller.GetAPIGroupVersionKind().Kind) - // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains // the deleted key/value. Note that this value might be stale. If the pod @@ -133,6 +131,7 @@ func (jc *JobController) DeletePod(obj interface{}) { } } + logger := jclogger.LoggerForPod(pod, jc.Controller.GetAPIGroupVersionKind().Kind) controllerRef := metav1.GetControllerOf(pod) if controllerRef == nil { // No controller should care about orphans being deleted.