Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardfeng-db committed May 16, 2024
1 parent 8b29d2d commit 228d7ff
Show file tree
Hide file tree
Showing 5 changed files with 598 additions and 212 deletions.
84 changes: 73 additions & 11 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var clusterSchemaVersion = 3

const (
numWorkerErr = "NumWorkers could be 0 only for SingleNode clusters. See https://docs.databricks.com/clusters/single-node.html for more details"
unsupportedExceptCreateOrEditErr = "unsupported type %T, must be either %scompute.CreateCluster or %scompute.EditCluster. Please report this issue to the GitHub repo"
unsupportedExceptCreateOrEditErr = "unsupported type %T, must be one of %scompute.CreateCluster, %scompute.ClusterSpec or %scompute.EditCluster. Please report this issue to the GitHub repo"
)

func ResourceCluster() common.Resource {
Expand Down Expand Up @@ -123,8 +123,15 @@ func Validate(cluster any) error {
profile = c.SparkConf["spark.databricks.cluster.profile"]
master = c.SparkConf["spark.master"]
resourceClass = c.CustomTags["ResourceClass"]
case compute.ClusterSpec:
if c.NumWorkers > 0 || c.Autoscale != nil {
return nil
}
profile = c.SparkConf["spark.databricks.cluster.profile"]
master = c.SparkConf["spark.master"]
resourceClass = c.CustomTags["ResourceClass"]
default:
return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "", "")
return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "", "", "")
}
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" {
return nil
Expand All @@ -136,6 +143,34 @@ func Validate(cluster any) error {
// Long term, ModifyRequestOnInstancePool() in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK.
func ModifyRequestOnInstancePool(cluster any) error {
switch c := cluster.(type) {
case *compute.ClusterSpec:
// Instance profile id does not exist or not set
if c.InstancePoolId == "" {
// Worker must use an instance pool if driver uses an instance pool,
// therefore empty the computed value for driver instance pool.
c.DriverInstancePoolId = ""
return nil
}
if c.AwsAttributes != nil {
// Reset AwsAttributes
awsAttributes := compute.AwsAttributes{
InstanceProfileArn: c.AwsAttributes.InstanceProfileArn,
}
c.AwsAttributes = &awsAttributes
}
if c.AzureAttributes != nil {
c.AzureAttributes = &compute.AzureAttributes{}
}
if c.GcpAttributes != nil {
gcpAttributes := compute.GcpAttributes{
GoogleServiceAccount: c.GcpAttributes.GoogleServiceAccount,
}
c.GcpAttributes = &gcpAttributes
}
c.EnableElasticDisk = false
c.NodeTypeId = ""
c.DriverNodeTypeId = ""
return nil
case *compute.CreateCluster:
// Instance profile id does not exist or not set
if c.InstancePoolId == "" {
Expand Down Expand Up @@ -193,20 +228,44 @@ func ModifyRequestOnInstancePool(cluster any) error {
c.DriverNodeTypeId = ""
return nil
default:
return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*")
return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*", "*")
}
}

// This method is a duplicate of FixInstancePoolChangeIfAny(d *schema.ResourceData) in clusters/clusters_api.go that uses Go SDK.
// Long term, FixInstancePoolChangeIfAny(d *schema.ResourceData) in clusters_api.go will be removed once all the resources using clusters are migrated to Go SDK.
// https://github.com/databricks/terraform-provider-databricks/issues/824
func FixInstancePoolChangeIfAny(d *schema.ResourceData, cluster *compute.EditCluster) {
oldInstancePool, newInstancePool := d.GetChange("instance_pool_id")
oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id")
if oldInstancePool != newInstancePool &&
oldDriverPool == oldInstancePool &&
oldDriverPool == newDriverPool {
cluster.DriverInstancePoolId = cluster.InstancePoolId
func FixInstancePoolChangeIfAny(d *schema.ResourceData, cluster any) error {
switch c := cluster.(type) {
case *compute.ClusterSpec:
oldInstancePool, newInstancePool := d.GetChange("instance_pool_id")
oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id")
if oldInstancePool != newInstancePool &&
oldDriverPool == oldInstancePool &&
oldDriverPool == newDriverPool {
c.DriverInstancePoolId = c.InstancePoolId
}
return nil
case *compute.CreateCluster:
oldInstancePool, newInstancePool := d.GetChange("instance_pool_id")
oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id")
if oldInstancePool != newInstancePool &&
oldDriverPool == oldInstancePool &&
oldDriverPool == newDriverPool {
c.DriverInstancePoolId = c.InstancePoolId
}
return nil
case *compute.EditCluster:
oldInstancePool, newInstancePool := d.GetChange("instance_pool_id")
oldDriverPool, newDriverPool := d.GetChange("driver_instance_pool_id")
if oldInstancePool != newInstancePool &&
oldDriverPool == oldInstancePool &&
oldDriverPool == newDriverPool {
c.DriverInstancePoolId = c.InstancePoolId
}
return nil
default:
return fmt.Errorf(unsupportedExceptCreateOrEditErr, cluster, "*", "*", "*")
}
}

Expand Down Expand Up @@ -450,7 +509,10 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
if err = ModifyRequestOnInstancePool(&cluster); err != nil {
return err
}
FixInstancePoolChangeIfAny(d, &cluster)
err = FixInstancePoolChangeIfAny(d, &cluster)
if err != nil {
return err
}

// We can only call the resize api if the cluster is in the running state
// and only the cluster size (ie num_workers OR autoscale) is being changed
Expand Down
206 changes: 206 additions & 0 deletions jobs/jobs_api_go_sdk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package jobs

import (
"context"
"errors"
"fmt"
"sort"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/terraform-provider-databricks/clusters"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func (js *JobSettingsResource) adjustTasks() {
js.sortTasksByKey()
for _, task := range js.Tasks {
sort.Slice(task.DependsOn, func(i, j int) bool {
return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey
})
sortWebhookNotifications(task.WebhookNotifications)
}
}

func (js *JobSettingsResource) sortTasksByKey() {
sort.Slice(js.Tasks, func(i, j int) bool {
return js.Tasks[i].TaskKey < js.Tasks[j].TaskKey
})
}

func adjustTasks(cj *jobs.CreateJob) {
sortTasksByKey(cj)
for _, task := range cj.Tasks {
sort.Slice(task.DependsOn, func(i, j int) bool {
return task.DependsOn[i].TaskKey < task.DependsOn[j].TaskKey
})
sortWebhookNotifications(task.WebhookNotifications)
}
}

func sortTasksByKey(cj *jobs.CreateJob) {
sort.Slice(cj.Tasks, func(i, j int) bool {
return cj.Tasks[i].TaskKey < cj.Tasks[j].TaskKey
})
}

func (js *JobSettingsResource) sortWebhooksByID() {
sortWebhookNotifications(js.WebhookNotifications)
}

func sortWebhooksByID(cj *jobs.CreateJob) {
sortWebhookNotifications(cj.WebhookNotifications)
}

func (js *JobSettingsResource) isMultiTask() bool {
return js.Format == "MULTI_TASK" || len(js.Tasks) > 0
}

func getJobLifecycleManagerGoSdk(d *schema.ResourceData, m *common.DatabricksClient) jobLifecycleManager {
if d.Get("always_running").(bool) {
return alwaysRunningLifecycleManagerGoSdk{d: d, m: m}
}
if d.Get("control_run_state").(bool) {
return controlRunStateLifecycleManagerGoSdk{d: d, m: m}
}
return noopLifecycleManager{}
}

type alwaysRunningLifecycleManagerGoSdk struct {
d *schema.ResourceData
m *common.DatabricksClient
}

func (a alwaysRunningLifecycleManagerGoSdk) OnCreate(ctx context.Context) error {
w, err := a.m.WorkspaceClient()
if err != nil {
return err
}
jobID, err := parseJobId(a.d.Id())
if err != nil {
return err
}

return Start(jobID, a.d.Timeout(schema.TimeoutCreate), w, ctx)
}

func (a alwaysRunningLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error {
w, err := a.m.WorkspaceClient()
if err != nil {
return err
}
jobID, err := parseJobId(a.d.Id())
if err != nil {
return err
}

err = StopActiveRun(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx)

if err != nil {
return err
}
return Start(jobID, a.d.Timeout(schema.TimeoutUpdate), w, ctx)
}

type controlRunStateLifecycleManagerGoSdk struct {
d *schema.ResourceData
m *common.DatabricksClient
}

func (c controlRunStateLifecycleManagerGoSdk) OnCreate(ctx context.Context) error {
return nil
}

func (c controlRunStateLifecycleManagerGoSdk) OnUpdate(ctx context.Context) error {
if c.d.Get("continuous") == nil {
return nil
}

jobID, err := parseJobId(c.d.Id())
if err != nil {
return err
}

w, err := c.m.WorkspaceClient()
if err != nil {
return err
}

// Only use RunNow to stop the active run if the job is unpaused.
pauseStatus := c.d.Get("continuous.0.pause_status").(string)
if pauseStatus == "UNPAUSED" {
// Previously, RunNow() was not supported for continuous jobs. Now, calling RunNow()
// on a continuous job works, cancelling the active run if there is one, and resetting
// the exponential backoff timer. So, we try to call RunNow() first, and if it fails,
// we call StopActiveRun() instead.
_, err := w.Jobs.RunNow(ctx, jobs.RunNow{
JobId: jobID,
})

if err == nil {
return nil
}

// RunNow() returns 404 when the feature is disabled.
var apiErr *apierr.APIError
if errors.As(err, &apiErr) && apiErr.StatusCode != 404 {
return err
}
}

return StopActiveRun(jobID, c.d.Timeout(schema.TimeoutUpdate), w, ctx)
}

func prepareJobSettingsForUpdateGoSdk(d *schema.ResourceData, js JobSettingsResource) {
if js.NewCluster != nil {
clusters.ModifyRequestOnInstancePool(js.NewCluster)
clusters.FixInstancePoolChangeIfAny(d, &js.NewCluster)
}
for _, task := range js.Tasks {
if task.NewCluster != nil {
clusters.ModifyRequestOnInstancePool(task.NewCluster)
clusters.FixInstancePoolChangeIfAny(d, &task.NewCluster)
}
}
for i := range js.JobClusters {
clusters.ModifyRequestOnInstancePool(&js.JobClusters[i].NewCluster)
clusters.FixInstancePoolChangeIfAny(d, &js.JobClusters[i].NewCluster)
}
}

func Update(jobID int64, js JobSettingsResource, w *databricks.WorkspaceClient, ctx context.Context) error {
err := w.Jobs.Reset(ctx, jobs.ResetJob{
JobId: jobID,
NewSettings: js.JobSettings,
})
return wrapMissingJobError(err, fmt.Sprintf("%d", jobID))
}

func Read(jobID int64, w *databricks.WorkspaceClient, ctx context.Context) (job *jobs.Job, err error) {
job, err = w.Jobs.GetByJobId(ctx, jobID)
err = wrapMissingJobError(err, fmt.Sprintf("%d", jobID))
if job.Settings != nil {
js := JobSettingsResource{JobSettings: *job.Settings}
js.adjustTasks()
js.sortWebhooksByID()
}

// Populate the `run_as` field. In the settings struct it can only be set on write and is not
// returned on read. Therefore, we populate it from the top-level `run_as_user_name` field so
// that Terraform can still diff it with the intended state.
if job.Settings != nil && job.RunAsUserName != "" {
if common.StringIsUUID(job.RunAsUserName) {
job.Settings.RunAs = &jobs.JobRunAs{
ServicePrincipalName: job.RunAsUserName,
}
} else {
job.Settings.RunAs = &jobs.JobRunAs{
UserName: job.RunAsUserName,
}
}
}

return
}
Loading

0 comments on commit 228d7ff

Please sign in to comment.