Skip to content

Commit

Permalink
[Exporter] Improve exporting of databricks_pipeline resources
Browse files Browse the repository at this point in the history
Changes include:

- Use `List` + iterator instead of waiting for full list - improves performance in big
  workspaces with a lot of DLT pipelines
- Better handling of pipelines deployed via DABs - fix error that lead to emitting of
  notebooks even for DLT pipelines deployed with DABs.
- Emit `databricks_schema` for pipelines with direct publishing mode enabled.
  • Loading branch information
alexott committed Oct 23, 2024
1 parent fa3c3de commit d0b29ad
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
8 changes: 4 additions & 4 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ var meAdminFixture = qa.HTTPFixture{
var emptyPipelines = qa.HTTPFixture{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{},
}

Expand Down Expand Up @@ -2021,7 +2021,7 @@ func TestImportingDLTPipelines(t *testing.T) {
emptyIpAccessLIst,
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down Expand Up @@ -2236,7 +2236,7 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) {
userReadFixture,
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down Expand Up @@ -2601,7 +2601,7 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down
51 changes: 33 additions & 18 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,23 +2002,22 @@ var resourcesMap map[string]importable = map[string]importable{
return name + "_" + d.Id()
},
List: func(ic *importContext) error {
w, err := ic.Client.WorkspaceClient()
if err != nil {
return err
}
pipelinesList, err := w.Pipelines.ListPipelinesAll(ic.Context, pipelines.ListPipelinesRequest{
MaxResults: 50,
it := ic.workspaceClient.Pipelines.ListPipelines(ic.Context, pipelines.ListPipelinesRequest{
MaxResults: 100,
})
if err != nil {
return err
}
for i, q := range pipelinesList {
i := 0
for it.HasNext(ic.Context) {
q, err := it.Next(ic.Context)
if err != nil {
return err
}
i++
if !ic.MatchesName(q.Name) {
continue
}
var modifiedAt int64
if ic.incremental {
pipeline, err := w.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
pipeline, err := ic.workspaceClient.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
PipelineId: q.PipelineId,
})
if err != nil {
Expand All @@ -2030,21 +2029,37 @@ var resourcesMap map[string]importable = map[string]importable{
Resource: "databricks_pipeline",
ID: q.PipelineId,
}, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name))
log.Printf("[INFO] Imported %d of %d DLT Pipelines", i+1, len(pipelinesList))
if i%100 == 0 {
log.Printf("[INFO] Imported %d DLT Pipelines", i)
}
}
log.Printf("[INFO] Listed %d DLT pipelines", i)
return nil
},
Import: func(ic *importContext, r *resource) error {
var pipeline tfpipelines.Pipeline
s := ic.Resources["databricks_pipeline"].Schema
common.DataToStructPointer(r.Data, s, &pipeline)
if pipeline.Catalog != "" && pipeline.Target != "" {
ic.Emit(&resource{
Resource: "databricks_schema",
ID: pipeline.Catalog + "." + pipeline.Target,
})
if pipeline.Deployment != nil && pipeline.Deployment.Kind == "BUNDLE" {
log.Printf("[INFO] Skipping processing of DLT Pipeline with ID %s (%s) as deployed with DABs",
r.ID, pipeline.Name)
return nil
}
if pipeline.Catalog != "" {
var schemaName string
if pipeline.Target != "" {
schemaName = pipeline.Target
} else if pipeline.Schema != "" {
schemaName = pipeline.Schema
}
if schemaName != "" {
ic.Emit(&resource{
Resource: "databricks_schema",
ID: pipeline.Catalog + "." + pipeline.Target,
})
}
}
if pipeline.Deployment == nil || pipeline.Deployment.Kind == "BUNDLE" {
if pipeline.Deployment == nil || pipeline.Deployment.Kind != "BUNDLE" {
for _, lib := range pipeline.Libraries {
if lib.Notebook != nil {
ic.emitNotebookOrRepo(lib.Notebook.Path)
Expand Down
2 changes: 1 addition & 1 deletion exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func TestIncrementalListDLT(t *testing.T) {
qa.HTTPFixturesApply(t, []qa.HTTPFixture{
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Resource: "/api/2.0/pipelines?max_results=100",
Response: pipelines.ListPipelinesResponse{
Statuses: []pipelines.PipelineStateInfo{
{
Expand Down

0 comments on commit d0b29ad

Please sign in to comment.