Skip to content

Commit

Permalink
Add Lakehouse Monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
aravind-segu committed Mar 24, 2024
1 parent 2453cd4 commit 6d15b15
Show file tree
Hide file tree
Showing 22 changed files with 389 additions and 31 deletions.
3 changes: 3 additions & 0 deletions bundle/config/mutator/process_target_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
RegisteredModels: map[string]*resources.RegisteredModel{
"registeredmodel1": {CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{Name: "registeredmodel1"}},
},
LakehouseMonitor: map[string]*resources.LakehouseMonitor{
"lakehouseMonitor1": {CreateMonitor: &catalog.CreateMonitor{FullName: "lakehouseMonitor1"}},
},
},
},
// Use AWS implementation for testing.
Expand Down
4 changes: 2 additions & 2 deletions bundle/config/mutator/resolve_variable_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *resolveVariableReferences) Apply(ctx context.Context, b *bundle.Bundle)
// We can ignore the diagnostics return valuebecause we know that the dynamic value
// has already been normalized when it was first loaded from the configuration file.
//
normalized, _ := convert.Normalize(b.Config, root, convert.IncludeMissingFields)
normalized, _ := convert.Normalize(b.Config, root, make(map[string]string), convert.IncludeMissingFields)
lookup := func(path dyn.Path) (dyn.Value, error) {
// Future opportunity: if we lookup this path in both the given root
// and the synthesized root, we know if it was explicitly set or implied to be empty.
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *resolveVariableReferences) Apply(ctx context.Context, b *bundle.Bundle)

// Normalize the result because variable resolution may have been applied to non-string fields.
// For example, a variable reference may have been resolved to a integer.
root, diags := convert.Normalize(b.Config, root)
root, diags := convert.Normalize(b.Config, root, make(map[string]string))
for _, diag := range diags {
// This occurs when a variable's resolved value is incompatible with the field's type.
// Log a warning until we have a better way to surface these diagnostics to the user.
Expand Down
14 changes: 14 additions & 0 deletions bundle/config/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Resources struct {
Experiments map[string]*resources.MlflowExperiment `json:"experiments,omitempty"`
ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"`
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
LakehouseMonitor map[string]*resources.LakehouseMonitor `json:"lakehouse_monitor,omitempty"`
}

type UniqueResourceIdTracker struct {
Expand Down Expand Up @@ -123,6 +124,19 @@ func (r *Resources) VerifyUniqueResourceIdentifiers() (*UniqueResourceIdTracker,
tracker.Type[k] = "registered_model"
tracker.ConfigPath[k] = r.RegisteredModels[k].ConfigFilePath
}
for k := range r.LakehouseMonitor {
if _, ok := tracker.Type[k]; ok {
return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
tracker.Type[k],
tracker.ConfigPath[k],
"lakehouse_monitor",
r.LakehouseMonitor[k].ConfigFilePath,
)
}
tracker.Type[k] = "lakehouse_monitor"
tracker.ConfigPath[k] = r.LakehouseMonitor[k].ConfigFilePath
}
return tracker, nil
}

Expand Down
23 changes: 23 additions & 0 deletions bundle/config/resources/lakehouse_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package resources

import (
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/databricks-sdk-go/service/catalog"
)

type LakehouseMonitor struct {
// Represents the Input Arguments for Terraform and will get
// converted to a HCL representation for CRUD
*catalog.CreateMonitor

// This represents the id which is the full name of the monitor
// (catalog_name.schema_name.table_name) that can be used
// as a reference in other resources. This value is returned by terraform.
ID string `json:"id,omitempty" bundle:"readonly"`

// Path to config file where the resource is defined. All bundle resources
// include this for interpolation purposes.
paths.Paths

ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
}
2 changes: 1 addition & 1 deletion bundle/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func Load(path string) (*Root, error) {
}

// Normalize dynamic configuration tree according to configuration type.
v, diags := convert.Normalize(r, v)
v, diags := convert.Normalize(r, v, make(map[string]string))

// Keep track of diagnostics (warnings and errors in the schema).
// We delay acting on diagnostics until we have loaded all
Expand Down
18 changes: 18 additions & 0 deletions bundle/deploy/terraform/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ func BundleToTerraform(config *config.Root) *schema.Root {
}
}

for k, src := range config.Resources.LakehouseMonitor {
noResources = false
var dst schema.ResourceLakehouseMonitor
conv(src, &dst)
dst.TableName = src.FullName
tfroot.Resource.LakehouseMonitor[k] = &dst
}

// We explicitly set "resource" to nil to omit it from a JSON encoding.
// This is required because the terraform CLI requires >= 1 resources defined
// if the "resource" property is used in a .tf.json file.
Expand Down Expand Up @@ -359,6 +367,16 @@ func TerraformToBundle(state *tfjson.State, config *config.Root) error {
modifiedStatus := convRemoteToLocal(tmp, &cur)
cur.ModifiedStatus = modifiedStatus
config.Resources.RegisteredModels[resource.Name] = cur
case "databricks_lakehouse_monitor":
var tmp schema.ResourceLakehouseMonitor
conv(resource.AttributeValues, &tmp)
if config.Resources.LakehouseMonitor == nil {
config.Resources.LakehouseMonitor = make(map[string]*resources.LakehouseMonitor)
}
cur := config.Resources.LakehouseMonitor[resource.Name]
modifiedStatus := convRemoteToLocal(tmp, &cur)
cur.ModifiedStatus = modifiedStatus
config.Resources.LakehouseMonitor[resource.Name] = cur
case "databricks_permissions":
case "databricks_grants":
// Ignore; no need to pull these back into the configuration.
Expand Down
27 changes: 27 additions & 0 deletions bundle/deploy/terraform/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
Name: "test_registered_model",
AttributeValues: map[string]interface{}{"id": "1"},
},
{
Type: "databricks_lakehouse_monitor",
Mode: "managed",
Name: "test_lakehouse_monitor",
AttributeValues: map[string]interface{}{"id": "1"},
},
},
},
},
Expand All @@ -573,6 +579,8 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.RegisteredModels["test_registered_model"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.RegisteredModels["test_registered_model"].ModifiedStatus)

assert.Equal(t, "1", config.Resources.RegisteredModels["test_lakehouse_monitor"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.RegisteredModels["test_lakehouse_monitor"].ModifiedStatus)
AssertFullResourceCoverage(t, &config)
}

Expand Down Expand Up @@ -621,6 +629,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
},
},
},
LakehouseMonitor: map[string]*resources.LakehouseMonitor{
"test_lakehouse_monitor": {
CreateMonitor: &catalog.CreateMonitor{
FullName: "test_lakehouse_monitor",
},
},
},
},
}
var tfState = tfjson.State{
Expand Down Expand Up @@ -725,6 +740,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
},
},
},
LakehouseMonitor: map[string]*resources.LakehouseMonitor{
"test_lakehouse_monitor": {
CreateMonitor: &catalog.CreateMonitor{
FullName: "test_lakehouse_monitor",
},
},
"test_lakehouse_monitor_new": {
CreateMonitor: &catalog.CreateMonitor{
FullName: "test_lakehouse_monitor_new",
},
},
},
},
}
var tfState = tfjson.State{
Expand Down
2 changes: 2 additions & 0 deletions bundle/deploy/terraform/interpolate.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) error
path = dyn.NewPath(dyn.Key("databricks_model_serving")).Append(path[2:]...)
case dyn.Key("registered_models"):
path = dyn.NewPath(dyn.Key("databricks_registered_model")).Append(path[2:]...)
case dyn.Key("lakehouse_monitor"):
path = dyn.NewPath(dyn.Key("databricks_lakehouse_monitor")).Append(path[2:]...)
default:
// Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path)
Expand Down
2 changes: 1 addition & 1 deletion bundle/deploy/terraform/tfdyn/convert_experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func convertExperimentResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourceMlflowExperiment{}, vin)
vout, diags := convert.Normalize(schema.ResourceMlflowExperiment{}, vin, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "experiment normalization diagnostic: %s", diag.Summary)
}
Expand Down
4 changes: 2 additions & 2 deletions bundle/deploy/terraform/tfdyn/convert_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the input value to the underlying job schema.
// This removes superfluous keys and adapts the input to the expected schema.
vin, diags := convert.Normalize(jobs.JobSettings{}, vin)
vin, diags := convert.Normalize(jobs.JobSettings{}, vin, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "job normalization diagnostic: %s", diag.Summary)
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
}

// Normalize the output value to the target schema.
vout, diags = convert.Normalize(schema.ResourceJob{}, vout)
vout, diags = convert.Normalize(schema.ResourceJob{}, vout, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "job normalization diagnostic: %s", diag.Summary)
}
Expand Down
40 changes: 40 additions & 0 deletions bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package tfdyn

import (
"context"

"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
)

func convertLakehouseMonitorResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
var substitutions = make(map[string]string)
substitutions["full_name"] = "table_name"
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourceLakehouseMonitor{}, vin, substitutions)
for _, diag := range diags {
log.Debugf(ctx, "lakehouse monitor normalization diagnostic: %s", diag.Summary)
}

return vout, nil
}

type lakehouseMonitorConverter struct{}

func (lakehouseMonitorConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertLakehouseMonitorResource(ctx, vin)
if err != nil {
return err
}

// Add the converted resource to the output.
out.LakehouseMonitor[key] = vout.AsAny()

return nil
}

func init() {
registerConverter("lakehouse_monitor", lakehouseMonitorConverter{})
}
47 changes: 47 additions & 0 deletions bundle/deploy/terraform/tfdyn/convert_lakehouse_monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package tfdyn

import (
"context"
"testing"

"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestConvertLakehouseMonitor(t *testing.T) {
var src = resources.LakehouseMonitor{
CreateMonitor: &catalog.CreateMonitor{
FullName: "test_table_name",
AssetsDir: "assets_dir",
OutputSchemaName: "output_schema_name",
InferenceLog: &catalog.MonitorInferenceLogProfileType{
ModelIdCol: "model_id",
PredictionCol: "test_prediction_col",
ProblemType: "PROBLEM_TYPE_CLASSIFICATION",
},
},
}
vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)
ctx := context.Background()
out := schema.NewResources()
err = lakehouseMonitorConverter{}.Convert(ctx, "my_lakehouse_monitor", vin, out)

require.NoError(t, err)
print(out)
assert.Equal(t, map[string]any{
"assets_dir": "assets_dir",
"output_schema_name": "output_schema_name",
"table_name": "test_table_name",
"inference_log": map[string]any{
"model_id_col": "model_id",
"prediction_col": "test_prediction_col",
"problem_type": "PROBLEM_TYPE_CLASSIFICATION",
},
}, out.LakehouseMonitor["my_lakehouse_monitor"])
}
2 changes: 1 addition & 1 deletion bundle/deploy/terraform/tfdyn/convert_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func convertModelResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourceMlflowModel{}, vin)
vout, diags := convert.Normalize(schema.ResourceMlflowModel{}, vin, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "model normalization diagnostic: %s", diag.Summary)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func convertModelServingEndpointResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourceModelServing{}, vin)
vout, diags := convert.Normalize(schema.ResourceModelServing{}, vin, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "model serving endpoint normalization diagnostic: %s", diag.Summary)
}
Expand Down
2 changes: 1 addition & 1 deletion bundle/deploy/terraform/tfdyn/convert_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func convertPipelineResource(ctx context.Context, vin dyn.Value) (dyn.Value, err
}

// Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout)
vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "pipeline normalization diagnostic: %s", diag.Summary)
}
Expand Down
2 changes: 1 addition & 1 deletion bundle/deploy/terraform/tfdyn/convert_registered_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func convertRegisteredModelResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourceRegisteredModel{}, vin)
vout, diags := convert.Normalize(schema.ResourceRegisteredModel{}, vin, make(map[string]string))
for _, diag := range diags {
log.Debugf(ctx, "registered model normalization diagnostic: %s", diag.Summary)
}
Expand Down
Loading

0 comments on commit 6d15b15

Please sign in to comment.