Skip to content

Commit

Permalink
[Exporter][Breaking] Move databricks_workspace_file to a separate s…
Browse files Browse the repository at this point in the history
…ervice

Move `databricks_workspace_file` a separate service `wsfiles`, so we can list and export
them separately from notebooks.  If you used `notebooks` in `-listing` or `-services`
options, then you need to append `wsfiles` to these options.
  • Loading branch information
alexott committed Oct 17, 2024
1 parent e24c780 commit 2a1d70f
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 47 deletions.
3 changes: 2 additions & 1 deletion docs/guides/experimental-exporter.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Services are just logical groups of resources used for filtering and organizatio
* `mlflow-webhooks` - **listing** [databricks_mlflow_webhook](../resources/mlflow_webhook.md).
* `model-serving` - **listing** [databricks_model_serving](../resources/model_serving.md).
* `mounts` - **listing** works only in combination with `-mounts` command-line option.
* `notebooks` - **listing** [databricks_notebook](../resources/notebook.md) and [databricks_workspace_file](../resources/workspace_file.md).
* `notebooks` - **listing** [databricks_notebook](../resources/notebook.md).
* `policies` - **listing** [databricks_cluster_policy](../resources/cluster_policy).
* `pools` - **listing** [instance pools](../resources/instance_pool.md).
* `repos` - **listing** [databricks_repo](../resources/repo.md)
Expand Down Expand Up @@ -148,6 +148,7 @@ Services are just logical groups of resources used for filtering and organizatio
* `users` - [databricks_user](../resources/user.md) and [databricks_service_principal](../resources/service_principal.md) are written to their own file, simply because of their amount. If you use SCIM provisioning, migrating workspaces is the only use case for importing `users` service.
* `vector-search` - **listing** exports [databricks_vector_search_endpoint](../resources/vector_search_endpoint.md) and [databricks_vector_search_index](../resources/vector_search_index.md)
* `workspace` - **listing** [databricks_workspace_conf](../resources/workspace_conf.md) and [databricks_global_init_script](../resources/global_init_script.md)
* `wsfiles` - **listing** [databricks_workspace_file](../resources/workspace_file.md).

## Secrets

Expand Down
5 changes: 3 additions & 2 deletions exporter/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func (ic *importContext) allServicesAndListing() (string, string) {
listing[ir.Service] = struct{}{}
}
}
// We need this to specify default listings of UC objects...
for _, ir := range []string{"uc-schemas", "uc-models", "uc-tables", "uc-volumes"} {
// We need this to specify default listings of UC & Workspace objects...
for _, ir := range []string{"uc-schemas", "uc-models", "uc-tables", "uc-volumes",
"notebooks", "directories", "wsfiles"} {
listing[ir] = struct{}{}
}
return strings.Join(maps.Keys(services), ","), strings.Join(maps.Keys(listing), ",")
Expand Down
20 changes: 18 additions & 2 deletions exporter/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,30 @@ func (ic *importContext) Run() error {
ic.startImportChannels()

// Start listing of objects
listWorkspaceObjectsAlreadyRunning := false
for rnLoop, irLoop := range ic.Importables {
resourceName := rnLoop
ir := irLoop
// TODO: extend this to other services? Like, Git Folders
if !ic.accountLevel && (ir.Service == "notebooks" || ir.Service == "wsfiles" || (ir.Service == "directories" && !ic.incremental)) {
if _, exists := ic.listing[ir.Service]; exists && !listWorkspaceObjectsAlreadyRunning {
ic.waitGroup.Add(1)
log.Printf("[DEBUG] Starting listing of workspace objects")
go func() {
if err := listWorkspaceObjects(ic); err != nil {
log.Printf("[ERROR] listing of workspace objects failed %s", err)
}
log.Print("[DEBUG] Finished listing of workspace objects")
ic.waitGroup.Done()
}()
listWorkspaceObjectsAlreadyRunning = true
}
continue
}
if ir.List == nil {
continue
}
_, exists := ic.listing[ir.Service]
if !exists {
if _, exists := ic.listing[ir.Service]; !exists {
log.Printf("[DEBUG] %s (%s service) is not part of listing", resourceName, ir.Service)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,7 @@ func TestImportingDLTPipelines(t *testing.T) {
ic := newImportContext(client)
ic.Directory = tmpDir
ic.enableListing("dlt")
ic.enableServices("dlt,access,notebooks,users,repos,secrets")
ic.enableServices("dlt,access,notebooks,users,repos,secrets,wsfiles")

err := ic.Run()
assert.NoError(t, err)
Expand Down
27 changes: 1 addition & 26 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,6 @@ var resourcesMap map[string]importable = map[string]importable{
WorkspaceLevel: true,
Service: "notebooks",
Name: workspaceObjectResouceName,
List: listNotebooksAndWorkspaceFiles,
Import: func(ic *importContext, r *resource) error {
ic.emitUserOrServicePrincipalForPath(r.ID, "/Users")
notebooksAPI := workspace.NewNotebooksAPI(ic.Context, ic.Client)
Expand Down Expand Up @@ -1623,10 +1622,8 @@ var resourcesMap map[string]importable = map[string]importable{
},
"databricks_workspace_file": {
WorkspaceLevel: true,
Service: "notebooks",
Service: "wsfiles",
Name: workspaceObjectResouceName,
// We don't need list function for workspace files because it will be handled by the notebooks listing
// List: createListWorkspaceObjectsFunc(workspace.File, "databricks_workspace_file", "workspace_file"),
Import: func(ic *importContext, r *resource) error {
ic.emitUserOrServicePrincipalForPath(r.ID, "/Users")
notebooksAPI := workspace.NewNotebooksAPI(ic.Context, ic.Client)
Expand Down Expand Up @@ -2175,28 +2172,6 @@ var resourcesMap map[string]importable = map[string]importable{
}
return fmt.Errorf("can't find directory with object_id: %s", r.Value)
},
// TODO: think if we really need this, we need directories only for permissions,
// and only when they are different from parents & notebooks
List: func(ic *importContext) error {
if ic.incremental {
return nil
}
directoryList := ic.getAllDirectories()
for offset, directory := range directoryList {
if strings.HasPrefix(directory.Path, "/Repos") {
continue
}
if res := ignoreIdeFolderRegex.FindStringSubmatch(directory.Path); res != nil {
continue
}
ic.maybeEmitWorkspaceObject("databricks_directory", directory.Path, &directory)

if offset%50 == 0 {
log.Printf("[INFO] Scanned %d of %d directories", offset+1, len(directoryList))
}
}
return nil
},
Import: func(ic *importContext, r *resource) error {
ic.emitUserOrServicePrincipalForPath(r.ID, "/Users")
// Existing permissions API doesn't allow to set permissions for
Expand Down
16 changes: 10 additions & 6 deletions exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,8 @@ func TestNotebookGeneration(t *testing.T) {
},
}, "notebooks", false, func(ic *importContext) {
ic.notebooksFormat = "SOURCE"
err := resourcesMap["databricks_notebook"].List(ic)
ic.enableListing("notebooks")
err := listWorkspaceObjects(ic)
assert.NoError(t, err)
ic.waitGroup.Wait()
ic.closeImportChannels()
Expand Down Expand Up @@ -1127,7 +1128,8 @@ func TestNotebookGenerationJupyter(t *testing.T) {
},
}, "notebooks", false, func(ic *importContext) {
ic.notebooksFormat = "JUPYTER"
err := resourcesMap["databricks_notebook"].List(ic)
ic.enableListing("notebooks")
err := listWorkspaceObjects(ic)
assert.NoError(t, err)
ic.waitGroup.Wait()
ic.closeImportChannels()
Expand Down Expand Up @@ -1184,7 +1186,8 @@ func TestNotebookGenerationBadCharacters(t *testing.T) {
}, "notebooks,directories", true, func(ic *importContext) {
ic.notebooksFormat = "SOURCE"
ic.enableServices("notebooks")
err := resourcesMap["databricks_notebook"].List(ic)
ic.enableListing("notebooks")
err := listWorkspaceObjects(ic)
assert.NoError(t, err)
ic.waitGroup.Wait()
ic.closeImportChannels()
Expand Down Expand Up @@ -1231,7 +1234,8 @@ func TestDirectoryGeneration(t *testing.T) {
},
},
}, "directories", false, func(ic *importContext) {
err := resourcesMap["databricks_directory"].List(ic)
ic.enableListing("directories")
err := listWorkspaceObjects(ic)
assert.NoError(t, err)

ic.waitGroup.Wait()
Expand Down Expand Up @@ -1517,7 +1521,7 @@ func TestEmitSqlParent(t *testing.T) {

func TestEmitFilesFromSlice(t *testing.T) {
ic := importContextForTest()
ic.enableServices("storage,notebooks")
ic.enableServices("storage,notebooks,wsfiles")
ic.emitFilesFromSlice([]string{
"dbfs:/FileStore/test.txt",
"/Workspace/Shared/test.txt",
Expand All @@ -1530,7 +1534,7 @@ func TestEmitFilesFromSlice(t *testing.T) {

func TestEmitFilesFromMap(t *testing.T) {
ic := importContextForTest()
ic.enableServices("storage,notebooks")
ic.enableServices("storage,notebooks,wsfiles")
ic.emitFilesFromMap(map[string]string{
"k1": "dbfs:/FileStore/test.txt",
"k2": "/Workspace/Shared/test.txt",
Expand Down
2 changes: 0 additions & 2 deletions exporter/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,6 @@ func TestDirectoryIncrementalMode(t *testing.T) {
ic := importContextForTest()
ic.incremental = true

// test direct listing
assert.Nil(t, resourcesMap["databricks_directory"].List(ic))
// test emit during workspace listing
assert.True(t, ic.shouldSkipWorkspaceObject(workspace.ObjectStatus{ObjectType: workspace.Directory}, 111111))
}
25 changes: 18 additions & 7 deletions exporter/util_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (ic *importContext) shouldSkipWorkspaceObject(object workspace.ObjectStatus
}
if !(object.ObjectType == workspace.Notebook || object.ObjectType == workspace.File) ||
strings.HasPrefix(object.Path, "/Repos") {
// log.Printf("[DEBUG] Skipping unsupported entry %v", object)
log.Printf("[DEBUG] Skipping unsupported entry %v", object)
return true
}
if res := ignoreIdeFolderRegex.FindStringSubmatch(object.Path); res != nil {
Expand Down Expand Up @@ -236,7 +236,7 @@ func emitWorkpaceObject(ic *importContext, object workspace.ObjectStatus) {
}
}

func listNotebooksAndWorkspaceFiles(ic *importContext) error {
func listWorkspaceObjects(ic *importContext) error {
objectsChannel := make(chan workspace.ObjectStatus, defaultChannelSize)
numRoutines := 2 // TODO: make configurable? together with the channel size?
var processedObjects atomic.Uint64
Expand All @@ -257,10 +257,13 @@ func listNotebooksAndWorkspaceFiles(ic *importContext) error {
}
// There are two use cases - this function will handle listing, or it will receive listing
updatedSinceMs := ic.getUpdatedSinceMs()
isNotebooksListingEnabled := ic.isServiceInListing("notebooks")
isDirectoryListingEnabled := ic.isServiceInListing("directories")
isWsFilesListingEnabled := ic.isServiceInListing("wsfiles")
allObjects := ic.getAllWorkspaceObjects(func(objects []workspace.ObjectStatus) {
for _, object := range objects {
if object.ObjectType == workspace.Directory {
if !ic.incremental && object.Path != "/" && ic.isServiceInListing("directories") {
if !ic.incremental && object.Path != "/" && isDirectoryListingEnabled {
objectsChannel <- object
}
} else {
Expand All @@ -269,8 +272,14 @@ func listNotebooksAndWorkspaceFiles(ic *importContext) error {
}
object := object
switch object.ObjectType {
case workspace.Notebook, workspace.File:
objectsChannel <- object
case workspace.Notebook:
if isNotebooksListingEnabled {
objectsChannel <- object
}
case workspace.File:
if isWsFilesListingEnabled {
objectsChannel <- object
}
default:
log.Printf("[WARN] unknown type %s for path %s", object.ObjectType, object.Path)
}
Expand All @@ -285,9 +294,11 @@ func listNotebooksAndWorkspaceFiles(ic *importContext) error {
if ic.shouldSkipWorkspaceObject(object, updatedSinceMs) {
continue
}
if object.ObjectType == workspace.Directory && !ic.incremental && ic.isServiceInListing("directories") && object.Path != "/" {
if !ic.incremental && isDirectoryListingEnabled && object.ObjectType == workspace.Directory && object.Path != "/" {
emitWorkpaceObject(ic, object)
} else if isNotebooksListingEnabled && object.ObjectType == workspace.Notebook {
emitWorkpaceObject(ic, object)
} else if (object.ObjectType == workspace.Notebook || object.ObjectType == workspace.File) && ic.isServiceInListing("notebooks") {
} else if isWsFilesListingEnabled && object.ObjectType == workspace.File {
emitWorkpaceObject(ic, object)
}
}
Expand Down

0 comments on commit 2a1d70f

Please sign in to comment.