diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 1b935907..5424d63d 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -784,7 +784,9 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta if err != nil { logger.Info(ctx, "Job monitoring failed with error: %v", err) s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobMonitoringFailed", err.Error()) - return statusUnchanged, err + s.flinkController.UpdateLatestJobID(ctx, app, "") + s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob) + return statusChanged, err } if jobStarted { return updateJobAndReturn(ctx, s, app, hash) diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 54c893a5..23eceff6 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -759,13 +759,16 @@ func TestSubmittingVertexFailsToStart(t *testing.T) { mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object k8sclient.Object) error { if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } else if statusUpdateCount == 3 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) } statusUpdateCount++ return nil @@ -925,13 +928,16 @@ func TestSubmittingVertexStartTimeout(t *testing.T) { mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object k8sclient.Object) error { if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) } else if statusUpdateCount == 2 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } else if statusUpdateCount == 3 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(ctx, application)) + assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) } statusUpdateCount++ return nil