From 3bab21e72ee78f6711519762317e8218f9884afd Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Tue, 22 Oct 2024 20:29:17 +0530 Subject: [PATCH] Fix race condition when restarting continuous jobs (#1849) ## Changes We don't need to cancel existing runs when the job is continuous and unpaused. The `/jobs/run-now` command will cancel the existing run and trigger a new one automatically. Cancelling the job manually can cause a race condition where both the manual trigger from the CLI and the continuous trigger from the job configuration happens at the same time. This PR prevents that from happening. ## Tests Unit tests and manually --- bundle/run/job.go | 23 +++++++ bundle/run/job_test.go | 132 ++++++++++++++++++++++++++++++++++++ bundle/run/pipeline.go | 12 ++++ bundle/run/pipeline_test.go | 70 +++++++++++++++++++ bundle/run/runner.go | 4 ++ cmd/bundle/run.go | 14 ++-- 6 files changed, 247 insertions(+), 8 deletions(-) diff --git a/bundle/run/job.go b/bundle/run/job.go index 8003c7d291..340af961cf 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -317,6 +317,29 @@ func (r *jobRunner) Cancel(ctx context.Context) error { return errGroup.Wait() } +func (r *jobRunner) Restart(ctx context.Context, opts *Options) (output.RunOutput, error) { + // We don't need to cancel existing runs if the job is continuous and unpaused. + // the /jobs/run-now API will automatically cancel any existing runs before starting a new one. + // + // /jobs/run-now will not cancel existing runs if the job is continuous and paused. + // New job runs will be queued instead and will wait for existing runs to finish. + // In this case, we need to cancel the existing runs before starting a new one. + continuous := r.job.JobSettings.Continuous + if continuous != nil && continuous.PauseStatus == jobs.PauseStatusUnpaused { + return r.Run(ctx, opts) + } + + s := cmdio.Spinner(ctx) + s <- "Cancelling all active job runs" + err := r.Cancel(ctx) + close(s) + if err != nil { + return nil, err + } + + return r.Run(ctx, opts) +} + func (r *jobRunner) ParseArgs(args []string, opts *Options) error { return r.posArgsHandler().ParseArgs(args, opts) } diff --git a/bundle/run/job_test.go b/bundle/run/job_test.go index be189306b2..369c546aa6 100644 --- a/bundle/run/job_test.go +++ b/bundle/run/job_test.go @@ -1,6 +1,7 @@ package run import ( + "bytes" "context" "testing" "time" @@ -8,6 +9,8 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/flags" "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/stretchr/testify/mock" @@ -126,3 +129,132 @@ func TestJobRunnerCancelWithNoActiveRuns(t *testing.T) { err := runner.Cancel(context.Background()) require.NoError(t, err) } + +func TestJobRunnerRestart(t *testing.T) { + for _, jobSettings := range []*jobs.JobSettings{ + {}, + { + Continuous: &jobs.Continuous{ + PauseStatus: jobs.PauseStatusPaused, + }, + }, + } { + job := &resources.Job{ + ID: "123", + JobSettings: jobSettings, + } + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "test_job": job, + }, + }, + }, + } + + runner := jobRunner{key: "test", bundle: b, job: job} + + m := mocks.NewMockWorkspaceClient(t) + b.SetWorkpaceClient(m.WorkspaceClient) + ctx := context.Background() + ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", "")) + ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend)) + + jobApi := m.GetMockJobsAPI() + jobApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{ + ActiveOnly: true, + JobId: 123, + }).Return([]jobs.BaseRun{ + {RunId: 1}, + {RunId: 2}, + }, nil) + + // Mock the runner cancelling existing job runs. + mockWait := &jobs.WaitGetRunJobTerminatedOrSkipped[struct{}]{ + Poll: func(time time.Duration, f func(j *jobs.Run)) (*jobs.Run, error) { + return nil, nil + }, + } + jobApi.EXPECT().CancelRun(mock.Anything, jobs.CancelRun{ + RunId: 1, + }).Return(mockWait, nil) + jobApi.EXPECT().CancelRun(mock.Anything, jobs.CancelRun{ + RunId: 2, + }).Return(mockWait, nil) + + // Mock the runner triggering a job run + mockWaitForRun := &jobs.WaitGetRunJobTerminatedOrSkipped[jobs.RunNowResponse]{ + Poll: func(d time.Duration, f func(*jobs.Run)) (*jobs.Run, error) { + return &jobs.Run{ + State: &jobs.RunState{ + ResultState: jobs.RunResultStateSuccess, + }, + }, nil + }, + } + jobApi.EXPECT().RunNow(mock.Anything, jobs.RunNow{ + JobId: 123, + }).Return(mockWaitForRun, nil) + + // Mock the runner getting the job output + jobApi.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{}).Return(&jobs.Run{}, nil) + + _, err := runner.Restart(ctx, &Options{}) + require.NoError(t, err) + } +} + +func TestJobRunnerRestartForContinuousUnpausedJobs(t *testing.T) { + job := &resources.Job{ + ID: "123", + JobSettings: &jobs.JobSettings{ + Continuous: &jobs.Continuous{ + PauseStatus: jobs.PauseStatusUnpaused, + }, + }, + } + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "test_job": job, + }, + }, + }, + } + + runner := jobRunner{key: "test", bundle: b, job: job} + + m := mocks.NewMockWorkspaceClient(t) + b.SetWorkpaceClient(m.WorkspaceClient) + ctx := context.Background() + ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", "...")) + ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend)) + + jobApi := m.GetMockJobsAPI() + + // The runner should not try and cancel existing job runs for unpaused continuous jobs. + jobApi.AssertNotCalled(t, "ListRunsAll") + jobApi.AssertNotCalled(t, "CancelRun") + + // Mock the runner triggering a job run + mockWaitForRun := &jobs.WaitGetRunJobTerminatedOrSkipped[jobs.RunNowResponse]{ + Poll: func(d time.Duration, f func(*jobs.Run)) (*jobs.Run, error) { + return &jobs.Run{ + State: &jobs.RunState{ + ResultState: jobs.RunResultStateSuccess, + }, + }, nil + }, + } + jobApi.EXPECT().RunNow(mock.Anything, jobs.RunNow{ + JobId: 123, + }).Return(mockWaitForRun, nil) + + // Mock the runner getting the job output + jobApi.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{}).Return(&jobs.Run{}, nil) + + _, err := runner.Restart(ctx, &Options{}) + require.NoError(t, err) +} diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index d684f83886..ffe0128430 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -183,6 +183,18 @@ func (r *pipelineRunner) Cancel(ctx context.Context) error { return err } +func (r *pipelineRunner) Restart(ctx context.Context, opts *Options) (output.RunOutput, error) { + s := cmdio.Spinner(ctx) + s <- "Cancelling the active pipeline update" + err := r.Cancel(ctx) + close(s) + if err != nil { + return nil, err + } + + return r.Run(ctx, opts) +} + func (r *pipelineRunner) ParseArgs(args []string, opts *Options) error { if len(args) == 0 { return nil diff --git a/bundle/run/pipeline_test.go b/bundle/run/pipeline_test.go index 29b57ffdb2..e4608061c0 100644 --- a/bundle/run/pipeline_test.go +++ b/bundle/run/pipeline_test.go @@ -1,6 +1,7 @@ package run import ( + "bytes" "context" "testing" "time" @@ -8,8 +9,12 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/flags" + sdk_config "github.com/databricks/databricks-sdk-go/config" "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -47,3 +52,68 @@ func TestPipelineRunnerCancel(t *testing.T) { err := runner.Cancel(context.Background()) require.NoError(t, err) } + +func TestPipelineRunnerRestart(t *testing.T) { + pipeline := &resources.Pipeline{ + ID: "123", + } + + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "test_pipeline": pipeline, + }, + }, + }, + } + + runner := pipelineRunner{key: "test", bundle: b, pipeline: pipeline} + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdk_config.Config{ + Host: "https://test.com", + } + b.SetWorkpaceClient(m.WorkspaceClient) + ctx := context.Background() + ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", "...")) + ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend)) + + mockWait := &pipelines.WaitGetPipelineIdle[struct{}]{ + Poll: func(time.Duration, func(*pipelines.GetPipelineResponse)) (*pipelines.GetPipelineResponse, error) { + return nil, nil + }, + } + + pipelineApi := m.GetMockPipelinesAPI() + pipelineApi.EXPECT().Stop(mock.Anything, pipelines.StopRequest{ + PipelineId: "123", + }).Return(mockWait, nil) + + pipelineApi.EXPECT().GetByPipelineId(mock.Anything, "123").Return(&pipelines.GetPipelineResponse{}, nil) + + // Mock runner starting a new update + pipelineApi.EXPECT().StartUpdate(mock.Anything, pipelines.StartUpdate{ + PipelineId: "123", + }).Return(&pipelines.StartUpdateResponse{ + UpdateId: "456", + }, nil) + + // Mock runner polling for events + pipelineApi.EXPECT().ListPipelineEventsAll(mock.Anything, pipelines.ListPipelineEventsRequest{ + Filter: `update_id = '456'`, + MaxResults: 100, + PipelineId: "123", + }).Return([]pipelines.PipelineEvent{}, nil) + + // Mock runner polling for update status + pipelineApi.EXPECT().GetUpdateByPipelineIdAndUpdateId(mock.Anything, "123", "456"). + Return(&pipelines.GetUpdateResponse{ + Update: &pipelines.UpdateInfo{ + State: pipelines.UpdateInfoStateCompleted, + }, + }, nil) + + _, err := runner.Restart(ctx, &Options{}) + require.NoError(t, err) +} diff --git a/bundle/run/runner.go b/bundle/run/runner.go index 0f202ce7dd..1cdcc9d8b0 100644 --- a/bundle/run/runner.go +++ b/bundle/run/runner.go @@ -27,6 +27,10 @@ type Runner interface { // Run the underlying worklow. Run(ctx context.Context, opts *Options) (output.RunOutput, error) + // Restart the underlying workflow by cancelling any existing runs before + // starting a new one. + Restart(ctx context.Context, opts *Options) (output.RunOutput, error) + // Cancel the underlying workflow. Cancel(ctx context.Context) error diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index 9ef5eb8ff3..ed5bd2ef10 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -8,6 +8,7 @@ import ( "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/run" + "github.com/databricks/cli/bundle/run/output" "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/root" "github.com/databricks/cli/libs/cmdio" @@ -100,19 +101,16 @@ task or a Python wheel task, the second example applies. } runOptions.NoWait = noWait + var output output.RunOutput if restart { - s := cmdio.Spinner(ctx) - s <- "Cancelling all runs" - err := runner.Cancel(ctx) - close(s) - if err != nil { - return err - } + output, err = runner.Restart(ctx, &runOptions) + } else { + output, err = runner.Run(ctx, &runOptions) } - output, err := runner.Run(ctx, &runOptions) if err != nil { return err } + if output != nil { switch root.OutputType(cmd) { case flags.OutputText: