Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
Sync PodGroup fix (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnugeorge authored and k8s-ci-robot committed Jun 6, 2019
1 parent 999c6ca commit 396fb2f
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 85 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ required = [

[[constraint]]
name = "github.com/kubeflow/tf-operator"
version = "v0.5.1"
version = "v0.5.3"

[[constraint]]
name = "github.com/sirupsen/logrus"
Expand Down
32 changes: 13 additions & 19 deletions pkg/controller.v1/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,6 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) {
job := sharedJob.DeepCopy()
jobNeedsSync := pc.satisfiedExpectations(job)

if pc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(job)
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
}
}

// Set default for the new job.
scheme.Scheme.Default(job)

Expand Down Expand Up @@ -419,13 +411,8 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error {
}

if pc.Config.EnableGangScheduling {
pc.Recorder.Event(job, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup")
if err := pc.DeletePodGroup(job); err != nil {
pc.Recorder.Eventf(job, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
pc.Recorder.Eventf(job, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", job.Name)

}
}

Expand All @@ -437,11 +424,15 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error {
job.Status.ReplicaStatuses[rtype].Active = 0
}
}
// no need to update the job if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, job.Status) {
return pc.updateStatusHandler(job)
return pc.updateStatusHandler(job)
}

if pc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(job)
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
}
return nil
}

// Save the current state of the replicas
Expand All @@ -463,8 +454,11 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *pyv1.PyTorchJob) error {
}
}

// TODO(CPH): Add check here, no need to update the job if the status hasn't changed since last time.
return pc.updateStatusHandler(job)
// No need to update the job if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, job.Status) {
return pc.updateStatusHandler(job)
}
return nil
}

// satisfiedExpectations returns true if the required adds/dels for the given job have been observed.
Expand Down
32 changes: 13 additions & 19 deletions pkg/controller.v1beta2/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,6 @@ func (pc *PyTorchController) syncPyTorchJob(key string) (bool, error) {
job := sharedJob.DeepCopy()
jobNeedsSync := pc.satisfiedExpectations(job)

if pc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(job)
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
}
}

// Set default for the new job.
scheme.Scheme.Default(job)

Expand Down Expand Up @@ -419,13 +411,8 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error
}

if pc.Config.EnableGangScheduling {
pc.Recorder.Event(job, v1.EventTypeNormal, "JobTerminated", "Job is terminated, deleting PodGroup")
if err := pc.DeletePodGroup(job); err != nil {
pc.Recorder.Eventf(job, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
pc.Recorder.Eventf(job, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", job.Name)

}
}

Expand All @@ -437,11 +424,15 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error
job.Status.ReplicaStatuses[rtype].Active = 0
}
}
// no need to update the job if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, job.Status) {
return pc.updateStatusHandler(job)
return pc.updateStatusHandler(job)
}

if pc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(job)
_, err := pc.SyncPodGroup(job, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", job.Name, err)
}
return nil
}

// Save the current state of the replicas
Expand All @@ -463,8 +454,11 @@ func (pc *PyTorchController) reconcilePyTorchJobs(job *v1beta2.PyTorchJob) error
}
}

// TODO(CPH): Add check here, no need to update the job if the status hasn't changed since last time.
return pc.updateStatusHandler(job)
// No need to update the job if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, job.Status) {
return pc.updateStatusHandler(job)
}
return nil
}

// satisfiedExpectations returns true if the required adds/dels for the given job have been observed.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 27 additions & 22 deletions vendor/github.com/kubeflow/tf-operator/pkg/apis/common/v1/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 396fb2f

Please sign in to comment.