Skip to content

Commit

Permalink
Fix control run state failures for databricks_job resource (#3585)
Browse files Browse the repository at this point in the history
* work

* Tolerate 409 in RunNow() call for jobs

* fmt
  • Loading branch information
mgyucht authored May 16, 2024
1 parent 8b29d2d commit 9648502
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
15 changes: 8 additions & 7 deletions jobs/resource_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"

Expand Down Expand Up @@ -1000,17 +1001,17 @@ func (c controlRunStateLifecycleManager) OnUpdate(ctx context.Context) error {
// on a continuous job works, cancelling the active run if there is one, and resetting
// the exponential backoff timer. So, we try to call RunNow() first, and if it fails,
// we call StopActiveRun() instead.
//
// If there was no active run before the update, Jobs will start a run after the update.
// This RunNow() call can race with this automatic trigger, in which case, a 409 Conflict
// is returned. The provider can safely ignore this, as a new run will have started
// anyways.
_, err = api.RunNow(jobID)

if err == nil {
if err == nil || errors.Is(err, databricks.ErrNotFound) || errors.Is(err, databricks.ErrResourceConflict) {
return nil
}

// RunNow() returns 404 when the feature is disabled.
var apiErr *apierr.APIError
if errors.As(err, &apiErr) && apiErr.StatusCode != 404 {
return err
}
return err
}

return api.StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate))
Expand Down
62 changes: 60 additions & 2 deletions jobs/resource_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ func TestResourceJobCreate_Trigger_TableUpdateCreate(t *testing.T) {
}.Apply(t)
}

func TestResourceJobCreate_ControlRunState_ContinuousUpdateRunNow(t *testing.T) {
func TestResourceJobUpdate_ControlRunState_ContinuousUpdateRunNow(t *testing.T) {
qa.ResourceFixture{
Update: true,
ID: "789",
Expand Down Expand Up @@ -1432,7 +1432,65 @@ func TestResourceJobCreate_ControlRunState_ContinuousUpdateRunNow(t *testing.T)
max_concurrent_runs = 1
name = "Test"
`,
}.Apply(t)
}.ApplyNoError(t)
}

func TestResourceJobUpdate_ControlRunState_ContinuousUpdateRunNowFailsWith409(t *testing.T) {
qa.ResourceFixture{
Update: true,
ID: "789",
Resource: ResourceJob(),
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.0/jobs/reset",
ExpectedRequest: UpdateJobRequest{
JobID: 789,
NewSettings: &JobSettings{
MaxConcurrentRuns: 1,
Name: "Test",
Continuous: &ContinuousConf{
PauseStatus: "UNPAUSED",
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/jobs/get?job_id=789",
Response: Job{
JobID: 789,
Settings: &JobSettings{
MaxConcurrentRuns: 1,
Name: "Test",
Continuous: &ContinuousConf{
PauseStatus: "UNPAUSED",
},
},
},
},
{
Method: "POST",
Resource: "/api/2.0/jobs/run-now",
ExpectedRequest: RunParameters{
JobID: 789,
},
Status: 409,
Response: apierr.APIErrorBody{
ErrorCode: "CONFLICT",
Message: "A concurrent request to run the continuous job is already in progress. Please wait for it to complete before issuing a new request.",
},
},
},
HCL: `
continuous {
pause_status = "UNPAUSED"
}
control_run_state = true
max_concurrent_runs = 1
name = "Test"
`,
}.ApplyNoError(t)
}

func TestResourceJobCreate_ControlRunState_ContinuousUpdateCancel(t *testing.T) {
Expand Down

0 comments on commit 9648502

Please sign in to comment.