diff --git a/clusters/resource_cluster.go b/clusters/resource_cluster.go index 4b3b50646d..28560dea06 100644 --- a/clusters/resource_cluster.go +++ b/clusters/resource_cluster.go @@ -24,7 +24,7 @@ var clusterSchemaVersion = 3 const ( numWorkerErr = "NumWorkers could be 0 only for SingleNode clusters. See https://docs.databricks.com/clusters/single-node.html for more details" - unsupportedExceptCreateOrEditErr = "unsupported type %T, must be either %scompute.CreateCluster or %scompute.EditCluster. Please report this issue to the GitHub repo" + unsupportedExceptCreateOrEditErr = "unsupported type %T, must be one of %scompute.CreateCluster, %scompute.ClusterSpec or %scompute.EditCluster. Please report this issue to the GitHub repo" ) func ResourceCluster() common.Resource { @@ -123,8 +123,15 @@ func Validate(cluster any) error { profile = c.SparkConf["spark.databricks.cluster.profile"] master = c.SparkConf["spark.master"] resourceClass = c.CustomTags["ResourceClass"] + case compute.ClusterSpec: + if c.NumWorkers > 0 || c.Autoscale != nil { + return nil + } + profile = c.SparkConf["spark.databricks.cluster.profile"] + master = c.SparkConf["spark.master"] + resourceClass = c.CustomTags["ResourceClass"] default: - return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "", "") + return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "", "", "") } if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { return nil @@ -136,6 +143,34 @@ func Validate(cluster any) error { // Long term, ModifyRequestOnInstancePool() in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK. func ModifyRequestOnInstancePool(cluster any) error { switch c := cluster.(type) { + case *compute.ClusterSpec: + // Instance profile id does not exist or not set + if c.InstancePoolId == "" { + // Worker must use an instance pool if driver uses an instance pool, + // therefore empty the computed value for driver instance pool. + c.DriverInstancePoolId = "" + return nil + } + if c.AwsAttributes != nil { + // Reset AwsAttributes + awsAttributes := compute.AwsAttributes{ + InstanceProfileArn: c.AwsAttributes.InstanceProfileArn, + } + c.AwsAttributes = &awsAttributes + } + if c.AzureAttributes != nil { + c.AzureAttributes = &compute.AzureAttributes{} + } + if c.GcpAttributes != nil { + gcpAttributes := compute.GcpAttributes{ + GoogleServiceAccount: c.GcpAttributes.GoogleServiceAccount, + } + c.GcpAttributes = &gcpAttributes + } + c.EnableElasticDisk = false + c.NodeTypeId = "" + c.DriverNodeTypeId = "" + return nil case *compute.CreateCluster: // Instance profile id does not exist or not set if c.InstancePoolId == "" { @@ -193,20 +228,44 @@ func ModifyRequestOnInstancePool(cluster any) error { c.DriverNodeTypeId = "" return nil default: - return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*") + return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*", "*") } } // This method is a duplicate of FixInstancePoolChangeIfAny(d *schema.ResourceData) in clusters/clusters_api.go that uses Go SDK. // Long term, FixInstancePoolChangeIfAny(d *schema.ResourceData) in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK. // https://github.com/databricks/terraform-provider-databricks/issues/824 -func FixInstancePoolChangeIfAny(d *schema.ResourceData, cluster *compute.EditCluster) { - oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") - oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") - if oldInstancePool != newInstancePool && - oldDriverPool == oldInstancePool && - oldDriverPool == newDriverPool { - cluster.DriverInstancePoolId = cluster.InstancePoolId +func FixInstancePoolChangeIfAny(d *schema.ResourceData, cluster any) error { + switch c := cluster.(type) { + case *compute.ClusterSpec: + oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") + oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") + if oldInstancePool != newInstancePool && + oldDriverPool == oldInstancePool && + oldDriverPool == newDriverPool { + c.DriverInstancePoolId = c.InstancePoolId + } + return nil + case *compute.CreateCluster: + oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") + oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") + if oldInstancePool != newInstancePool && + oldDriverPool == oldInstancePool && + oldDriverPool == newDriverPool { + c.DriverInstancePoolId = c.InstancePoolId + } + return nil + case *compute.EditCluster: + oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") + oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") + if oldInstancePool != newInstancePool && + oldDriverPool == oldInstancePool && + oldDriverPool == newDriverPool { + c.DriverInstancePoolId = c.InstancePoolId + } + return nil + default: + return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*", "*") } } @@ -450,7 +509,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo if err = ModifyRequestOnInstancePool(&cluster); err != nil { return err } - FixInstancePoolChangeIfAny(d, &cluster) + err = FixInstancePoolChangeIfAny(d, &cluster) + if err != nil { + return err + } // We can only call the resize api if the cluster is in the running state // and only the cluster size (ie num_workers OR autoscale) is being changed diff --git a/jobs/jobs_api_go_sdk.go b/jobs/jobs_api_go_sdk.go new file mode 100644 index 0000000000..6fbb787109 --- /dev/null +++ b/jobs/jobs_api_go_sdk.go @@ -0,0 +1,206 @@ +package jobs + +import ( + "context" + "errors" + "fmt" + "sort" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/terraform-provider-databricks/clusters" + "github.com/databricks/terraform-provider-databricks/common" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func (js *JobSettingsResource) adjustTasks() { + js.sortTasksByKey() + for _, task := range js.Tasks { + sort.Slice(task.DependsOn, func(i, j int) bool { + return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey + }) + sortWebhookNotifications(task.WebhookNotifications) + } +} + +func (js *JobSettingsResource) sortTasksByKey() { + sort.Slice(js.Tasks, func(i, j int) bool { + return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey + }) +} + +func adjustTasks(cj *jobs.CreateJob) { + sortTasksByKey(cj) + for _, task := range cj.Tasks { + sort.Slice(task.DependsOn, func(i, j int) bool { + return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey + }) + sortWebhookNotifications(task.WebhookNotifications) + } +} + +func sortTasksByKey(cj *jobs.CreateJob) { + sort.Slice(cj.Tasks, func(i, j int) bool { + return cj.Tasks[i].TaskKey < cj.Tasks[j].TaskKey + }) +} + +func (js *JobSettingsResource) sortWebhooksByID() { + sortWebhookNotifications(js.WebhookNotifications) +} + +func sortWebhooksByID(cj *jobs.CreateJob) { + sortWebhookNotifications(cj.WebhookNotifications) +} + +func (js *JobSettingsResource) isMultiTask() bool { + return js.Format == "MULTI_TASK" || len(js.Tasks) > 0 +} + +func getJobLifecycleManagerGoSdk(d *schema.ResourceData, m *common.DatabricksClient) jobLifecycleManager { + if d.Get("always_running").(bool) { + return alwaysRunningLifecycleManagerGoSdk{d: d, m: m} + } + if d.Get("control_run_state").(bool) { + return controlRunStateLifecycleManagerGoSdk{d: d, m: m} + } + return noopLifecycleManager{} +} + +type alwaysRunningLifecycleManagerGoSdk struct { + d *schema.ResourceData + m *common.DatabricksClient +} + +func (a alwaysRunningLifecycleManagerGoSdk) OnCreate(ctx context.Context) error { + w, err := a.m.WorkspaceClient() + if err != nil { + return err + } + jobID, err := parseJobId(a.d.Id()) + if err != nil { + return err + } + + return Start(jobID, a.d.Timeout(schema.TimeoutCreate), w, ctx) +} + +func (a alwaysRunningLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error { + w, err := a.m.WorkspaceClient() + if err != nil { + return err + } + jobID, err := parseJobId(a.d.Id()) + if err != nil { + return err + } + + err = StopActiveRun(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx) + + if err != nil { + return err + } + return Start(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx) +} + +type controlRunStateLifecycleManagerGoSdk struct { + d *schema.ResourceData + m *common.DatabricksClient +} + +func (c controlRunStateLifecycleManagerGoSdk) OnCreate(ctx context.Context) error { + return nil +} + +func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error { + if c.d.Get("continuous") == nil { + return nil + } + + jobID, err := parseJobId(c.d.Id()) + if err != nil { + return err + } + + w, err := c.m.WorkspaceClient() + if err != nil { + return err + } + + // Only use RunNow to stop the active run if the job is unpaused. + pauseStatus := c.d.Get("continuous.0.pause_status").(string) + if pauseStatus == "UNPAUSED" { + // Previously, RunNow() was not supported for continuous jobs. Now, calling RunNow() + // on a continuous job works, cancelling the active run if there is one, and resetting + // the exponential backoff timer. So, we try to call RunNow() first, and if it fails, + // we call StopActiveRun() instead. + _, err := w.Jobs.RunNow(ctx, jobs.RunNow{ + JobId: jobID, + }) + + if err == nil { + return nil + } + + // RunNow() returns 404 when the feature is disabled. + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode != 404 { + return err + } + } + + return StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate), w, ctx) +} + +func prepareJobSettingsForUpdateGoSdk(d *schema.ResourceData, js JobSettingsResource) { + if js.NewCluster != nil { + clusters.ModifyRequestOnInstancePool(js.NewCluster) + clusters.FixInstancePoolChangeIfAny(d, &js.NewCluster) + } + for _, task := range js.Tasks { + if task.NewCluster != nil { + clusters.ModifyRequestOnInstancePool(task.NewCluster) + clusters.FixInstancePoolChangeIfAny(d, &task.NewCluster) + } + } + for i := range js.JobClusters { + clusters.ModifyRequestOnInstancePool(&js.JobClusters[i].NewCluster) + clusters.FixInstancePoolChangeIfAny(d, &js.JobClusters[i].NewCluster) + } +} + +func Update(jobID int64, js JobSettingsResource, w *databricks.WorkspaceClient, ctx context.Context) error { + err := w.Jobs.Reset(ctx, jobs.ResetJob{ + JobId: jobID, + NewSettings: js.JobSettings, + }) + return wrapMissingJobError(err, fmt.Sprintf("%d", jobID)) +} + +func Read(jobID int64, w *databricks.WorkspaceClient, ctx context.Context) (job *jobs.Job, err error) { + job, err = w.Jobs.GetByJobId(ctx, jobID) + err = wrapMissingJobError(err, fmt.Sprintf("%d", jobID)) + if job.Settings != nil { + js := JobSettingsResource{JobSettings: *job.Settings} + js.adjustTasks() + js.sortWebhooksByID() + } + + // Populate the `run_as` field. In the settings struct it can only be set on write and is not + // returned on read. Therefore, we populate it from the top-level `run_as_user_name` field so + // that Terraform can still diff it with the intended state. + if job.Settings != nil && job.RunAsUserName != "" { + if common.StringIsUUID(job.RunAsUserName) { + job.Settings.RunAs = &jobs.JobRunAs{ + ServicePrincipalName: job.RunAsUserName, + } + } else { + job.Settings.RunAs = &jobs.JobRunAs{ + UserName: job.RunAsUserName, + } + } + } + + return +} diff --git a/jobs/resource_job.go b/jobs/resource_job.go index 1e5e064dae..f40b2e75d7 100644 --- a/jobs/resource_job.go +++ b/jobs/resource_job.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -98,7 +99,7 @@ type SqlTask struct { Dashboard *SqlDashboardTask `json:"dashboard,omitempty"` Alert *SqlAlertTask `json:"alert,omitempty"` File *SqlFileTask `json:"file,omitempty"` - WarehouseID string `json:"warehouse_id,omitempty"` + WarehouseID string `json:"warehouse_id"` Parameters map[string]string `json:"parameters,omitempty"` } @@ -129,7 +130,7 @@ type ForEachTask struct { } type ForEachNestedTask struct { - TaskKey string `json:"task_key,omitempty"` + TaskKey string `json:"task_key"` Description string `json:"description,omitempty"` DependsOn []jobs.TaskDependency `json:"depends_on,omitempty"` RunIf string `json:"run_if,omitempty"` @@ -190,7 +191,7 @@ type CronSchedule struct { // BEGIN Jobs + Repo integration preview type GitSource struct { Url string `json:"git_url" tf:"alias:url"` - Provider string `json:"git_provider,omitempty" tf:"alias:provider"` + Provider string `json:"git_provider" tf:"alias:provider"` Branch string `json:"git_branch,omitempty" tf:"alias:branch"` Tag string `json:"git_tag,omitempty" tf:"alias:tag"` Commit string `json:"git_commit,omitempty" tf:"alias:commit"` @@ -200,9 +201,9 @@ type GitSource struct { // End Jobs + Repo integration preview type JobHealthRule struct { - Metric string `json:"metric,omitempty"` - Operation string `json:"op,omitempty"` - Value int64 `json:"value,omitempty"` + Metric string `json:"metric"` + Operation string `json:"op"` + Value int64 `json:"value"` } type JobHealth struct { @@ -210,7 +211,7 @@ type JobHealth struct { } type JobTaskSettings struct { - TaskKey string `json:"task_key,omitempty"` + TaskKey string `json:"task_key"` Description string `json:"description,omitempty"` DependsOn []jobs.TaskDependency `json:"depends_on,omitempty"` RunIf string `json:"run_if,omitempty"` @@ -245,8 +246,8 @@ type JobTaskSettings struct { } type JobCluster struct { - JobClusterKey string `json:"job_cluster_key,omitempty" tf:"group:cluster_type"` - NewCluster *clusters.Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"` + JobClusterKey string `json:"job_cluster_key" tf:"group:cluster_type"` + NewCluster *clusters.Cluster `json:"new_cluster" tf:"group:cluster_type"` } type ContinuousConf struct { @@ -445,6 +446,45 @@ type JobsAPI struct { context context.Context } +// JobCreate + JobSettingResource related aliases. +var jobsAliases = map[string]map[string]string{ + "jobs.JobSettingsResource": { + "tasks": "task", + "parameters": "parameter", + "job_clusters": "job_cluster", + "environments": "environment", + }, + "jobs.JobCreateStruct": { + "tasks": "task", + "parameters": "parameter", + "job_clusters": "job_cluster", + "environments": "environment", + }, + "jobs.GitSource": { + "git_url": "url", + "git_provider": "provider", + "git_branch": "branch", + "git_tag": "tag", + "git_commit": "commit", + }, + "jobs.Task": { + "libraries": "library", + }, +} + +// Need a struct for JobCreate because there are aliases we need and it'll be needed in the create method. +type JobCreateStruct struct { + jobs.CreateJob +} + +func (JobCreateStruct) Aliases() map[string]map[string]string { + return jobsAliases +} + +func (JobCreateStruct) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { + return s +} + type JobSettingsResource struct { jobs.JobSettings @@ -468,25 +508,7 @@ type JobSettingsResource struct { } func (JobSettingsResource) Aliases() map[string]map[string]string { - aliases := map[string]map[string]string{ - "jobs.JobSettingsResource": { - "tasks": "task", - "parameters": "parameter", - "job_clusters": "job_cluster", - "environments": "environment", - }, - "jobs.GitSource": { - "git_url": "url", - "git_provider": "provider", - "git_branch": "branch", - "git_tag": "tag", - "git_commit": "commit", - }, - "jobs.Task": { - "libraries": "library", - }, - } - return aliases + return jobsAliases } func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { @@ -591,13 +613,13 @@ func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common s.SchemaPath("task", "for_each_task", "task", "new_cluster").RemoveField("cluster_source") // ======= To keep consistency with the manually maintained schema, should be reverted once full migration is done. ====== - s.SchemaPath("task", "task_key").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "task_key").SetOptional() + // s.SchemaPath("task", "task_key").SetOptional() + // s.SchemaPath("task", "for_each_task", "task", "task_key").SetOptional() s.SchemaPath("trigger", "table_update", "table_names").SetRequired() - s.SchemaPath("task", "sql_task", "warehouse_id").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "sql_task", "warehouse_id").SetOptional() + // s.SchemaPath("task", "sql_task", "warehouse_id").SetOptional() + // s.SchemaPath("task", "for_each_task", "task", "sql_task", "warehouse_id").SetOptional() s.SchemaPath("task", "python_wheel_task", "entry_point").SetOptional() s.SchemaPath("task", "for_each_task", "task", "python_wheel_task", "entry_point").SetOptional() @@ -608,9 +630,9 @@ func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common s.SchemaPath("task", "sql_task", "alert", "subscriptions").SetRequired() s.SchemaPath("task", "for_each_task", "task", "sql_task", "alert", "subscriptions").SetRequired() - s.SchemaPath("health", "rules", "metric").SetOptional() - s.SchemaPath("health", "rules", "op").SetOptional() - s.SchemaPath("health", "rules", "value").SetOptional() + // s.SchemaPath("health", "rules", "metric").SetOptional() + // s.SchemaPath("health", "rules", "op").SetOptional() + // s.SchemaPath("health", "rules", "value").SetOptional() s.SchemaPath("task", "new_cluster", "cluster_id").SetOptional() s.SchemaPath("task", "for_each_task", "task", "new_cluster", "cluster_id").SetOptional() @@ -619,22 +641,22 @@ func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common s.SchemaPath("task", "health", "rules").SetRequired() s.SchemaPath("task", "for_each_task", "task", "health", "rules").SetRequired() - s.SchemaPath("task", "health", "rules", "op").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "health", "rules", "op").SetOptional() + // s.SchemaPath("task", "health", "rules", "op").SetOptional() + // s.SchemaPath("task", "for_each_task", "task", "health", "rules", "op").SetOptional() - s.SchemaPath("task", "health", "rules", "metric").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "health", "rules", "metric").SetOptional() + // s.SchemaPath("task", "health", "rules", "metric").SetOptional() + // s.SchemaPath("task", "for_each_task", "task", "health", "rules", "metric").SetOptional() - s.SchemaPath("task", "health", "rules", "value").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "health", "rules", "value").SetOptional() + // s.SchemaPath("task", "health", "rules", "value").SetOptional() + // s.SchemaPath("task", "for_each_task", "task", "health", "rules", "value").SetOptional() - s.SchemaPath("job_cluster", "job_cluster_key").SetOptional() - s.SchemaPath("job_cluster", "new_cluster").SetOptional() + // s.SchemaPath("job_cluster", "job_cluster_key").SetOptional() + // s.SchemaPath("job_cluster", "new_cluster").SetOptional() s.SchemaPath("job_cluster", "new_cluster", "cluster_id").SetOptional() s.SchemaPath("new_cluster", "cluster_id").SetOptional() - s.SchemaPath("git_source", "provider").SetOptional() + // s.SchemaPath("git_source", "provider").SetOptional() s.SchemaPath("library").Schema.Type = schema.TypeSet s.SchemaPath("task", "library").Schema.Type = schema.TypeSet @@ -750,6 +772,21 @@ func (a JobsAPI) Start(jobID int64, timeout time.Duration) error { return a.waitForRunState(runID, "RUNNING", timeout) } +func Start(jobID int64, timeout time.Duration, w *databricks.WorkspaceClient, ctx context.Context) error { + res, err := w.Jobs.RunNow(ctx, jobs.RunNow{ + JobId: jobID, + }) + if err != nil { + return err + } + + _, err = res.GetWithTimeout(timeout) + if err != nil { + return err + } + return nil +} + func (a JobsAPI) StopActiveRun(jobID int64, timeout time.Duration) error { runs, err := a.RunsList(JobRunsListRequest{JobID: jobID, ActiveOnly: true}) if err != nil { @@ -769,6 +806,34 @@ func (a JobsAPI) StopActiveRun(jobID int64, timeout time.Duration) error { return nil } +func StopActiveRun(jobID int64, timeout time.Duration, w *databricks.WorkspaceClient, ctx context.Context) error { + runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{ + JobId: jobID, + ActiveOnly: true, + }) + if err != nil { + return err + } + if len(runs) > 1 { + return fmt.Errorf("`always_running` must be specified only with "+ + "`max_concurrent_runs = 1`. There are %d active runs", len(runs)) + } + if len(runs) == 1 { + activeRun := runs[0] + res, err := w.Jobs.CancelRun(ctx, jobs.CancelRun{ + RunId: activeRun.RunId, + }) + if err != nil { + return fmt.Errorf("cannot cancel run %d: %v", activeRun.RunId, err) + } + _, err = res.GetWithTimeout(timeout) + if err != nil { + return fmt.Errorf("cannot cancel run, error waiting for run to be terminated %d: %v", activeRun.RunId, err) + } + } + return nil +} + // Create creates a job on the workspace given the job settings func (a JobsAPI) Create(jobSettings JobSettings) (Job, error) { var job Job @@ -788,6 +853,24 @@ func (a JobsAPI) Create(jobSettings JobSettings) (Job, error) { return job, err } +func Create(createJob jobs.CreateJob, w *databricks.WorkspaceClient, ctx context.Context) (int64, error) { + adjustTasks(&createJob) + sortWebhooksByID(&createJob) + var gitSource *jobs.GitSource = createJob.GitSource + if gitSource != nil && gitSource.GitProvider == "" { + var provider jobs.GitProvider = jobs.GitProvider(repos.GetGitProviderFromUrl(gitSource.GitUrl)) + gitSource.GitProvider = provider + if gitSource.GitProvider == "" { + return 0, fmt.Errorf("git source is not empty but Git Provider is not specified and cannot be guessed by url %+v", gitSource) + } + if gitSource.GitBranch == "" && gitSource.GitTag == "" && gitSource.GitCommit == "" { + return 0, fmt.Errorf("git source is not empty but none of branch, commit and tag is specified") + } + } + res, err := w.Jobs.Create(ctx, createJob) + return res.JobId, err +} + // Update updates a job given the id and a new set of job settings func (a JobsAPI) Update(id string, jobSettings JobSettings) error { jobID, err := parseJobId(id) @@ -959,6 +1042,7 @@ func (a alwaysRunningLifecycleManager) OnCreate(ctx context.Context) error { } return NewJobsAPI(ctx, a.m).Start(jobID, a.d.Timeout(schema.TimeoutCreate)) } + func (a alwaysRunningLifecycleManager) OnUpdate(ctx context.Context) error { api := NewJobsAPI(ctx, a.m) jobID, err := parseJobId(a.d.Id()) @@ -1088,40 +1172,95 @@ func ResourceJob() common.Resource { var js JobSettings common.DataToStructPointer(d, jobsGoSdkSchema, &js) if js.isMultiTask() { - ctx = context.WithValue(ctx, common.Api, common.API_2_1) - } - jobsAPI := NewJobsAPI(ctx, c) - job, err := jobsAPI.Create(js) - if err != nil { - return err + // Api 2.1 + w, err := c.WorkspaceClient() + if err != nil { + return err + } + var cj JobCreateStruct + common.DataToStructPointer(d, jobsGoSdkSchema, &cj) + jobId, err := Create(cj.CreateJob, w, ctx) + if err != nil { + return err + } + d.SetId(fmt.Sprintf("%d", jobId)) + return getJobLifecycleManagerGoSdk(d, c).OnCreate(ctx) + } else { + // Api 2.0 + // TODO: Deprecate and remove this code path + jobsAPI := NewJobsAPI(ctx, c) + job, err := jobsAPI.Create(js) + if err != nil { + return err + } + d.SetId(job.ID()) + return getJobLifecycleManager(d, c).OnCreate(ctx) } - d.SetId(job.ID()) - return getJobLifecycleManager(d, c).OnCreate(ctx) }, Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - ctx = getReadCtx(ctx, d) - job, err := NewJobsAPI(ctx, c).Read(d.Id()) - if err != nil { - return err - } - d.Set("url", c.FormatURL("#job/", d.Id())) - return common.StructToData(*job.Settings, jobsGoSdkSchema, d) - }, - Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - var js JobSettings + var js JobSettingsResource common.DataToStructPointer(d, jobsGoSdkSchema, &js) if js.isMultiTask() { - ctx = context.WithValue(ctx, common.Api, common.API_2_1) + // Api 2.1 + w, err := c.WorkspaceClient() + if err != nil { + return err + } + jobID, err := parseJobId(d.Id()) + if err != nil { + return err + } + job, err := Read(jobID, w, ctx) + if err != nil { + return err + } + d.Set("url", c.FormatURL("#job/", d.Id())) + return common.StructToData(*job.Settings, jobsGoSdkSchema, d) + } else { + // Api 2.0 + // TODO: Deprecate and remove this code path + job, err := NewJobsAPI(ctx, c).Read(d.Id()) + if err != nil { + return err + } + d.Set("url", c.FormatURL("#job/", d.Id())) + return common.StructToData(*job.Settings, jobsGoSdkSchema, d) } - - prepareJobSettingsForUpdate(d, js) - - jobsAPI := NewJobsAPI(ctx, c) - err := jobsAPI.Update(d.Id(), js) - if err != nil { - return err + }, + Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + var jsr JobSettingsResource + common.DataToStructPointer(d, jobsGoSdkSchema, &jsr) + if jsr.isMultiTask() { + // Api 2.1 + prepareJobSettingsForUpdateGoSdk(d, jsr) + jobID, err := parseJobId(d.Id()) + if err != nil { + return err + } + w, err := c.WorkspaceClient() + if err != nil { + return err + } + err = Update(jobID, jsr, w, ctx) + if err != nil { + return err + } + return getJobLifecycleManagerGoSdk(d, c).OnUpdate(ctx) + } else { + // Api 2.0 + // TODO: Deprecate and remove this code path + var js JobSettings + common.DataToStructPointer(d, jobsGoSdkSchema, &js) + + prepareJobSettingsForUpdate(d, js) + + jobsAPI := NewJobsAPI(ctx, c) + err := jobsAPI.Update(d.Id(), js) + if err != nil { + return err + } + return getJobLifecycleManager(d, c).OnUpdate(ctx) } - return getJobLifecycleManager(d, c).OnUpdate(ctx) }, Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { ctx = getReadCtx(ctx, d) diff --git a/jobs/resource_job_test.go b/jobs/resource_job_test.go index 746ec423e5..c5eec86b15 100644 --- a/jobs/resource_job_test.go +++ b/jobs/resource_job_test.go @@ -996,19 +996,19 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { { Method: "POST", Resource: "/api/2.1/jobs/create", - ExpectedRequest: JobSettings{ + ExpectedRequest: jobs.CreateJob{ Name: "TF SQL task subscriptions", MaxConcurrentRuns: 1, - Tasks: []JobTaskSettings{ + Tasks: []jobs.Task{ { TaskKey: "a", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Alert: &SqlAlertTask{ - AlertID: "3cf91a42-6217-4f3c-a6f0-345d489051b9", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Alert: &jobs.SqlTaskAlert{ + AlertId: "3cf91a42-6217-4f3c-a6f0-345d489051b9", + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, PauseSubscriptions: true, }, @@ -1016,15 +1016,17 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { }, { TaskKey: "d", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Dashboard: &SqlDashboardTask{ - DashboardID: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Dashboard: &jobs.SqlTaskDashboard{ + DashboardId: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", + PauseSubscriptions: false, + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, - CustomSubject: "test", + CustomSubject: "test", + ForceSendFields: []string{"PauseSubscriptions"}, }, }, }, @@ -1037,20 +1039,20 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { { Method: "GET", Resource: "/api/2.1/jobs/get?job_id=789", - Response: Job{ - JobID: 789, - Settings: &JobSettings{ + Response: jobs.Job{ + JobId: 789, + Settings: &jobs.JobSettings{ Name: "TF SQL task subscriptions", - Tasks: []JobTaskSettings{ + Tasks: []jobs.Task{ { TaskKey: "a", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Alert: &SqlAlertTask{ - AlertID: "3cf91a42-6217-4f3c-a6f0-345d489051b9", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Alert: &jobs.SqlTaskAlert{ + AlertId: "3cf91a42-6217-4f3c-a6f0-345d489051b9", + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, PauseSubscriptions: true, }, @@ -1058,13 +1060,13 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { }, { TaskKey: "d", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Dashboard: &SqlDashboardTask{ - DashboardID: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Dashboard: &jobs.SqlTaskDashboard{ + DashboardId: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, CustomSubject: "test", }, @@ -1767,7 +1769,6 @@ func TestResourceJobCreateFromGitSource(t *testing.T) { Method: "POST", Resource: "/api/2.1/jobs/create", ExpectedRequest: JobSettings{ - ExistingClusterID: "abc", Tasks: []JobTaskSettings{ { TaskKey: "b", @@ -1814,7 +1815,7 @@ func TestResourceJobCreateFromGitSource(t *testing.T) { }, Create: true, Resource: ResourceJob(), - HCL: `existing_cluster_id = "abc" + HCL: ` max_concurrent_runs = 1 name = "GitSourceJob" @@ -1826,6 +1827,7 @@ func TestResourceJobCreateFromGitSource(t *testing.T) { import_from_git_branch = "main" dirty_state = "NOT_SYNCED" } + provider = "gitHub" } task { @@ -1915,7 +1917,7 @@ func TestResourceJobCreateFromGitSourceWithoutProviderFail(t *testing.T) { } } `, - }.ExpectError(t, "git source is not empty but Git Provider is not specified and cannot be guessed by url &{Url:https://custom.git.hosting.com/databricks/terraform-provider-databricks Provider: Branch: Tag:0.4.8 Commit: JobSource:}") + }.ExpectError(t, "invalid config supplied. [git_source.#.provider] Missing required argument") } func TestResourceJobCreateSingleNode_Fail(t *testing.T) { @@ -2147,6 +2149,7 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { MaxConcurrentRuns: 1, Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{ NotebookPath: "/foo/bar", }, @@ -2156,9 +2159,11 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { RunIf: "ALL_SUCCESS", }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Inputs: "abc", Task: ForEachNestedTask{ + TaskKey: "task3", NotebookTask: &NotebookTask{NotebookPath: "/bar/foo"}, // The diff is suppressed here. Value is from // the terraform state. @@ -2180,13 +2185,16 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { Name: "My job", Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{NotebookPath: "/foo/bar"}, RunIf: "ALL_SUCCESS", }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Inputs: "abc", Task: ForEachNestedTask{ + TaskKey: "task3", NotebookTask: &NotebookTask{NotebookPath: "/bar/foo"}, RunIf: "ALL_SUCCESS", }, @@ -2207,14 +2215,17 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { HCL: ` name = "My job" task { + task_key = "task1" notebook_task { notebook_path = "/foo/bar" } } task { + task_key = "task2" for_each_task { inputs = "abc" task { + task_key = "task3" notebook_task { notebook_path = "/bar/foo" } @@ -2237,6 +2248,7 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { MaxConcurrentRuns: 1, Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{ NotebookPath: "/foo/bar", }, @@ -2246,9 +2258,11 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { // RunIf is not set, as implied from the HCL config. }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Inputs: "abc", Task: ForEachNestedTask{ + TaskKey: "task3", NotebookTask: &NotebookTask{NotebookPath: "/bar/foo"}, // The diff is not suppressed. RunIf is // not set, as implied from the HCL config. @@ -2269,13 +2283,16 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { Name: "My job", Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{NotebookPath: "/foo/bar"}, RunIf: "AT_LEAST_ONE_FAILED", }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Task: ForEachNestedTask{ - RunIf: "AT_LEAST_ONE_FAILED", + TaskKey: "task3", + RunIf: "AT_LEAST_ONE_FAILED", }, }, }, @@ -2294,14 +2311,17 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { HCL: ` name = "My job" task { + task_key = "task1" notebook_task { notebook_path = "/foo/bar" } } task { + task_key = "task2" for_each_task { inputs = "abc" task { + task_key = "task3" notebook_task { notebook_path = "/bar/foo" } @@ -2318,58 +2338,44 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { { Method: "POST", Resource: "/api/2.1/jobs/reset", - ExpectedRequest: UpdateJobRequest{ - JobID: 789, - NewSettings: &JobSettings{ - NewCluster: &clusters.Cluster{ - InstancePoolID: "instance-pool-worker", - DriverInstancePoolID: "instance-pool-driver", - SparkVersion: "spark-1", - NumWorkers: 1, - }, - Tasks: []JobTaskSettings{ + ExpectedRequest: jobs.ResetJob{ + JobId: 789, + NewSettings: jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ { - NewCluster: &clusters.Cluster{ - InstancePoolID: "instance-pool-worker-task", - DriverInstancePoolID: "instance-pool-driver-task", - SparkVersion: "spark-2", - NumWorkers: 2, + JobClusterKey: "job_cluster_1", + NewCluster: compute.ClusterSpec{ + InstancePoolId: "instance-pool-worker-job", + DriverInstancePoolId: "instance-pool-driver-job", + SparkVersion: "spark-3", + NumWorkers: 3, }, }, }, - JobClusters: []JobCluster{ + Tasks: []jobs.Task{ { - NewCluster: &clusters.Cluster{ - InstancePoolID: "instance-pool-worker-job", - DriverInstancePoolID: "instance-pool-driver-job", - SparkVersion: "spark-3", - NumWorkers: 3, + TaskKey: "task1", + NewCluster: &compute.ClusterSpec{ + InstancePoolId: "instance-pool-worker-task", + DriverInstancePoolId: "instance-pool-driver-task", + SparkVersion: "spark-2", + NumWorkers: 2, }, }, }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, { Method: "GET", Resource: "/api/2.1/jobs/get?job_id=789", - Response: Job{ - JobID: 789, - Settings: &JobSettings{ - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id", - DriverNodeTypeID: "driver-node-type-id", - }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Response: jobs.Job{ + JobId: 789, + Settings: &jobs.JobSettings{ + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, @@ -2378,21 +2384,14 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { Update: true, Resource: ResourceJob(), InstanceState: map[string]string{ - "new_cluster.0.node_type_id": "node-type-id-worker", - "new_cluster.0.driver_node_type_id": "node-type-id-driver", "task.0.new_cluster.0.node_type_id": "node-type-id-worker-task", "task.0.new_cluster.0.driver_node_type_id": "node-type-id-driver-task", "job_cluster.0.new_cluster.0.node_type_id": "node-type-id-worker-job", "job_cluster.0.new_cluster.0.driver_node_type_id": "node-type-id-driver-job", }, HCL: ` - new_cluster = { - instance_pool_id = "instance-pool-worker" - driver_instance_pool_id = "instance-pool-driver" - spark_version = "spark-1" - num_workers = 1 - } task = { + task_key = "task1" new_cluster = { instance_pool_id = "instance-pool-worker-task" driver_instance_pool_id = "instance-pool-driver-task" @@ -2401,6 +2400,7 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { } } job_cluster = { + job_cluster_key = "job_cluster_1" new_cluster = { instance_pool_id = "instance-pool-worker-job" driver_instance_pool_id = "instance-pool-driver-job" @@ -2409,10 +2409,7 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { } } max_concurrent_runs = 1 - max_retries = 3 - min_retry_interval_millis = 5000 - name = "Featurizer New" - retry_on_timeout = true`, + name = "Featurizer New"`, }.Apply(t) assert.NoError(t, err) assert.Equal(t, "789", d.Id(), "Id should be the same as in reading") @@ -2425,57 +2422,42 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { { Method: "POST", Resource: "/api/2.1/jobs/reset", - ExpectedRequest: UpdateJobRequest{ - JobID: 789, - NewSettings: &JobSettings{ - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id-1", - SparkVersion: "spark-1", - NumWorkers: 1, - }, - Tasks: []JobTaskSettings{ + ExpectedRequest: jobs.UpdateJob{ + JobId: 789, + NewSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ { - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id-2", + TaskKey: "task1", + NewCluster: &compute.ClusterSpec{ + NodeTypeId: "node-type-id-2", SparkVersion: "spark-2", NumWorkers: 2, }, }, }, - JobClusters: []JobCluster{ + JobClusters: []jobs.JobCluster{ { - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id-3", + JobClusterKey: "job_cluster_1", + NewCluster: compute.ClusterSpec{ + NodeTypeId: "node-type-id-3", SparkVersion: "spark-3", NumWorkers: 3, }, }, }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, { Method: "GET", Resource: "/api/2.1/jobs/get?job_id=789", - Response: Job{ - JobID: 789, - Settings: &JobSettings{ - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id", - DriverNodeTypeID: "driver-node-type-id", - InstancePoolID: "instance-pool-id-worker", - DriverInstancePoolID: "instance-pool-id-driver", - }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Response: jobs.Job{ + JobId: 789, + Settings: &jobs.JobSettings{ + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, @@ -2484,9 +2466,6 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { Update: true, Resource: ResourceJob(), InstanceState: map[string]string{ - "new_cluster.0.instance_pool_id": "instance-pool-id-worker", - "new_cluster.0.driver_instance_pool_id": "instance-pool-id-driver", - "new_cluster.0.node_type_id": "node-type-id-worker", "task.0.new_cluster.0.node_type_id": "node-type-id-worker-task", "task.0.instance_pool_id": "instance-pool-id-worker", "task.0.driver_instance_pool_id": "instance-pool-id-driver", @@ -2495,12 +2474,8 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { "job_cluster.0.driver_instance_pool_id": "instance-pool-id-driver", }, HCL: ` - new_cluster = { - node_type_id = "node-type-id-1" - spark_version = "spark-1" - num_workers = 1 - } task = { + task_key = "task1" new_cluster = { node_type_id = "node-type-id-2" spark_version = "spark-2" @@ -2508,6 +2483,7 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { } } job_cluster = { + job_cluster_key = "job_cluster_1" new_cluster = { node_type_id = "node-type-id-3" spark_version = "spark-3" @@ -2515,10 +2491,7 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { } } max_concurrent_runs = 1 - max_retries = 3 - min_retry_interval_millis = 5000 - name = "Featurizer New" - retry_on_timeout = true`, + name = "Featurizer New"`, }.Apply(t) assert.NoError(t, err) assert.Equal(t, "789", d.Id(), "Id should be the same as in reading") @@ -2537,6 +2510,7 @@ func TestResourceJobUpdate_Tasks(t *testing.T) { Name: "Featurizer New", Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", SparkJarTask: &SparkJarTask{ MainClassName: "com.labs.BarMain", @@ -2557,6 +2531,7 @@ func TestResourceJobUpdate_Tasks(t *testing.T) { Settings: &JobSettings{ Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", SparkJarTask: &SparkJarTask{ MainClassName: "com.labs.BarMain", @@ -2573,6 +2548,7 @@ func TestResourceJobUpdate_Tasks(t *testing.T) { HCL: ` name = "Featurizer New" task { + task_key = "task1" existing_cluster_id = "abc" spark_jar_task { main_class_name = "com.labs.BarMain" diff --git a/jobs/resource_job_webhook_test.go b/jobs/resource_job_webhook_test.go index 145f2f5d81..da4b5dc426 100644 --- a/jobs/resource_job_webhook_test.go +++ b/jobs/resource_job_webhook_test.go @@ -19,6 +19,7 @@ func TestResourceJobUpdate_WebhookNotifications(t *testing.T) { Name: "Webhook test", Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", }, }, @@ -42,6 +43,7 @@ func TestResourceJobUpdate_WebhookNotifications(t *testing.T) { Name: "Webhook test", Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", }, }, @@ -72,6 +74,7 @@ func TestResourceJobUpdate_WebhookNotifications(t *testing.T) { HCL: ` name = "Webhook test" task { + task_key = "task1" existing_cluster_id = "abc" } webhook_notifications {