Skip to content

Commit

Permalink
Fix: properly update job on update
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Jan 15, 2025
1 parent 6a09135 commit 143356e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 31 deletions.
4 changes: 2 additions & 2 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (reconciler *ClusterReconciler) reconcileJob(ctx context.Context) (ctrl.Res
}

// Suspend or stop job to proceed update.
if recorded.Revision.IsUpdateTriggered() && isJobUpdate(observed.revisions, observed.cluster) {
if recorded.Revision.IsUpdateTriggered() && !isScaleUpdate(observed.revisions, observed.cluster) {
log.Info("Preparing job update")
var takeSavepoint = jobSpec.TakeSavepointOnUpdate == nil || *jobSpec.TakeSavepointOnUpdate
var shouldSuspend = takeSavepoint && util.IsBlank(jobSpec.FromSavepoint)
Expand Down Expand Up @@ -724,7 +724,7 @@ func (reconciler *ClusterReconciler) canSuspendJob(ctx context.Context, jobID st
switch s.State {
case v1beta1.SavepointStateSucceeded:
log.Info("Successfully savepoint completed, wait until the job stops")
return false
return true
case v1beta1.SavepointStateInProgress:
log.Info("Savepoint is in progress, wait until it is completed")
return false
Expand Down
2 changes: 2 additions & 0 deletions controllers/flinkcluster/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ func (updater *ClusterStatusUpdater) deriveClusterStatus(
v1beta1.ClusterStatePartiallyStopped:
if shouldUpdateCluster(observed) {
status.State = v1beta1.ClusterStateUpdating
} else if jobStatus.IsActive() {
status.State = v1beta1.ClusterStateRunning
} else if runningComponents == 0 {
status.State = v1beta1.ClusterStateStopped
} else if runningComponents < totalComponents {
Expand Down
41 changes: 13 additions & 28 deletions controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func getUpdateState(observed *ObservedClusterState) UpdateState {

jobStatus := clusterStatus.Components.Job
switch {
case isJobUpdate(observed.revisions, observed.cluster) &&
case !isScaleUpdate(observed.revisions, observed.cluster) &&
!jobStatus.UpdateReady(observed.cluster.Spec.Job, observed.observeTime):
return UpdateStatePreparing
case !isClusterUpdateToDate(observed):
Expand All @@ -484,42 +484,31 @@ func getUpdateState(observed *ObservedClusterState) UpdateState {
return UpdateStateFinished
}

func revisionDiff(a, b *appsv1.ControllerRevision) map[string]util.DiffValue {
func revisionDiff(revisions []*appsv1.ControllerRevision) map[string]util.DiffValue {
if len(revisions) < 2 {
return map[string]util.DiffValue{}
}

patchSpec := func(bytes []byte) map[string]any {
var raw map[string]any
json.Unmarshal(bytes, &raw)
return raw["spec"].(map[string]any)
}

history.SortControllerRevisions(revisions)
a, b := revisions[len(revisions)-2], revisions[len(revisions)-1]
aSpec := patchSpec(a.Data.Raw)
bSpec := patchSpec(b.Data.Raw)

return util.MapDiff(aSpec, bSpec)
}

func isJobUpdate(revisions []*appsv1.ControllerRevision, cluster *v1beta1.FlinkCluster) bool {
if wasJobCancelRequested(cluster.Status.Control) {
return false
}

if len(revisions) < 2 || (cluster != nil && cluster.Spec.Job == nil) {
return false
}

history.SortControllerRevisions(revisions)
diff := revisionDiff(revisions[len(revisions)-2], revisions[len(revisions)-1])
_, ok := diff["job"]
return ok
}

func isScaleUpdate(revisions []*appsv1.ControllerRevision, cluster *v1beta1.FlinkCluster) bool {
if len(revisions) < 2 || (cluster != nil && cluster.Spec.Job == nil) {
if cluster != nil && cluster.Spec.Job == nil {
return false
}

history.SortControllerRevisions(revisions)
diff := revisionDiff(revisions[len(revisions)-2], revisions[len(revisions)-1])

diff := revisionDiff(revisions)
tmDiff, ok := diff["taskManager"]
if len(diff) != 1 || !ok {
return false
Expand All @@ -532,16 +521,12 @@ func isScaleUpdate(revisions []*appsv1.ControllerRevision, cluster *v1beta1.Flin
}

func shouldUpdateJob(observed *ObservedClusterState) bool {
return observed.updateState == UpdateStateInProgress && isJobUpdate(observed.revisions, observed.cluster)
return observed.updateState == UpdateStateInProgress && !isScaleUpdate(observed.revisions, observed.cluster)
}

func shouldUpdateCluster(observed *ObservedClusterState) bool {
if isJobUpdate(observed.revisions, observed.cluster) {
var job = observed.cluster.Status.Components.Job
return !job.IsActive() && observed.updateState == UpdateStateInProgress
}

return observed.updateState == UpdateStateInProgress
var job = observed.cluster.Status.Components.Job
return !job.IsActive() && observed.updateState == UpdateStateInProgress
}

func shouldRecreateOnUpdate(observed *ObservedClusterState) bool {
Expand Down
2 changes: 1 addition & 1 deletion controllers/flinkcluster/flinkcluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestGetUpdateState(t *testing.T) {
jmService: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{RevisionNameLabel: "cluster-85dc8f749"}}},
}
var state = getUpdateState(&observed)
assert.Equal(t, state, UpdateStateInProgress)
assert.Equal(t, state, UpdateStatePreparing)

observed = ObservedClusterState{
cluster: &v1beta1.FlinkCluster{
Expand Down

0 comments on commit 143356e

Please sign in to comment.