From 6d15b155c391f47022c3838c0b6722ce2705caa2 Mon Sep 17 00:00:00 2001 From: aravind-segu Date: Sun, 24 Mar 2024 11:47:13 -0700 Subject: [PATCH] Add Lakehouse Monitor --- .../mutator/process_target_mode_test.go | 3 + .../mutator/resolve_variable_references.go | 4 +- bundle/config/resources.go | 14 ++++ .../config/resources/lakehouse_monitoring.go | 23 ++++++ bundle/config/root.go | 2 +- bundle/deploy/terraform/convert.go | 18 +++++ bundle/deploy/terraform/convert_test.go | 27 +++++++ bundle/deploy/terraform/interpolate.go | 2 + .../terraform/tfdyn/convert_experiment.go | 2 +- bundle/deploy/terraform/tfdyn/convert_job.go | 4 +- .../tfdyn/convert_lakehouse_monitor.go | 40 ++++++++++ .../tfdyn/convert_lakehouse_monitor_test.go | 47 ++++++++++++ .../deploy/terraform/tfdyn/convert_model.go | 2 +- .../tfdyn/convert_model_serving_endpoint.go | 2 +- .../terraform/tfdyn/convert_pipeline.go | 2 +- .../tfdyn/convert_registered_model.go | 2 +- .../tf/schema/resource_lakehouse_monitor.go | 73 +++++++++++++++++++ bundle/schema/openapi.go | 19 +++++ bundle/tests/lakehouse_monitor/databricks.yml | 31 ++++++++ bundle/tests/lakehouse_monitor_test.go | 48 ++++++++++++ libs/dyn/convert/normalize.go | 43 ++++++----- libs/dyn/convert/struct_info.go | 12 ++- 22 files changed, 389 insertions(+), 31 deletions(-) create mode 100644 bundle/config/resources/lakehouse_monitoring.go create mode 100644 bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor.go create mode 100644 bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor_test.go create mode 100644 bundle/internal/tf/schema/resource_lakehouse_monitor.go create mode 100644 bundle/tests/lakehouse_monitor/databricks.yml create mode 100644 bundle/tests/lakehouse_monitor_test.go diff --git a/bundle/config/mutator/process_target_mode_test.go b/bundle/config/mutator/process_target_mode_test.go index a5f61284c7..2f9dac12ba 100644 --- a/bundle/config/mutator/process_target_mode_test.go +++ b/bundle/config/mutator/process_target_mode_test.go @@ -97,6 +97,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle { RegisteredModels: map[string]*resources.RegisteredModel{ "registeredmodel1": {CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{Name: "registeredmodel1"}}, }, + LakehouseMonitor: map[string]*resources.LakehouseMonitor{ + "lakehouseMonitor1": {CreateMonitor: &catalog.CreateMonitor{FullName: "lakehouseMonitor1"}}, + }, }, }, // Use AWS implementation for testing. diff --git a/bundle/config/mutator/resolve_variable_references.go b/bundle/config/mutator/resolve_variable_references.go index 1075e83e35..815b7ff5d4 100644 --- a/bundle/config/mutator/resolve_variable_references.go +++ b/bundle/config/mutator/resolve_variable_references.go @@ -50,7 +50,7 @@ func (m *resolveVariableReferences) Apply(ctx context.Context, b *bundle.Bundle) // We can ignore the diagnostics return valuebecause we know that the dynamic value // has already been normalized when it was first loaded from the configuration file. // - normalized, _ := convert.Normalize(b.Config, root, convert.IncludeMissingFields) + normalized, _ := convert.Normalize(b.Config, root, make(map[string]string), convert.IncludeMissingFields) lookup := func(path dyn.Path) (dyn.Value, error) { // Future opportunity: if we lookup this path in both the given root // and the synthesized root, we know if it was explicitly set or implied to be empty. @@ -84,7 +84,7 @@ func (m *resolveVariableReferences) Apply(ctx context.Context, b *bundle.Bundle) // Normalize the result because variable resolution may have been applied to non-string fields. // For example, a variable reference may have been resolved to a integer. - root, diags := convert.Normalize(b.Config, root) + root, diags := convert.Normalize(b.Config, root, make(map[string]string)) for _, diag := range diags { // This occurs when a variable's resolved value is incompatible with the field's type. // Log a warning until we have a better way to surface these diagnostics to the user. diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 457360a0cb..f00d86939e 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -17,6 +17,7 @@ type Resources struct { Experiments map[string]*resources.MlflowExperiment `json:"experiments,omitempty"` ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` + LakehouseMonitor map[string]*resources.LakehouseMonitor `json:"lakehouse_monitor,omitempty"` } type UniqueResourceIdTracker struct { @@ -123,6 +124,19 @@ func (r *Resources) VerifyUniqueResourceIdentifiers() (*UniqueResourceIdTracker, tracker.Type[k] = "registered_model" tracker.ConfigPath[k] = r.RegisteredModels[k].ConfigFilePath } + for k := range r.LakehouseMonitor { + if _, ok := tracker.Type[k]; ok { + return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)", + k, + tracker.Type[k], + tracker.ConfigPath[k], + "lakehouse_monitor", + r.LakehouseMonitor[k].ConfigFilePath, + ) + } + tracker.Type[k] = "lakehouse_monitor" + tracker.ConfigPath[k] = r.LakehouseMonitor[k].ConfigFilePath + } return tracker, nil } diff --git a/bundle/config/resources/lakehouse_monitoring.go b/bundle/config/resources/lakehouse_monitoring.go new file mode 100644 index 0000000000..035c27a30a --- /dev/null +++ b/bundle/config/resources/lakehouse_monitoring.go @@ -0,0 +1,23 @@ +package resources + +import ( + "github.com/databricks/cli/bundle/config/paths" + "github.com/databricks/databricks-sdk-go/service/catalog" +) + +type LakehouseMonitor struct { + // Represents the Input Arguments for Terraform and will get + // converted to a HCL representation for CRUD + *catalog.CreateMonitor + + // This represents the id which is the full name of the monitor + // (catalog_name.schema_name.table_name) that can be used + // as a reference in other resources. This value is returned by terraform. + ID string `json:"id,omitempty" bundle:"readonly"` + + // Path to config file where the resource is defined. All bundle resources + // include this for interpolation purposes. + paths.Paths + + ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"` +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 8e1ff65077..6865d98adf 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -97,7 +97,7 @@ func Load(path string) (*Root, error) { } // Normalize dynamic configuration tree according to configuration type. - v, diags := convert.Normalize(r, v) + v, diags := convert.Normalize(r, v, make(map[string]string)) // Keep track of diagnostics (warnings and errors in the schema). // We delay acting on diagnostics until we have loaded all diff --git a/bundle/deploy/terraform/convert.go b/bundle/deploy/terraform/convert.go index f2fb77e18e..6cb01d5b4c 100644 --- a/bundle/deploy/terraform/convert.go +++ b/bundle/deploy/terraform/convert.go @@ -222,6 +222,14 @@ func BundleToTerraform(config *config.Root) *schema.Root { } } + for k, src := range config.Resources.LakehouseMonitor { + noResources = false + var dst schema.ResourceLakehouseMonitor + conv(src, &dst) + dst.TableName = src.FullName + tfroot.Resource.LakehouseMonitor[k] = &dst + } + // We explicitly set "resource" to nil to omit it from a JSON encoding. // This is required because the terraform CLI requires >= 1 resources defined // if the "resource" property is used in a .tf.json file. @@ -359,6 +367,16 @@ func TerraformToBundle(state *tfjson.State, config *config.Root) error { modifiedStatus := convRemoteToLocal(tmp, &cur) cur.ModifiedStatus = modifiedStatus config.Resources.RegisteredModels[resource.Name] = cur + case "databricks_lakehouse_monitor": + var tmp schema.ResourceLakehouseMonitor + conv(resource.AttributeValues, &tmp) + if config.Resources.LakehouseMonitor == nil { + config.Resources.LakehouseMonitor = make(map[string]*resources.LakehouseMonitor) + } + cur := config.Resources.LakehouseMonitor[resource.Name] + modifiedStatus := convRemoteToLocal(tmp, &cur) + cur.ModifiedStatus = modifiedStatus + config.Resources.LakehouseMonitor[resource.Name] = cur case "databricks_permissions": case "databricks_grants": // Ignore; no need to pull these back into the configuration. diff --git a/bundle/deploy/terraform/convert_test.go b/bundle/deploy/terraform/convert_test.go index fa59e092df..d37cf95a83 100644 --- a/bundle/deploy/terraform/convert_test.go +++ b/bundle/deploy/terraform/convert_test.go @@ -548,6 +548,12 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { Name: "test_registered_model", AttributeValues: map[string]interface{}{"id": "1"}, }, + { + Type: "databricks_lakehouse_monitor", + Mode: "managed", + Name: "test_lakehouse_monitor", + AttributeValues: map[string]interface{}{"id": "1"}, + }, }, }, }, @@ -573,6 +579,8 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { assert.Equal(t, "1", config.Resources.RegisteredModels["test_registered_model"].ID) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.RegisteredModels["test_registered_model"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.RegisteredModels["test_lakehouse_monitor"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.RegisteredModels["test_lakehouse_monitor"].ModifiedStatus) AssertFullResourceCoverage(t, &config) } @@ -621,6 +629,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { }, }, }, + LakehouseMonitor: map[string]*resources.LakehouseMonitor{ + "test_lakehouse_monitor": { + CreateMonitor: &catalog.CreateMonitor{ + FullName: "test_lakehouse_monitor", + }, + }, + }, }, } var tfState = tfjson.State{ @@ -725,6 +740,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { }, }, }, + LakehouseMonitor: map[string]*resources.LakehouseMonitor{ + "test_lakehouse_monitor": { + CreateMonitor: &catalog.CreateMonitor{ + FullName: "test_lakehouse_monitor", + }, + }, + "test_lakehouse_monitor_new": { + CreateMonitor: &catalog.CreateMonitor{ + FullName: "test_lakehouse_monitor_new", + }, + }, + }, }, } var tfState = tfjson.State{ diff --git a/bundle/deploy/terraform/interpolate.go b/bundle/deploy/terraform/interpolate.go index 525a38fa88..c6010ebc57 100644 --- a/bundle/deploy/terraform/interpolate.go +++ b/bundle/deploy/terraform/interpolate.go @@ -53,6 +53,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) error path = dyn.NewPath(dyn.Key("databricks_model_serving")).Append(path[2:]...) case dyn.Key("registered_models"): path = dyn.NewPath(dyn.Key("databricks_registered_model")).Append(path[2:]...) + case dyn.Key("lakehouse_monitor"): + path = dyn.NewPath(dyn.Key("databricks_lakehouse_monitor")).Append(path[2:]...) default: // Trigger "key not found" for unknown resource types. return dyn.GetByPath(root, path) diff --git a/bundle/deploy/terraform/tfdyn/convert_experiment.go b/bundle/deploy/terraform/tfdyn/convert_experiment.go index 0c129181f2..641abb4e47 100644 --- a/bundle/deploy/terraform/tfdyn/convert_experiment.go +++ b/bundle/deploy/terraform/tfdyn/convert_experiment.go @@ -12,7 +12,7 @@ import ( func convertExperimentResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { // Normalize the output value to the target schema. - vout, diags := convert.Normalize(schema.ResourceMlflowExperiment{}, vin) + vout, diags := convert.Normalize(schema.ResourceMlflowExperiment{}, vin, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "experiment normalization diagnostic: %s", diag.Summary) } diff --git a/bundle/deploy/terraform/tfdyn/convert_job.go b/bundle/deploy/terraform/tfdyn/convert_job.go index 778af1adc9..0b5cdd1dd8 100644 --- a/bundle/deploy/terraform/tfdyn/convert_job.go +++ b/bundle/deploy/terraform/tfdyn/convert_job.go @@ -14,7 +14,7 @@ import ( func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { // Normalize the input value to the underlying job schema. // This removes superfluous keys and adapts the input to the expected schema. - vin, diags := convert.Normalize(jobs.JobSettings{}, vin) + vin, diags := convert.Normalize(jobs.JobSettings{}, vin, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "job normalization diagnostic: %s", diag.Summary) } @@ -54,7 +54,7 @@ func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { } // Normalize the output value to the target schema. - vout, diags = convert.Normalize(schema.ResourceJob{}, vout) + vout, diags = convert.Normalize(schema.ResourceJob{}, vout, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "job normalization diagnostic: %s", diag.Summary) } diff --git a/bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor.go b/bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor.go new file mode 100644 index 0000000000..7f48e80414 --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor.go @@ -0,0 +1,40 @@ +package tfdyn + +import ( + "context" + + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +func convertLakehouseMonitorResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { + var substitutions = make(map[string]string) + substitutions["full_name"] = "table_name" + // Normalize the output value to the target schema. + vout, diags := convert.Normalize(schema.ResourceLakehouseMonitor{}, vin, substitutions) + for _, diag := range diags { + log.Debugf(ctx, "lakehouse monitor normalization diagnostic: %s", diag.Summary) + } + + return vout, nil +} + +type lakehouseMonitorConverter struct{} + +func (lakehouseMonitorConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error { + vout, err := convertLakehouseMonitorResource(ctx, vin) + if err != nil { + return err + } + + // Add the converted resource to the output. + out.LakehouseMonitor[key] = vout.AsAny() + + return nil +} + +func init() { + registerConverter("lakehouse_monitor", lakehouseMonitorConverter{}) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor_test.go b/bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor_test.go new file mode 100644 index 0000000000..f676c30c0c --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor_test.go @@ -0,0 +1,47 @@ +package tfdyn + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertLakehouseMonitor(t *testing.T) { + var src = resources.LakehouseMonitor{ + CreateMonitor: &catalog.CreateMonitor{ + FullName: "test_table_name", + AssetsDir: "assets_dir", + OutputSchemaName: "output_schema_name", + InferenceLog: &catalog.MonitorInferenceLogProfileType{ + ModelIdCol: "model_id", + PredictionCol: "test_prediction_col", + ProblemType: "PROBLEM_TYPE_CLASSIFICATION", + }, + }, + } + vin, err := convert.FromTyped(src, dyn.NilValue) + require.NoError(t, err) + ctx := context.Background() + out := schema.NewResources() + err = lakehouseMonitorConverter{}.Convert(ctx, "my_lakehouse_monitor", vin, out) + + require.NoError(t, err) + print(out) + assert.Equal(t, map[string]any{ + "assets_dir": "assets_dir", + "output_schema_name": "output_schema_name", + "table_name": "test_table_name", + "inference_log": map[string]any{ + "model_id_col": "model_id", + "prediction_col": "test_prediction_col", + "problem_type": "PROBLEM_TYPE_CLASSIFICATION", + }, + }, out.LakehouseMonitor["my_lakehouse_monitor"]) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_model.go b/bundle/deploy/terraform/tfdyn/convert_model.go index f5d7d489b6..d259902f34 100644 --- a/bundle/deploy/terraform/tfdyn/convert_model.go +++ b/bundle/deploy/terraform/tfdyn/convert_model.go @@ -12,7 +12,7 @@ import ( func convertModelResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { // Normalize the output value to the target schema. - vout, diags := convert.Normalize(schema.ResourceMlflowModel{}, vin) + vout, diags := convert.Normalize(schema.ResourceMlflowModel{}, vin, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "model normalization diagnostic: %s", diag.Summary) } diff --git a/bundle/deploy/terraform/tfdyn/convert_model_serving_endpoint.go b/bundle/deploy/terraform/tfdyn/convert_model_serving_endpoint.go index b67e4dcc34..fa63f28c74 100644 --- a/bundle/deploy/terraform/tfdyn/convert_model_serving_endpoint.go +++ b/bundle/deploy/terraform/tfdyn/convert_model_serving_endpoint.go @@ -12,7 +12,7 @@ import ( func convertModelServingEndpointResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { // Normalize the output value to the target schema. - vout, diags := convert.Normalize(schema.ResourceModelServing{}, vin) + vout, diags := convert.Normalize(schema.ResourceModelServing{}, vin, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "model serving endpoint normalization diagnostic: %s", diag.Summary) } diff --git a/bundle/deploy/terraform/tfdyn/convert_pipeline.go b/bundle/deploy/terraform/tfdyn/convert_pipeline.go index ea0c94d66c..721d9c780a 100644 --- a/bundle/deploy/terraform/tfdyn/convert_pipeline.go +++ b/bundle/deploy/terraform/tfdyn/convert_pipeline.go @@ -22,7 +22,7 @@ func convertPipelineResource(ctx context.Context, vin dyn.Value) (dyn.Value, err } // Normalize the output value to the target schema. - vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout) + vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "pipeline normalization diagnostic: %s", diag.Summary) } diff --git a/bundle/deploy/terraform/tfdyn/convert_registered_model.go b/bundle/deploy/terraform/tfdyn/convert_registered_model.go index 20aa596f2c..c0a79c9420 100644 --- a/bundle/deploy/terraform/tfdyn/convert_registered_model.go +++ b/bundle/deploy/terraform/tfdyn/convert_registered_model.go @@ -12,7 +12,7 @@ import ( func convertRegisteredModelResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { // Normalize the output value to the target schema. - vout, diags := convert.Normalize(schema.ResourceRegisteredModel{}, vin) + vout, diags := convert.Normalize(schema.ResourceRegisteredModel{}, vin, make(map[string]string)) for _, diag := range diags { log.Debugf(ctx, "registered model normalization diagnostic: %s", diag.Summary) } diff --git a/bundle/internal/tf/schema/resource_lakehouse_monitor.go b/bundle/internal/tf/schema/resource_lakehouse_monitor.go new file mode 100644 index 0000000000..d96f7e2fe5 --- /dev/null +++ b/bundle/internal/tf/schema/resource_lakehouse_monitor.go @@ -0,0 +1,73 @@ +// Generated from Databricks Terraform provider schema. DO NOT EDIT. + +package schema + + +type ResourceLakehouseMonitorCustomMetrics struct { + Definition string `json:"definition,omitempty"` + InputColumns []string `json:"input_columns,omitempty"` + Name string `json:"name,omitempty"` + OutputDataType string `json:"output_data_type,omitempty"` + Type string `json:"type,omitempty"` +} + +type ResourceLakehouseMonitorDataClassificationConfig struct { + Enabled bool `json:"enabled,omitempty"` +} + +type ResourceLakehouseMonitorInferenceLog struct { + Granularities []string `json:"granularities,omitempty"` + LabelCol string `json:"label_col,omitempty"` + ModelIdCol string `json:"model_id_col,omitempty"` + PredictionCol string `json:"prediction_col,omitempty"` + PredictionProbaCol string `json:"prediction_proba_col,omitempty"` + ProblemType string `json:"problem_type,omitempty"` + TimestampCol string `json:"timestamp_col,omitempty"` +} + +type ResourceLakehouseMonitorNotificationsOnFailure struct { + EmailAddresses []string `json:"email_addresses,omitempty"` +} + +type ResourceLakehouseMonitorNotifications struct { + OnFailure *ResourceLakehouseMonitorNotificationsOnFailure `json:"on_failure,omitempty"` +} + +type ResourceLakehouseMonitorSchedule struct { + PauseStatus string `json:"pause_status,omitempty"` + QuartzCronExpression string `json:"quartz_cron_expression,omitempty"` + TimezoneId string `json:"timezone_id,omitempty"` +} + +type ResourceLakehouseMonitorSnapshot struct { +} + +type ResourceLakehouseMonitorTimeSeries struct { + Granularities []string `json:"granularities,omitempty"` + TimestampCol string `json:"timestamp_col,omitempty"` +} + +type ResourceLakehouseMonitor struct { + AssetsDir string `json:"assets_dir"` + BaselineTableName string `json:"baseline_table_name,omitempty"` + DashboardId string `json:"dashboard_id,omitempty"` + DriftMetricsTableName string `json:"drift_metrics_table_name,omitempty"` + Id string `json:"id,omitempty"` + LatestMonitorFailureMsg string `json:"latest_monitor_failure_msg,omitempty"` + MonitorVersion string `json:"monitor_version,omitempty"` + OutputSchemaName string `json:"output_schema_name"` + ProfileMetricsTableName string `json:"profile_metrics_table_name,omitempty"` + SkipBuiltinDashboard bool `json:"skip_builtin_dashboard,omitempty"` + SlicingExprs []string `json:"slicing_exprs,omitempty"` + Status string `json:"status,omitempty"` + TableName string `json:"table_name"` + WarehouseId string `json:"warehouse_id,omitempty"` + CustomMetrics []ResourceLakehouseMonitorCustomMetrics `json:"custom_metrics,omitempty"` + DataClassificationConfig *ResourceLakehouseMonitorDataClassificationConfig `json:"data_classification_config,omitempty"` + InferenceLog *ResourceLakehouseMonitorInferenceLog `json:"inference_log,omitempty"` + Notifications []ResourceLakehouseMonitorNotifications `json:"notifications,omitempty"` + Schedule *ResourceLakehouseMonitorSchedule `json:"schedule,omitempty"` + Snapshot *ResourceLakehouseMonitorSnapshot `json:"snapshot,omitempty"` + TimeSeries *ResourceLakehouseMonitorTimeSeries `json:"time_series,omitempty"` +} + diff --git a/bundle/schema/openapi.go b/bundle/schema/openapi.go index fe329e7ac4..5093e5b1ff 100644 --- a/bundle/schema/openapi.go +++ b/bundle/schema/openapi.go @@ -243,6 +243,19 @@ func (reader *OpenapiReader) registeredModelDocs() (*Docs, error) { return registeredModelsAllDocs, nil } +func (reader *OpenapiReader) lakehouseMonitorDocs() (*Docs, error) { + lakehouseMonitorSpecSchema, err := reader.readResolvedSchema(SchemaPathPrefix + "catalog.CreateMonitor") + if err != nil { + return nil, err + } + lakehouseMonitorDocs := schemaToDocs(lakehouseMonitorSpecSchema) + lakehouseMonitorAllDocs := &Docs{ + Description: "List of Lakehouse Monitors", + AdditionalProperties: lakehouseMonitorDocs, + } + return lakehouseMonitorAllDocs, nil +} + func (reader *OpenapiReader) ResourcesDocs() (*Docs, error) { jobsDocs, err := reader.jobsDocs() if err != nil { @@ -269,6 +282,11 @@ func (reader *OpenapiReader) ResourcesDocs() (*Docs, error) { return nil, err } + lakehouseMonitorDocs, err := reader.lakehouseMonitorDocs() + if err != nil { + return nil, err + } + return &Docs{ Description: "Collection of Databricks resources to deploy.", Properties: map[string]*Docs{ @@ -278,6 +296,7 @@ func (reader *OpenapiReader) ResourcesDocs() (*Docs, error) { "models": modelsDocs, "model_serving_endpoints": modelServingEndpointsDocs, "registered_models": registeredModelsDocs, + "lakehouse_monitor": lakehouseMonitorDocs, }, }, nil } diff --git a/bundle/tests/lakehouse_monitor/databricks.yml b/bundle/tests/lakehouse_monitor/databricks.yml new file mode 100644 index 0000000000..e5bf1080aa --- /dev/null +++ b/bundle/tests/lakehouse_monitor/databricks.yml @@ -0,0 +1,31 @@ +resources: + lakehouse_monitor: + my_lakehouse_monitor: + full_name: "main.test.thing1" + assets_dir: "/Shared/provider-test/databricks_lakehouse_monitoring/main.test.thing1" + output_schema_name: "test" + inference_log: + granularities: ["1 day"] + timestamp_col: "timestamp" + prediction_col: "prediction" + model_id_col: "model_id" + problem_type: "PROBLEM_TYPE_REGRESSION" + + +targets: + development: + mode: development + resources: + lakehouse_monitor: + my_lakehouse_monitor: + full_name: "main.test.dev" + staging: + resources: + lakehouse_monitor: + my_lakehouse_monitor: + full_name: "main.test.staging" + production: + resources: + lakehouse_monitor: + my_lakehouse_monitor: + full_name: "main.test.prod" diff --git a/bundle/tests/lakehouse_monitor_test.go b/bundle/tests/lakehouse_monitor_test.go new file mode 100644 index 0000000000..aadb2aa287 --- /dev/null +++ b/bundle/tests/lakehouse_monitor_test.go @@ -0,0 +1,48 @@ +package config_tests + +import ( + "testing" + + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" +) + +func assertExpectedLakehouseMonitor(t *testing.T, p *resources.LakehouseMonitor) { + assert.Equal(t, "test", p.OutputSchemaName) + assert.Equal(t, "/Shared/provider-test/databricks_lakehouse_monitoring/main.test.thing1", p.AssetsDir) + assert.Equal(t, "model_id", p.InferenceLog.ModelIdCol) + assert.Equal(t, "prediction", p.InferenceLog.PredictionCol) + assert.Equal(t, catalog.MonitorInferenceLogProfileTypeProblemType("PROBLEM_TYPE_REGRESSION"), p.InferenceLog.ProblemType) + assert.Equal(t, "timestamp", p.InferenceLog.TimestampCol) + +} + +func TestLakehouseMonitorDevelepment(t *testing.T) { + b := loadTarget(t, "./lakehouse_monitor", "development") + assert.Len(t, b.Config.Resources.LakehouseMonitor, 1) + assert.Equal(t, b.Config.Bundle.Mode, config.Development) + + p := b.Config.Resources.LakehouseMonitor["my_lakehouse_monitor"] + assert.Equal(t, "main.test.dev", p.FullName) + assertExpectedLakehouseMonitor(t, p) +} + +func TestLakehouseMonitorStaging(t *testing.T) { + b := loadTarget(t, "./lakehouse_monitor", "staging") + assert.Len(t, b.Config.Resources.LakehouseMonitor, 1) + + p := b.Config.Resources.LakehouseMonitor["my_lakehouse_monitor"] + assert.Equal(t, "main.test.staging", p.FullName) + assertExpectedLakehouseMonitor(t, p) +} + +func TestLakehouseMonitorProduction(t *testing.T) { + b := loadTarget(t, "./lakehouse_monitor", "production") + assert.Len(t, b.Config.Resources.LakehouseMonitor, 1) + + p := b.Config.Resources.LakehouseMonitor["my_lakehouse_monitor"] + assert.Equal(t, "main.test.prod", p.FullName) + assertExpectedLakehouseMonitor(t, p) +} diff --git a/libs/dyn/convert/normalize.go b/libs/dyn/convert/normalize.go index d6539be952..45d91eba3e 100644 --- a/libs/dyn/convert/normalize.go +++ b/libs/dyn/convert/normalize.go @@ -24,7 +24,7 @@ type normalizeOptions struct { includeMissingFields bool } -func Normalize(dst any, src dyn.Value, opts ...NormalizeOption) (dyn.Value, diag.Diagnostics) { +func Normalize(dst any, src dyn.Value, field_substitutions map[string]string, opts ...NormalizeOption) (dyn.Value, diag.Diagnostics) { var n normalizeOptions for _, opt := range opts { switch opt { @@ -33,21 +33,21 @@ func Normalize(dst any, src dyn.Value, opts ...NormalizeOption) (dyn.Value, diag } } - return n.normalizeType(reflect.TypeOf(dst), src, []reflect.Type{}) + return n.normalizeType(reflect.TypeOf(dst), src, []reflect.Type{}, field_substitutions) } -func (n normalizeOptions) normalizeType(typ reflect.Type, src dyn.Value, seen []reflect.Type) (dyn.Value, diag.Diagnostics) { +func (n normalizeOptions) normalizeType(typ reflect.Type, src dyn.Value, seen []reflect.Type, field_substitutions map[string]string) (dyn.Value, diag.Diagnostics) { for typ.Kind() == reflect.Pointer { typ = typ.Elem() } switch typ.Kind() { case reflect.Struct: - return n.normalizeStruct(typ, src, append(seen, typ)) + return n.normalizeStruct(typ, src, append(seen, typ), field_substitutions) case reflect.Map: - return n.normalizeMap(typ, src, append(seen, typ)) + return n.normalizeMap(typ, src, append(seen, typ), field_substitutions) case reflect.Slice: - return n.normalizeSlice(typ, src, append(seen, typ)) + return n.normalizeSlice(typ, src, append(seen, typ), field_substitutions) case reflect.String: return n.normalizeString(typ, src) case reflect.Bool: @@ -69,7 +69,7 @@ func typeMismatch(expected dyn.Kind, src dyn.Value) diag.Diagnostic { } } -func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen []reflect.Type) (dyn.Value, diag.Diagnostics) { +func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen []reflect.Type, field_substitutions map[string]string) (dyn.Value, diag.Diagnostics) { var diags diag.Diagnostics switch src.Kind() { @@ -77,6 +77,10 @@ func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen out := make(map[string]dyn.Value) info := getStructInfo(typ) for k, v := range src.MustMap() { + value, exists := field_substitutions[k] + if exists { + k = value + } index, ok := info.Fields[k] if !ok { diags = diags.Append(diag.Diagnostic{ @@ -88,7 +92,7 @@ func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen } // Normalize the value according to the field type. - v, err := n.normalizeType(typ.FieldByIndex(index).Type, v, seen) + v, err := n.normalizeType(typ.FieldByIndex(index).Type, v, seen, field_substitutions) if err != nil { diags = diags.Extend(err) // Skip the element if it cannot be normalized. @@ -96,7 +100,6 @@ func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen continue } } - out[k] = v } @@ -126,17 +129,17 @@ func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen var v dyn.Value switch ftyp.Kind() { case reflect.Struct, reflect.Map: - v, _ = n.normalizeType(ftyp, dyn.V(map[string]dyn.Value{}), seen) + v, _ = n.normalizeType(ftyp, dyn.V(map[string]dyn.Value{}), seen, field_substitutions) case reflect.Slice: - v, _ = n.normalizeType(ftyp, dyn.V([]dyn.Value{}), seen) + v, _ = n.normalizeType(ftyp, dyn.V([]dyn.Value{}), seen, field_substitutions) case reflect.String: - v, _ = n.normalizeType(ftyp, dyn.V(""), seen) + v, _ = n.normalizeType(ftyp, dyn.V(""), seen, field_substitutions) case reflect.Bool: - v, _ = n.normalizeType(ftyp, dyn.V(false), seen) + v, _ = n.normalizeType(ftyp, dyn.V(false), seen, field_substitutions) case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - v, _ = n.normalizeType(ftyp, dyn.V(int64(0)), seen) + v, _ = n.normalizeType(ftyp, dyn.V(int64(0)), seen, field_substitutions) case reflect.Float32, reflect.Float64: - v, _ = n.normalizeType(ftyp, dyn.V(float64(0)), seen) + v, _ = n.normalizeType(ftyp, dyn.V(float64(0)), seen, field_substitutions) default: // Skip fields for which we do not have a natural [dyn.Value] equivalent. // For example, we don't handle reflect.Complex* and reflect.Uint* types. @@ -146,7 +149,7 @@ func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen out[k] = v } } - + print("HELLLOOOOOO") return dyn.NewValue(out, src.Location()), diags case dyn.KindNil: return src, diags @@ -155,7 +158,7 @@ func (n normalizeOptions) normalizeStruct(typ reflect.Type, src dyn.Value, seen return dyn.InvalidValue, diags.Append(typeMismatch(dyn.KindMap, src)) } -func (n normalizeOptions) normalizeMap(typ reflect.Type, src dyn.Value, seen []reflect.Type) (dyn.Value, diag.Diagnostics) { +func (n normalizeOptions) normalizeMap(typ reflect.Type, src dyn.Value, seen []reflect.Type, field_substitutions map[string]string) (dyn.Value, diag.Diagnostics) { var diags diag.Diagnostics switch src.Kind() { @@ -163,7 +166,7 @@ func (n normalizeOptions) normalizeMap(typ reflect.Type, src dyn.Value, seen []r out := make(map[string]dyn.Value) for k, v := range src.MustMap() { // Normalize the value according to the map element type. - v, err := n.normalizeType(typ.Elem(), v, seen) + v, err := n.normalizeType(typ.Elem(), v, seen, field_substitutions) if err != nil { diags = diags.Extend(err) // Skip the element if it cannot be normalized. @@ -183,7 +186,7 @@ func (n normalizeOptions) normalizeMap(typ reflect.Type, src dyn.Value, seen []r return dyn.InvalidValue, diags.Append(typeMismatch(dyn.KindMap, src)) } -func (n normalizeOptions) normalizeSlice(typ reflect.Type, src dyn.Value, seen []reflect.Type) (dyn.Value, diag.Diagnostics) { +func (n normalizeOptions) normalizeSlice(typ reflect.Type, src dyn.Value, seen []reflect.Type, field_substitutions map[string]string) (dyn.Value, diag.Diagnostics) { var diags diag.Diagnostics switch src.Kind() { @@ -191,7 +194,7 @@ func (n normalizeOptions) normalizeSlice(typ reflect.Type, src dyn.Value, seen [ out := make([]dyn.Value, 0, len(src.MustSequence())) for _, v := range src.MustSequence() { // Normalize the value according to the slice element type. - v, err := n.normalizeType(typ.Elem(), v, seen) + v, err := n.normalizeType(typ.Elem(), v, seen, field_substitutions) if err != nil { diags = diags.Extend(err) // Skip the element if it cannot be normalized. diff --git a/libs/dyn/convert/struct_info.go b/libs/dyn/convert/struct_info.go index dc3ed4da40..dbd101a0a2 100644 --- a/libs/dyn/convert/struct_info.go +++ b/libs/dyn/convert/struct_info.go @@ -2,6 +2,7 @@ package convert import ( "reflect" + "regexp" "strings" "sync" @@ -43,6 +44,9 @@ func getStructInfo(typ reflect.Type) structInfo { // buildStructInfo populates a new [structInfo] for the given type. func buildStructInfo(typ reflect.Type) structInfo { + var matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)") + var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])") + var out = structInfo{ Fields: make(map[string][]int), } @@ -84,9 +88,15 @@ func buildStructInfo(typ reflect.Type) structInfo { } name, _, _ := strings.Cut(sf.Tag.Get("json"), ",") - if name == "" || name == "-" { + if name == "" { continue } + if name == "-" { + // convert Name to snake case + snake := matchFirstCap.ReplaceAllString(sf.Name, "${1}_${2}") + snake = matchAllCap.ReplaceAllString(snake, "${1}_${2}") + name = strings.ToLower(snake) + } // Top level fields always take precedence. // Therefore, if it is already set, we ignore it.