diff --git a/clusters/resource_cluster.go b/clusters/resource_cluster.go index 14cbdc638e..ecb8ba7be8 100644 --- a/clusters/resource_cluster.go +++ b/clusters/resource_cluster.go @@ -24,7 +24,7 @@ var clusterSchemaVersion = 4 const ( numWorkerErr = "NumWorkers could be 0 only for SingleNode clusters. See https://docs.databricks.com/clusters/single-node.html for more details" - unsupportedExceptCreateOrEditErr = "unsupported type %T, must be either %scompute.CreateCluster or %scompute.EditCluster. Please report this issue to the GitHub repo" + unsupportedExceptCreateOrEditErr = "unsupported type %T, must be one of %scompute.CreateCluster, %scompute.ClusterSpec or %scompute.EditCluster. Please report this issue to the GitHub repo" ) func ResourceCluster() common.Resource { @@ -127,8 +127,15 @@ func Validate(cluster any) error { profile = c.SparkConf["spark.databricks.cluster.profile"] master = c.SparkConf["spark.master"] resourceClass = c.CustomTags["ResourceClass"] + case compute.ClusterSpec: + if c.NumWorkers > 0 || c.Autoscale != nil { + return nil + } + profile = c.SparkConf["spark.databricks.cluster.profile"] + master = c.SparkConf["spark.master"] + resourceClass = c.CustomTags["ResourceClass"] default: - return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "", "") + return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "", "", "") } if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { return nil @@ -140,6 +147,34 @@ func Validate(cluster any) error { // Long term, ModifyRequestOnInstancePool() in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK. func ModifyRequestOnInstancePool(cluster any) error { switch c := cluster.(type) { + case *compute.ClusterSpec: + // Instance profile id does not exist or not set + if c.InstancePoolId == "" { + // Worker must use an instance pool if driver uses an instance pool, + // therefore empty the computed value for driver instance pool. + c.DriverInstancePoolId = "" + return nil + } + if c.AwsAttributes != nil { + // Reset AwsAttributes + awsAttributes := compute.AwsAttributes{ + InstanceProfileArn: c.AwsAttributes.InstanceProfileArn, + } + c.AwsAttributes = &awsAttributes + } + if c.AzureAttributes != nil { + c.AzureAttributes = &compute.AzureAttributes{} + } + if c.GcpAttributes != nil { + gcpAttributes := compute.GcpAttributes{ + GoogleServiceAccount: c.GcpAttributes.GoogleServiceAccount, + } + c.GcpAttributes = &gcpAttributes + } + c.EnableElasticDisk = false + c.NodeTypeId = "" + c.DriverNodeTypeId = "" + return nil case *compute.CreateCluster: // Instance profile id does not exist or not set if c.InstancePoolId == "" { @@ -197,20 +232,35 @@ func ModifyRequestOnInstancePool(cluster any) error { c.DriverNodeTypeId = "" return nil default: - return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*") + return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*", "*") } } // This method is a duplicate of FixInstancePoolChangeIfAny(d *schema.ResourceData) in clusters/clusters_api.go that uses Go SDK. // Long term, FixInstancePoolChangeIfAny(d *schema.ResourceData) in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK. // https://github.com/databricks/terraform-provider-databricks/issues/824 -func FixInstancePoolChangeIfAny(d *schema.ResourceData, cluster *compute.EditCluster) { - oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") - oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") - if oldInstancePool != newInstancePool && - oldDriverPool == oldInstancePool && - oldDriverPool == newDriverPool { - cluster.DriverInstancePoolId = cluster.InstancePoolId +func FixInstancePoolChangeIfAny(d *schema.ResourceData, cluster any) error { + switch c := cluster.(type) { + case *compute.ClusterSpec: + oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") + oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") + if oldInstancePool != newInstancePool && + oldDriverPool == oldInstancePool && + oldDriverPool == newDriverPool { + c.DriverInstancePoolId = c.InstancePoolId + } + return nil + case *compute.EditCluster: + oldInstancePool, newInstancePool := d.GetChange("instance_pool_id") + oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id") + if oldInstancePool != newInstancePool && + oldDriverPool == oldInstancePool && + oldDriverPool == newDriverPool { + c.DriverInstancePoolId = c.InstancePoolId + } + return nil + default: + return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*", "*") } } @@ -220,7 +270,6 @@ type ClusterSpec struct { } func (ClusterSpec) CustomizeSchemaResourceSpecific(s *common.CustomizableSchema) *common.CustomizableSchema { - s.SchemaPath("autotermination_minutes").SetDefault(60) s.AddNewField("default_tags", &schema.Schema{ Type: schema.TypeMap, Computed: true, @@ -244,6 +293,11 @@ func (ClusterSpec) CustomizeSchemaResourceSpecific(s *common.CustomizableSchema) Type: schema.TypeString, Computed: true, }) + s.AddNewField("autotermination_minutes", &schema.Schema{ + Type: schema.TypeInt, + Optional: true, + Default: 60, + }) return s } @@ -300,6 +354,8 @@ func (ClusterSpec) CustomizeSchema(s *common.CustomizableSchema) *common.Customi Schema: common.StructToSchema(MountInfo{}, nil), }, }) + // Adding it back in the resource specific customization function because it is not relevant for other resources. + s.RemoveField("autotermination_minutes") return s } @@ -453,7 +509,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo if err = ModifyRequestOnInstancePool(&cluster); err != nil { return err } - FixInstancePoolChangeIfAny(d, &cluster) + err = FixInstancePoolChangeIfAny(d, &cluster) + if err != nil { + return err + } // We can only call the resize api if the cluster is in the running state // and only the cluster size (ie num_workers OR autoscale) is being changed diff --git a/common/reflect_resource.go b/common/reflect_resource.go index c05837ab4e..d65c2872af 100644 --- a/common/reflect_resource.go +++ b/common/reflect_resource.go @@ -617,7 +617,7 @@ func iterFields(rv reflect.Value, path []string, s map[string]*schema.Schema, al return fmt.Errorf("inconsistency: %s has omitempty, but is not optional", fieldName) } defaultEmpty := reflect.ValueOf(fieldSchema.Default).Kind() == reflect.Invalid - if fieldSchema.Optional && defaultEmpty && !omitEmpty { + if !isGoSDK && fieldSchema.Optional && defaultEmpty && !omitEmpty { return fmt.Errorf("inconsistency: %s is optional, default is empty, but has no omitempty", fieldName) } err := cb(fieldSchema, append(path, fieldName), field) diff --git a/docs/resources/job.md b/docs/resources/job.md index 80a73940ae..84c2a61ee6 100644 --- a/docs/resources/job.md +++ b/docs/resources/job.md @@ -127,7 +127,7 @@ This block describes individual tasks: * `sql_task` * `library` - (Optional) (Set) An optional list of libraries to be installed on the cluster that will execute the job. * `depends_on` - (Optional) block specifying dependency(-ies) for a given task. -* `job_cluster_key` - (Optional) Identifier of the Job cluster specified in the `job_cluster` block. +* `job_cluster_key` - (Required) Identifier of the Job cluster specified in the `job_cluster` block. * `existing_cluster_id` - (Optional) Identifier of the [interactive cluster](cluster.md) to run job on. *Note: running tasks on interactive clusters may lead to increased costs!* * `new_cluster` - (Optional) Task will run on a dedicated cluster. See [databricks_cluster](cluster.md) documentation for specification. * `run_if` - (Optional) An optional value indicating the condition that determines whether the task should be run once its dependencies have been completed. One of `ALL_SUCCESS`, `AT_LEAST_ONE_SUCCESS`, `NONE_FAILED`, `ALL_DONE`, `AT_LEAST_ONE_FAILED` or `ALL_FAILED`. When omitted, defaults to `ALL_SUCCESS`. @@ -426,9 +426,9 @@ The following parameter is only available on task level. This block describes health conditions for a given job or an individual task. It consists of the following attributes: * `rules` - (List) list of rules that are represented as objects with the following attributes: - * `metric` - (Optional) string specifying the metric to check. The only supported metric is `RUN_DURATION_SECONDS` (check [Jobs REST API documentation](https://docs.databricks.com/api/workspace/jobs/create) for the latest information). - * `op` - (Optional) string specifying the operation used to evaluate the given metric. The only supported operation is `GREATER_THAN`. - * `value` - (Optional) integer value used to compare to the given metric. + * `metric` - (Required) string specifying the metric to check. The only supported metric is `RUN_DURATION_SECONDS` (check [Jobs REST API documentation](https://docs.databricks.com/api/workspace/jobs/create) for the latest information). + * `op` - (Required) string specifying the operation used to evaluate the given metric. The only supported operation is `GREATER_THAN`. + * `value` - (Required) integer value used to compare to the given metric. ### tags Configuration Map @@ -468,8 +468,8 @@ This syntax uses Jobs API 2.0 to create a job with a single task. Only a subset The job cluster is specified using either of the below argument: -* `new_cluster` - (Optional) Same set of parameters as for [databricks_cluster](cluster.md) resource. -* `existing_cluster_id` - (Optional) If existing_cluster_id, the ID of an existing [cluster](cluster.md) that will be used for all runs of this job. When running jobs on an existing cluster, you may need to manually restart the cluster if it stops responding. We strongly suggest to use `new_cluster` for greater reliability. +* `new_cluster` - (Required) Same set of parameters as for [databricks_cluster](cluster.md) resource. +* `job_cluster_key` - (Required) A unique name for the job cluster. This field must be unique within the job. JobTaskSettings may refer to this field to determine which cluster to launch for the task execution. ```hcl data "databricks_current_user" "me" {} diff --git a/jobs/jobs_api_go_sdk.go b/jobs/jobs_api_go_sdk.go new file mode 100644 index 0000000000..7bf90b8f38 --- /dev/null +++ b/jobs/jobs_api_go_sdk.go @@ -0,0 +1,297 @@ +package jobs + +import ( + "context" + "errors" + "fmt" + "slices" + "sort" + "time" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/terraform-provider-databricks/clusters" + "github.com/databricks/terraform-provider-databricks/common" + "github.com/databricks/terraform-provider-databricks/repos" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func (js *JobSettingsResource) adjustTasks() { + js.sortTasksByKey() + for _, task := range js.Tasks { + sort.Slice(task.DependsOn, func(i, j int) bool { + return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey + }) + sortWebhookNotifications(task.WebhookNotifications) + } +} + +func (js *JobSettingsResource) sortTasksByKey() { + sort.Slice(js.Tasks, func(i, j int) bool { + return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey + }) +} + +func adjustTasks(cj *jobs.CreateJob) { + sortTasksByKey(cj) + for _, task := range cj.Tasks { + sort.Slice(task.DependsOn, func(i, j int) bool { + return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey + }) + sortWebhookNotifications(task.WebhookNotifications) + } +} + +func sortTasksByKey(cj *jobs.CreateJob) { + sort.Slice(cj.Tasks, func(i, j int) bool { + return cj.Tasks[i].TaskKey < cj.Tasks[j].TaskKey + }) +} + +func (js *JobSettingsResource) sortWebhooksByID() { + sortWebhookNotifications(js.WebhookNotifications) +} + +func sortWebhooksByID(cj *jobs.CreateJob) { + sortWebhookNotifications(cj.WebhookNotifications) +} + +func (js *JobSettingsResource) isMultiTask() bool { + return js.Format == "MULTI_TASK" || len(js.Tasks) > 0 +} + +func getJobLifecycleManagerGoSdk(d *schema.ResourceData, m *common.DatabricksClient) jobLifecycleManager { + if d.Get("always_running").(bool) { + return alwaysRunningLifecycleManagerGoSdk{d: d, m: m} + } + if d.Get("control_run_state").(bool) { + return controlRunStateLifecycleManagerGoSdk{d: d, m: m} + } + return noopLifecycleManager{} +} + +type alwaysRunningLifecycleManagerGoSdk struct { + d *schema.ResourceData + m *common.DatabricksClient +} + +func (a alwaysRunningLifecycleManagerGoSdk) OnCreate(ctx context.Context) error { + w, err := a.m.WorkspaceClient() + if err != nil { + return err + } + jobID, err := parseJobId(a.d.Id()) + if err != nil { + return err + } + + return Start(jobID, a.d.Timeout(schema.TimeoutCreate), w, ctx) +} + +func (a alwaysRunningLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error { + w, err := a.m.WorkspaceClient() + if err != nil { + return err + } + jobID, err := parseJobId(a.d.Id()) + if err != nil { + return err + } + + err = StopActiveRun(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx) + + if err != nil { + return err + } + return Start(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx) +} + +type controlRunStateLifecycleManagerGoSdk struct { + d *schema.ResourceData + m *common.DatabricksClient +} + +func (c controlRunStateLifecycleManagerGoSdk) OnCreate(ctx context.Context) error { + return nil +} + +func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error { + if c.d.Get("continuous") == nil { + return nil + } + + jobID, err := parseJobId(c.d.Id()) + if err != nil { + return err + } + + w, err := c.m.WorkspaceClient() + if err != nil { + return err + } + + // Only use RunNow to stop the active run if the job is unpaused. + pauseStatus := c.d.Get("continuous.0.pause_status").(string) + if pauseStatus == "UNPAUSED" { + // Previously, RunNow() was not supported for continuous jobs. Now, calling RunNow() + // on a continuous job works, cancelling the active run if there is one, and resetting + // the exponential backoff timer. So, we try to call RunNow() first, and if it fails, + // we call StopActiveRun() instead. + _, err := w.Jobs.RunNow(ctx, jobs.RunNow{ + JobId: jobID, + }) + + if err == nil { + return nil + } + + // RunNow() returns 404 when the feature is disabled. + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode != 404 { + return err + } + } + + return StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate), w, ctx) +} + +// Removing unnecesary fields out of ClusterSpec's ForceSendFields because the current terraform plugin sdk +// has a limitation that it will always set unspecified values to the zero value. +func removeUnnecessaryFieldsFromForceSendFields(clusterSpec *compute.ClusterSpec) error { + if clusterSpec.AwsAttributes != nil { + newAwsAttributesForceSendFields := []string{} + // These fields should never be 0. + unnecessaryFieldNamesForAwsAttributes := []string{ + "SpotBidPricePercent", + "EbsVolumeCount", + "EbsVolumeIops", + "EbsVolumeSize", + "EbsVolumeThroughput", + } + for _, field := range clusterSpec.AwsAttributes.ForceSendFields { + if !slices.Contains(unnecessaryFieldNamesForAwsAttributes, field) { + newAwsAttributesForceSendFields = append(newAwsAttributesForceSendFields, field) + } + } + clusterSpec.AwsAttributes.ForceSendFields = newAwsAttributesForceSendFields + } + return nil +} + +func updateJobClusterSpec(clusterSpec *compute.ClusterSpec, d *schema.ResourceData) { + clusters.ModifyRequestOnInstancePool(clusterSpec) + clusters.FixInstancePoolChangeIfAny(d, clusterSpec) + removeUnnecessaryFieldsFromForceSendFields(clusterSpec) +} + +func prepareJobSettingsForUpdateGoSdk(d *schema.ResourceData, js *JobSettingsResource) { + if js.NewCluster != nil { + updateJobClusterSpec(js.NewCluster, d) + } + for _, task := range js.Tasks { + if task.NewCluster != nil { + updateJobClusterSpec(task.NewCluster, d) + } + } + for i := range js.JobClusters { + updateJobClusterSpec(&js.JobClusters[i].NewCluster, d) + } +} + +func Create(createJob jobs.CreateJob, w *databricks.WorkspaceClient, ctx context.Context) (int64, error) { + adjustTasks(&createJob) + sortWebhooksByID(&createJob) + var gitSource *jobs.GitSource = createJob.GitSource + if gitSource != nil && gitSource.GitProvider == "" { + var provider jobs.GitProvider = jobs.GitProvider(repos.GetGitProviderFromUrl(gitSource.GitUrl)) + gitSource.GitProvider = provider + if gitSource.GitProvider == "" { + return 0, fmt.Errorf("git source is not empty but Git Provider is not specified and cannot be guessed by url %+v", gitSource) + } + if gitSource.GitBranch == "" && gitSource.GitTag == "" && gitSource.GitCommit == "" { + return 0, fmt.Errorf("git source is not empty but none of branch, commit and tag is specified") + } + } + res, err := w.Jobs.Create(ctx, createJob) + return res.JobId, err +} + +func Update(jobID int64, js JobSettingsResource, w *databricks.WorkspaceClient, ctx context.Context) error { + err := w.Jobs.Reset(ctx, jobs.ResetJob{ + JobId: jobID, + NewSettings: js.JobSettings, + }) + return wrapMissingJobError(err, fmt.Sprintf("%d", jobID)) +} + +func Read(jobID int64, w *databricks.WorkspaceClient, ctx context.Context) (job *jobs.Job, err error) { + job, err = w.Jobs.GetByJobId(ctx, jobID) + err = wrapMissingJobError(err, fmt.Sprintf("%d", jobID)) + if job.Settings != nil { + js := JobSettingsResource{JobSettings: *job.Settings} + js.adjustTasks() + js.sortWebhooksByID() + } + + // Populate the `run_as` field. In the settings struct it can only be set on write and is not + // returned on read. Therefore, we populate it from the top-level `run_as_user_name` field so + // that Terraform can still diff it with the intended state. + if job.Settings != nil && job.RunAsUserName != "" { + if common.StringIsUUID(job.RunAsUserName) { + job.Settings.RunAs = &jobs.JobRunAs{ + ServicePrincipalName: job.RunAsUserName, + } + } else { + job.Settings.RunAs = &jobs.JobRunAs{ + UserName: job.RunAsUserName, + } + } + } + + return +} + +func Start(jobID int64, timeout time.Duration, w *databricks.WorkspaceClient, ctx context.Context) error { + res, err := w.Jobs.RunNow(ctx, jobs.RunNow{ + JobId: jobID, + }) + if err != nil { + return err + } + + _, err = res.GetWithTimeout(timeout) + if err != nil { + return err + } + return nil +} + +func StopActiveRun(jobID int64, timeout time.Duration, w *databricks.WorkspaceClient, ctx context.Context) error { + runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{ + JobId: jobID, + ActiveOnly: true, + }) + if err != nil { + return err + } + if len(runs) > 1 { + return fmt.Errorf("`always_running` must be specified only with "+ + "`max_concurrent_runs = 1`. There are %d active runs", len(runs)) + } + if len(runs) == 1 { + activeRun := runs[0] + res, err := w.Jobs.CancelRun(ctx, jobs.CancelRun{ + RunId: activeRun.RunId, + }) + if err != nil { + return fmt.Errorf("cannot cancel run %d: %v", activeRun.RunId, err) + } + _, err = res.GetWithTimeout(timeout) + if err != nil { + return fmt.Errorf("cannot cancel run, error waiting for run to be terminated %d: %v", activeRun.RunId, err) + } + } + return nil +} diff --git a/jobs/resource_job.go b/jobs/resource_job.go index 2fff33d74b..d662d4a986 100644 --- a/jobs/resource_job.go +++ b/jobs/resource_job.go @@ -99,7 +99,7 @@ type SqlTask struct { Dashboard *SqlDashboardTask `json:"dashboard,omitempty"` Alert *SqlAlertTask `json:"alert,omitempty"` File *SqlFileTask `json:"file,omitempty"` - WarehouseID string `json:"warehouse_id,omitempty"` + WarehouseID string `json:"warehouse_id"` Parameters map[string]string `json:"parameters,omitempty"` } @@ -130,7 +130,7 @@ type ForEachTask struct { } type ForEachNestedTask struct { - TaskKey string `json:"task_key,omitempty"` + TaskKey string `json:"task_key"` Description string `json:"description,omitempty"` DependsOn []jobs.TaskDependency `json:"depends_on,omitempty"` RunIf string `json:"run_if,omitempty"` @@ -201,9 +201,9 @@ type GitSource struct { // End Jobs + Repo integration preview type JobHealthRule struct { - Metric string `json:"metric,omitempty"` - Operation string `json:"op,omitempty"` - Value int64 `json:"value,omitempty"` + Metric string `json:"metric"` + Operation string `json:"op"` + Value int64 `json:"value"` } type JobHealth struct { @@ -211,7 +211,7 @@ type JobHealth struct { } type JobTaskSettings struct { - TaskKey string `json:"task_key,omitempty"` + TaskKey string `json:"task_key"` Description string `json:"description,omitempty"` DependsOn []jobs.TaskDependency `json:"depends_on,omitempty"` RunIf string `json:"run_if,omitempty"` @@ -246,8 +246,8 @@ type JobTaskSettings struct { } type JobCluster struct { - JobClusterKey string `json:"job_cluster_key,omitempty" tf:"group:cluster_type"` - NewCluster *clusters.Cluster `json:"new_cluster,omitempty" tf:"group:cluster_type"` + JobClusterKey string `json:"job_cluster_key" tf:"group:cluster_type"` + NewCluster *clusters.Cluster `json:"new_cluster" tf:"group:cluster_type"` } type ContinuousConf struct { @@ -446,6 +446,45 @@ type JobsAPI struct { context context.Context } +// JobCreate + JobSettingResource related aliases. +var jobsAliases = map[string]map[string]string{ + "jobs.JobSettingsResource": { + "tasks": "task", + "parameters": "parameter", + "job_clusters": "job_cluster", + "environments": "environment", + }, + "jobs.JobCreateStruct": { + "tasks": "task", + "parameters": "parameter", + "job_clusters": "job_cluster", + "environments": "environment", + }, + "jobs.GitSource": { + "git_url": "url", + "git_provider": "provider", + "git_branch": "branch", + "git_tag": "tag", + "git_commit": "commit", + }, + "jobs.Task": { + "libraries": "library", + }, +} + +// Need a struct for JobCreate because there are aliases we need and it'll be needed in the create method. +type JobCreateStruct struct { + jobs.CreateJob +} + +func (JobCreateStruct) Aliases() map[string]map[string]string { + return jobsAliases +} + +func (JobCreateStruct) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { + return s +} + type JobSettingsResource struct { jobs.JobSettings @@ -469,25 +508,7 @@ type JobSettingsResource struct { } func (JobSettingsResource) Aliases() map[string]map[string]string { - aliases := map[string]map[string]string{ - "jobs.JobSettingsResource": { - "tasks": "task", - "parameters": "parameter", - "job_clusters": "job_cluster", - "environments": "environment", - }, - "jobs.GitSource": { - "git_url": "url", - "git_provider": "provider", - "git_branch": "branch", - "git_tag": "tag", - "git_commit": "commit", - }, - "jobs.Task": { - "libraries": "library", - }, - } - return aliases + return jobsAliases } func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common.CustomizableSchema { @@ -591,14 +612,9 @@ func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common s.SchemaPath("task", "for_each_task", "task", "new_cluster", "cluster_id").Schema.Computed = false // ======= To keep consistency with the manually maintained schema, should be reverted once full migration is done. ====== - s.SchemaPath("task", "task_key").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "task_key").SetOptional() s.SchemaPath("trigger", "table_update", "table_names").SetRequired() - s.SchemaPath("task", "sql_task", "warehouse_id").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "sql_task", "warehouse_id").SetOptional() - s.SchemaPath("task", "python_wheel_task", "entry_point").SetOptional() s.SchemaPath("task", "for_each_task", "task", "python_wheel_task", "entry_point").SetOptional() @@ -608,10 +624,6 @@ func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common s.SchemaPath("task", "sql_task", "alert", "subscriptions").SetRequired() s.SchemaPath("task", "for_each_task", "task", "sql_task", "alert", "subscriptions").SetRequired() - s.SchemaPath("health", "rules", "metric").SetOptional() - s.SchemaPath("health", "rules", "op").SetOptional() - s.SchemaPath("health", "rules", "value").SetOptional() - s.SchemaPath("task", "new_cluster", "cluster_id").SetOptional() s.SchemaPath("task", "for_each_task", "task", "new_cluster", "cluster_id").SetOptional() @@ -619,26 +631,15 @@ func (JobSettingsResource) CustomizeSchema(s *common.CustomizableSchema) *common s.SchemaPath("task", "health", "rules").SetRequired() s.SchemaPath("task", "for_each_task", "task", "health", "rules").SetRequired() - s.SchemaPath("task", "health", "rules", "op").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "health", "rules", "op").SetOptional() - - s.SchemaPath("task", "health", "rules", "metric").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "health", "rules", "metric").SetOptional() - - s.SchemaPath("task", "health", "rules", "value").SetOptional() - s.SchemaPath("task", "for_each_task", "task", "health", "rules", "value").SetOptional() - - s.SchemaPath("job_cluster", "job_cluster_key").SetOptional() - s.SchemaPath("job_cluster", "new_cluster").SetOptional() s.SchemaPath("job_cluster", "new_cluster", "cluster_id").SetOptional() - s.SchemaPath("new_cluster", "cluster_id").SetOptional() - s.SchemaPath("git_source", "provider").SetOptional() - s.SchemaPath("library").Schema.Type = schema.TypeSet s.SchemaPath("task", "library").Schema.Type = schema.TypeSet + // Technically this is required by the API, but marking it optional since we can infer it from the hostname. + s.SchemaPath("git_source", "provider").SetOptional() + return s } @@ -959,6 +960,7 @@ func (a alwaysRunningLifecycleManager) OnCreate(ctx context.Context) error { } return NewJobsAPI(ctx, a.m).Start(jobID, a.d.Timeout(schema.TimeoutCreate)) } + func (a alwaysRunningLifecycleManager) OnUpdate(ctx context.Context) error { api := NewJobsAPI(ctx, a.m) jobID, err := parseJobId(a.d.Id()) @@ -1039,7 +1041,7 @@ var jobsGoSdkSchema = common.StructToSchema(JobSettingsResource{}, nil) func ResourceJob() common.Resource { getReadCtx := func(ctx context.Context, d *schema.ResourceData) context.Context { - var js JobSettings + var js JobSettingsResource common.DataToStructPointer(d, jobsGoSdkSchema, &js) if js.isMultiTask() { return context.WithValue(ctx, common.Api, common.API_2_1) @@ -1054,7 +1056,7 @@ func ResourceJob() common.Resource { Update: schema.DefaultTimeout(clusters.DefaultProvisionTimeout), }, CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff) error { - var js JobSettings + var js JobSettingsResource common.DiffToStructPointer(d, jobsGoSdkSchema, &js) alwaysRunning := d.Get("always_running").(bool) if alwaysRunning && js.MaxConcurrentRuns > 1 { @@ -1073,12 +1075,12 @@ func ResourceJob() common.Resource { if task.NewCluster == nil { continue } - if err := task.NewCluster.Validate(); err != nil { + if err := clusters.Validate(*task.NewCluster); err != nil { return fmt.Errorf("task %s invalid: %w", task.TaskKey, err) } } if js.NewCluster != nil { - if err := js.NewCluster.Validate(); err != nil { + if err := clusters.Validate(*js.NewCluster); err != nil { return fmt.Errorf("invalid job cluster: %w", err) } } @@ -1088,40 +1090,99 @@ func ResourceJob() common.Resource { var js JobSettings common.DataToStructPointer(d, jobsGoSdkSchema, &js) if js.isMultiTask() { - ctx = context.WithValue(ctx, common.Api, common.API_2_1) - } - jobsAPI := NewJobsAPI(ctx, c) - job, err := jobsAPI.Create(js) - if err != nil { - return err + // Api 2.1 + w, err := c.WorkspaceClient() + if err != nil { + return err + } + var cj JobCreateStruct + common.DataToStructPointer(d, jobsGoSdkSchema, &cj) + jobId, err := Create(cj.CreateJob, w, ctx) + if err != nil { + return err + } + d.SetId(fmt.Sprintf("%d", jobId)) + return getJobLifecycleManagerGoSdk(d, c).OnCreate(ctx) + } else { + // Api 2.0 + // TODO: Deprecate and remove this code path + jobsAPI := NewJobsAPI(ctx, c) + job, err := jobsAPI.Create(js) + if err != nil { + return err + } + d.SetId(job.ID()) + return getJobLifecycleManager(d, c).OnCreate(ctx) } - d.SetId(job.ID()) - return getJobLifecycleManager(d, c).OnCreate(ctx) }, Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - ctx = getReadCtx(ctx, d) - job, err := NewJobsAPI(ctx, c).Read(d.Id()) - if err != nil { - return err - } - d.Set("url", c.FormatURL("#job/", d.Id())) - return common.StructToData(*job.Settings, jobsGoSdkSchema, d) - }, - Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { - var js JobSettings + var js JobSettingsResource common.DataToStructPointer(d, jobsGoSdkSchema, &js) if js.isMultiTask() { - ctx = context.WithValue(ctx, common.Api, common.API_2_1) - } - - prepareJobSettingsForUpdate(d, js) + // Api 2.1 + w, err := c.WorkspaceClient() + if err != nil { + return err + } + jobID, err := parseJobId(d.Id()) + if err != nil { + return err + } + job, err := Read(jobID, w, ctx) + if err != nil { + return err + } + d.Set("url", c.FormatURL("#job/", d.Id())) - jobsAPI := NewJobsAPI(ctx, c) - err := jobsAPI.Update(d.Id(), js) - if err != nil { - return err + res := JobSettingsResource{ + JobSettings: *job.Settings, + } + return common.StructToData(res, jobsGoSdkSchema, d) + } else { + // Api 2.0 + // TODO: Deprecate and remove this code path + job, err := NewJobsAPI(ctx, c).Read(d.Id()) + if err != nil { + return err + } + d.Set("url", c.FormatURL("#job/", d.Id())) + return common.StructToData(*job.Settings, jobsGoSdkSchema, d) + } + }, + Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { + var jsr JobSettingsResource + common.DataToStructPointer(d, jobsGoSdkSchema, &jsr) + if jsr.isMultiTask() { + // Api 2.1 + prepareJobSettingsForUpdateGoSdk(d, &jsr) + jobID, err := parseJobId(d.Id()) + if err != nil { + return err + } + w, err := c.WorkspaceClient() + if err != nil { + return err + } + err = Update(jobID, jsr, w, ctx) + if err != nil { + return err + } + return getJobLifecycleManagerGoSdk(d, c).OnUpdate(ctx) + } else { + // Api 2.0 + // TODO: Deprecate and remove this code path + var js JobSettings + common.DataToStructPointer(d, jobsGoSdkSchema, &js) + + prepareJobSettingsForUpdate(d, js) + + jobsAPI := NewJobsAPI(ctx, c) + err := jobsAPI.Update(d.Id(), js) + if err != nil { + return err + } + return getJobLifecycleManager(d, c).OnUpdate(ctx) } - return getJobLifecycleManager(d, c).OnUpdate(ctx) }, Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error { ctx = getReadCtx(ctx, d) diff --git a/jobs/resource_job_test.go b/jobs/resource_job_test.go index f9608b4cdc..76881f0a8c 100644 --- a/jobs/resource_job_test.go +++ b/jobs/resource_job_test.go @@ -996,19 +996,19 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { { Method: "POST", Resource: "/api/2.1/jobs/create", - ExpectedRequest: JobSettings{ + ExpectedRequest: jobs.CreateJob{ Name: "TF SQL task subscriptions", MaxConcurrentRuns: 1, - Tasks: []JobTaskSettings{ + Tasks: []jobs.Task{ { TaskKey: "a", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Alert: &SqlAlertTask{ - AlertID: "3cf91a42-6217-4f3c-a6f0-345d489051b9", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Alert: &jobs.SqlTaskAlert{ + AlertId: "3cf91a42-6217-4f3c-a6f0-345d489051b9", + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, PauseSubscriptions: true, }, @@ -1016,15 +1016,17 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { }, { TaskKey: "d", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Dashboard: &SqlDashboardTask{ - DashboardID: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Dashboard: &jobs.SqlTaskDashboard{ + DashboardId: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", + PauseSubscriptions: false, + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, - CustomSubject: "test", + CustomSubject: "test", + ForceSendFields: []string{"PauseSubscriptions"}, }, }, }, @@ -1037,20 +1039,20 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { { Method: "GET", Resource: "/api/2.1/jobs/get?job_id=789", - Response: Job{ - JobID: 789, - Settings: &JobSettings{ + Response: jobs.Job{ + JobId: 789, + Settings: &jobs.JobSettings{ Name: "TF SQL task subscriptions", - Tasks: []JobTaskSettings{ + Tasks: []jobs.Task{ { TaskKey: "a", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Alert: &SqlAlertTask{ - AlertID: "3cf91a42-6217-4f3c-a6f0-345d489051b9", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Alert: &jobs.SqlTaskAlert{ + AlertId: "3cf91a42-6217-4f3c-a6f0-345d489051b9", + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, PauseSubscriptions: true, }, @@ -1058,13 +1060,13 @@ func TestResourceJobCreate_SqlSubscriptions(t *testing.T) { }, { TaskKey: "d", - SqlTask: &SqlTask{ - WarehouseID: "dca3a0ba199040eb", - Dashboard: &SqlDashboardTask{ - DashboardID: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", - Subscriptions: []SqlSubscription{ + SqlTask: &jobs.SqlTask{ + WarehouseId: "dca3a0ba199040eb", + Dashboard: &jobs.SqlTaskDashboard{ + DashboardId: "d81a7760-7fd2-443e-bf41-95a60c2f4c7c", + Subscriptions: []jobs.SqlTaskSubscription{ {UserName: "user@domain.com"}, - {DestinationID: "Test"}, + {DestinationId: "Test"}, }, CustomSubject: "test", }, @@ -1825,7 +1827,6 @@ func TestResourceJobCreateFromGitSource(t *testing.T) { Method: "POST", Resource: "/api/2.1/jobs/create", ExpectedRequest: JobSettings{ - ExistingClusterID: "abc", Tasks: []JobTaskSettings{ { TaskKey: "b", @@ -1872,7 +1873,7 @@ func TestResourceJobCreateFromGitSource(t *testing.T) { }, Create: true, Resource: ResourceJob(), - HCL: `existing_cluster_id = "abc" + HCL: ` max_concurrent_runs = 1 name = "GitSourceJob" @@ -1884,6 +1885,7 @@ func TestResourceJobCreateFromGitSource(t *testing.T) { import_from_git_branch = "main" dirty_state = "NOT_SYNCED" } + provider = "gitHub" } task { @@ -1973,7 +1975,7 @@ func TestResourceJobCreateFromGitSourceWithoutProviderFail(t *testing.T) { } } `, - }.ExpectError(t, "git source is not empty but Git Provider is not specified and cannot be guessed by url &{Url:https://custom.git.hosting.com/databricks/terraform-provider-databricks Provider: Branch: Tag:0.4.8 Commit: JobSource:}") + }.ExpectError(t, "git source is not empty but Git Provider is not specified and cannot be guessed by url &{GitBranch: GitCommit: GitProvider: GitSnapshot: GitTag:0.4.8 GitUrl:https://custom.git.hosting.com/databricks/terraform-provider-databricks JobSource: ForceSendFields:[]}") } func TestResourceJobCreateSingleNode_Fail(t *testing.T) { @@ -2205,6 +2207,7 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { MaxConcurrentRuns: 1, Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{ NotebookPath: "/foo/bar", }, @@ -2214,9 +2217,11 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { RunIf: "ALL_SUCCESS", }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Inputs: "abc", Task: ForEachNestedTask{ + TaskKey: "task3", NotebookTask: &NotebookTask{NotebookPath: "/bar/foo"}, // The diff is suppressed here. Value is from // the terraform state. @@ -2238,13 +2243,16 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { Name: "My job", Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{NotebookPath: "/foo/bar"}, RunIf: "ALL_SUCCESS", }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Inputs: "abc", Task: ForEachNestedTask{ + TaskKey: "task3", NotebookTask: &NotebookTask{NotebookPath: "/bar/foo"}, RunIf: "ALL_SUCCESS", }, @@ -2265,14 +2273,17 @@ func TestResourceJobUpdate_RunIfSuppressesDiffIfAllSuccess(t *testing.T) { HCL: ` name = "My job" task { + task_key = "task1" notebook_task { notebook_path = "/foo/bar" } } task { + task_key = "task2" for_each_task { inputs = "abc" task { + task_key = "task3" notebook_task { notebook_path = "/bar/foo" } @@ -2295,6 +2306,7 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { MaxConcurrentRuns: 1, Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{ NotebookPath: "/foo/bar", }, @@ -2304,9 +2316,11 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { // RunIf is not set, as implied from the HCL config. }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Inputs: "abc", Task: ForEachNestedTask{ + TaskKey: "task3", NotebookTask: &NotebookTask{NotebookPath: "/bar/foo"}, // The diff is not suppressed. RunIf is // not set, as implied from the HCL config. @@ -2327,13 +2341,16 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { Name: "My job", Tasks: []JobTaskSettings{ { + TaskKey: "task1", NotebookTask: &NotebookTask{NotebookPath: "/foo/bar"}, RunIf: "AT_LEAST_ONE_FAILED", }, { + TaskKey: "task2", ForEachTask: &ForEachTask{ Task: ForEachNestedTask{ - RunIf: "AT_LEAST_ONE_FAILED", + TaskKey: "task3", + RunIf: "AT_LEAST_ONE_FAILED", }, }, }, @@ -2352,14 +2369,17 @@ func TestResourceJobUpdate_RunIfDoesNotSuppressIfNotAllSuccess(t *testing.T) { HCL: ` name = "My job" task { + task_key = "task1" notebook_task { notebook_path = "/foo/bar" } } task { + task_key = "task2" for_each_task { inputs = "abc" task { + task_key = "task3" notebook_task { notebook_path = "/bar/foo" } @@ -2376,58 +2396,44 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { { Method: "POST", Resource: "/api/2.1/jobs/reset", - ExpectedRequest: UpdateJobRequest{ - JobID: 789, - NewSettings: &JobSettings{ - NewCluster: &clusters.Cluster{ - InstancePoolID: "instance-pool-worker", - DriverInstancePoolID: "instance-pool-driver", - SparkVersion: "spark-1", - NumWorkers: 1, - }, - Tasks: []JobTaskSettings{ + ExpectedRequest: jobs.ResetJob{ + JobId: 789, + NewSettings: jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ { - NewCluster: &clusters.Cluster{ - InstancePoolID: "instance-pool-worker-task", - DriverInstancePoolID: "instance-pool-driver-task", - SparkVersion: "spark-2", - NumWorkers: 2, + JobClusterKey: "job_cluster_1", + NewCluster: compute.ClusterSpec{ + InstancePoolId: "instance-pool-worker-job", + DriverInstancePoolId: "instance-pool-driver-job", + SparkVersion: "spark-3", + NumWorkers: 3, }, }, }, - JobClusters: []JobCluster{ + Tasks: []jobs.Task{ { - NewCluster: &clusters.Cluster{ - InstancePoolID: "instance-pool-worker-job", - DriverInstancePoolID: "instance-pool-driver-job", - SparkVersion: "spark-3", - NumWorkers: 3, + TaskKey: "task1", + NewCluster: &compute.ClusterSpec{ + InstancePoolId: "instance-pool-worker-task", + DriverInstancePoolId: "instance-pool-driver-task", + SparkVersion: "spark-2", + NumWorkers: 2, }, }, }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, { Method: "GET", Resource: "/api/2.1/jobs/get?job_id=789", - Response: Job{ - JobID: 789, - Settings: &JobSettings{ - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id", - DriverNodeTypeID: "driver-node-type-id", - }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Response: jobs.Job{ + JobId: 789, + Settings: &jobs.JobSettings{ + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, @@ -2436,21 +2442,14 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { Update: true, Resource: ResourceJob(), InstanceState: map[string]string{ - "new_cluster.0.node_type_id": "node-type-id-worker", - "new_cluster.0.driver_node_type_id": "node-type-id-driver", "task.0.new_cluster.0.node_type_id": "node-type-id-worker-task", "task.0.new_cluster.0.driver_node_type_id": "node-type-id-driver-task", "job_cluster.0.new_cluster.0.node_type_id": "node-type-id-worker-job", "job_cluster.0.new_cluster.0.driver_node_type_id": "node-type-id-driver-job", }, HCL: ` - new_cluster = { - instance_pool_id = "instance-pool-worker" - driver_instance_pool_id = "instance-pool-driver" - spark_version = "spark-1" - num_workers = 1 - } task = { + task_key = "task1" new_cluster = { instance_pool_id = "instance-pool-worker-task" driver_instance_pool_id = "instance-pool-driver-task" @@ -2459,6 +2458,7 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { } } job_cluster = { + job_cluster_key = "job_cluster_1" new_cluster = { instance_pool_id = "instance-pool-worker-job" driver_instance_pool_id = "instance-pool-driver-job" @@ -2467,10 +2467,7 @@ func TestResourceJobUpdate_NodeTypeToInstancePool(t *testing.T) { } } max_concurrent_runs = 1 - max_retries = 3 - min_retry_interval_millis = 5000 - name = "Featurizer New" - retry_on_timeout = true`, + name = "Featurizer New"`, }.Apply(t) assert.NoError(t, err) assert.Equal(t, "789", d.Id(), "Id should be the same as in reading") @@ -2483,57 +2480,42 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { { Method: "POST", Resource: "/api/2.1/jobs/reset", - ExpectedRequest: UpdateJobRequest{ - JobID: 789, - NewSettings: &JobSettings{ - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id-1", - SparkVersion: "spark-1", - NumWorkers: 1, - }, - Tasks: []JobTaskSettings{ + ExpectedRequest: jobs.UpdateJob{ + JobId: 789, + NewSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ { - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id-2", + TaskKey: "task1", + NewCluster: &compute.ClusterSpec{ + NodeTypeId: "node-type-id-2", SparkVersion: "spark-2", NumWorkers: 2, }, }, }, - JobClusters: []JobCluster{ + JobClusters: []jobs.JobCluster{ { - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id-3", + JobClusterKey: "job_cluster_1", + NewCluster: compute.ClusterSpec{ + NodeTypeId: "node-type-id-3", SparkVersion: "spark-3", NumWorkers: 3, }, }, }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, { Method: "GET", Resource: "/api/2.1/jobs/get?job_id=789", - Response: Job{ - JobID: 789, - Settings: &JobSettings{ - NewCluster: &clusters.Cluster{ - NodeTypeID: "node-type-id", - DriverNodeTypeID: "driver-node-type-id", - InstancePoolID: "instance-pool-id-worker", - DriverInstancePoolID: "instance-pool-id-driver", - }, - Name: "Featurizer New", - MaxRetries: 3, - MinRetryIntervalMillis: 5000, - RetryOnTimeout: true, - MaxConcurrentRuns: 1, + Response: jobs.Job{ + JobId: 789, + Settings: &jobs.JobSettings{ + Name: "Featurizer New", + MaxConcurrentRuns: 1, }, }, }, @@ -2542,9 +2524,6 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { Update: true, Resource: ResourceJob(), InstanceState: map[string]string{ - "new_cluster.0.instance_pool_id": "instance-pool-id-worker", - "new_cluster.0.driver_instance_pool_id": "instance-pool-id-driver", - "new_cluster.0.node_type_id": "node-type-id-worker", "task.0.new_cluster.0.node_type_id": "node-type-id-worker-task", "task.0.instance_pool_id": "instance-pool-id-worker", "task.0.driver_instance_pool_id": "instance-pool-id-driver", @@ -2553,12 +2532,8 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { "job_cluster.0.driver_instance_pool_id": "instance-pool-id-driver", }, HCL: ` - new_cluster = { - node_type_id = "node-type-id-1" - spark_version = "spark-1" - num_workers = 1 - } task = { + task_key = "task1" new_cluster = { node_type_id = "node-type-id-2" spark_version = "spark-2" @@ -2566,6 +2541,7 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { } } job_cluster = { + job_cluster_key = "job_cluster_1" new_cluster = { node_type_id = "node-type-id-3" spark_version = "spark-3" @@ -2573,10 +2549,7 @@ func TestResourceJobUpdate_InstancePoolToNodeType(t *testing.T) { } } max_concurrent_runs = 1 - max_retries = 3 - min_retry_interval_millis = 5000 - name = "Featurizer New" - retry_on_timeout = true`, + name = "Featurizer New"`, }.Apply(t) assert.NoError(t, err) assert.Equal(t, "789", d.Id(), "Id should be the same as in reading") @@ -2595,6 +2568,7 @@ func TestResourceJobUpdate_Tasks(t *testing.T) { Name: "Featurizer New", Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", SparkJarTask: &SparkJarTask{ MainClassName: "com.labs.BarMain", @@ -2615,6 +2589,7 @@ func TestResourceJobUpdate_Tasks(t *testing.T) { Settings: &JobSettings{ Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", SparkJarTask: &SparkJarTask{ MainClassName: "com.labs.BarMain", @@ -2631,6 +2606,7 @@ func TestResourceJobUpdate_Tasks(t *testing.T) { HCL: ` name = "Featurizer New" task { + task_key = "task1" existing_cluster_id = "abc" spark_jar_task { main_class_name = "com.labs.BarMain" diff --git a/jobs/resource_job_webhook_test.go b/jobs/resource_job_webhook_test.go index 145f2f5d81..da4b5dc426 100644 --- a/jobs/resource_job_webhook_test.go +++ b/jobs/resource_job_webhook_test.go @@ -19,6 +19,7 @@ func TestResourceJobUpdate_WebhookNotifications(t *testing.T) { Name: "Webhook test", Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", }, }, @@ -42,6 +43,7 @@ func TestResourceJobUpdate_WebhookNotifications(t *testing.T) { Name: "Webhook test", Tasks: []JobTaskSettings{ { + TaskKey: "task1", ExistingClusterID: "abc", }, }, @@ -72,6 +74,7 @@ func TestResourceJobUpdate_WebhookNotifications(t *testing.T) { HCL: ` name = "Webhook test" task { + task_key = "task1" existing_cluster_id = "abc" } webhook_notifications {