Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardfeng-db committed May 15, 2024
1 parent eb5b5db commit 952e027
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 304 deletions.
2 changes: 1 addition & 1 deletion common/customizable_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (s *CustomizableSchema) SetConflictsWith(value []string) *CustomizableSchem
panic("SetConflictsWith cannot take in an empty list")
}
if s.pathContainsMultipleItemsList() {
log.Printf("[DEBUG] ConflictsWith skipped for %v, path contains TypeList block with MaxItems not equal to 1", strings.Join(s.path, "."))
log.Printf("[DEBUG] ConflictsWith skipped for %v, path contains TypeList block with MaxItems not equal to 1", getPrefixedValue(s.context.path, value))
return s
}
s.Schema.ConflictsWith = getPrefixedValue(s.context.path, value)
Expand Down
206 changes: 206 additions & 0 deletions jobs/jobs_api_go_sdk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package jobs

import (
"context"
"errors"
"fmt"
"sort"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/terraform-provider-databricks/clusters"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func (js *JobSettingsResource) adjustTasks() {
js.sortTasksByKey()
for _, task := range js.Tasks {
sort.Slice(task.DependsOn, func(i, j int) bool {
return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey
})
sortWebhookNotifications(task.WebhookNotifications)
}
}

func (js *JobSettingsResource) sortTasksByKey() {
sort.Slice(js.Tasks, func(i, j int) bool {
return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey
})
}

func adjustTasks(cj *jobs.CreateJob) {
sortTasksByKey(cj)
for _, task := range cj.Tasks {
sort.Slice(task.DependsOn, func(i, j int) bool {
return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey
})
sortWebhookNotifications(task.WebhookNotifications)
}
}

func sortTasksByKey(cj *jobs.CreateJob) {
sort.Slice(cj.Tasks, func(i, j int) bool {
return cj.Tasks[i].TaskKey < cj.Tasks[j].TaskKey
})
}

func (js *JobSettingsResource) sortWebhooksByID() {
sortWebhookNotifications(js.WebhookNotifications)
}

func sortWebhooksByID(cj *jobs.CreateJob) {
sortWebhookNotifications(cj.WebhookNotifications)
}

func (js *JobSettingsResource) isMultiTask() bool {
return js.Format == "MULTI_TASK" || len(js.Tasks) > 0
}

func getJobLifecycleManagerGoSdk(d *schema.ResourceData, m *common.DatabricksClient) jobLifecycleManager {
if d.Get("always_running").(bool) {
return alwaysRunningLifecycleManagerGoSdk{d: d, m: m}
}
if d.Get("control_run_state").(bool) {
return controlRunStateLifecycleManagerGoSdk{d: d, m: m}
}
return noopLifecycleManager{}
}

type alwaysRunningLifecycleManagerGoSdk struct {
d *schema.ResourceData
m *common.DatabricksClient
}

func (a alwaysRunningLifecycleManagerGoSdk) OnCreate(ctx context.Context) error {
w, err := a.m.WorkspaceClient()
if err != nil {
return err
}
jobID, err := parseJobId(a.d.Id())
if err != nil {
return err
}

return Start(jobID, a.d.Timeout(schema.TimeoutCreate), w, ctx)
}

func (a alwaysRunningLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error {
w, err := a.m.WorkspaceClient()
if err != nil {
return err
}
jobID, err := parseJobId(a.d.Id())
if err != nil {
return err
}

err = StopActiveRun(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx)

if err != nil {
return err
}
return Start(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx)
}

type controlRunStateLifecycleManagerGoSdk struct {
d *schema.ResourceData
m *common.DatabricksClient
}

func (c controlRunStateLifecycleManagerGoSdk) OnCreate(ctx context.Context) error {
return nil
}

func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error {
if c.d.Get("continuous") == nil {
return nil
}

jobID, err := parseJobId(c.d.Id())
if err != nil {
return err
}

w, err := c.m.WorkspaceClient()
if err != nil {
return err
}

// Only use RunNow to stop the active run if the job is unpaused.
pauseStatus := c.d.Get("continuous.0.pause_status").(string)
if pauseStatus == "UNPAUSED" {
// Previously, RunNow() was not supported for continuous jobs. Now, calling RunNow()
// on a continuous job works, cancelling the active run if there is one, and resetting
// the exponential backoff timer. So, we try to call RunNow() first, and if it fails,
// we call StopActiveRun() instead.
_, err := w.Jobs.RunNow(ctx, jobs.RunNow{
JobId: jobID,
})

if err == nil {
return nil
}

// RunNow() returns 404 when the feature is disabled.
var apiErr *apierr.APIError
if errors.As(err, &apiErr) && apiErr.StatusCode != 404 {
return err
}
}

return StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate), w, ctx)
}

func prepareJobSettingsForUpdateGoSdk(d *schema.ResourceData, js JobSettingsResource) {
if js.NewCluster != nil {
clusters.ModifyRequestOnInstancePool(js.NewCluster)
clusters.FixInstancePoolChangeIfAny(d, &js.NewCluster)
}
for _, task := range js.Tasks {
if task.NewCluster != nil {
clusters.ModifyRequestOnInstancePool(task.NewCluster)
clusters.FixInstancePoolChangeIfAny(d, &task.NewCluster)
}
}
for i := range js.JobClusters {
clusters.ModifyRequestOnInstancePool(&js.JobClusters[i].NewCluster)
clusters.FixInstancePoolChangeIfAny(d, &js.JobClusters[i].NewCluster)
}
}

func Update(jobID int64, js JobSettingsResource, w *databricks.WorkspaceClient, ctx context.Context) error {
err := w.Jobs.Reset(ctx, jobs.ResetJob{
JobId: jobID,
NewSettings: js.JobSettings,
})
return wrapMissingJobError(err, fmt.Sprintf("%d", jobID))
}

func Read(jobID int64, w *databricks.WorkspaceClient, ctx context.Context) (job *jobs.Job, err error) {
job, err = w.Jobs.GetByJobId(ctx, jobID)
err = wrapMissingJobError(err, fmt.Sprintf("%d", jobID))
if job.Settings != nil {
js := JobSettingsResource{JobSettings: *job.Settings}
js.adjustTasks()
js.sortWebhooksByID()
}

// Populate the `run_as` field. In the settings struct it can only be set on write and is not
// returned on read. Therefore, we populate it from the top-level `run_as_user_name` field so
// that Terraform can still diff it with the intended state.
if job.Settings != nil && job.RunAsUserName != "" {
if common.StringIsUUID(job.RunAsUserName) {
job.Settings.RunAs = &jobs.JobRunAs{
ServicePrincipalName: job.RunAsUserName,
}
} else {
job.Settings.RunAs = &jobs.JobRunAs{
UserName: job.RunAsUserName,
}
}
}

return
}
Loading

0 comments on commit 952e027

Please sign in to comment.