Skip to content

Commit

Permalink
Rename lakehouse monitor to quality monitor (#3584)
Browse files Browse the repository at this point in the history
Deprecate resource instead of breaking rename

fix

Apply suggestions from code review

Co-authored-by: vuong-nguyen <44292934+nkvuong@users.noreply.github.com>
  • Loading branch information
arpitjasa-db and nkvuong authored May 23, 2024
1 parent 2a0ea25 commit f2ecb5c
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 146 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
| [databricks_permissions](docs/resources/permissions.md)
| [databricks_pipeline](docs/resources/pipeline.md)
| [databricks_pipelines](docs/data-sources/pipelines.md) data
| [databricks_quality_monitor](docs/resources/quality_monitor.md)
| [databricks_repo](docs/resources/repo.md)
| [databricks_schema](docs/resources/schema.md)
| [databricks_schemas](docs/data-sources/schema.md) data
Expand Down
115 changes: 3 additions & 112 deletions catalog/resource_lakehouse_monitor.go
Original file line number Diff line number Diff line change
@@ -1,120 +1,11 @@
package catalog

import (
"context"
"fmt"
"time"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

const lakehouseMonitorDefaultProvisionTimeout = 15 * time.Minute

func WaitForMonitor(w *databricks.WorkspaceClient, ctx context.Context, monitorName string) error {
return retry.RetryContext(ctx, lakehouseMonitorDefaultProvisionTimeout, func() *retry.RetryError {
endpoint, err := w.QualityMonitors.GetByTableName(ctx, monitorName)
if err != nil {
return retry.NonRetryableError(err)
}

switch endpoint.Status {
case catalog.MonitorInfoStatusMonitorStatusActive:
return nil
case catalog.MonitorInfoStatusMonitorStatusError, catalog.MonitorInfoStatusMonitorStatusFailed:
return retry.NonRetryableError(fmt.Errorf("monitor status retrund %s for monitor: %s", endpoint.Status, monitorName))
}
return retry.RetryableError(fmt.Errorf("monitor %s is still pending", monitorName))
})
}

func ResourceLakehouseMonitor() common.Resource {
monitorSchema := common.StructToSchema(
catalog.MonitorInfo{},
func(m map[string]*schema.Schema) map[string]*schema.Schema {
common.CustomizeSchemaPath(m, "assets_dir").SetRequired()
common.CustomizeSchemaPath(m, "output_schema_name").SetRequired()
common.CustomizeSchemaPath(m, "table_name").SetRequired()
common.CustomizeSchemaPath(m).AddNewField("skip_builtin_dashboard", &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Required: false,
})
common.CustomizeSchemaPath(m).AddNewField("warehouse_id", &schema.Schema{
Type: schema.TypeString,
Optional: true,
Required: false,
})
common.CustomizeSchemaPath(m, "monitor_version").SetReadOnly()
common.CustomizeSchemaPath(m, "drift_metrics_table_name").SetReadOnly()
common.CustomizeSchemaPath(m, "profile_metrics_table_name").SetReadOnly()
common.CustomizeSchemaPath(m, "status").SetReadOnly()
common.CustomizeSchemaPath(m, "dashboard_id").SetReadOnly()
return m
},
)

return common.Resource{
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}

var create catalog.CreateMonitor
common.DataToStructPointer(d, monitorSchema, &create)
create.TableName = d.Get("table_name").(string)

endpoint, err := w.QualityMonitors.Create(ctx, create)
if err != nil {
return err
}
err = WaitForMonitor(w, ctx, create.TableName)
if err != nil {
return err
}
d.SetId(endpoint.TableName)
return nil
},
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}
endpoint, err := w.QualityMonitors.GetByTableName(ctx, d.Id())
if err != nil {
return err

}
return common.StructToData(endpoint, monitorSchema, d)
},
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}
var update catalog.UpdateMonitor
common.DataToStructPointer(d, monitorSchema, &update)
update.TableName = d.Get("table_name").(string)
_, err = w.QualityMonitors.Update(ctx, update)
if err != nil {
return err
}
return WaitForMonitor(w, ctx, update.TableName)
},
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}
return w.QualityMonitors.DeleteByTableName(ctx, d.Id())
},
Schema: monitorSchema,
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(lakehouseMonitorDefaultProvisionTimeout),
},
}
r := ResourceQualityMonitor()
r.DeprecationMessage = "Use `databricks_quality_monitor` instead."
return r
}
4 changes: 2 additions & 2 deletions catalog/resource_online_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
const onlineTableDefaultProvisionTimeout = 45 * time.Minute

func waitForOnlineTableCreation(w *databricks.WorkspaceClient, ctx context.Context, onlineTableName string) error {
return retry.RetryContext(ctx, lakehouseMonitorDefaultProvisionTimeout, func() *retry.RetryError {
return retry.RetryContext(ctx, onlineTableDefaultProvisionTimeout, func() *retry.RetryError {
endpoint, err := w.OnlineTables.GetByName(ctx, onlineTableName)
if err != nil {
return retry.NonRetryableError(err)
Expand All @@ -40,7 +40,7 @@ func waitForOnlineTableCreation(w *databricks.WorkspaceClient, ctx context.Conte
}

func waitForOnlineTableDeletion(w *databricks.WorkspaceClient, ctx context.Context, onlineTableName string) error {
return retry.RetryContext(ctx, lakehouseMonitorDefaultProvisionTimeout, func() *retry.RetryError {
return retry.RetryContext(ctx, onlineTableDefaultProvisionTimeout, func() *retry.RetryError {
_, err := w.OnlineTables.GetByName(ctx, onlineTableName)
if err == nil {
return retry.RetryableError(fmt.Errorf("online table %s is still not deleted", onlineTableName))
Expand Down
120 changes: 120 additions & 0 deletions catalog/resource_quality_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package catalog

import (
"context"
"fmt"
"time"

"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

const qualityMonitorDefaultProvisionTimeout = 15 * time.Minute

func WaitForMonitor(w *databricks.WorkspaceClient, ctx context.Context, monitorName string) error {
return retry.RetryContext(ctx, qualityMonitorDefaultProvisionTimeout, func() *retry.RetryError {
endpoint, err := w.QualityMonitors.GetByTableName(ctx, monitorName)
if err != nil {
return retry.NonRetryableError(err)
}

switch endpoint.Status {
case catalog.MonitorInfoStatusMonitorStatusActive:
return nil
case catalog.MonitorInfoStatusMonitorStatusError, catalog.MonitorInfoStatusMonitorStatusFailed:
return retry.NonRetryableError(fmt.Errorf("monitor status retrund %s for monitor: %s", endpoint.Status, monitorName))
}
return retry.RetryableError(fmt.Errorf("monitor %s is still pending", monitorName))
})
}

func ResourceQualityMonitor() common.Resource {
monitorSchema := common.StructToSchema(
catalog.MonitorInfo{},
func(m map[string]*schema.Schema) map[string]*schema.Schema {
common.CustomizeSchemaPath(m, "assets_dir").SetRequired()
common.CustomizeSchemaPath(m, "output_schema_name").SetRequired()
common.CustomizeSchemaPath(m, "table_name").SetRequired()
common.CustomizeSchemaPath(m).AddNewField("skip_builtin_dashboard", &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Required: false,
})
common.CustomizeSchemaPath(m).AddNewField("warehouse_id", &schema.Schema{
Type: schema.TypeString,
Optional: true,
Required: false,
})
common.CustomizeSchemaPath(m, "monitor_version").SetReadOnly()
common.CustomizeSchemaPath(m, "drift_metrics_table_name").SetReadOnly()
common.CustomizeSchemaPath(m, "profile_metrics_table_name").SetReadOnly()
common.CustomizeSchemaPath(m, "status").SetReadOnly()
common.CustomizeSchemaPath(m, "dashboard_id").SetReadOnly()
return m
},
)

return common.Resource{
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}

var create catalog.CreateMonitor
common.DataToStructPointer(d, monitorSchema, &create)
create.TableName = d.Get("table_name").(string)

endpoint, err := w.QualityMonitors.Create(ctx, create)
if err != nil {
return err
}
err = WaitForMonitor(w, ctx, create.TableName)
if err != nil {
return err
}
d.SetId(endpoint.TableName)
return nil
},
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}
endpoint, err := w.QualityMonitors.GetByTableName(ctx, d.Id())
if err != nil {
return err

}
return common.StructToData(endpoint, monitorSchema, d)
},
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}
var update catalog.UpdateMonitor
common.DataToStructPointer(d, monitorSchema, &update)
update.TableName = d.Get("table_name").(string)
_, err = w.QualityMonitors.Update(ctx, update)
if err != nil {
return err
}
return WaitForMonitor(w, ctx, update.TableName)
},
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
w, err := c.WorkspaceClient()
if err != nil {
return err
}
return w.QualityMonitors.DeleteByTableName(ctx, d.Id())
},
Schema: monitorSchema,
Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(qualityMonitorDefaultProvisionTimeout),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/stretchr/testify/mock"
)

func TestLakehouseMonitorCornerCases(t *testing.T) {
qa.ResourceCornerCases(t, ResourceLakehouseMonitor())
func TestQualityMonitorCornerCases(t *testing.T) {
qa.ResourceCornerCases(t, ResourceQualityMonitor())
}

func TestLakehouseMonitorCreateTimeseries(t *testing.T) {
func TestQualityMonitorCreateTimeseries(t *testing.T) {
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockQualityMonitorsAPI().EXPECT()
Expand All @@ -40,7 +40,7 @@ func TestLakehouseMonitorCreateTimeseries(t *testing.T) {
DriftMetricsTableName: "test_table_drift",
}, nil)
},
Resource: ResourceLakehouseMonitor(),
Resource: ResourceQualityMonitor(),
HCL: `
table_name = "test_table",
assets_dir = "sample.dir",
Expand All @@ -54,7 +54,7 @@ func TestLakehouseMonitorCreateTimeseries(t *testing.T) {
}.ApplyNoError(t)
}

func TestLakehouseMonitorCreateInference(t *testing.T) {
func TestQualityMonitorCreateInference(t *testing.T) {
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockQualityMonitorsAPI().EXPECT()
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestLakehouseMonitorCreateInference(t *testing.T) {
},
}, nil)
},
Resource: ResourceLakehouseMonitor(),
Resource: ResourceQualityMonitor(),
HCL: `
table_name = "test_table",
assets_dir = "sample.dir",
Expand All @@ -106,7 +106,7 @@ func TestLakehouseMonitorCreateInference(t *testing.T) {
}.ApplyNoError(t)
}

func TestLakehouseMonitorCreateSnapshot(t *testing.T) {
func TestQualityMonitorCreateSnapshot(t *testing.T) {
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockQualityMonitorsAPI().EXPECT()
Expand All @@ -129,7 +129,7 @@ func TestLakehouseMonitorCreateSnapshot(t *testing.T) {
Snapshot: &catalog.MonitorSnapshot{},
}, nil)
},
Resource: ResourceLakehouseMonitor(),
Resource: ResourceQualityMonitor(),
HCL: `
table_name = "test_table",
assets_dir = "sample.dir",
Expand All @@ -140,7 +140,7 @@ func TestLakehouseMonitorCreateSnapshot(t *testing.T) {
}.ApplyNoError(t)
}

func TestLakehouseMonitorGet(t *testing.T) {
func TestQualityMonitorGet(t *testing.T) {
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockQualityMonitorsAPI().EXPECT()
Expand All @@ -157,13 +157,13 @@ func TestLakehouseMonitorGet(t *testing.T) {
},
DriftMetricsTableName: "test_table_drift"}, nil)
},
Resource: ResourceLakehouseMonitor(),
Resource: ResourceQualityMonitor(),
Read: true,
ID: "test_table",
}.ApplyNoError(t)
}

func TestLakehouseMonitorUpdate(t *testing.T) {
func TestQualityMonitorUpdate(t *testing.T) {
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockQualityMonitorsAPI().EXPECT()
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestLakehouseMonitorUpdate(t *testing.T) {
DriftMetricsTableName: "test_table_drift",
}, nil)
},
Resource: ResourceLakehouseMonitor(),
Resource: ResourceQualityMonitor(),
Update: true,
ID: "test_table",
InstanceState: map[string]string{
Expand All @@ -225,13 +225,13 @@ func TestLakehouseMonitorUpdate(t *testing.T) {
}.ApplyNoError(t)
}

func TestLakehouseMonitorDelete(t *testing.T) {
func TestQualityMonitorDelete(t *testing.T) {
qa.ResourceFixture{
MockWorkspaceClientFunc: func(w *mocks.MockWorkspaceClient) {
e := w.GetMockQualityMonitorsAPI().EXPECT()
e.DeleteByTableName(mock.Anything, "test_table").Return(nil)
},
Resource: ResourceLakehouseMonitor(),
Resource: ResourceQualityMonitor(),
Delete: true,
ID: "test_table",
}.ApplyNoError(t)
Expand Down
2 changes: 2 additions & 0 deletions docs/resources/lakehouse_monitor.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ subcategory: "Unity Catalog"
---
# databricks_lakehouse_monitor Resource

NOTE: This resource has been deprecated and will be removed soon. Please use the [databricks_quality_monitor resource](./quality_monitor.md) instead.

This resource allows you to manage [Lakehouse Monitors](https://docs.databricks.com/en/lakehouse-monitoring/index.html) in Databricks.

A `databricks_lakehouse_monitor` is attached to a [databricks_sql_table](sql_table.md) and can be of type timeseries, snapshot or inference.
Expand Down
Loading

0 comments on commit f2ecb5c

Please sign in to comment.