From a9293e9df6632db23c62c3f08d7f1031b636fcee Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Fri, 9 Aug 2024 18:52:48 +0200 Subject: [PATCH] [Internal] Refactor exporter: split huge files into smaller ones (#3870) ## Changes This PR splits `context.go` and `util.go` into smaller files, filled with related functions, like, utils for workspace objects, utils for SCIM (users/sps/groups), ... ## Tests - [x] `make test` run locally - [ ] relevant change in `docs/` folder - [ ] covered with integration tests in `internal/acceptance` - [ ] relevant acceptance tests are passing - [ ] using Go SDK --- exporter/codegen.go | 1060 +++++++++++++++++++++++++++++++++++ exporter/context.go | 1070 ------------------------------------ exporter/context_test.go | 8 +- exporter/util.go | 1006 +-------------------------------- exporter/util_compute.go | 228 ++++++++ exporter/util_scim.go | 392 +++++++++++++ exporter/util_workspace.go | 400 ++++++++++++++ 7 files changed, 2098 insertions(+), 2066 deletions(-) create mode 100644 exporter/codegen.go create mode 100644 exporter/util_compute.go create mode 100644 exporter/util_scim.go create mode 100644 exporter/util_workspace.go diff --git a/exporter/codegen.go b/exporter/codegen.go new file mode 100644 index 0000000000..dee6f12fd4 --- /dev/null +++ b/exporter/codegen.go @@ -0,0 +1,1060 @@ +package exporter + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "os" + "reflect" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/databricks/terraform-provider-databricks/workspace" + "github.com/hashicorp/hcl/v2" + "github.com/hashicorp/hcl/v2/hclsyntax" + "github.com/hashicorp/hcl/v2/hclwrite" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/zclconf/go-cty/cty" +) + +// TODO: move to IC +var dependsRe = regexp.MustCompile(`(\.[\d]+)`) + +func (ic *importContext) generateVariableName(attrName, name string) string { + return fmt.Sprintf("%s_%s", attrName, name) +} + +func maybeAddQuoteCharacter(s string) string { + s = strings.ReplaceAll(s, "\\", "\\\\") + s = strings.ReplaceAll(s, "\"", "\\\"") + return s +} + +func genTraversalTokens(sr *resourceApproximation, pick string) hcl.Traversal { + if sr.Mode == "data" { + return hcl.Traversal{ + hcl.TraverseRoot{Name: "data"}, + hcl.TraverseAttr{Name: sr.Type}, + hcl.TraverseAttr{Name: sr.Name}, + hcl.TraverseAttr{Name: pick}, + } + } + return hcl.Traversal{ + hcl.TraverseRoot{Name: sr.Type}, + hcl.TraverseAttr{Name: sr.Name}, + hcl.TraverseAttr{Name: pick}, + } +} + +func (ic *importContext) isIgnoredResourceApproximation(ra *resourceApproximation) bool { + var ignored bool + if ra != nil && ra.Resource != nil { + ignoreFunc := ic.Importables[ra.Type].Ignore + if ignoreFunc != nil && ignoreFunc(ic, ra.Resource) { + log.Printf("[WARN] Found reference to the ignored resource %s: %s", ra.Type, ra.Name) + return true + } + } + return ignored +} + +func (ic *importContext) Find(value, attr string, ref reference, origResource *resource, origPath string) (string, hcl.Traversal, bool) { + log.Printf("[DEBUG] Starting searching for reference for resource %s, attr='%s', value='%s', ref=%v", + ref.Resource, attr, value, ref) + // optimize performance by avoiding doing regexp matching multiple times + matchValue := "" + switch ref.MatchType { + case MatchRegexp: + if ref.Regexp == nil { + log.Printf("[WARN] you must provide regular expression for 'regexp' match type") + return "", nil, false + } + res := ref.Regexp.FindStringSubmatch(value) + if len(res) < 2 { + log.Printf("[WARN] no match for regexp: %v in string %s", ref.Regexp, value) + return "", nil, false + } + matchValue = res[1] + case MatchCaseInsensitive: + matchValue = strings.ToLower(value) // performance optimization to avoid doing it in the loop + case MatchExact, MatchDefault: + matchValue = value + case MatchPrefix, MatchLongestPrefix: + if ref.MatchValueTransformFunc != nil { + matchValue = ref.MatchValueTransformFunc(value) + } else { + matchValue = value + } + } + // doing explicit lookup in the state. For case insensitive matches, first attempt to lookup for the value, + // and do iteration if it's not found + if (ref.MatchType == MatchExact || ref.MatchType == MatchDefault || ref.MatchType == MatchRegexp || + ref.MatchType == MatchCaseInsensitive) && !ref.SkipDirectLookup { + sr := ic.State.Get(ref.Resource, attr, matchValue) + if sr != nil && (ref.IsValidApproximation == nil || ref.IsValidApproximation(ic, origResource, sr, origPath)) && + !ic.isIgnoredResourceApproximation(sr) { + log.Printf("[DEBUG] Finished direct lookup for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", + ref.Resource, attr, value, ref, sr.Type, sr.Name) + return matchValue, genTraversalTokens(sr, attr), sr.Mode == "data" + } + if ref.MatchType != MatchCaseInsensitive { // for case-insensitive matching we'll try iteration + log.Printf("[DEBUG] Finished direct lookup for reference for resource %s, attr='%s', value='%s', ref=%v. Not found", + ref.Resource, attr, value, ref) + return "", nil, false + } + } else if ref.MatchType == MatchLongestPrefix && ref.ExtraLookupKey != "" { + extraKeyValue, exists := origResource.GetExtraData(ref.ExtraLookupKey) + if exists && extraKeyValue.(string) != "" { + sr := ic.State.Get(ref.Resource, attr, extraKeyValue.(string)) + if sr != nil && (ref.IsValidApproximation == nil || ref.IsValidApproximation(ic, origResource, sr, origPath)) && + !ic.isIgnoredResourceApproximation(sr) { + log.Printf("[DEBUG] Finished direct lookup by key %s for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", + ref.ExtraLookupKey, ref.Resource, attr, value, ref, sr.Type, sr.Name) + return extraKeyValue.(string), genTraversalTokens(sr, attr), sr.Mode == "data" + } + } + } + + maxPrefixLen := 0 + maxPrefixOrigValue := "" + var maxPrefixResource *resourceApproximation + srs := *ic.State.Resources(ref.Resource) + for _, sr := range srs { + for _, i := range sr.Instances { + v := i.Attributes[attr] + if v == nil { + log.Printf("[WARN] Can't find instance attribute '%v' in resource: '%v'", attr, ref.Resource) + continue + } + strValue := v.(string) + origValue := strValue + if ref.SearchValueTransformFunc != nil { + strValue = ref.SearchValueTransformFunc(strValue) + log.Printf("[TRACE] Resource %s. Transformed value from '%s' to '%s'", ref.Resource, origValue, strValue) + } + matched := false + switch ref.MatchType { + case MatchCaseInsensitive: + matched = (strings.ToLower(strValue) == matchValue) + case MatchPrefix: + matched = strings.HasPrefix(matchValue, strValue) + case MatchLongestPrefix: + if strings.HasPrefix(matchValue, strValue) && len(origValue) > maxPrefixLen && !ic.isIgnoredResourceApproximation(sr) { + maxPrefixLen = len(origValue) + maxPrefixOrigValue = origValue + maxPrefixResource = sr + } + case MatchExact, MatchDefault: + matched = (strValue == matchValue) + default: + log.Printf("[WARN] Unsupported match type: %s", ref.MatchType) + } + if !matched || (ref.IsValidApproximation != nil && !ref.IsValidApproximation(ic, origResource, sr, origPath)) || + ic.isIgnoredResourceApproximation(sr) { + continue + } + log.Printf("[DEBUG] Finished searching for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", + ref.Resource, attr, value, ref, sr.Type, sr.Name) + return origValue, genTraversalTokens(sr, attr), sr.Mode == "data" + } + } + if ref.MatchType == MatchLongestPrefix && maxPrefixResource != nil && + (ref.IsValidApproximation == nil || ref.IsValidApproximation(ic, origResource, maxPrefixResource, origPath)) && + !ic.isIgnoredResourceApproximation(maxPrefixResource) { + log.Printf("[DEBUG] Finished searching longest prefix for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", + ref.Resource, attr, value, ref, maxPrefixResource.Type, maxPrefixResource.Name) + return maxPrefixOrigValue, genTraversalTokens(maxPrefixResource, attr), maxPrefixResource.Mode == "data" + } + log.Printf("[DEBUG] Finished searching for reference for resource %s, pick=%s, ref=%v. Not found", ref.Resource, attr, ref) + return "", nil, false +} + +func (ic *importContext) getTraversalTokens(ref reference, value string, origResource *resource, origPath string) (hclwrite.Tokens, bool) { + matchType := ref.MatchTypeValue() + attr := ref.MatchAttribute() + attrValue, traversal, isData := ic.Find(value, attr, ref, origResource, origPath) + // at least one invocation of ic.Find will assign Nil to traversal if resource with value is not found + if traversal == nil { + return nil, isData + } + // capture if it's data? + switch matchType { + case MatchExact, MatchDefault, MatchCaseInsensitive: + return hclwrite.TokensForTraversal(traversal), isData + case MatchPrefix, MatchLongestPrefix: + rest := value[len(attrValue):] + tokens := hclwrite.Tokens{&hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'"', '$', '{'}}} + tokens = append(tokens, hclwrite.TokensForTraversal(traversal)...) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'}'}}) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(maybeAddQuoteCharacter(rest))}) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'"'}}) + return tokens, isData + case MatchRegexp: + indices := ref.Regexp.FindStringSubmatchIndex(value) + if len(indices) == 4 { + tokens := hclwrite.Tokens{&hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'"'}}} + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(maybeAddQuoteCharacter(value[0:indices[2]]))}) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'$', '{'}}) + tokens = append(tokens, hclwrite.TokensForTraversal(traversal)...) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'}'}}) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(maybeAddQuoteCharacter(value[indices[3]:]))}) + tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'"'}}) + return tokens, isData + } + log.Printf("[WARN] Can't match found data in '%s'. Indices: %v", value, indices) + default: + log.Printf("[WARN] Unsupported match type: %s", ref.MatchType) + } + return nil, false +} + +func (ic *importContext) reference(i importable, path []string, value string, ctyValue cty.Value, origResource *resource) hclwrite.Tokens { + pathString := strings.Join(path, ".") + match := dependsRe.ReplaceAllString(pathString, "") + // get reference candidate, but if it's a `data`, then look for another non-data reference if possible.. + var dataTokens hclwrite.Tokens + for _, d := range i.Depends { + if d.Path != match { + continue + } + if d.File { + relativeFile := fmt.Sprintf("${path.module}/%s", value) + return hclwrite.Tokens{ + &hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'"'}}, + &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(relativeFile)}, + &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'"'}}, + } + } + if d.Variable { + varName := ic.generateVariableName(path[0], value) + return ic.variable(varName, "") + } + + tokens, isData := ic.getTraversalTokens(d, value, origResource, pathString) + if tokens != nil { + if isData { + dataTokens = tokens + log.Printf("[DEBUG] Got reference to data for dependency %v", d) + } else { + return tokens + } + } + } + if len(dataTokens) > 0 { + return dataTokens + } + return hclwrite.TokensForValue(ctyValue) +} + +func (ic *importContext) variable(name, desc string) hclwrite.Tokens { + ic.variablesLock.Lock() + ic.variables[name] = desc + ic.variablesLock.Unlock() + return hclwrite.TokensForTraversal(hcl.Traversal{ + hcl.TraverseRoot{Name: "var"}, + hcl.TraverseAttr{Name: name}, + }) +} + +type fieldTuple struct { + Field string + Schema *schema.Schema +} + +func (ic *importContext) dataToHcl(i importable, path []string, + pr *schema.Resource, res *resource, body *hclwrite.Body) error { + d := res.Data + ss := []fieldTuple{} + for a, as := range pr.Schema { + ss = append(ss, fieldTuple{a, as}) + } + sort.Slice(ss, func(i, j int) bool { + // it just happens that reverse field order + // makes the most beautiful configs + return ss[i].Field > ss[j].Field + }) + var_cnt := 0 + for _, tuple := range ss { + a, as := tuple.Field, tuple.Schema + pathString := strings.Join(append(path, a), ".") + raw, nonZero := d.GetOk(pathString) + // log.Printf("[DEBUG] path=%s, raw='%v'", pathString, raw) + if i.ShouldOmitField == nil { // we don't have custom function, so skip computed & default fields + if defaultShouldOmitFieldFunc(ic, pathString, as, d) { + continue + } + } else if i.ShouldOmitField(ic, pathString, as, d) { + continue + } + mpath := dependsRe.ReplaceAllString(pathString, "") + for _, ref := range i.Depends { + if ref.Path == mpath && ref.Variable { + // sensitive fields are moved to variable depends, variable name is normalized + // TODO: handle a case when we have multiple blocks, so names won't be unique + raw = ic.regexFix(ic.ResourceName(res), simpleNameFixes) + if var_cnt > 0 { + raw = fmt.Sprintf("%s_%d", raw, var_cnt) + } + nonZero = true + var_cnt++ + } + } + shouldSkip := !nonZero + if as.Required { // for required fields we must produce a value, even empty... + shouldSkip = false + } else if as.Default != nil && !reflect.DeepEqual(raw, as.Default) { + // In case when have zero value, but there is non-zero default, we also need to produce it + shouldSkip = false + } + if shouldSkip && (i.ShouldGenerateField == nil || !i.ShouldGenerateField(ic, pathString, as, d)) { + continue + } + switch as.Type { + case schema.TypeString: + value := raw.(string) + tokens := ic.reference(i, append(path, a), value, cty.StringVal(value), res) + body.SetAttributeRaw(a, tokens) + case schema.TypeBool: + body.SetAttributeValue(a, cty.BoolVal(raw.(bool))) + case schema.TypeInt: + var num int64 + switch iv := raw.(type) { + case int: + num = int64(iv) + case int32: + num = int64(iv) + case int64: + num = iv + } + body.SetAttributeRaw(a, ic.reference(i, append(path, a), + strconv.FormatInt(num, 10), cty.NumberIntVal(num), res)) + case schema.TypeFloat: + body.SetAttributeValue(a, cty.NumberFloatVal(raw.(float64))) + case schema.TypeMap: + // TODO: Resolve references in maps as well, and also support different types inside map... + ov := map[string]cty.Value{} + for key, iv := range raw.(map[string]any) { + v := cty.StringVal(fmt.Sprintf("%v", iv)) + ov[key] = v + } + body.SetAttributeValue(a, cty.ObjectVal(ov)) + case schema.TypeSet: + if rawSet, ok := raw.(*schema.Set); ok { + rawList := rawSet.List() + err := ic.readListFromData(i, append(path, a), res, rawList, body, as, func(i int) string { + return strconv.Itoa(rawSet.F(rawList[i])) + }) + if err != nil { + return err + } + } + case schema.TypeList: + if rawList, ok := raw.([]any); ok { + err := ic.readListFromData(i, append(path, a), res, rawList, body, as, strconv.Itoa) + if err != nil { + return err + } + } + default: + return fmt.Errorf("unsupported schema type: %v", path) + } + } + // Generate `depends_on` only for top-level resource because `dataToHcl` is called recursively + if len(path) == 0 && len(res.DependsOn) > 0 { + notIgnoredResources := []*resource{} + for _, dr := range res.DependsOn { + dr := dr + if dr.Data == nil { + tdr := ic.Scope.FindById(dr.Resource, dr.ID) + if tdr == nil { + log.Printf("[WARN] can't find resource %s in scope", dr) + continue + } + dr = tdr + } + if ic.Importables[dr.Resource].Ignore == nil || !ic.Importables[dr.Resource].Ignore(ic, dr) { + found := false + for _, v := range notIgnoredResources { + if v.ID == dr.ID && v.Resource == dr.Resource { + found = true + break + } + } + if !found { + notIgnoredResources = append(notIgnoredResources, dr) + } + } + } + if len(notIgnoredResources) > 0 { + toks := hclwrite.Tokens{} + toks = append(toks, &hclwrite.Token{ + Type: hclsyntax.TokenOBrack, + Bytes: []byte{'['}, + }) + for i, dr := range notIgnoredResources { + if i > 0 { + toks = append(toks, &hclwrite.Token{ + Type: hclsyntax.TokenComma, + Bytes: []byte{','}, + }) + } + toks = append(toks, hclwrite.TokensForTraversal(hcl.Traversal{ + hcl.TraverseRoot{Name: dr.Resource}, + hcl.TraverseAttr{Name: ic.ResourceName(dr)}, + })...) + } + toks = append(toks, &hclwrite.Token{ + Type: hclsyntax.TokenCBrack, + Bytes: []byte{']'}, + }) + body.SetAttributeRaw("depends_on", toks) + } + } + return nil +} + +func (ic *importContext) readListFromData(i importable, path []string, res *resource, + rawList []any, body *hclwrite.Body, as *schema.Schema, offsetConverter func(i int) string) error { + if len(rawList) == 0 { + return nil + } + name := path[len(path)-1] + switch elem := as.Elem.(type) { + case *schema.Resource: + if as.MaxItems == 1 { + nestedPath := append(path, offsetConverter(0)) + confBlock := body.AppendNewBlock(name, []string{}) + return ic.dataToHcl(i, nestedPath, elem, res, confBlock.Body()) + } + for offset := range rawList { + confBlock := body.AppendNewBlock(name, []string{}) + nestedPath := append(path, offsetConverter(offset)) + err := ic.dataToHcl(i, nestedPath, elem, res, confBlock.Body()) + if err != nil { + return err + } + } + case *schema.Schema: + toks := hclwrite.Tokens{} + toks = append(toks, &hclwrite.Token{ + Type: hclsyntax.TokenOBrack, + Bytes: []byte{'['}, + }) + for _, raw := range rawList { + if len(toks) != 1 { + toks = append(toks, &hclwrite.Token{ + Type: hclsyntax.TokenComma, + Bytes: []byte{','}, + }) + } + switch x := raw.(type) { + case string: + value := raw.(string) + toks = append(toks, ic.reference(i, path, value, cty.StringVal(value), res)...) + case int: + // probably we don't even use integer lists?... + toks = append(toks, hclwrite.TokensForValue( + cty.NumberIntVal(int64(x)))...) + default: + return fmt.Errorf("unsupported primitive list: %#v", path) + } + } + toks = append(toks, &hclwrite.Token{ + Type: hclsyntax.TokenCBrack, + Bytes: []byte{']'}, + }) + body.SetAttributeRaw(name, toks) + } + return nil +} + +func (ic *importContext) generateTfvars() error { + // TODO: make it incremental as well... + if len(ic.tfvars) == 0 { + return nil + } + f := hclwrite.NewEmptyFile() + body := f.Body() + fileName := fmt.Sprintf("%s/terraform.tfvars", ic.Directory) + + vf, err := os.Create(fileName) + if err != nil { + return err + } + defer vf.Close() + + for k, v := range ic.tfvars { + body.SetAttributeValue(k, cty.StringVal(v)) + } + // nolint + vf.Write(f.Bytes()) + log.Printf("[INFO] Written %d tfvars", len(ic.tfvars)) + + ic.generateGitIgnore() + + return nil +} + +func (ic *importContext) generateVariables() error { + if len(ic.variables) == 0 { + return nil + } + f := hclwrite.NewEmptyFile() + body := f.Body() + fileName := fmt.Sprintf("%s/vars.tf", ic.Directory) + if ic.incremental { + content, err := os.ReadFile(fileName) + if err == nil { + ftmp, diags := hclwrite.ParseConfig(content, fileName, hcl.Pos{Line: 1, Column: 1}) + if diags.HasErrors() { + log.Printf("[ERROR] parsing of existing file failed: %s", diags) + } else { + tbody := ftmp.Body() + for _, block := range tbody.Blocks() { + typ := block.Type() + labels := block.Labels() + log.Printf("[DEBUG] blockBody: %v %v\n", typ, labels) + _, present := ic.variables[labels[0]] + if typ == "variable" && present { + log.Printf("[DEBUG] Ignoring variable '%s' that will be re-exported", labels[0]) + } else { + log.Printf("[DEBUG] Adding not exported object. type='%s', labels=%v", typ, labels) + body.AppendBlock(block) + } + } + } + } else { + log.Printf("[ERROR] opening file %s", fileName) + } + } + vf, err := os.Create(fileName) + if err != nil { + return err + } + defer vf.Close() + + for k, v := range ic.variables { + b := body.AppendNewBlock("variable", []string{k}).Body() + b.SetAttributeValue("description", cty.StringVal(v)) + } + // nolint + vf.Write(f.Bytes()) + log.Printf("[INFO] Written %d variables", len(ic.variables)) + return nil +} + +func (ic *importContext) generateGitIgnore() { + fileName := fmt.Sprintf("%s/.gitignore", ic.Directory) + vf, err := os.Create(fileName) + if err != nil { + log.Printf("[ERROR] can't create %s: %v", fileName, err) + return + } + defer vf.Close() + // nolint + vf.Write([]byte("terraform.tfvars\n")) +} + +func (ic *importContext) generateAndWriteResources(sh *os.File) { + resources := ic.Scope.Sorted() + scopeSize := ic.Scope.Len() + t1 := time.Now() + log.Printf("[INFO] Generating configuration for %d resources", scopeSize) + + // make configurable via environment variables + resourceHandlersNumber := getEnvAsInt("EXPORTER_RESOURCE_GENERATORS", 50) + resourcesChan := make(resourceChannel, defaultChannelSize) + + resourceWriters := make(map[string]dataWriteChannel, len(ic.Resources)) + for service := range ic.services { + resourceWriters[service] = make(dataWriteChannel, defaultChannelSize) + } + writersWaitGroup := &sync.WaitGroup{} + // write shell script for importing + shellImportChan := make(importWriteChannel, defaultChannelSize) + writersWaitGroup.Add(1) + go func() { + ic.writeShellImports(sh, shellImportChan) + writersWaitGroup.Done() + }() + // + nativeImportChan := make(importWriteChannel, defaultChannelSize) + writersWaitGroup.Add(1) + go func() { + ic.writeNativeImports(nativeImportChan) + writersWaitGroup.Done() + }() + // start resource handlers + for i := 0; i < resourceHandlersNumber; i++ { + i := i + go func() { + log.Printf("[DEBUG] Starting resource handler %d", i) + ic.processSingleResource(resourcesChan, resourceWriters, nativeImportChan) + }() + } + // start writers for specific services + for service, ch := range resourceWriters { + service := service + ch := ch + generatedFile := fmt.Sprintf("%s/%s.tf", ic.Directory, service) + log.Printf("[DEBUG] starting writer for service %s", service) + writersWaitGroup.Add(1) + go func() { + ic.handleResourceWrite(generatedFile, ch, shellImportChan) + writersWaitGroup.Done() + }() + } + // submit all extracted resources... + for i, r := range resources { + ic.waitGroup.Add(1) + resourcesChan <- r + if i%500 == 0 { + log.Printf("[INFO] Submitted %d of %d resources", i+1, scopeSize) + } + } + ic.waitGroup.Wait() + // close all channels + close(shellImportChan) + close(nativeImportChan) + close(resourcesChan) + for service, ch := range resourceWriters { + log.Printf("Closing writer for service %s", service) + close(ch) + } + writersWaitGroup.Wait() + + log.Printf("[INFO] Finished generation of configuration for %d resources (took %v seconds)", + scopeSize, time.Since(t1).Seconds()) +} + +func (ic *importContext) processSingleResource(resourcesChan resourceChannel, + writerChannels map[string]dataWriteChannel, nativeImportChannel importWriteChannel) { + processed := 0 + generated := 0 + ignored := 0 + for r := range resourcesChan { + processed = processed + 1 + if r == nil { + log.Print("[WARN] Got nil resource...") + ic.waitGroup.Done() + continue + } + ir := ic.Importables[r.Resource] + if ir.Ignore != nil && ir.Ignore(ic, r) { + log.Printf("[WARN] Ignoring resource %s: %s", r.Resource, r.Name) + ignored = ignored + 1 + ic.waitGroup.Done() + continue + } + var err error + f := hclwrite.NewEmptyFile() + log.Printf("[TRACE] Generating %s: %s", r.Resource, r.Name) + body := f.Body() + if ir.Body != nil { + err = ir.Body(ic, body, r) + if err != nil { + log.Printf("[ERROR] error calling ir.Body for %v: %s", r, err.Error()) + } + } else { + resourceBlock := body.AppendNewBlock("resource", []string{r.Resource, r.Name}) + err = ic.dataToHcl(ir, []string{}, ic.Resources[r.Resource], r, resourceBlock.Body()) + if err != nil { + log.Printf("[ERROR] error generating body for %v: %s", r, err.Error()) + } + } + if err == nil && len(body.Blocks()) > 0 { + formatted := hclwrite.Format(f.Bytes()) + // fix some formatting in a hacky way instead of writing 100 lines of HCL AST writer code + formatted = []byte(ic.regexFix(string(formatted), ic.hclFixes)) + writeData := &resourceWriteData{ + ResourceBody: string(formatted), + BlockName: generateBlockFullName(body.Blocks()[0]), + } + if r.Mode != "data" && ic.Resources[r.Resource].Importer != nil { + writeData.ImportCommand = r.ImportCommand(ic) + if ic.nativeImportSupported { // generate import block for native import + imp := hclwrite.NewEmptyFile() + imoBlock := imp.Body().AppendNewBlock("import", []string{}) + imoBlock.Body().SetAttributeValue("id", cty.StringVal(r.ID)) + traversal := hcl.Traversal{ + hcl.TraverseRoot{Name: r.Resource}, + hcl.TraverseAttr{Name: r.Name}, + } + tokens := hclwrite.TokensForTraversal(traversal) + imoBlock.Body().SetAttributeRaw("to", tokens) + formattedImp := hclwrite.Format(imp.Bytes()) + //log.Printf("[DEBUG] Import block for %s: %s", r.ID, string(formattedImp)) + ic.waitGroup.Add(1) + nativeImportChannel <- string(formattedImp) + } + } + ch, exists := writerChannels[ir.Service] + if exists { + ic.waitGroup.Add(1) + ch <- writeData + } else { + log.Printf("[WARN] can't find a channel for service: %s, resource: %s", ir.Service, r.Resource) + } + log.Printf("[TRACE] Finished generating %s: %s", r.Resource, r.Name) + generated = generated + 1 + } else { + log.Printf("[WARN] error generating resource body: %v, or body blocks len is 0", err) + } + ic.waitGroup.Done() + } + log.Printf("[DEBUG] processed resources: %d, generated: %d, ignored: %d", processed, generated, ignored) +} + +func extractResourceIdFromImportBlock(block *hclwrite.Block) string { + if block.Type() != "import" { + log.Print("[WARN] it's not an import block!") + return "" + } + idAttr := block.Body().GetAttribute("to") + if idAttr == nil { + log.Printf("[WARN] Can't find `to` attribute in the import block") + return "" + } + idVal := string(idAttr.Expr().BuildTokens(nil).Bytes()) + return strings.TrimSpace(idVal) +} + +func extractResourceIdFromImportBlockString(importBlock string) string { + block, diags := hclwrite.ParseConfig([]byte(importBlock), "test.tf", hcl.Pos{Line: 1, Column: 1}) + if diags.HasErrors() { + log.Printf("[WARN] parsing of import block %s has failed: %s", importBlock, diags.Error()) + return "" + } + if len(block.Body().Blocks()) == 0 { + log.Printf("[WARN] import block %s has 0 blocks!", importBlock) + return "" + } + return extractResourceIdFromImportBlock(block.Body().Blocks()[0]) +} + +func (ic *importContext) writeNativeImports(importChan importWriteChannel) { + if !ic.nativeImportSupported { + log.Print("[DEBUG] Native import is not enabled, skipping...") + return + } + importsFileName := fmt.Sprintf("%s/import.tf", ic.Directory) + // TODO: in incremental mode read existing file with imports and append them for not processed & not deleted resources + var existingFile *hclwrite.File + if ic.incremental { + log.Printf("[DEBUG] Going to read existing file %s", importsFileName) + content, err := os.ReadFile(importsFileName) + if errors.Is(err, os.ErrNotExist) { + log.Printf("[WARN] File %s doesn't exist when using incremental export", importsFileName) + } else if err != nil { + log.Printf("[ERROR] error opening %s", importsFileName) + } else { + log.Printf("[DEBUG] Going to parse existing file %s", importsFileName) + var diags hcl.Diagnostics + existingFile, diags = hclwrite.ParseConfig(content, importsFileName, hcl.Pos{Line: 1, Column: 1}) + if diags.HasErrors() { + log.Printf("[ERROR] parsing of existing file %s failed: %s", importsFileName, diags.Error()) + } else { + log.Printf("[DEBUG] There are %d objects in existing file %s", + len(existingFile.Body().Blocks()), importsFileName) + } + } + } + if existingFile == nil { + existingFile = hclwrite.NewEmptyFile() + } + + // do actual writes + importsFile, err := os.Create(importsFileName) + if err != nil { + log.Printf("[ERROR] Can't create %s: %v", importsFileName, err) + return + } + defer importsFile.Close() + + newImports := make(map[string]struct{}, 100) + log.Printf("[DEBUG] started processing new writes for %s", importsFileName) + // write native imports + for importBlock := range importChan { + if importBlock != "" { + log.Printf("[TRACE] writing import command %s", importBlock) + importsFile.WriteString(importBlock) + id := extractResourceIdFromImportBlockString(importBlock) + if id != "" { + newImports[id] = struct{}{} + } + } else { + log.Print("[WARN] got empty import command...") + } + ic.waitGroup.Done() + } + // write the rest of import blocks + numResources := len(newImports) + log.Printf("[DEBUG] finished processing new writes for %s. Wrote %d resources", importsFileName, numResources) + // update existing file if incremental mode + if ic.incremental { + log.Printf("[DEBUG] Starting to merge existing resources for %s", importsFileName) + f := hclwrite.NewEmptyFile() + for _, block := range existingFile.Body().Blocks() { + blockName := extractResourceIdFromImportBlock(block) + if blockName == "" { + log.Printf("[WARN] can't extract resource ID from import block: %s", + string(block.BuildTokens(nil).Bytes())) + continue + } + _, exists := newImports[blockName] + _, deleted := ic.deletedResources[blockName] + if exists { + log.Printf("[DEBUG] resource %s already generated, skipping...", blockName) + } else if deleted { + log.Printf("[DEBUG] resource %s is deleted, skipping...", blockName) + } else { + log.Printf("[DEBUG] resource %s doesn't exist, adding...", blockName) + f.Body().AppendBlock(block) + numResources = numResources + 1 + } + } + _, err = importsFile.WriteString(string(f.Bytes())) + if err != nil { + log.Printf("[ERROR] error when writing existing resources for file %s: %v", importsFileName, err) + } + log.Printf("[DEBUG] Finished merging existing resources for %s", importsFileName) + } +} + +func (ic *importContext) writeShellImports(sh *os.File, importChan importWriteChannel) { + for importCommand := range importChan { + if importCommand != "" && sh != nil { + log.Printf("[DEBUG] writing import command %s", importCommand) + sh.WriteString(importCommand + "\n") + delete(ic.shImports, importCommand) + } else { + log.Print("[WARN] got empty import command... or file is nil") + } + ic.waitGroup.Done() + } + if sh != nil { + log.Printf("[DEBUG] Writing the rest of import commands. len=%d", len(ic.shImports)) + for k := range ic.shImports { + parts := strings.Split(k, " ") + if len(parts) > 3 { + resource := parts[2] + _, deleted := ic.deletedResources[resource] + if deleted { + log.Printf("[DEBUG] Resource %s is deleted. Skipping import command for it", resource) + continue + } + } + sh.WriteString(k + "\n") + } + } +} + +func generateResourceName(rtype, rname string) string { + return rtype + "." + rname +} + +func generateBlockFullName(block *hclwrite.Block) string { + labels := block.Labels() + return generateResourceName(labels[0], strings.Join(labels[1:], "_")) +} + +type resourceWriteData struct { + BlockName string + ResourceBody string + ImportCommand string +} + +type dataWriteChannel chan *resourceWriteData +type importWriteChannel chan string + +func (ic *importContext) handleResourceWrite(generatedFile string, ch dataWriteChannel, importChan importWriteChannel) { + var existingFile *hclwrite.File + if ic.incremental { + log.Printf("[DEBUG] Going to read existing file %s", generatedFile) + content, err := os.ReadFile(generatedFile) + if errors.Is(err, os.ErrNotExist) { + log.Printf("[WARN] File %s doesn't exist when using incremental export", generatedFile) + } else if err != nil { + log.Printf("[ERROR] error opening %s", generatedFile) + } else { + log.Printf("[DEBUG] Going to parse existing file %s", generatedFile) + var diags hcl.Diagnostics + existingFile, diags = hclwrite.ParseConfig(content, generatedFile, hcl.Pos{Line: 1, Column: 1}) + if diags.HasErrors() { + log.Printf("[ERROR] parsing of existing file %s failed: %s", generatedFile, diags.Error()) + } else { + log.Printf("[DEBUG] There are %d objects in existing file %s", + len(existingFile.Body().Blocks()), generatedFile) + } + } + } + if existingFile == nil { + existingFile = hclwrite.NewEmptyFile() + } + + tf, err := os.Create(generatedFile) + if err != nil { + log.Printf("[ERROR] Can't create %s: %v", generatedFile, err) + return + } + + // + newResources := make(map[string]struct{}, 100) + log.Printf("[DEBUG] started processing new writes for %s", generatedFile) + for f := range ch { + if f != nil { + log.Printf("[DEBUG] started writing resource body for %s", f.BlockName) + _, err = tf.WriteString(f.ResourceBody) + if err == nil { + newResources[f.BlockName] = struct{}{} + if f.ImportCommand != "" { + ic.waitGroup.Add(1) + importChan <- f.ImportCommand + } + log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName) + } else { + log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err) + } + } else { + log.Print("[WARN] got nil as resourceWriteData!") + } + ic.waitGroup.Done() + } + numResources := len(newResources) + log.Printf("[DEBUG] finished processing new writes for %s. Wrote %d resources", generatedFile, numResources) + // update existing file if incremental mode + if ic.incremental { + log.Printf("[DEBUG] Starting to merge existing resources for %s", generatedFile) + f := hclwrite.NewEmptyFile() + for _, block := range existingFile.Body().Blocks() { + blockName := generateBlockFullName(block) + _, exists := newResources[blockName] + _, deleted := ic.deletedResources[blockName] + if exists { + log.Printf("[DEBUG] resource %s already generated, skipping...", blockName) + } else if deleted { + log.Printf("[DEBUG] resource %s is deleted, skipping...", blockName) + } else { + log.Printf("[DEBUG] resource %s doesn't exist, adding...", blockName) + f.Body().AppendBlock(block) + numResources = numResources + 1 + } + } + _, err = tf.WriteString(string(f.Bytes())) + if err != nil { + log.Printf("[ERROR] error when writing existing resources for file %s: %v", generatedFile, err) + } + log.Printf("[DEBUG] Finished merging existing resources for %s", generatedFile) + } + tf.Close() + if numResources == 0 { + log.Printf("[DEBUG] removing empty file %s - no resources for a given service", generatedFile) + os.Remove(generatedFile) + } +} + +func (ic *importContext) generateResourceIdForWorkspaceObject(obj workspace.ObjectStatus) (string, string) { + var rtype string + switch obj.ObjectType { + case workspace.Directory: + rtype = "databricks_directory" + case workspace.File: + rtype = "databricks_workspace_file" + case workspace.Notebook: + rtype = "databricks_notebook" + default: + log.Printf("[WARN] Unsupported WS object type: %s in obj %v", obj.ObjectType, obj) + return "", "" + } + rData := ic.Resources[rtype].Data( + &terraform.InstanceState{ + ID: obj.Path, + Attributes: map[string]string{}, + }) + rData.Set("object_id", obj.ObjectID) + rData.Set("path", obj.Path) + name := ic.ResourceName(&resource{ + ID: obj.Path, + Resource: rtype, + Data: rData, + }) + return generateResourceName(rtype, name), rtype +} + +func (ic *importContext) loadOldWorkspaceObjects(fileName string) { + ic.oldWorkspaceObjects = []workspace.ObjectStatus{} + // Read a list of resources from previous run + oldDataFile, err := os.ReadFile(fileName) + if err != nil { + log.Printf("[WARN] Can't open the file (%s) with previous list of workspace objects: %s", fileName, err.Error()) + return + } + err = json.Unmarshal(oldDataFile, &ic.oldWorkspaceObjects) + if err != nil { + log.Printf("[WARN] Can't desereialize previous list of workspace objects: %s", err.Error()) + return + } + log.Printf("[DEBUG] Read previous list of workspace objects. got %d objects", len(ic.oldWorkspaceObjects)) + for _, obj := range ic.oldWorkspaceObjects { + ic.oldWorkspaceObjectMapping[obj.ObjectID] = obj.Path + } +} + +func (ic *importContext) findDeletedResources() { + log.Print("[INFO] Starting detection of deleted workspace objects") + if !ic.incremental || len(ic.allWorkspaceObjects) == 0 { + return + } + if len(ic.oldWorkspaceObjects) == 0 { + log.Print("[INFO] Previous list of workspace objects is empty") + return + } + // generate IDs of current objects + currentObjs := map[string]struct{}{} + for _, obj := range ic.allWorkspaceObjects { + obj := obj + if !isSupportedWorkspaceObject(obj) { + continue + } + rid, _ := ic.generateResourceIdForWorkspaceObject(obj) + currentObjs[rid] = struct{}{} + } + // Loop through previous objects, and if it's missing from the current list, add it to deleted, including permission + for _, obj := range ic.oldWorkspaceObjects { + obj := obj + if !isSupportedWorkspaceObject(obj) { + continue + } + rid, rtype := ic.generateResourceIdForWorkspaceObject(obj) + _, exists := currentObjs[rid] + if exists { + log.Printf("[DEBUG] object %s still exists", rid) // change to TRACE? + continue + } + log.Printf("[DEBUG] object %s is deleted!", rid) + ic.deletedResources[rid] = struct{}{} + // convert into permissions. This is quite fragile right now, need to think how to handle it better + var permId string + switch rtype { + case "databricks_notebook": + permId = "databricks_permissions.notebook_" + rid[len(rtype)+1:] + case "databricks_directory": + permId = "databricks_permissions.directory_" + rid[len(rtype)+1:] + case "databricks_workspace_file": + permId = "databricks_permissions.ws_file_" + rid[len(rtype)+1:] + } + log.Printf("[DEBUG] deleted permissions object %s", permId) + if permId != "" { + ic.deletedResources[permId] = struct{}{} + } + } + log.Printf("[INFO] Finished detection of deleted workspace objects. Detected %d deleted objects.", + len(ic.deletedResources)) + log.Printf("[DEBUG] Deleted objects. %v", ic.deletedResources) // change to TRACE? +} diff --git a/exporter/context.go b/exporter/context.go index 441ed12078..122f43ad87 100644 --- a/exporter/context.go +++ b/exporter/context.go @@ -5,15 +5,12 @@ import ( "context" "crypto/md5" "encoding/json" - "errors" "fmt" "log" "os" "os/exec" - "reflect" "regexp" "sort" - "strconv" "strings" "sync" "time" @@ -29,12 +26,7 @@ import ( "github.com/databricks/terraform-provider-databricks/scim" "github.com/databricks/terraform-provider-databricks/workspace" - "github.com/hashicorp/hcl/v2" - "github.com/hashicorp/hcl/v2/hclsyntax" - "github.com/hashicorp/hcl/v2/hclwrite" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" - "github.com/zclconf/go-cty/cty" ) /** High level overview of importer design: @@ -539,114 +531,6 @@ func (ic *importContext) Run() error { return nil } -func isSupportedWsObject(obj workspace.ObjectStatus) bool { - switch obj.ObjectType { - case workspace.Directory, workspace.Notebook, workspace.File: - return true - } - return false -} - -func (ic *importContext) generateResourceIdForWsObject(obj workspace.ObjectStatus) (string, string) { - var rtype string - switch obj.ObjectType { - case workspace.Directory: - rtype = "databricks_directory" - case workspace.File: - rtype = "databricks_workspace_file" - case workspace.Notebook: - rtype = "databricks_notebook" - default: - log.Printf("[WARN] Unsupported WS object type: %s in obj %v", obj.ObjectType, obj) - return "", "" - } - rData := ic.Resources[rtype].Data( - &terraform.InstanceState{ - ID: obj.Path, - Attributes: map[string]string{}, - }) - rData.Set("object_id", obj.ObjectID) - rData.Set("path", obj.Path) - name := ic.ResourceName(&resource{ - ID: obj.Path, - Resource: rtype, - Data: rData, - }) - return generateResourceName(rtype, name), rtype -} - -func (ic *importContext) loadOldWorkspaceObjects(fileName string) { - ic.oldWorkspaceObjects = []workspace.ObjectStatus{} - // Read a list of resources from previous run - oldDataFile, err := os.ReadFile(fileName) - if err != nil { - log.Printf("[WARN] Can't open the file (%s) with previous list of workspace objects: %s", fileName, err.Error()) - return - } - err = json.Unmarshal(oldDataFile, &ic.oldWorkspaceObjects) - if err != nil { - log.Printf("[WARN] Can't desereialize previous list of workspace objects: %s", err.Error()) - return - } - log.Printf("[DEBUG] Read previous list of workspace objects. got %d objects", len(ic.oldWorkspaceObjects)) - for _, obj := range ic.oldWorkspaceObjects { - ic.oldWorkspaceObjectMapping[obj.ObjectID] = obj.Path - } -} - -func (ic *importContext) findDeletedResources() { - log.Print("[INFO] Starting detection of deleted workspace objects") - if !ic.incremental || len(ic.allWorkspaceObjects) == 0 { - return - } - if len(ic.oldWorkspaceObjects) == 0 { - log.Print("[INFO] Previous list of workspace objects is empty") - return - } - // generate IDs of current objects - currentObjs := map[string]struct{}{} - for _, obj := range ic.allWorkspaceObjects { - obj := obj - if !isSupportedWsObject(obj) { - continue - } - rid, _ := ic.generateResourceIdForWsObject(obj) - currentObjs[rid] = struct{}{} - } - // Loop through previous objects, and if it's missing from the current list, add it to deleted, including permission - for _, obj := range ic.oldWorkspaceObjects { - obj := obj - if !isSupportedWsObject(obj) { - continue - } - rid, rtype := ic.generateResourceIdForWsObject(obj) - _, exists := currentObjs[rid] - if exists { - log.Printf("[DEBUG] object %s still exists", rid) // change to TRACE? - continue - } - log.Printf("[DEBUG] object %s is deleted!", rid) - ic.deletedResources[rid] = struct{}{} - // convert into permissions. This is quite fragile right now, need to think how to handle it better - var permId string - switch rtype { - case "databricks_notebook": - permId = "databricks_permissions.notebook_" + rid[len(rtype)+1:] - case "databricks_directory": - permId = "databricks_permissions.directory_" + rid[len(rtype)+1:] - case "databricks_workspace_file": - permId = "databricks_permissions.ws_file_" + rid[len(rtype)+1:] - } - log.Printf("[DEBUG] deleted permissions object %s", permId) - if permId != "" { - ic.deletedResources[permId] = struct{}{} - } - } - log.Printf("[INFO] Finished detection of deleted workspace objects. Detected %d deleted objects.", - len(ic.deletedResources)) - log.Printf("[DEBUG] Deleted objects. %v", ic.deletedResources) // change to TRACE? -} - func (ic *importContext) resourceHandler(num int, resourceType string, ch resourceChannel) { log.Printf("[DEBUG] Starting goroutine %d for resource %s", num, resourceType) for r := range ch { @@ -693,638 +577,6 @@ func (ic *importContext) closeImportChannels() { close(ic.defaultChannel) } -func generateResourceName(rtype, rname string) string { - return rtype + "." + rname -} - -func generateBlockFullName(block *hclwrite.Block) string { - labels := block.Labels() - return generateResourceName(labels[0], strings.Join(labels[1:], "_")) -} - -type resourceWriteData struct { - BlockName string - ResourceBody string - ImportCommand string -} - -type dataWriteChannel chan *resourceWriteData -type importWriteChannel chan string - -func (ic *importContext) handleResourceWrite(generatedFile string, ch dataWriteChannel, importChan importWriteChannel) { - var existingFile *hclwrite.File - if ic.incremental { - log.Printf("[DEBUG] Going to read existing file %s", generatedFile) - content, err := os.ReadFile(generatedFile) - if errors.Is(err, os.ErrNotExist) { - log.Printf("[WARN] File %s doesn't exist when using incremental export", generatedFile) - } else if err != nil { - log.Printf("[ERROR] error opening %s", generatedFile) - } else { - log.Printf("[DEBUG] Going to parse existing file %s", generatedFile) - var diags hcl.Diagnostics - existingFile, diags = hclwrite.ParseConfig(content, generatedFile, hcl.Pos{Line: 1, Column: 1}) - if diags.HasErrors() { - log.Printf("[ERROR] parsing of existing file %s failed: %s", generatedFile, diags.Error()) - } else { - log.Printf("[DEBUG] There are %d objects in existing file %s", - len(existingFile.Body().Blocks()), generatedFile) - } - } - } - if existingFile == nil { - existingFile = hclwrite.NewEmptyFile() - } - - tf, err := os.Create(generatedFile) - if err != nil { - log.Printf("[ERROR] Can't create %s: %v", generatedFile, err) - return - } - - // - newResources := make(map[string]struct{}, 100) - log.Printf("[DEBUG] started processing new writes for %s", generatedFile) - for f := range ch { - if f != nil { - log.Printf("[DEBUG] started writing resource body for %s", f.BlockName) - _, err = tf.WriteString(f.ResourceBody) - if err == nil { - newResources[f.BlockName] = struct{}{} - if f.ImportCommand != "" { - ic.waitGroup.Add(1) - importChan <- f.ImportCommand - } - log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName) - } else { - log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err) - } - } else { - log.Print("[WARN] got nil as resourceWriteData!") - } - ic.waitGroup.Done() - } - numResources := len(newResources) - log.Printf("[DEBUG] finished processing new writes for %s. Wrote %d resources", generatedFile, numResources) - // update existing file if incremental mode - if ic.incremental { - log.Printf("[DEBUG] Starting to merge existing resources for %s", generatedFile) - f := hclwrite.NewEmptyFile() - for _, block := range existingFile.Body().Blocks() { - blockName := generateBlockFullName(block) - _, exists := newResources[blockName] - _, deleted := ic.deletedResources[blockName] - if exists { - log.Printf("[DEBUG] resource %s already generated, skipping...", blockName) - } else if deleted { - log.Printf("[DEBUG] resource %s is deleted, skipping...", blockName) - } else { - log.Printf("[DEBUG] resource %s doesn't exist, adding...", blockName) - f.Body().AppendBlock(block) - numResources = numResources + 1 - } - } - _, err = tf.WriteString(string(f.Bytes())) - if err != nil { - log.Printf("[ERROR] error when writing existing resources for file %s: %v", generatedFile, err) - } - log.Printf("[DEBUG] Finished merging existing resources for %s", generatedFile) - } - tf.Close() - if numResources == 0 { - log.Printf("[DEBUG] removing empty file %s - no resources for a given service", generatedFile) - os.Remove(generatedFile) - } -} - -func (ic *importContext) writeShellImports(sh *os.File, importChan importWriteChannel) { - for importCommand := range importChan { - if importCommand != "" && sh != nil { - log.Printf("[DEBUG] writing import command %s", importCommand) - sh.WriteString(importCommand + "\n") - delete(ic.shImports, importCommand) - } else { - log.Print("[WARN] got empty import command... or file is nil") - } - ic.waitGroup.Done() - } - if sh != nil { - log.Printf("[DEBUG] Writing the rest of import commands. len=%d", len(ic.shImports)) - for k := range ic.shImports { - parts := strings.Split(k, " ") - if len(parts) > 3 { - resource := parts[2] - _, deleted := ic.deletedResources[resource] - if deleted { - log.Printf("[DEBUG] Resource %s is deleted. Skipping import command for it", resource) - continue - } - } - sh.WriteString(k + "\n") - } - } -} - -func extractResourceIdFromImportBlock(block *hclwrite.Block) string { - if block.Type() != "import" { - log.Print("[WARN] it's not an import block!") - return "" - } - idAttr := block.Body().GetAttribute("to") - if idAttr == nil { - log.Printf("[WARN] Can't find `to` attribute in the import block") - return "" - } - idVal := string(idAttr.Expr().BuildTokens(nil).Bytes()) - return strings.TrimSpace(idVal) -} - -func extractResourceIdFromImportBlockString(importBlock string) string { - block, diags := hclwrite.ParseConfig([]byte(importBlock), "test.tf", hcl.Pos{Line: 1, Column: 1}) - if diags.HasErrors() { - log.Printf("[WARN] parsing of import block %s has failed: %s", importBlock, diags.Error()) - return "" - } - if len(block.Body().Blocks()) == 0 { - log.Printf("[WARN] import block %s has 0 blocks!", importBlock) - return "" - } - return extractResourceIdFromImportBlock(block.Body().Blocks()[0]) -} - -func (ic *importContext) writeNativeImports(importChan importWriteChannel) { - if !ic.nativeImportSupported { - log.Print("[DEBUG] Native import is not enabled, skipping...") - return - } - importsFileName := fmt.Sprintf("%s/import.tf", ic.Directory) - // TODO: in incremental mode read existing file with imports and append them for not processed & not deleted resources - var existingFile *hclwrite.File - if ic.incremental { - log.Printf("[DEBUG] Going to read existing file %s", importsFileName) - content, err := os.ReadFile(importsFileName) - if errors.Is(err, os.ErrNotExist) { - log.Printf("[WARN] File %s doesn't exist when using incremental export", importsFileName) - } else if err != nil { - log.Printf("[ERROR] error opening %s", importsFileName) - } else { - log.Printf("[DEBUG] Going to parse existing file %s", importsFileName) - var diags hcl.Diagnostics - existingFile, diags = hclwrite.ParseConfig(content, importsFileName, hcl.Pos{Line: 1, Column: 1}) - if diags.HasErrors() { - log.Printf("[ERROR] parsing of existing file %s failed: %s", importsFileName, diags.Error()) - } else { - log.Printf("[DEBUG] There are %d objects in existing file %s", - len(existingFile.Body().Blocks()), importsFileName) - } - } - } - if existingFile == nil { - existingFile = hclwrite.NewEmptyFile() - } - - // do actual writes - importsFile, err := os.Create(importsFileName) - if err != nil { - log.Printf("[ERROR] Can't create %s: %v", importsFileName, err) - return - } - defer importsFile.Close() - - newImports := make(map[string]struct{}, 100) - log.Printf("[DEBUG] started processing new writes for %s", importsFileName) - // write native imports - for importBlock := range importChan { - if importBlock != "" { - log.Printf("[TRACE] writing import command %s", importBlock) - importsFile.WriteString(importBlock) - id := extractResourceIdFromImportBlockString(importBlock) - if id != "" { - newImports[id] = struct{}{} - } - } else { - log.Print("[WARN] got empty import command...") - } - ic.waitGroup.Done() - } - // write the rest of import blocks - numResources := len(newImports) - log.Printf("[DEBUG] finished processing new writes for %s. Wrote %d resources", importsFileName, numResources) - // update existing file if incremental mode - if ic.incremental { - log.Printf("[DEBUG] Starting to merge existing resources for %s", importsFileName) - f := hclwrite.NewEmptyFile() - for _, block := range existingFile.Body().Blocks() { - blockName := extractResourceIdFromImportBlock(block) - if blockName == "" { - log.Printf("[WARN] can't extract resource ID from import block: %s", - string(block.BuildTokens(nil).Bytes())) - continue - } - _, exists := newImports[blockName] - _, deleted := ic.deletedResources[blockName] - if exists { - log.Printf("[DEBUG] resource %s already generated, skipping...", blockName) - } else if deleted { - log.Printf("[DEBUG] resource %s is deleted, skipping...", blockName) - } else { - log.Printf("[DEBUG] resource %s doesn't exist, adding...", blockName) - f.Body().AppendBlock(block) - numResources = numResources + 1 - } - } - _, err = importsFile.WriteString(string(f.Bytes())) - if err != nil { - log.Printf("[ERROR] error when writing existing resources for file %s: %v", importsFileName, err) - } - log.Printf("[DEBUG] Finished merging existing resources for %s", importsFileName) - } - -} - -func (ic *importContext) processSingleResource(resourcesChan resourceChannel, - writerChannels map[string]dataWriteChannel, nativeImportChannel importWriteChannel) { - processed := 0 - generated := 0 - ignored := 0 - for r := range resourcesChan { - processed = processed + 1 - if r == nil { - log.Print("[WARN] Got nil resource...") - ic.waitGroup.Done() - continue - } - ir := ic.Importables[r.Resource] - if ir.Ignore != nil && ir.Ignore(ic, r) { - log.Printf("[WARN] Ignoring resource %s: %s", r.Resource, r.Name) - ignored = ignored + 1 - ic.waitGroup.Done() - continue - } - var err error - f := hclwrite.NewEmptyFile() - log.Printf("[TRACE] Generating %s: %s", r.Resource, r.Name) - body := f.Body() - if ir.Body != nil { - err = ir.Body(ic, body, r) - if err != nil { - log.Printf("[ERROR] error calling ir.Body for %v: %s", r, err.Error()) - } - } else { - resourceBlock := body.AppendNewBlock("resource", []string{r.Resource, r.Name}) - err = ic.dataToHcl(ir, []string{}, ic.Resources[r.Resource], r, resourceBlock.Body()) - if err != nil { - log.Printf("[ERROR] error generating body for %v: %s", r, err.Error()) - } - } - if err == nil && len(body.Blocks()) > 0 { - formatted := hclwrite.Format(f.Bytes()) - // fix some formatting in a hacky way instead of writing 100 lines of HCL AST writer code - formatted = []byte(ic.regexFix(string(formatted), ic.hclFixes)) - writeData := &resourceWriteData{ - ResourceBody: string(formatted), - BlockName: generateBlockFullName(body.Blocks()[0]), - } - if r.Mode != "data" && ic.Resources[r.Resource].Importer != nil { - writeData.ImportCommand = r.ImportCommand(ic) - if ic.nativeImportSupported { // generate import block for native import - imp := hclwrite.NewEmptyFile() - imoBlock := imp.Body().AppendNewBlock("import", []string{}) - imoBlock.Body().SetAttributeValue("id", cty.StringVal(r.ID)) - traversal := hcl.Traversal{ - hcl.TraverseRoot{Name: r.Resource}, - hcl.TraverseAttr{Name: r.Name}, - } - tokens := hclwrite.TokensForTraversal(traversal) - imoBlock.Body().SetAttributeRaw("to", tokens) - formattedImp := hclwrite.Format(imp.Bytes()) - //log.Printf("[DEBUG] Import block for %s: %s", r.ID, string(formattedImp)) - ic.waitGroup.Add(1) - nativeImportChannel <- string(formattedImp) - } - } - ch, exists := writerChannels[ir.Service] - if exists { - ic.waitGroup.Add(1) - ch <- writeData - } else { - log.Printf("[WARN] can't find a channel for service: %s, resource: %s", ir.Service, r.Resource) - } - log.Printf("[TRACE] Finished generating %s: %s", r.Resource, r.Name) - generated = generated + 1 - } else { - log.Printf("[WARN] error generating resource body: %v, or body blocks len is 0", err) - } - ic.waitGroup.Done() - } - log.Printf("[DEBUG] processed resources: %d, generated: %d, ignored: %d", processed, generated, ignored) -} - -func (ic *importContext) generateAndWriteResources(sh *os.File) { - resources := ic.Scope.Sorted() - scopeSize := ic.Scope.Len() - t1 := time.Now() - log.Printf("[INFO] Generating configuration for %d resources", scopeSize) - - // make configurable via environment variables - resourceHandlersNumber := getEnvAsInt("EXPORTER_RESOURCE_GENERATORS", 50) - resourcesChan := make(resourceChannel, defaultChannelSize) - - resourceWriters := make(map[string]dataWriteChannel, len(ic.Resources)) - for service := range ic.services { - resourceWriters[service] = make(dataWriteChannel, defaultChannelSize) - } - writersWaitGroup := &sync.WaitGroup{} - // write shell script for importing - shellImportChan := make(importWriteChannel, defaultChannelSize) - writersWaitGroup.Add(1) - go func() { - ic.writeShellImports(sh, shellImportChan) - writersWaitGroup.Done() - }() - // - nativeImportChan := make(importWriteChannel, defaultChannelSize) - writersWaitGroup.Add(1) - go func() { - ic.writeNativeImports(nativeImportChan) - writersWaitGroup.Done() - }() - // start resource handlers - for i := 0; i < resourceHandlersNumber; i++ { - i := i - go func() { - log.Printf("[DEBUG] Starting resource handler %d", i) - ic.processSingleResource(resourcesChan, resourceWriters, nativeImportChan) - }() - } - // start writers for specific services - for service, ch := range resourceWriters { - service := service - ch := ch - generatedFile := fmt.Sprintf("%s/%s.tf", ic.Directory, service) - log.Printf("[DEBUG] starting writer for service %s", service) - writersWaitGroup.Add(1) - go func() { - ic.handleResourceWrite(generatedFile, ch, shellImportChan) - writersWaitGroup.Done() - }() - } - // submit all extracted resources... - for i, r := range resources { - ic.waitGroup.Add(1) - resourcesChan <- r - if i%500 == 0 { - log.Printf("[INFO] Submitted %d of %d resources", i+1, scopeSize) - } - } - ic.waitGroup.Wait() - // close all channels - close(shellImportChan) - close(nativeImportChan) - close(resourcesChan) - for service, ch := range resourceWriters { - log.Printf("Closing writer for service %s", service) - close(ch) - } - writersWaitGroup.Wait() - - log.Printf("[INFO] Finished generation of configuration for %d resources (took %v seconds)", - scopeSize, time.Since(t1).Seconds()) -} - -func (ic *importContext) generateGitIgnore() { - fileName := fmt.Sprintf("%s/.gitignore", ic.Directory) - vf, err := os.Create(fileName) - if err != nil { - log.Printf("[ERROR] can't create %s: %v", fileName, err) - return - } - defer vf.Close() - // nolint - vf.Write([]byte("terraform.tfvars\n")) -} - -func (ic *importContext) generateTfvars() error { - // TODO: make it incremental as well... - if len(ic.tfvars) == 0 { - return nil - } - f := hclwrite.NewEmptyFile() - body := f.Body() - fileName := fmt.Sprintf("%s/terraform.tfvars", ic.Directory) - - vf, err := os.Create(fileName) - if err != nil { - return err - } - defer vf.Close() - - for k, v := range ic.tfvars { - body.SetAttributeValue(k, cty.StringVal(v)) - } - // nolint - vf.Write(f.Bytes()) - log.Printf("[INFO] Written %d tfvars", len(ic.tfvars)) - - ic.generateGitIgnore() - - return nil -} - -func (ic *importContext) generateVariables() error { - if len(ic.variables) == 0 { - return nil - } - f := hclwrite.NewEmptyFile() - body := f.Body() - fileName := fmt.Sprintf("%s/vars.tf", ic.Directory) - if ic.incremental { - content, err := os.ReadFile(fileName) - if err == nil { - ftmp, diags := hclwrite.ParseConfig(content, fileName, hcl.Pos{Line: 1, Column: 1}) - if diags.HasErrors() { - log.Printf("[ERROR] parsing of existing file failed: %s", diags) - } else { - tbody := ftmp.Body() - for _, block := range tbody.Blocks() { - typ := block.Type() - labels := block.Labels() - log.Printf("[DEBUG] blockBody: %v %v\n", typ, labels) - _, present := ic.variables[labels[0]] - if typ == "variable" && present { - log.Printf("[DEBUG] Ignoring variable '%s' that will be re-exported", labels[0]) - } else { - log.Printf("[DEBUG] Adding not exported object. type='%s', labels=%v", typ, labels) - body.AppendBlock(block) - } - } - } - } else { - log.Printf("[ERROR] opening file %s", fileName) - } - } - vf, err := os.Create(fileName) - if err != nil { - return err - } - defer vf.Close() - - for k, v := range ic.variables { - b := body.AppendNewBlock("variable", []string{k}).Body() - b.SetAttributeValue("description", cty.StringVal(v)) - } - // nolint - vf.Write(f.Bytes()) - log.Printf("[INFO] Written %d variables", len(ic.variables)) - return nil -} - -func (ic *importContext) MatchesName(n string) bool { - if ic.match == "" { - return true - } - return strings.Contains(strings.ToLower(n), strings.ToLower(ic.match)) -} - -func genTraversalTokens(sr *resourceApproximation, pick string) hcl.Traversal { - if sr.Mode == "data" { - return hcl.Traversal{ - hcl.TraverseRoot{Name: "data"}, - hcl.TraverseAttr{Name: sr.Type}, - hcl.TraverseAttr{Name: sr.Name}, - hcl.TraverseAttr{Name: pick}, - } - } - return hcl.Traversal{ - hcl.TraverseRoot{Name: sr.Type}, - hcl.TraverseAttr{Name: sr.Name}, - hcl.TraverseAttr{Name: pick}, - } -} - -func (ic *importContext) isIgnoredResourceApproximation(ra *resourceApproximation) bool { - var ignored bool - if ra != nil && ra.Resource != nil { - ignoreFunc := ic.Importables[ra.Type].Ignore - if ignoreFunc != nil && ignoreFunc(ic, ra.Resource) { - log.Printf("[WARN] Found reference to the ignored resource %s: %s", ra.Type, ra.Name) - return true - } - } - return ignored -} - -func (ic *importContext) Find(value, attr string, ref reference, origResource *resource, origPath string) (string, hcl.Traversal, bool) { - log.Printf("[DEBUG] Starting searching for reference for resource %s, attr='%s', value='%s', ref=%v", - ref.Resource, attr, value, ref) - // optimize performance by avoiding doing regexp matching multiple times - matchValue := "" - switch ref.MatchType { - case MatchRegexp: - if ref.Regexp == nil { - log.Printf("[WARN] you must provide regular expression for 'regexp' match type") - return "", nil, false - } - res := ref.Regexp.FindStringSubmatch(value) - if len(res) < 2 { - log.Printf("[WARN] no match for regexp: %v in string %s", ref.Regexp, value) - return "", nil, false - } - matchValue = res[1] - case MatchCaseInsensitive: - matchValue = strings.ToLower(value) // performance optimization to avoid doing it in the loop - case MatchExact, MatchDefault: - matchValue = value - case MatchPrefix, MatchLongestPrefix: - if ref.MatchValueTransformFunc != nil { - matchValue = ref.MatchValueTransformFunc(value) - } else { - matchValue = value - } - } - // doing explicit lookup in the state. For case insensitive matches, first attempt to lookup for the value, - // and do iteration if it's not found - if (ref.MatchType == MatchExact || ref.MatchType == MatchDefault || ref.MatchType == MatchRegexp || - ref.MatchType == MatchCaseInsensitive) && !ref.SkipDirectLookup { - sr := ic.State.Get(ref.Resource, attr, matchValue) - if sr != nil && (ref.IsValidApproximation == nil || ref.IsValidApproximation(ic, origResource, sr, origPath)) && - !ic.isIgnoredResourceApproximation(sr) { - log.Printf("[DEBUG] Finished direct lookup for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", - ref.Resource, attr, value, ref, sr.Type, sr.Name) - return matchValue, genTraversalTokens(sr, attr), sr.Mode == "data" - } - if ref.MatchType != MatchCaseInsensitive { // for case-insensitive matching we'll try iteration - log.Printf("[DEBUG] Finished direct lookup for reference for resource %s, attr='%s', value='%s', ref=%v. Not found", - ref.Resource, attr, value, ref) - return "", nil, false - } - } else if ref.MatchType == MatchLongestPrefix && ref.ExtraLookupKey != "" { - extraKeyValue, exists := origResource.GetExtraData(ref.ExtraLookupKey) - if exists && extraKeyValue.(string) != "" { - sr := ic.State.Get(ref.Resource, attr, extraKeyValue.(string)) - if sr != nil && (ref.IsValidApproximation == nil || ref.IsValidApproximation(ic, origResource, sr, origPath)) && - !ic.isIgnoredResourceApproximation(sr) { - log.Printf("[DEBUG] Finished direct lookup by key %s for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", - ref.ExtraLookupKey, ref.Resource, attr, value, ref, sr.Type, sr.Name) - return extraKeyValue.(string), genTraversalTokens(sr, attr), sr.Mode == "data" - } - } - } - - maxPrefixLen := 0 - maxPrefixOrigValue := "" - var maxPrefixResource *resourceApproximation - srs := *ic.State.Resources(ref.Resource) - for _, sr := range srs { - for _, i := range sr.Instances { - v := i.Attributes[attr] - if v == nil { - log.Printf("[WARN] Can't find instance attribute '%v' in resource: '%v'", attr, ref.Resource) - continue - } - strValue := v.(string) - origValue := strValue - if ref.SearchValueTransformFunc != nil { - strValue = ref.SearchValueTransformFunc(strValue) - log.Printf("[TRACE] Resource %s. Transformed value from '%s' to '%s'", ref.Resource, origValue, strValue) - } - matched := false - switch ref.MatchType { - case MatchCaseInsensitive: - matched = (strings.ToLower(strValue) == matchValue) - case MatchPrefix: - matched = strings.HasPrefix(matchValue, strValue) - case MatchLongestPrefix: - if strings.HasPrefix(matchValue, strValue) && len(origValue) > maxPrefixLen && !ic.isIgnoredResourceApproximation(sr) { - maxPrefixLen = len(origValue) - maxPrefixOrigValue = origValue - maxPrefixResource = sr - } - case MatchExact, MatchDefault: - matched = (strValue == matchValue) - default: - log.Printf("[WARN] Unsupported match type: %s", ref.MatchType) - } - if !matched || (ref.IsValidApproximation != nil && !ref.IsValidApproximation(ic, origResource, sr, origPath)) || - ic.isIgnoredResourceApproximation(sr) { - continue - } - log.Printf("[DEBUG] Finished searching for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", - ref.Resource, attr, value, ref, sr.Type, sr.Name) - return origValue, genTraversalTokens(sr, attr), sr.Mode == "data" - } - } - if ref.MatchType == MatchLongestPrefix && maxPrefixResource != nil && - (ref.IsValidApproximation == nil || ref.IsValidApproximation(ic, origResource, maxPrefixResource, origPath)) && - !ic.isIgnoredResourceApproximation(maxPrefixResource) { - log.Printf("[DEBUG] Finished searching longest prefix for reference for resource %s, attr='%s', value='%s', ref=%v. Found: type=%s name=%s", - ref.Resource, attr, value, ref, maxPrefixResource.Type, maxPrefixResource.Name) - return maxPrefixOrigValue, genTraversalTokens(maxPrefixResource, attr), maxPrefixResource.Mode == "data" - } - log.Printf("[DEBUG] Finished searching for reference for resource %s, pick=%s, ref=%v. Not found", ref.Resource, attr, ref) - return "", nil, false -} - // This function checks if resource exist in any state (already added or in process of addition) func (ic *importContext) Has(r *resource) bool { return ic.HasInState(r, false) @@ -1412,16 +664,6 @@ func (ic *importContext) ResourceName(r *resource) string { return name } -func (ic *importContext) isServiceEnabled(service string) bool { - _, exists := ic.services[service] - return exists -} - -func (ic *importContext) isServiceInListing(service string) bool { - _, exists := ic.listing[service] - return exists -} - func (ic *importContext) EmitIfUpdatedAfterMillis(r *resource, modifiedAt int64, message string) { updatedSinceMs := ic.getUpdatedSinceMs() if ic.incremental && modifiedAt < updatedSinceMs { @@ -1500,315 +742,3 @@ func (ic *importContext) Emit(r *resource) { ic.defaultChannel <- r } } - -func maybeAddQuoteCharacter(s string) string { - s = strings.ReplaceAll(s, "\\", "\\\\") - s = strings.ReplaceAll(s, "\"", "\\\"") - return s -} - -func (ic *importContext) getTraversalTokens(ref reference, value string, origResource *resource, origPath string) (hclwrite.Tokens, bool) { - matchType := ref.MatchTypeValue() - attr := ref.MatchAttribute() - attrValue, traversal, isData := ic.Find(value, attr, ref, origResource, origPath) - // at least one invocation of ic.Find will assign Nil to traversal if resource with value is not found - if traversal == nil { - return nil, isData - } - // capture if it's data? - switch matchType { - case MatchExact, MatchDefault, MatchCaseInsensitive: - return hclwrite.TokensForTraversal(traversal), isData - case MatchPrefix, MatchLongestPrefix: - rest := value[len(attrValue):] - tokens := hclwrite.Tokens{&hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'"', '$', '{'}}} - tokens = append(tokens, hclwrite.TokensForTraversal(traversal)...) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'}'}}) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(maybeAddQuoteCharacter(rest))}) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'"'}}) - return tokens, isData - case MatchRegexp: - indices := ref.Regexp.FindStringSubmatchIndex(value) - if len(indices) == 4 { - tokens := hclwrite.Tokens{&hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'"'}}} - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(maybeAddQuoteCharacter(value[0:indices[2]]))}) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'$', '{'}}) - tokens = append(tokens, hclwrite.TokensForTraversal(traversal)...) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'}'}}) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(maybeAddQuoteCharacter(value[indices[3]:]))}) - tokens = append(tokens, &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'"'}}) - return tokens, isData - } - log.Printf("[WARN] Can't match found data in '%s'. Indices: %v", value, indices) - default: - log.Printf("[WARN] Unsupported match type: %s", ref.MatchType) - } - return nil, false -} - -// TODO: move to IC -var dependsRe = regexp.MustCompile(`(\.[\d]+)`) - -func (ic *importContext) generateVariableName(attrName, name string) string { - return fmt.Sprintf("%s_%s", attrName, name) -} - -func (ic *importContext) reference(i importable, path []string, value string, ctyValue cty.Value, origResource *resource) hclwrite.Tokens { - pathString := strings.Join(path, ".") - match := dependsRe.ReplaceAllString(pathString, "") - // get reference candidate, but if it's a `data`, then look for another non-data reference if possible.. - var dataTokens hclwrite.Tokens - for _, d := range i.Depends { - if d.Path != match { - continue - } - if d.File { - relativeFile := fmt.Sprintf("${path.module}/%s", value) - return hclwrite.Tokens{ - &hclwrite.Token{Type: hclsyntax.TokenOQuote, Bytes: []byte{'"'}}, - &hclwrite.Token{Type: hclsyntax.TokenQuotedLit, Bytes: []byte(relativeFile)}, - &hclwrite.Token{Type: hclsyntax.TokenCQuote, Bytes: []byte{'"'}}, - } - } - if d.Variable { - varName := ic.generateVariableName(path[0], value) - return ic.variable(varName, "") - } - - tokens, isData := ic.getTraversalTokens(d, value, origResource, pathString) - if tokens != nil { - if isData { - dataTokens = tokens - log.Printf("[DEBUG] Got reference to data for dependency %v", d) - } else { - return tokens - } - } - } - if len(dataTokens) > 0 { - return dataTokens - } - return hclwrite.TokensForValue(ctyValue) -} - -func (ic *importContext) variable(name, desc string) hclwrite.Tokens { - ic.variablesLock.Lock() - ic.variables[name] = desc - ic.variablesLock.Unlock() - return hclwrite.TokensForTraversal(hcl.Traversal{ - hcl.TraverseRoot{Name: "var"}, - hcl.TraverseAttr{Name: name}, - }) -} - -type fieldTuple struct { - Field string - Schema *schema.Schema -} - -func (ic *importContext) dataToHcl(i importable, path []string, - pr *schema.Resource, res *resource, body *hclwrite.Body) error { - d := res.Data - ss := []fieldTuple{} - for a, as := range pr.Schema { - ss = append(ss, fieldTuple{a, as}) - } - sort.Slice(ss, func(i, j int) bool { - // it just happens that reverse field order - // makes the most beautiful configs - return ss[i].Field > ss[j].Field - }) - var_cnt := 0 - for _, tuple := range ss { - a, as := tuple.Field, tuple.Schema - pathString := strings.Join(append(path, a), ".") - raw, nonZero := d.GetOk(pathString) - // log.Printf("[DEBUG] path=%s, raw='%v'", pathString, raw) - if i.ShouldOmitField == nil { // we don't have custom function, so skip computed & default fields - if defaultShouldOmitFieldFunc(ic, pathString, as, d) { - continue - } - } else if i.ShouldOmitField(ic, pathString, as, d) { - continue - } - mpath := dependsRe.ReplaceAllString(pathString, "") - for _, ref := range i.Depends { - if ref.Path == mpath && ref.Variable { - // sensitive fields are moved to variable depends, variable name is normalized - // TODO: handle a case when we have multiple blocks, so names won't be unique - raw = ic.regexFix(ic.ResourceName(res), simpleNameFixes) - if var_cnt > 0 { - raw = fmt.Sprintf("%s_%d", raw, var_cnt) - } - nonZero = true - var_cnt++ - } - } - shouldSkip := !nonZero - if as.Required { // for required fields we must produce a value, even empty... - shouldSkip = false - } else if as.Default != nil && !reflect.DeepEqual(raw, as.Default) { - // In case when have zero value, but there is non-zero default, we also need to produce it - shouldSkip = false - } - if shouldSkip && (i.ShouldGenerateField == nil || !i.ShouldGenerateField(ic, pathString, as, d)) { - continue - } - switch as.Type { - case schema.TypeString: - value := raw.(string) - tokens := ic.reference(i, append(path, a), value, cty.StringVal(value), res) - body.SetAttributeRaw(a, tokens) - case schema.TypeBool: - body.SetAttributeValue(a, cty.BoolVal(raw.(bool))) - case schema.TypeInt: - var num int64 - switch iv := raw.(type) { - case int: - num = int64(iv) - case int32: - num = int64(iv) - case int64: - num = iv - } - body.SetAttributeRaw(a, ic.reference(i, append(path, a), - strconv.FormatInt(num, 10), cty.NumberIntVal(num), res)) - case schema.TypeFloat: - body.SetAttributeValue(a, cty.NumberFloatVal(raw.(float64))) - case schema.TypeMap: - // TODO: Resolve references in maps as well, and also support different types inside map... - ov := map[string]cty.Value{} - for key, iv := range raw.(map[string]any) { - v := cty.StringVal(fmt.Sprintf("%v", iv)) - ov[key] = v - } - body.SetAttributeValue(a, cty.ObjectVal(ov)) - case schema.TypeSet: - if rawSet, ok := raw.(*schema.Set); ok { - rawList := rawSet.List() - err := ic.readListFromData(i, append(path, a), res, rawList, body, as, func(i int) string { - return strconv.Itoa(rawSet.F(rawList[i])) - }) - if err != nil { - return err - } - } - case schema.TypeList: - if rawList, ok := raw.([]any); ok { - err := ic.readListFromData(i, append(path, a), res, rawList, body, as, strconv.Itoa) - if err != nil { - return err - } - } - default: - return fmt.Errorf("unsupported schema type: %v", path) - } - } - // Generate `depends_on` only for top-level resource because `dataToHcl` is called recursively - if len(path) == 0 && len(res.DependsOn) > 0 { - notIgnoredResources := []*resource{} - for _, dr := range res.DependsOn { - dr := dr - if dr.Data == nil { - tdr := ic.Scope.FindById(dr.Resource, dr.ID) - if tdr == nil { - log.Printf("[WARN] can't find resource %s in scope", dr) - continue - } - dr = tdr - } - if ic.Importables[dr.Resource].Ignore == nil || !ic.Importables[dr.Resource].Ignore(ic, dr) { - found := false - for _, v := range notIgnoredResources { - if v.ID == dr.ID && v.Resource == dr.Resource { - found = true - break - } - } - if !found { - notIgnoredResources = append(notIgnoredResources, dr) - } - } - } - if len(notIgnoredResources) > 0 { - toks := hclwrite.Tokens{} - toks = append(toks, &hclwrite.Token{ - Type: hclsyntax.TokenOBrack, - Bytes: []byte{'['}, - }) - for i, dr := range notIgnoredResources { - if i > 0 { - toks = append(toks, &hclwrite.Token{ - Type: hclsyntax.TokenComma, - Bytes: []byte{','}, - }) - } - toks = append(toks, hclwrite.TokensForTraversal(hcl.Traversal{ - hcl.TraverseRoot{Name: dr.Resource}, - hcl.TraverseAttr{Name: ic.ResourceName(dr)}, - })...) - } - toks = append(toks, &hclwrite.Token{ - Type: hclsyntax.TokenCBrack, - Bytes: []byte{']'}, - }) - body.SetAttributeRaw("depends_on", toks) - } - } - return nil -} - -func (ic *importContext) readListFromData(i importable, path []string, res *resource, - rawList []any, body *hclwrite.Body, as *schema.Schema, offsetConverter func(i int) string) error { - if len(rawList) == 0 { - return nil - } - name := path[len(path)-1] - switch elem := as.Elem.(type) { - case *schema.Resource: - if as.MaxItems == 1 { - nestedPath := append(path, offsetConverter(0)) - confBlock := body.AppendNewBlock(name, []string{}) - return ic.dataToHcl(i, nestedPath, elem, res, confBlock.Body()) - } - for offset := range rawList { - confBlock := body.AppendNewBlock(name, []string{}) - nestedPath := append(path, offsetConverter(offset)) - err := ic.dataToHcl(i, nestedPath, elem, res, confBlock.Body()) - if err != nil { - return err - } - } - case *schema.Schema: - toks := hclwrite.Tokens{} - toks = append(toks, &hclwrite.Token{ - Type: hclsyntax.TokenOBrack, - Bytes: []byte{'['}, - }) - for _, raw := range rawList { - if len(toks) != 1 { - toks = append(toks, &hclwrite.Token{ - Type: hclsyntax.TokenComma, - Bytes: []byte{','}, - }) - } - switch x := raw.(type) { - case string: - value := raw.(string) - toks = append(toks, ic.reference(i, path, value, cty.StringVal(value), res)...) - case int: - // probably we don't even use integer lists?... - toks = append(toks, hclwrite.TokensForValue( - cty.NumberIntVal(int64(x)))...) - default: - return fmt.Errorf("unsupported primitive list: %#v", path) - } - } - toks = append(toks, &hclwrite.Token{ - Type: hclsyntax.TokenCBrack, - Bytes: []byte{']'}, - }) - body.SetAttributeRaw(name, toks) - } - return nil -} diff --git a/exporter/context_test.go b/exporter/context_test.go index 204cb2c68c..d27e35925e 100644 --- a/exporter/context_test.go +++ b/exporter/context_test.go @@ -378,7 +378,7 @@ func TestGenerateResourceIdForWsObject(t *testing.T) { Importables: resourcesMap, Resources: p.ResourcesMap, } - rid, rtype := ic.generateResourceIdForWsObject(workspace.ObjectStatus{ + rid, rtype := ic.generateResourceIdForWorkspaceObject(workspace.ObjectStatus{ ObjectID: 123, Path: "Test", ObjectType: "Unknown", @@ -386,7 +386,7 @@ func TestGenerateResourceIdForWsObject(t *testing.T) { assert.Empty(t, rid) assert.Empty(t, rtype) - rid, rtype = ic.generateResourceIdForWsObject(workspace.ObjectStatus{ + rid, rtype = ic.generateResourceIdForWorkspaceObject(workspace.ObjectStatus{ ObjectID: 123, Path: "/Users/user@domain.com/TestDir", ObjectType: workspace.Directory, @@ -394,7 +394,7 @@ func TestGenerateResourceIdForWsObject(t *testing.T) { assert.Equal(t, "databricks_directory.users_user_domain_com_testdir_123", rid) assert.Equal(t, "databricks_directory", rtype) - rid, rtype = ic.generateResourceIdForWsObject(workspace.ObjectStatus{ + rid, rtype = ic.generateResourceIdForWorkspaceObject(workspace.ObjectStatus{ ObjectID: 123, Path: "/Users/user@domain.com/Test File", ObjectType: workspace.File, @@ -402,7 +402,7 @@ func TestGenerateResourceIdForWsObject(t *testing.T) { assert.Equal(t, "databricks_workspace_file.users_user_domain_com_test_file_123", rid) assert.Equal(t, "databricks_workspace_file", rtype) - rid, rtype = ic.generateResourceIdForWsObject(workspace.ObjectStatus{ + rid, rtype = ic.generateResourceIdForWorkspaceObject(workspace.ObjectStatus{ ObjectID: 123, Path: "/Users/user@domain.com/Test Notebook", ObjectType: workspace.Notebook, diff --git a/exporter/util.go b/exporter/util.go index 6bd91871ef..4562e14405 100644 --- a/exporter/util.go +++ b/exporter/util.go @@ -8,49 +8,37 @@ import ( "os" "path" "reflect" - "regexp" "strconv" "strings" - "sync" - "sync/atomic" "time" "github.com/databricks/terraform-provider-databricks/aws" "github.com/databricks/terraform-provider-databricks/clusters" "github.com/databricks/terraform-provider-databricks/common" - "github.com/databricks/terraform-provider-databricks/jobs" - "github.com/databricks/terraform-provider-databricks/scim" "github.com/databricks/terraform-provider-databricks/storage" - "github.com/databricks/terraform-provider-databricks/workspace" "github.com/databricks/databricks-sdk-go/service/catalog" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/iam" - sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs" - - "golang.org/x/exp/slices" "github.com/hashicorp/hcl/v2/hclwrite" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" ) -func (ic *importContext) emitInitScripts(initScripts []compute.InitScriptInfo) { - for _, is := range initScripts { - if is.Dbfs != nil { - ic.Emit(&resource{ - Resource: "databricks_dbfs_file", - ID: is.Dbfs.Destination, - }) - } - if is.Workspace != nil { - ic.emitWorkspaceFileOrRepo(is.Workspace.Destination) - } - if is.Volumes != nil { - // TODO: we should emit allow list for init scripts as well - ic.emitIfVolumeFile(is.Volumes.Destination) - } +func (ic *importContext) isServiceEnabled(service string) bool { + _, exists := ic.services[service] + return exists +} + +func (ic *importContext) isServiceInListing(service string) bool { + _, exists := ic.listing[service] + return exists +} + +func (ic *importContext) MatchesName(n string) bool { + if ic.match == "" { + return true } + return strings.Contains(strings.ToLower(n), strings.ToLower(ic.match)) } func (ic *importContext) emitFilesFromSlice(slice []string) { @@ -69,588 +57,12 @@ func (ic *importContext) emitFilesFromMap(m map[string]string) { } } -func (ic *importContext) importCluster(c *compute.ClusterSpec) { - if c == nil { - return - } - if c.AwsAttributes != nil && c.AwsAttributes.InstanceProfileArn != "" { - ic.Emit(&resource{ - Resource: "databricks_instance_profile", - ID: c.AwsAttributes.InstanceProfileArn, - }) - } - if c.InstancePoolId != "" { - // set enable_elastic_disk to false, and remove aws/gcp/azure_attributes - ic.Emit(&resource{ - Resource: "databricks_instance_pool", - ID: c.InstancePoolId, - }) - } - if c.DriverInstancePoolId != "" { - ic.Emit(&resource{ - Resource: "databricks_instance_pool", - ID: c.DriverInstancePoolId, - }) - } - if c.PolicyId != "" { - ic.Emit(&resource{ - Resource: "databricks_cluster_policy", - ID: c.PolicyId, - }) - } - ic.emitInitScripts(c.InitScripts) - ic.emitSecretsFromSecretsPathMap(c.SparkConf) - ic.emitSecretsFromSecretsPathMap(c.SparkEnvVars) - ic.emitUserOrServicePrincipal(c.SingleUserName) -} - -func (ic *importContext) emitSecretsFromSecretPathString(v string) { - if res := secretPathRegex.FindStringSubmatch(v); res != nil { - ic.Emit(&resource{ - Resource: "databricks_secret_scope", - ID: res[1], - }) - } -} - -func (ic *importContext) emitSecretsFromSecretsPathMap(m map[string]string) { - for _, v := range m { - ic.emitSecretsFromSecretPathString(v) - } -} - -func (ic *importContext) emitListOfUsers(users []string) { - for _, user := range users { - if user != "" { - ic.Emit(&resource{ - Resource: "databricks_user", - Attribute: "user_name", - Value: user, - }) - } - } -} - -func (ic *importContext) emitUserOrServicePrincipal(userOrSPName string) { - if userOrSPName == "" || !ic.isServiceEnabled("users") { - return - } - // Cache check here to avoid emitting - ic.emittedUsersMutex.RLock() - _, exists := ic.emittedUsers[userOrSPName] - ic.emittedUsersMutex.RUnlock() - if exists { - // log.Printf("[DEBUG] user or SP %s already emitted...", userOrSPName) - return - } - if common.StringIsUUID(userOrSPName) { - user, err := ic.findSpnByAppID(userOrSPName, false) - if err != nil { - log.Printf("[ERROR] Can't find SP with application ID %s", userOrSPName) - ic.addIgnoredResource(fmt.Sprintf("databricks_service_principal. application_id=%s", userOrSPName)) - } else { - ic.Emit(&resource{ - Resource: "databricks_service_principal", - ID: user.ID, - }) - } - } else { - user, err := ic.findUserByName(strings.ToLower(userOrSPName), false) - if err != nil { - log.Printf("[ERROR] Can't find user with name %s", userOrSPName) - ic.addIgnoredResource(fmt.Sprintf("databricks_user. user_name=%s", userOrSPName)) - } else { - ic.Emit(&resource{ - Resource: "databricks_user", - ID: user.ID, - }) - } - } - ic.emittedUsersMutex.Lock() - ic.emittedUsers[userOrSPName] = struct{}{} - ic.emittedUsersMutex.Unlock() -} - -func getUserOrSpNameAndDirectory(path, prefix string) (string, string) { - if !strings.HasPrefix(path, prefix) { - return "", "" - } - pathLen := len(path) - prefixLen := len(prefix) - searchStart := prefixLen + 1 - if pathLen <= searchStart { - return "", "" - } - pos := strings.Index(path[searchStart:pathLen], "/") - if pos == -1 { // we have only user directory... - return path[searchStart:pathLen], path - } - return path[searchStart : pos+searchStart], path[0 : pos+searchStart] -} - -func (ic *importContext) emitUserOrServicePrincipalForPath(path, prefix string) { - userOrSpName, _ := getUserOrSpNameAndDirectory(path, prefix) - if userOrSpName != "" { - ic.emitUserOrServicePrincipal(userOrSpName) - } -} - -func (ic *importContext) IsUserOrServicePrincipalDirectory(path, prefix string, strict bool) bool { - userOrSPName, userDir := getUserOrSpNameAndDirectory(path, prefix) - if userOrSPName == "" { - return false - } - // strict mode means that it should be exactly user dir, maybe with trailing `/` - if strict && !(len(path) == len(userDir) || (len(path) == len(userDir)+1 && path[len(path)-1] == '/')) { - return false - } - ic.userOrSpDirectoriesMutex.RLock() - result, exists := ic.userOrSpDirectories[userDir] - ic.userOrSpDirectoriesMutex.RUnlock() - if exists { - // log.Printf("[DEBUG] Directory %s already checked. Result=%v", userDir, result) - return result - } - var err error - if common.StringIsUUID(userOrSPName) { - _, err = ic.findSpnByAppID(userOrSPName, true) - if err != nil { - ic.addIgnoredResource(fmt.Sprintf("databricks_service_principal. application_id=%s", userOrSPName)) - } - } else { - _, err = ic.findUserByName(strings.ToLower(userOrSPName), true) - if err != nil { - ic.addIgnoredResource(fmt.Sprintf("databricks_user. user_name=%s", userOrSPName)) - } - } - ic.userOrSpDirectoriesMutex.Lock() - ic.userOrSpDirectories[userDir] = (err == nil) - ic.userOrSpDirectoriesMutex.Unlock() - return err == nil -} - -func (ic *importContext) emitRepoByPath(path string) { - // Path to Repos objects consits of following parts: /Repos, folder, repository, path inside Repo. - // Because it starts with `/`, it will produce empty string as first element in the slice. - // And we're stopping splitting to avoid producing too many not necessary parts, so we have 5 parts only. - parts := strings.SplitN(path, "/", 5) - if len(parts) >= 4 { - ic.Emit(&resource{ - Resource: "databricks_repo", - Attribute: "path", - Value: strings.Join(parts[:4], "/"), - }) - } else { - log.Printf("[WARN] Incorrect Repos path") - } -} - -func (ic *importContext) emitWorkspaceFileOrRepo(path string) { - if strings.HasPrefix(path, "/Repos") { - ic.emitRepoByPath(path) - } else { - // TODO: wrap this into ic.shouldEmit... - // TODO: strip /Workspace prefix if it's provided - ic.Emit(&resource{ - Resource: "databricks_workspace_file", - ID: path, - }) - } -} - -func (ic *importContext) emitNotebookOrRepo(path string) { - if strings.HasPrefix(path, "/Repos") { - ic.emitRepoByPath(path) - } else { - // TODO: strip /Workspace prefix if it's provided - ic.maybeEmitWorkspaceObject("databricks_notebook", path, nil) - } -} - -func (ic *importContext) getAllDirectories() []workspace.ObjectStatus { - if len(ic.allDirectories) == 0 { - objects := ic.getAllWorkspaceObjects(nil) - ic.wsObjectsMutex.Lock() - defer ic.wsObjectsMutex.Unlock() - if len(ic.allDirectories) == 0 { - for _, v := range objects { - if v.ObjectType == workspace.Directory { - ic.allDirectories = append(ic.allDirectories, v) - } - } - } - } - return ic.allDirectories -} - -// TODO: Ignore databricks_automl as well? -var directoriesToIgnore = []string{".ide", ".bundle", "__pycache__"} - -// TODO: add ignoring directories of deleted users? This could potentially decrease the number of processed objects... -func excludeAuxiliaryDirectories(v workspace.ObjectStatus) bool { - if v.ObjectType != workspace.Directory { - return true - } - // TODO: rewrite to use suffix check, etc., instead of split and slice contains? - parts := strings.Split(v.Path, "/") - result := len(parts) > 1 && slices.Contains[[]string, string](directoriesToIgnore, parts[len(parts)-1]) - if result { - log.Printf("[DEBUG] Ignoring directory %s", v.Path) - } - return !result -} - -func (ic *importContext) getAllWorkspaceObjects(visitor func([]workspace.ObjectStatus)) []workspace.ObjectStatus { - ic.wsObjectsMutex.Lock() - defer ic.wsObjectsMutex.Unlock() - if len(ic.allWorkspaceObjects) == 0 { - t1 := time.Now() - log.Print("[INFO] Starting to list all workspace objects") - notebooksAPI := workspace.NewNotebooksAPI(ic.Context, ic.Client) - ic.allWorkspaceObjects, _ = ListParallel(notebooksAPI, "/", excludeAuxiliaryDirectories, visitor) - log.Printf("[INFO] Finished listing of all workspace objects. %d objects in total. %v seconds", - len(ic.allWorkspaceObjects), time.Since(t1).Seconds()) - } - return ic.allWorkspaceObjects -} - -func (ic *importContext) emitGroups(u scim.User) { - for _, g := range u.Groups { - if g.Type != "direct" { - log.Printf("[DEBUG] Skipping non-direct group %s/%s for %s", g.Value, g.Display, u.DisplayName) - continue - } - ic.Emit(&resource{ - Resource: "databricks_group", - ID: g.Value, - }) - id := fmt.Sprintf("%s|%s", g.Value, u.ID) - ic.Emit(&resource{ - Resource: "databricks_group_member", - ID: id, - Name: fmt.Sprintf("%s_%s_%s_%s", g.Display, g.Value, u.DisplayName, u.ID), - Data: ic.makeGroupMemberData(id, g.Value, u.ID), - }) - } -} - -func (ic *importContext) emitRoles(objType string, id string, roles []scim.ComplexValue) { - log.Printf("[DEBUG] emitting roles for object type: %s, ID: %s, roles: %v", objType, id, roles) - for _, role := range roles { - if role.Type != "direct" { - continue - } - if !ic.accountLevel { - ic.Emit(&resource{ - Resource: "databricks_instance_profile", - ID: role.Value, - }) - } - ic.Emit(&resource{ - Resource: fmt.Sprintf("databricks_%s_role", objType), - ID: fmt.Sprintf("%s|%s", id, role.Value), - }) - } -} - -func (ic *importContext) emitLibraries(libs []compute.Library) { - for _, lib := range libs { - // Files on DBFS - ic.emitIfDbfsFile(lib.Whl) - ic.emitIfDbfsFile(lib.Jar) - ic.emitIfDbfsFile(lib.Egg) - // Files on WSFS - ic.emitIfWsfsFile(lib.Whl) - ic.emitIfWsfsFile(lib.Jar) - ic.emitIfWsfsFile(lib.Egg) - ic.emitIfWsfsFile(lib.Requirements) - // Files on UC Volumes - ic.emitIfVolumeFile(lib.Whl) - // TODO: we should emit UC allow list as well - ic.emitIfVolumeFile(lib.Jar) - ic.emitIfVolumeFile(lib.Requirements) - } -} - -func (ic *importContext) importLibraries(d *schema.ResourceData, s map[string]*schema.Schema) error { - var cll compute.InstallLibraries - common.DataToStructPointer(d, s, &cll) - ic.emitLibraries(cll.Libraries) - return nil -} - -func (ic *importContext) importClusterLibraries(d *schema.ResourceData, s map[string]*schema.Schema) error { - libraries := ic.workspaceClient.Libraries - cll, err := libraries.ClusterStatusByClusterId(ic.Context, d.Id()) - if err != nil { - return err - } - for _, lib := range cll.LibraryStatuses { - ic.emitIfDbfsFile(lib.Library.Egg) - ic.emitIfDbfsFile(lib.Library.Jar) - ic.emitIfDbfsFile(lib.Library.Whl) - // Files on UC Volumes - ic.emitIfVolumeFile(lib.Library.Whl) - ic.emitIfVolumeFile(lib.Library.Jar) - // Files on WSFS - ic.emitIfWsfsFile(lib.Library.Whl) - ic.emitIfWsfsFile(lib.Library.Jar) - } - return nil -} - -func (ic *importContext) cacheGroups() error { - ic.groupsMutex.Lock() - defer ic.groupsMutex.Unlock() - if ic.allGroups == nil { - log.Printf("[INFO] Caching groups in memory ...") - var groups *[]iam.Group - var err error - err = runWithRetries(func() error { - var grps []iam.Group - var err error - if ic.accountLevel { - grps, err = ic.accountClient.Groups.ListAll(ic.Context, iam.ListAccountGroupsRequest{ - Attributes: "id", - }) - } else { - grps, err = ic.workspaceClient.Groups.ListAll(ic.Context, iam.ListGroupsRequest{ - Attributes: "id", - }) - } - if err != nil { - return err - } - groups = &grps - return nil - }, "error fetching full list of groups") - if err != nil { - log.Printf("[ERROR] can't fetch list of groups. Error: %v", err) - return err - } - api := scim.NewGroupsAPI(ic.Context, ic.Client) - groupsCount := len(*groups) - ic.allGroups = make([]scim.Group, 0, groupsCount) - for i, g := range *groups { - err = runWithRetries(func() error { - group, err := api.Read(g.Id, "id,displayName,active,externalId,entitlements,groups,roles,members") - if err != nil { - return err - } - ic.allGroups = append(ic.allGroups, group) - return nil - }, "error reading group with ID "+g.Id) - if err != nil { - log.Printf("[ERROR] Error reading group with ID %s: %v", g.Id, err) - continue - } - if (i+1)%10 == 0 { - log.Printf("[DEBUG] Read %d out of %d groups", i+1, groupsCount) - } - } - log.Printf("[INFO] Cached %d groups", len(ic.allGroups)) - } - return nil -} - func (ic *importContext) addIgnoredResource(msg string) { ic.ignoredResourcesMutex.Lock() defer ic.ignoredResourcesMutex.Unlock() ic.ignoredResources[msg] = struct{}{} } -const ( - nonExistingUserOrSp = "__USER_OR_SPN_DOES_NOT_EXIST__" -) - -func (ic *importContext) getUsersMapping() { - ic.allUsersMutex.RLocker().Lock() - userMapping := ic.allUsersMapping - ic.allUsersMutex.RLocker().Unlock() - if userMapping == nil { - ic.allUsersMutex.Lock() - defer ic.allUsersMutex.Unlock() - if ic.allUsersMapping != nil { - return - } - ic.allUsersMapping = make(map[string]string) - err := runWithRetries(func() error { - var users []iam.User - var err error - if ic.accountLevel { - users, err = ic.accountClient.Users.ListAll(ic.Context, iam.ListAccountUsersRequest{ - Attributes: "id,userName", - }) - } else { - users, err = ic.workspaceClient.Users.ListAll(ic.Context, iam.ListUsersRequest{ - Attributes: "id,userName", - }) - } - if err != nil { - return err - } - for _, user := range users { - ic.allUsersMapping[user.UserName] = user.Id - } - log.Printf("[DEBUG] users are copied") - return nil - }, "error fetching full list of users") - if err != nil { - log.Fatalf("[ERROR] can't fetch list of users after few retries: error=%v", err) - } - } -} - -func (ic *importContext) findUserByName(name string, fastCheck bool) (u *scim.User, err error) { - log.Printf("[DEBUG] Looking for user %s", name) - ic.usersMutex.RLocker().Lock() - user, exists := ic.allUsers[name] - ic.usersMutex.RLocker().Unlock() - if exists { - if user.UserName == nonExistingUserOrSp { - log.Printf("[DEBUG] non-existing user %s is found in the cache", name) - err = fmt.Errorf("user %s is not found", name) - } else { - log.Printf("[DEBUG] existing user %s is found in the cache", name) - u = &user - } - return - } - ic.getUsersMapping() - ic.allUsersMutex.RLocker().Lock() - userId, exists := ic.allUsersMapping[name] - ic.allUsersMutex.RLocker().Unlock() - if !exists { - err = fmt.Errorf("there is no user '%s'", name) - u = &scim.User{UserName: nonExistingUserOrSp} - } else { - if fastCheck { - return &scim.User{UserName: name}, nil - } - a := scim.NewUsersAPI(ic.Context, ic.Client) - err = runWithRetries(func() error { - usr, err := a.Read(userId, "id,userName,displayName,active,externalId,entitlements,groups,roles") - if err != nil { - return err - } - u = &usr - return nil - }, fmt.Sprintf("error reading user with name '%s', user ID: %s", name, userId)) - if err != nil { - log.Printf("[WARN] error reading user with name '%s', user ID: %s", name, userId) - u = &scim.User{UserName: nonExistingUserOrSp} - } - } - ic.usersMutex.Lock() - defer ic.usersMutex.Unlock() - ic.allUsers[name] = *u - return -} - -func (ic *importContext) getSpsMapping() { - ic.spsMutex.Lock() - defer ic.spsMutex.Unlock() - if ic.allSpsMapping == nil { - ic.allSpsMapping = make(map[string]string) - err := runWithRetries(func() error { - var sps []iam.ServicePrincipal - var err error - if ic.accountLevel { - sps, err = ic.accountClient.ServicePrincipals.ListAll(ic.Context, iam.ListAccountServicePrincipalsRequest{ - Attributes: "id,userName", - }) - } else { - sps, err = ic.workspaceClient.ServicePrincipals.ListAll(ic.Context, iam.ListServicePrincipalsRequest{ - Attributes: "id,userName", - }) - } - if err != nil { - return err - } - for _, sp := range sps { - ic.allSpsMapping[sp.ApplicationId] = sp.Id - } - return nil - }, "error fetching full list of service principals") - if err != nil { - log.Fatalf("[ERROR] can't fetch list of service principals after few retries: error=%v", err) - } - } -} - -func (ic *importContext) getBuiltinPolicyFamilies() map[string]compute.PolicyFamily { - ic.builtInPoliciesMutex.Lock() - defer ic.builtInPoliciesMutex.Unlock() - if ic.builtInPolicies == nil { - if !ic.accountLevel { - log.Printf("[DEBUG] Going to initialize ic.builtInPolicies. Getting policy families...") - families, err := ic.workspaceClient.PolicyFamilies.ListAll(ic.Context, compute.ListPolicyFamiliesRequest{}) - log.Printf("[DEBUG] Going to initialize ic.builtInPolicies. Getting policy families...") - if err == nil { - ic.builtInPolicies = make(map[string]compute.PolicyFamily, len(families)) - for _, f := range families { - f2 := f - ic.builtInPolicies[f2.PolicyFamilyId] = f2 - } - } else { - log.Printf("[ERROR] Can't fetch cluster policy families: %v", err) - ic.builtInPolicies = map[string]compute.PolicyFamily{} - } - } else { - log.Print("[WARN] Can't list cluster policy families on account level") - ic.builtInPolicies = map[string]compute.PolicyFamily{} - } - } - return ic.builtInPolicies -} - -func (ic *importContext) findSpnByAppID(applicationID string, fastCheck bool) (u *scim.User, err error) { - log.Printf("[DEBUG] Looking for SP %s", applicationID) - ic.spsMutex.RLocker().Lock() - sp, exists := ic.allSps[applicationID] - ic.spsMutex.RLocker().Unlock() - if exists { - if sp.ApplicationID == nonExistingUserOrSp { - log.Printf("[DEBUG] non-existing SP %s is found in the cache", applicationID) - err = fmt.Errorf("service principal %s is not found", applicationID) - } else { - log.Printf("[DEBUG] existing SP %s is found in the cache", applicationID) - u = &sp - } - return - } - ic.getSpsMapping() - ic.spsMutex.RLocker().Lock() - spId, exists := ic.allSpsMapping[applicationID] - ic.spsMutex.RLocker().Unlock() - if !exists { - err = fmt.Errorf("there is no service principal '%s'", applicationID) - u = &scim.User{ApplicationID: nonExistingUserOrSp} - } else { - if fastCheck { - return &scim.User{ApplicationID: applicationID}, nil - } - a := scim.NewServicePrincipalsAPI(ic.Context, ic.Client) - err = runWithRetries(func() error { - usr, err := a.Read(spId, "userName,displayName,active,externalId,entitlements,groups,roles") - if err != nil { - return err - } - u = &usr - return nil - }, fmt.Sprintf("error reading service principal with AppID '%s', SP ID: %s", applicationID, spId)) - if err != nil { - log.Printf("[WARN] error reading service principal with AppID '%s', SP ID: %s", applicationID, spId) - u = &scim.User{ApplicationID: nonExistingUserOrSp} - } - } - ic.spsMutex.Lock() - defer ic.spsMutex.Unlock() - ic.allSps[applicationID] = *u - - return -} - func (ic *importContext) emitIfDbfsFile(path string) { if strings.HasPrefix(path, "dbfs:") { if strings.HasPrefix(path, "dbfs:/Volumes/") { @@ -865,28 +277,6 @@ func eitherString(a any, b any) string { return "" } -func (ic *importContext) importJobs(l []jobs.Job) { - i := 0 - for offset, job := range l { - if !ic.MatchesName(job.Settings.Name) { - log.Printf("[INFO] Job name %s doesn't match selection %s", job.Settings.Name, ic.match) - continue - } - if job.Settings.Deployment != nil && job.Settings.Deployment.Kind == "BUNDLE" && - job.Settings.EditMode == "UI_LOCKED" { - log.Printf("[INFO] Skipping job '%s' because it's deployed by DABs", job.Settings.Name) - continue - } - ic.Emit(&resource{ - Resource: "databricks_job", - ID: job.ID(), - }) - i++ - log.Printf("[INFO] Scanned %d of total %d jobs", offset+1, len(l)) - } - log.Printf("[INFO] %d of total %d jobs are going to be imported", i, len(l)) -} - func (ic *importContext) createFileIn(dir, name string) (*os.File, string, error) { fileName := ic.prefix + name localFileName := fmt.Sprintf("%s/%s/%s", ic.Directory, dir, fileName) @@ -925,47 +315,6 @@ func defaultShouldOmitFieldFunc(ic *importContext, pathString string, as *schema return false } -func makeShouldOmitFieldForCluster(regex *regexp.Regexp) func(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool { - return func(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool { - prefix := "" - if regex != nil { - if res := regex.FindStringSubmatch(pathString); res != nil { - prefix = res[0] - } else { - return false - } - } - raw := d.Get(pathString) - if raw != nil { - v := reflect.ValueOf(raw) - if as.Optional && v.IsZero() { - return true - } - } - workerInstPoolID := d.Get(prefix + "instance_pool_id").(string) - switch pathString { - case prefix + "node_type_id": - return workerInstPoolID != "" - case prefix + "driver_node_type_id": - driverInstPoolID := d.Get(prefix + "driver_instance_pool_id").(string) - nodeTypeID := d.Get(prefix + "node_type_id").(string) - return workerInstPoolID != "" || driverInstPoolID != "" || raw.(string) == nodeTypeID - case prefix + "driver_instance_pool_id": - return raw.(string) == workerInstPoolID - case prefix + "enable_elastic_disk", prefix + "aws_attributes", prefix + "azure_attributes", prefix + "gcp_attributes": - return workerInstPoolID != "" - case prefix + "enable_local_disk_encryption": - return false - case prefix + "spark_conf": - return fmt.Sprintf("%v", d.Get(prefix+"spark_conf")) == "map[spark.databricks.delta.preview.enabled:true]" - case prefix + "spark_env_vars": - return fmt.Sprintf("%v", d.Get(prefix+"spark_env_vars")) == "map[PYSPARK_PYTHON:/databricks/python3/bin/python3]" - } - - return defaultShouldOmitFieldFunc(ic, pathString, as, d) - } -} - func resourceOrDataBlockBody(ic *importContext, body *hclwrite.Body, r *resource) error { blockType := "resource" if r.Mode == "data" { @@ -980,73 +329,6 @@ func generateUniqueID(v string) string { return fmt.Sprintf("%x", sha1.Sum([]byte(v)))[:10] } -func shouldOmitMd5Field(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool { - if pathString == "md5" { // `md5` is kind of computed, but not declared as it... - return true - } - return defaultShouldOmitFieldFunc(ic, pathString, as, d) -} - -func workspaceObjectResouceName(ic *importContext, d *schema.ResourceData) string { - name := d.Get("path").(string) - if name == "" { - return d.Id() - } else { - name = nameNormalizationRegex.ReplaceAllString(name[1:], "_") + "_" + - strconv.FormatInt(int64(d.Get("object_id").(int)), 10) - } - return name -} - -func wsObjectGetModifiedAt(obs workspace.ObjectStatus) int64 { - if obs.ModifiedAtInteractive != nil && obs.ModifiedAtInteractive.TimeMillis != 0 { - return obs.ModifiedAtInteractive.TimeMillis - } - return obs.ModifiedAt -} - -func (ic *importContext) shouldEmitForPath(path string) bool { - if !ic.exportDeletedUsersAssets && strings.HasPrefix(path, "/Users/") { - return ic.IsUserOrServicePrincipalDirectory(path, "/Users", false) - } - return true -} - -func (ic *importContext) maybeEmitWorkspaceObject(resourceType, path string, obj *workspace.ObjectStatus) { - if ic.shouldEmitForPath(path) { - var data *schema.ResourceData - if obj != nil { - switch resourceType { - case "databricks_notebook": - data = workspace.ResourceNotebook().ToResource().TestResourceData() - case "databricks_workspace_file": - data = workspace.ResourceWorkspaceFile().ToResource().TestResourceData() - case "databricks_directory": - data = workspace.ResourceDirectory().ToResource().TestResourceData() - } - if data != nil { - scm := ic.Resources[resourceType].Schema - data.MarkNewResource() - data.SetId(path) - err := common.StructToData(obj, scm, data) - if err != nil { - log.Printf("[ERROR] can't convert %s object to data: %v. obj=%v", resourceType, err, obj) - data = nil - } - } - } - ic.Emit(&resource{ - Resource: resourceType, - ID: path, - Data: data, - Incremental: ic.incremental, - }) - } else { - log.Printf("[WARN] Not emitting a workspace object %s for deleted user. Path='%s'", resourceType, path) - ic.addIgnoredResource(fmt.Sprintf("%s. path=%s", resourceType, path)) - } -} - func (ic *importContext) enableServices(services string) { ic.services = map[string]struct{}{} for _, s := range strings.Split(services, ",") { @@ -1079,114 +361,6 @@ func (ic *importContext) emitSqlParentDirectory(parent string) { } } -func (ic *importContext) shouldSkipWorkspaceObject(object workspace.ObjectStatus, updatedSinceMs int64) bool { - if ic.incremental && object.ObjectType == workspace.Directory { - return true - } - if !(object.ObjectType == workspace.Notebook || object.ObjectType == workspace.File) || - strings.HasPrefix(object.Path, "/Repos") { - // log.Printf("[DEBUG] Skipping unsupported entry %v", object) - return true - } - if res := ignoreIdeFolderRegex.FindStringSubmatch(object.Path); res != nil { - return true - } - modifiedAt := wsObjectGetModifiedAt(object) - if ic.incremental && modifiedAt < updatedSinceMs { - p := ic.oldWorkspaceObjectMapping[object.ObjectID] - if p == "" || p == object.Path { - log.Printf("[DEBUG] skipping '%s' that was modified at %d (last active=%d)", - object.Path, modifiedAt, updatedSinceMs) - return true - } - log.Printf("[DEBUG] Different path for object %d. Old='%s', New='%s'", object.ObjectID, p, object.Path) - } - if !ic.MatchesName(object.Path) { - return true - } - return false -} - -func emitWorkpaceObject(ic *importContext, object workspace.ObjectStatus) { - // check the size of the default channel, and add delays if it has less than %20 capacity left. - // In this case we won't need to have increase the size of the default channel to extended capacity. - defChannelSize := len(ic.defaultChannel) - if float64(defChannelSize) > float64(ic.defaultHanlerChannelSize)*0.8 { - log.Printf("[DEBUG] waiting a bit before emitting a resource because default channel is 80%% full (%d): %v", - defChannelSize, object) - time.Sleep(1 * time.Second) - } - switch object.ObjectType { - case workspace.Notebook: - ic.maybeEmitWorkspaceObject("databricks_notebook", object.Path, &object) - case workspace.File: - ic.maybeEmitWorkspaceObject("databricks_workspace_file", object.Path, &object) - case workspace.Directory: - ic.maybeEmitWorkspaceObject("databricks_directory", object.Path, &object) - default: - log.Printf("[WARN] unknown type %s for path %s", object.ObjectType, object.Path) - } -} - -func listNotebooksAndWorkspaceFiles(ic *importContext) error { - objectsChannel := make(chan workspace.ObjectStatus, defaultChannelSize) - numRoutines := 2 // TODO: make configurable? together with the channel size? - var processedObjects atomic.Uint64 - for i := 0; i < numRoutines; i++ { - num := i - ic.waitGroup.Add(1) - go func() { - log.Printf("[DEBUG] Starting channel %d for workspace objects", num) - for object := range objectsChannel { - processedObjects.Add(1) - ic.waitGroup.Add(1) - emitWorkpaceObject(ic, object) - ic.waitGroup.Done() - } - log.Printf("[DEBUG] channel %d for workspace objects is finished", num) - ic.waitGroup.Done() - }() - } - // There are two use cases - this function will handle listing, or it will receive listing - updatedSinceMs := ic.getUpdatedSinceMs() - allObjects := ic.getAllWorkspaceObjects(func(objects []workspace.ObjectStatus) { - for _, object := range objects { - if object.ObjectType == workspace.Directory { - if !ic.incremental && object.Path != "/" && ic.isServiceInListing("directories") { - objectsChannel <- object - } - } else { - if ic.shouldSkipWorkspaceObject(object, updatedSinceMs) { - continue - } - object := object - switch object.ObjectType { - case workspace.Notebook, workspace.File: - objectsChannel <- object - default: - log.Printf("[WARN] unknown type %s for path %s", object.ObjectType, object.Path) - } - } - } - }) - close(objectsChannel) - log.Printf("[DEBUG] processedObjects=%d", processedObjects.Load()) - if processedObjects.Load() == 0 { // we didn't have side effect from listing as it was already happened - log.Printf("[DEBUG] ic.getAllWorkspaceObjects already was called before, so we need to explicitly submit all objects") - for _, object := range allObjects { - if ic.shouldSkipWorkspaceObject(object, updatedSinceMs) { - continue - } - if object.ObjectType == workspace.Directory && !ic.incremental && ic.isServiceInListing("directories") && object.Path != "/" { - emitWorkpaceObject(ic, object) - } else if (object.ObjectType == workspace.Notebook || object.ObjectType == workspace.File) && ic.isServiceInListing("notebooks") { - emitWorkpaceObject(ic, object) - } - } - } - return nil -} - func (ic *importContext) getLastActiveMs() int64 { if ic.lastActiveMs == 0 { ic.lastActiveMs = (time.Now().Unix() - ic.lastActiveDays*24*60*60) * 1000 @@ -1213,106 +387,6 @@ func getEnvAsInt(envName string, defaultValue int) int { return defaultValue } -// Parallel listing implementation -type syncAnswer struct { - MU sync.Mutex - data []workspace.ObjectStatus -} - -func (a *syncAnswer) append(objs []workspace.ObjectStatus) { - a.MU.Lock() - a.data = append(a.data, objs...) - a.MU.Unlock() -} - -type directoryInfo struct { - Path string - Attempts int -} - -// constants related to the parallel listing -const ( - envVarListParallelism = "EXPORTER_WS_LIST_PARALLELISM" - envVarDirectoryChannelSize = "EXPORTER_DIRECTORIES_CHANNEL_SIZE" - defaultWorkersPoolSize = 10 - defaultDirectoryChannelSize = 100000 -) - -func recursiveAddPathsParallel(a workspace.NotebooksAPI, directory directoryInfo, dirChannel chan directoryInfo, - answer *syncAnswer, wg *sync.WaitGroup, shouldIncludeDir func(workspace.ObjectStatus) bool, visitor func([]workspace.ObjectStatus)) { - defer wg.Done() - notebookInfoList, err := a.ListInternalImpl(directory.Path) - if err != nil { - log.Printf("[WARN] error listing '%s': %v", directory.Path, err) - if isRetryableError(err.Error(), directory.Attempts) { - wg.Add(1) - log.Printf("[INFO] attempt %d of retrying listing of '%s' after error: %v", - directory.Attempts+1, directory.Path, err) - time.Sleep(time.Duration(retryDelaySeconds) * time.Second) - dirChannel <- directoryInfo{Path: directory.Path, Attempts: directory.Attempts + 1} - } - } - - newList := make([]workspace.ObjectStatus, 0, len(notebookInfoList)) - directories := make([]workspace.ObjectStatus, 0, len(notebookInfoList)) - for _, v := range notebookInfoList { - if v.ObjectType == workspace.Directory { - if shouldIncludeDir(v) { - newList = append(newList, v) - directories = append(directories, v) - } - } else { - newList = append(newList, v) - } - } - answer.append(newList) - for _, v := range directories { - wg.Add(1) - log.Printf("[DEBUG] putting directory '%s' into channel. Channel size: %d", v.Path, len(dirChannel)) - dirChannel <- directoryInfo{Path: v.Path} - time.Sleep(3 * time.Millisecond) - } - if visitor != nil { - visitor(newList) - } -} - -func ListParallel(a workspace.NotebooksAPI, path string, shouldIncludeDir func(workspace.ObjectStatus) bool, - visitor func([]workspace.ObjectStatus)) ([]workspace.ObjectStatus, error) { - var answer syncAnswer - wg := &sync.WaitGroup{} - - if shouldIncludeDir == nil { - shouldIncludeDir = func(workspace.ObjectStatus) bool { return true } - } - - numWorkers := getEnvAsInt(envVarListParallelism, defaultWorkersPoolSize) - channelSize := getEnvAsInt(envVarDirectoryChannelSize, defaultDirectoryChannelSize) - dirChannel := make(chan directoryInfo, channelSize) - for i := 0; i < numWorkers; i++ { - t := i - go func() { - log.Printf("[DEBUG] starting go routine %d", t) - for directory := range dirChannel { - log.Printf("[DEBUG] processing directory %s", directory.Path) - recursiveAddPathsParallel(a, directory, dirChannel, &answer, wg, shouldIncludeDir, visitor) - } - }() - - } - log.Print("[DEBUG] pushing initial path to channel") - wg.Add(1) - recursiveAddPathsParallel(a, directoryInfo{Path: path}, dirChannel, &answer, wg, shouldIncludeDir, visitor) - log.Print("[DEBUG] starting to wait") - wg.Wait() - log.Print("[DEBUG] closing the directory channel") - close(dirChannel) - - answer.MU.Lock() - defer answer.MU.Unlock() - return answer.data, nil -} - var ( maxRetries = 5 retryDelaySeconds = 2 @@ -1365,24 +439,16 @@ func appendEndingSlashToDirName(dir string) string { } func isMatchingCatalogAndSchema(ic *importContext, res *resource, ra *resourceApproximation, origPath string) bool { - // log.Printf("[DEBUG] matchingCatalogAndSchema: resource: %s, origPath=%s", res.Resource, origPath) res_catalog_name := res.Data.Get("catalog_name").(string) res_schema_name := res.Data.Get("schema_name").(string) - // log.Printf("[DEBUG] matchingCatalogAndSchema: resource: %s, catalog='%s' schema='%s'", - // res.Resource, res_catalog_name, res_schema_name) ra_catalog_name, cat_found := ra.Get("catalog_name") ra_schema_name, schema_found := ra.Get("name") - // log.Printf("[DEBUG] matchingCatalogAndSchema: approximation: %s %s, catalog='%v' (found? %v) schema='%v' (found? %v)", - // ra.Type, ra.Name, ra_catalog_name, cat_found, ra_schema_name, schema_found) if !cat_found || !schema_found { log.Printf("[WARN] Can't find attributes in approximation: %s %s, catalog='%v' (found? %v) schema='%v' (found? %v). Resource: %s, catalog='%s', schema='%s'", ra.Type, ra.Name, ra_catalog_name, cat_found, ra_schema_name, schema_found, res.Resource, res_catalog_name, res_schema_name) return true } - result := ra_catalog_name.(string) == res_catalog_name && ra_schema_name.(string) == res_schema_name - // log.Printf("[DEBUG] matchingCatalogAndSchema: result: %v approximation: catalog='%v' schema='%v', res: catalog='%s' schema='%s'", - // result, ra_catalog_name, ra_schema_name, res_catalog_name, res_schema_name) return result } @@ -1403,10 +469,6 @@ func isMatchingCatalogAndSchemaInModelServing(ic *importContext, res *resource, func isMatchingShareRecipient(ic *importContext, res *resource, ra *resourceApproximation, origPath string) bool { shareName, ok := res.Data.GetOk("share") - // principal := res.Data.Get(origPath) - // log.Printf("[DEBUG] isMatchingShareRecipient: origPath='%s', ra.Type='%s', shareName='%v', ok? %v, principal='%v'", - // origPath, ra.Type, shareName, ok, principal) - return ok && shareName.(string) != "" } @@ -1414,10 +476,6 @@ func isMatchignShareObject(obj string) isValidAproximationFunc { return func(ic *importContext, res *resource, ra *resourceApproximation, origPath string) bool { objPath := strings.Replace(origPath, ".name", ".data_object_type", 1) objType, ok := res.Data.GetOk(objPath) - // name := res.Data.Get(origPath) - // log.Printf("[DEBUG] isMatchignShareObject: %s origPath='%s', ra.Type='%s', name='%v', objPath='%s' objType='%v' ok? %v", - // obj, origPath, ra.Type, name, objPath, objType, ok) - return ok && objType.(string) == obj } } @@ -1426,10 +484,6 @@ func isMatchingAllowListArtifact(ic *importContext, res *resource, ra *resourceA objPath := strings.Replace(origPath, ".artifact", ".match_type", 1) matchType, ok := res.Data.GetOk(objPath) artifactType := res.Data.Get("artifact_type").(string) - // artifact := res.Data.Get(origPath) - // log.Printf("[DEBUG] isMatchingAllowListArtifact: origPath='%s', ra.Type='%s', artifactType='%v' artifact='%v', objPath='%s' matchType='%v' ok? %v", - // origPath, ra.Type, artifactType, artifact, objPath, matchType, ok) - return ok && matchType.(string) == "PREFIX_MATCH" && (artifactType == "LIBRARY_JAR" || artifactType == "INIT_SCRIPT") } @@ -1483,20 +537,6 @@ func (ic *importContext) emitPermissionsIfNotIgnored(r *resource, id, name strin } } -func (ic *importContext) emitWorkspaceObjectParentDirectory(r *resource) { - if !ic.isServiceEnabled("directories") { - return - } - if idx := strings.LastIndex(r.ID, "/"); idx > 0 { // not found, or directly in the root... - directoryPath := r.ID[:idx] - ic.Emit(&resource{ - Resource: "databricks_directory", - ID: directoryPath, - }) - r.AddExtraData(ParentDirectoryExtraKey, directoryPath) - } -} - func dltIsMatchingCatalogAndSchema(ic *importContext, res *resource, ra *resourceApproximation, origPath string) bool { res_catalog_name := res.Data.Get("catalog").(string) if res_catalog_name == "" { @@ -1515,15 +555,6 @@ func dltIsMatchingCatalogAndSchema(ic *importContext, res *resource, ra *resourc return result } -func (ic *importContext) makeGroupMemberData(id, groupId, memberId string) *schema.ResourceData { - data := scim.ResourceGroupMember().ToResource().TestResourceData() - data.MarkNewResource() - data.SetId(id) - data.Set("group_id", groupId) - data.Set("member_id", memberId) - return data -} - func (ic *importContext) emitWorkspaceBindings(securableType, securableName string) { bindings, err := ic.workspaceClient.WorkspaceBindings.GetBindings(ic.Context, catalog.GetBindingsRequest{ SecurableName: securableName, @@ -1563,12 +594,3 @@ func isMatchingSecurableTypeAndName(ic *importContext, res *resource, ra *resour ra_name, _ := ra.Get("name") return ra.Type == ("databricks_"+res_securable_type) && ra_name.(string) == res_securable_name } - -func (ic *importContext) emitJobsDestinationNotifications(notifications []sdk_jobs.Webhook) { - for _, notification := range notifications { - ic.Emit(&resource{ - Resource: "databricks_notification_destination", - ID: notification.Id, - }) - } -} diff --git a/exporter/util_compute.go b/exporter/util_compute.go new file mode 100644 index 0000000000..7967b9ebc9 --- /dev/null +++ b/exporter/util_compute.go @@ -0,0 +1,228 @@ +package exporter + +import ( + "fmt" + "log" + "reflect" + "regexp" + + "github.com/databricks/terraform-provider-databricks/common" + "github.com/databricks/terraform-provider-databricks/jobs" + + "github.com/databricks/databricks-sdk-go/service/compute" + sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func (ic *importContext) emitInitScripts(initScripts []compute.InitScriptInfo) { + for _, is := range initScripts { + if is.Dbfs != nil { + ic.Emit(&resource{ + Resource: "databricks_dbfs_file", + ID: is.Dbfs.Destination, + }) + } + if is.Workspace != nil { + ic.emitWorkspaceFileOrRepo(is.Workspace.Destination) + } + if is.Volumes != nil { + // TODO: we should emit allow list for init scripts as well + ic.emitIfVolumeFile(is.Volumes.Destination) + } + } +} + +func (ic *importContext) importCluster(c *compute.ClusterSpec) { + if c == nil { + return + } + if c.AwsAttributes != nil && c.AwsAttributes.InstanceProfileArn != "" { + ic.Emit(&resource{ + Resource: "databricks_instance_profile", + ID: c.AwsAttributes.InstanceProfileArn, + }) + } + if c.InstancePoolId != "" { + // set enable_elastic_disk to false, and remove aws/gcp/azure_attributes + ic.Emit(&resource{ + Resource: "databricks_instance_pool", + ID: c.InstancePoolId, + }) + } + if c.DriverInstancePoolId != "" { + ic.Emit(&resource{ + Resource: "databricks_instance_pool", + ID: c.DriverInstancePoolId, + }) + } + if c.PolicyId != "" { + ic.Emit(&resource{ + Resource: "databricks_cluster_policy", + ID: c.PolicyId, + }) + } + ic.emitInitScripts(c.InitScripts) + ic.emitSecretsFromSecretsPathMap(c.SparkConf) + ic.emitSecretsFromSecretsPathMap(c.SparkEnvVars) + ic.emitUserOrServicePrincipal(c.SingleUserName) +} + +func (ic *importContext) emitSecretsFromSecretPathString(v string) { + if res := secretPathRegex.FindStringSubmatch(v); res != nil { + ic.Emit(&resource{ + Resource: "databricks_secret_scope", + ID: res[1], + }) + } +} + +func (ic *importContext) emitSecretsFromSecretsPathMap(m map[string]string) { + for _, v := range m { + ic.emitSecretsFromSecretPathString(v) + } +} + +func (ic *importContext) emitLibraries(libs []compute.Library) { + for _, lib := range libs { + // Files on DBFS + ic.emitIfDbfsFile(lib.Whl) + ic.emitIfDbfsFile(lib.Jar) + ic.emitIfDbfsFile(lib.Egg) + // Files on WSFS + ic.emitIfWsfsFile(lib.Whl) + ic.emitIfWsfsFile(lib.Jar) + ic.emitIfWsfsFile(lib.Egg) + ic.emitIfWsfsFile(lib.Requirements) + // Files on UC Volumes + ic.emitIfVolumeFile(lib.Whl) + // TODO: we should emit UC allow list as well + ic.emitIfVolumeFile(lib.Jar) + ic.emitIfVolumeFile(lib.Requirements) + } +} + +func (ic *importContext) importLibraries(d *schema.ResourceData, s map[string]*schema.Schema) error { + var cll compute.InstallLibraries + common.DataToStructPointer(d, s, &cll) + ic.emitLibraries(cll.Libraries) + return nil +} + +func (ic *importContext) importClusterLibraries(d *schema.ResourceData, s map[string]*schema.Schema) error { + libraries := ic.workspaceClient.Libraries + cll, err := libraries.ClusterStatusByClusterId(ic.Context, d.Id()) + if err != nil { + return err + } + for _, lib := range cll.LibraryStatuses { + ic.emitIfDbfsFile(lib.Library.Egg) + ic.emitIfDbfsFile(lib.Library.Jar) + ic.emitIfDbfsFile(lib.Library.Whl) + // Files on UC Volumes + ic.emitIfVolumeFile(lib.Library.Whl) + ic.emitIfVolumeFile(lib.Library.Jar) + // Files on WSFS + ic.emitIfWsfsFile(lib.Library.Whl) + ic.emitIfWsfsFile(lib.Library.Jar) + } + return nil +} + +func (ic *importContext) getBuiltinPolicyFamilies() map[string]compute.PolicyFamily { + ic.builtInPoliciesMutex.Lock() + defer ic.builtInPoliciesMutex.Unlock() + if ic.builtInPolicies == nil { + if !ic.accountLevel { + log.Printf("[DEBUG] Going to initialize ic.builtInPolicies. Getting policy families...") + families, err := ic.workspaceClient.PolicyFamilies.ListAll(ic.Context, compute.ListPolicyFamiliesRequest{}) + log.Printf("[DEBUG] Going to initialize ic.builtInPolicies. Getting policy families...") + if err == nil { + ic.builtInPolicies = make(map[string]compute.PolicyFamily, len(families)) + for _, f := range families { + f2 := f + ic.builtInPolicies[f2.PolicyFamilyId] = f2 + } + } else { + log.Printf("[ERROR] Can't fetch cluster policy families: %v", err) + ic.builtInPolicies = map[string]compute.PolicyFamily{} + } + } else { + log.Print("[WARN] Can't list cluster policy families on account level") + ic.builtInPolicies = map[string]compute.PolicyFamily{} + } + } + return ic.builtInPolicies +} + +func (ic *importContext) importJobs(l []jobs.Job) { + i := 0 + for offset, job := range l { + if !ic.MatchesName(job.Settings.Name) { + log.Printf("[INFO] Job name %s doesn't match selection %s", job.Settings.Name, ic.match) + continue + } + if job.Settings.Deployment != nil && job.Settings.Deployment.Kind == "BUNDLE" && + job.Settings.EditMode == "UI_LOCKED" { + log.Printf("[INFO] Skipping job '%s' because it's deployed by DABs", job.Settings.Name) + continue + } + ic.Emit(&resource{ + Resource: "databricks_job", + ID: job.ID(), + }) + i++ + log.Printf("[INFO] Scanned %d of total %d jobs", offset+1, len(l)) + } + log.Printf("[INFO] %d of total %d jobs are going to be imported", i, len(l)) +} + +func makeShouldOmitFieldForCluster(regex *regexp.Regexp) func(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool { + return func(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool { + prefix := "" + if regex != nil { + if res := regex.FindStringSubmatch(pathString); res != nil { + prefix = res[0] + } else { + return false + } + } + raw := d.Get(pathString) + if raw != nil { + v := reflect.ValueOf(raw) + if as.Optional && v.IsZero() { + return true + } + } + workerInstPoolID := d.Get(prefix + "instance_pool_id").(string) + switch pathString { + case prefix + "node_type_id": + return workerInstPoolID != "" + case prefix + "driver_node_type_id": + driverInstPoolID := d.Get(prefix + "driver_instance_pool_id").(string) + nodeTypeID := d.Get(prefix + "node_type_id").(string) + return workerInstPoolID != "" || driverInstPoolID != "" || raw.(string) == nodeTypeID + case prefix + "driver_instance_pool_id": + return raw.(string) == workerInstPoolID + case prefix + "enable_elastic_disk", prefix + "aws_attributes", prefix + "azure_attributes", prefix + "gcp_attributes": + return workerInstPoolID != "" + case prefix + "enable_local_disk_encryption": + return false + case prefix + "spark_conf": + return fmt.Sprintf("%v", d.Get(prefix+"spark_conf")) == "map[spark.databricks.delta.preview.enabled:true]" + case prefix + "spark_env_vars": + return fmt.Sprintf("%v", d.Get(prefix+"spark_env_vars")) == "map[PYSPARK_PYTHON:/databricks/python3/bin/python3]" + } + + return defaultShouldOmitFieldFunc(ic, pathString, as, d) + } +} + +func (ic *importContext) emitJobsDestinationNotifications(notifications []sdk_jobs.Webhook) { + for _, notification := range notifications { + ic.Emit(&resource{ + Resource: "databricks_notification_destination", + ID: notification.Id, + }) + } +} diff --git a/exporter/util_scim.go b/exporter/util_scim.go new file mode 100644 index 0000000000..8887023400 --- /dev/null +++ b/exporter/util_scim.go @@ -0,0 +1,392 @@ +package exporter + +import ( + "fmt" + "log" + "strings" + + "github.com/databricks/terraform-provider-databricks/common" + "github.com/databricks/terraform-provider-databricks/scim" + + "github.com/databricks/databricks-sdk-go/service/iam" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +const ( + nonExistingUserOrSp = "__USER_OR_SPN_DOES_NOT_EXIST__" +) + +func (ic *importContext) emitListOfUsers(users []string) { + for _, user := range users { + if user != "" { + ic.Emit(&resource{ + Resource: "databricks_user", + Attribute: "user_name", + Value: user, + }) + } + } +} + +func (ic *importContext) emitUserOrServicePrincipal(userOrSPName string) { + if userOrSPName == "" || !ic.isServiceEnabled("users") { + return + } + // Cache check here to avoid emitting + ic.emittedUsersMutex.RLock() + _, exists := ic.emittedUsers[userOrSPName] + ic.emittedUsersMutex.RUnlock() + if exists { + // log.Printf("[DEBUG] user or SP %s already emitted...", userOrSPName) + return + } + if common.StringIsUUID(userOrSPName) { + user, err := ic.findSpnByAppID(userOrSPName, false) + if err != nil { + log.Printf("[ERROR] Can't find SP with application ID %s", userOrSPName) + ic.addIgnoredResource(fmt.Sprintf("databricks_service_principal. application_id=%s", userOrSPName)) + } else { + ic.Emit(&resource{ + Resource: "databricks_service_principal", + ID: user.ID, + }) + } + } else { + user, err := ic.findUserByName(strings.ToLower(userOrSPName), false) + if err != nil { + log.Printf("[ERROR] Can't find user with name %s", userOrSPName) + ic.addIgnoredResource(fmt.Sprintf("databricks_user. user_name=%s", userOrSPName)) + } else { + ic.Emit(&resource{ + Resource: "databricks_user", + ID: user.ID, + }) + } + } + ic.emittedUsersMutex.Lock() + ic.emittedUsers[userOrSPName] = struct{}{} + ic.emittedUsersMutex.Unlock() +} + +func getUserOrSpNameAndDirectory(path, prefix string) (string, string) { + if !strings.HasPrefix(path, prefix) { + return "", "" + } + pathLen := len(path) + prefixLen := len(prefix) + searchStart := prefixLen + 1 + if pathLen <= searchStart { + return "", "" + } + pos := strings.Index(path[searchStart:pathLen], "/") + if pos == -1 { // we have only user directory... + return path[searchStart:pathLen], path + } + return path[searchStart : pos+searchStart], path[0 : pos+searchStart] +} + +func (ic *importContext) emitUserOrServicePrincipalForPath(path, prefix string) { + userOrSpName, _ := getUserOrSpNameAndDirectory(path, prefix) + if userOrSpName != "" { + ic.emitUserOrServicePrincipal(userOrSpName) + } +} + +func (ic *importContext) IsUserOrServicePrincipalDirectory(path, prefix string, strict bool) bool { + userOrSPName, userDir := getUserOrSpNameAndDirectory(path, prefix) + if userOrSPName == "" { + return false + } + // strict mode means that it should be exactly user dir, maybe with trailing `/` + if strict && !(len(path) == len(userDir) || (len(path) == len(userDir)+1 && path[len(path)-1] == '/')) { + return false + } + ic.userOrSpDirectoriesMutex.RLock() + result, exists := ic.userOrSpDirectories[userDir] + ic.userOrSpDirectoriesMutex.RUnlock() + if exists { + // log.Printf("[DEBUG] Directory %s already checked. Result=%v", userDir, result) + return result + } + var err error + if common.StringIsUUID(userOrSPName) { + _, err = ic.findSpnByAppID(userOrSPName, true) + if err != nil { + ic.addIgnoredResource(fmt.Sprintf("databricks_service_principal. application_id=%s", userOrSPName)) + } + } else { + _, err = ic.findUserByName(strings.ToLower(userOrSPName), true) + if err != nil { + ic.addIgnoredResource(fmt.Sprintf("databricks_user. user_name=%s", userOrSPName)) + } + } + ic.userOrSpDirectoriesMutex.Lock() + ic.userOrSpDirectories[userDir] = (err == nil) + ic.userOrSpDirectoriesMutex.Unlock() + return err == nil +} + +func (ic *importContext) emitGroups(u scim.User) { + for _, g := range u.Groups { + if g.Type != "direct" { + log.Printf("[DEBUG] Skipping non-direct group %s/%s for %s", g.Value, g.Display, u.DisplayName) + continue + } + ic.Emit(&resource{ + Resource: "databricks_group", + ID: g.Value, + }) + id := fmt.Sprintf("%s|%s", g.Value, u.ID) + ic.Emit(&resource{ + Resource: "databricks_group_member", + ID: id, + Name: fmt.Sprintf("%s_%s_%s_%s", g.Display, g.Value, u.DisplayName, u.ID), + Data: ic.makeGroupMemberData(id, g.Value, u.ID), + }) + } +} + +func (ic *importContext) emitRoles(objType string, id string, roles []scim.ComplexValue) { + log.Printf("[DEBUG] emitting roles for object type: %s, ID: %s, roles: %v", objType, id, roles) + for _, role := range roles { + if role.Type != "direct" { + continue + } + if !ic.accountLevel { + ic.Emit(&resource{ + Resource: "databricks_instance_profile", + ID: role.Value, + }) + } + ic.Emit(&resource{ + Resource: fmt.Sprintf("databricks_%s_role", objType), + ID: fmt.Sprintf("%s|%s", id, role.Value), + }) + } +} + +func (ic *importContext) cacheGroups() error { + ic.groupsMutex.Lock() + defer ic.groupsMutex.Unlock() + if ic.allGroups == nil { + log.Printf("[INFO] Caching groups in memory ...") + var groups *[]iam.Group + var err error + err = runWithRetries(func() error { + var grps []iam.Group + var err error + if ic.accountLevel { + grps, err = ic.accountClient.Groups.ListAll(ic.Context, iam.ListAccountGroupsRequest{ + Attributes: "id", + }) + } else { + grps, err = ic.workspaceClient.Groups.ListAll(ic.Context, iam.ListGroupsRequest{ + Attributes: "id", + }) + } + if err != nil { + return err + } + groups = &grps + return nil + }, "error fetching full list of groups") + if err != nil { + log.Printf("[ERROR] can't fetch list of groups. Error: %v", err) + return err + } + api := scim.NewGroupsAPI(ic.Context, ic.Client) + groupsCount := len(*groups) + ic.allGroups = make([]scim.Group, 0, groupsCount) + for i, g := range *groups { + err = runWithRetries(func() error { + group, err := api.Read(g.Id, "id,displayName,active,externalId,entitlements,groups,roles,members") + if err != nil { + return err + } + ic.allGroups = append(ic.allGroups, group) + return nil + }, "error reading group with ID "+g.Id) + if err != nil { + log.Printf("[ERROR] Error reading group with ID %s: %v", g.Id, err) + continue + } + if (i+1)%10 == 0 { + log.Printf("[DEBUG] Read %d out of %d groups", i+1, groupsCount) + } + } + log.Printf("[INFO] Cached %d groups", len(ic.allGroups)) + } + return nil +} + +func (ic *importContext) getUsersMapping() { + ic.allUsersMutex.RLocker().Lock() + userMapping := ic.allUsersMapping + ic.allUsersMutex.RLocker().Unlock() + if userMapping == nil { + ic.allUsersMutex.Lock() + defer ic.allUsersMutex.Unlock() + if ic.allUsersMapping != nil { + return + } + ic.allUsersMapping = make(map[string]string) + err := runWithRetries(func() error { + var users []iam.User + var err error + if ic.accountLevel { + users, err = ic.accountClient.Users.ListAll(ic.Context, iam.ListAccountUsersRequest{ + Attributes: "id,userName", + }) + } else { + users, err = ic.workspaceClient.Users.ListAll(ic.Context, iam.ListUsersRequest{ + Attributes: "id,userName", + }) + } + if err != nil { + return err + } + for _, user := range users { + ic.allUsersMapping[user.UserName] = user.Id + } + log.Printf("[DEBUG] users are copied") + return nil + }, "error fetching full list of users") + if err != nil { + log.Fatalf("[ERROR] can't fetch list of users after few retries: error=%v", err) + } + } +} + +func (ic *importContext) findUserByName(name string, fastCheck bool) (u *scim.User, err error) { + log.Printf("[DEBUG] Looking for user %s", name) + ic.usersMutex.RLocker().Lock() + user, exists := ic.allUsers[name] + ic.usersMutex.RLocker().Unlock() + if exists { + if user.UserName == nonExistingUserOrSp { + log.Printf("[DEBUG] non-existing user %s is found in the cache", name) + err = fmt.Errorf("user %s is not found", name) + } else { + log.Printf("[DEBUG] existing user %s is found in the cache", name) + u = &user + } + return + } + ic.getUsersMapping() + ic.allUsersMutex.RLocker().Lock() + userId, exists := ic.allUsersMapping[name] + ic.allUsersMutex.RLocker().Unlock() + if !exists { + err = fmt.Errorf("there is no user '%s'", name) + u = &scim.User{UserName: nonExistingUserOrSp} + } else { + if fastCheck { + return &scim.User{UserName: name}, nil + } + a := scim.NewUsersAPI(ic.Context, ic.Client) + err = runWithRetries(func() error { + usr, err := a.Read(userId, "id,userName,displayName,active,externalId,entitlements,groups,roles") + if err != nil { + return err + } + u = &usr + return nil + }, fmt.Sprintf("error reading user with name '%s', user ID: %s", name, userId)) + if err != nil { + log.Printf("[WARN] error reading user with name '%s', user ID: %s", name, userId) + u = &scim.User{UserName: nonExistingUserOrSp} + } + } + ic.usersMutex.Lock() + defer ic.usersMutex.Unlock() + ic.allUsers[name] = *u + return +} + +func (ic *importContext) getSpsMapping() { + ic.spsMutex.Lock() + defer ic.spsMutex.Unlock() + if ic.allSpsMapping == nil { + ic.allSpsMapping = make(map[string]string) + err := runWithRetries(func() error { + var sps []iam.ServicePrincipal + var err error + if ic.accountLevel { + sps, err = ic.accountClient.ServicePrincipals.ListAll(ic.Context, iam.ListAccountServicePrincipalsRequest{ + Attributes: "id,userName", + }) + } else { + sps, err = ic.workspaceClient.ServicePrincipals.ListAll(ic.Context, iam.ListServicePrincipalsRequest{ + Attributes: "id,userName", + }) + } + if err != nil { + return err + } + for _, sp := range sps { + ic.allSpsMapping[sp.ApplicationId] = sp.Id + } + return nil + }, "error fetching full list of service principals") + if err != nil { + log.Fatalf("[ERROR] can't fetch list of service principals after few retries: error=%v", err) + } + } +} + +func (ic *importContext) findSpnByAppID(applicationID string, fastCheck bool) (u *scim.User, err error) { + log.Printf("[DEBUG] Looking for SP %s", applicationID) + ic.spsMutex.RLocker().Lock() + sp, exists := ic.allSps[applicationID] + ic.spsMutex.RLocker().Unlock() + if exists { + if sp.ApplicationID == nonExistingUserOrSp { + log.Printf("[DEBUG] non-existing SP %s is found in the cache", applicationID) + err = fmt.Errorf("service principal %s is not found", applicationID) + } else { + log.Printf("[DEBUG] existing SP %s is found in the cache", applicationID) + u = &sp + } + return + } + ic.getSpsMapping() + ic.spsMutex.RLocker().Lock() + spId, exists := ic.allSpsMapping[applicationID] + ic.spsMutex.RLocker().Unlock() + if !exists { + err = fmt.Errorf("there is no service principal '%s'", applicationID) + u = &scim.User{ApplicationID: nonExistingUserOrSp} + } else { + if fastCheck { + return &scim.User{ApplicationID: applicationID}, nil + } + a := scim.NewServicePrincipalsAPI(ic.Context, ic.Client) + err = runWithRetries(func() error { + usr, err := a.Read(spId, "userName,displayName,active,externalId,entitlements,groups,roles") + if err != nil { + return err + } + u = &usr + return nil + }, fmt.Sprintf("error reading service principal with AppID '%s', SP ID: %s", applicationID, spId)) + if err != nil { + log.Printf("[WARN] error reading service principal with AppID '%s', SP ID: %s", applicationID, spId) + u = &scim.User{ApplicationID: nonExistingUserOrSp} + } + } + ic.spsMutex.Lock() + defer ic.spsMutex.Unlock() + ic.allSps[applicationID] = *u + + return +} + +func (ic *importContext) makeGroupMemberData(id, groupId, memberId string) *schema.ResourceData { + data := scim.ResourceGroupMember().ToResource().TestResourceData() + data.MarkNewResource() + data.SetId(id) + data.Set("group_id", groupId) + data.Set("member_id", memberId) + return data +} diff --git a/exporter/util_workspace.go b/exporter/util_workspace.go new file mode 100644 index 0000000000..0361100592 --- /dev/null +++ b/exporter/util_workspace.go @@ -0,0 +1,400 @@ +package exporter + +import ( + "fmt" + "log" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/databricks/terraform-provider-databricks/common" + "github.com/databricks/terraform-provider-databricks/workspace" + + "golang.org/x/exp/slices" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func isSupportedWorkspaceObject(obj workspace.ObjectStatus) bool { + switch obj.ObjectType { + case workspace.Directory, workspace.Notebook, workspace.File: + return true + } + return false +} + +func (ic *importContext) emitRepoByPath(path string) { + // Path to Repos objects consits of following parts: /Repos, folder, repository, path inside Repo. + // Because it starts with `/`, it will produce empty string as first element in the slice. + // And we're stopping splitting to avoid producing too many not necessary parts, so we have 5 parts only. + parts := strings.SplitN(path, "/", 5) + if len(parts) >= 4 { + ic.Emit(&resource{ + Resource: "databricks_repo", + Attribute: "path", + Value: strings.Join(parts[:4], "/"), + }) + } else { + log.Printf("[WARN] Incorrect Repos path") + } +} + +func (ic *importContext) emitWorkspaceFileOrRepo(path string) { + if strings.HasPrefix(path, "/Repos") { + ic.emitRepoByPath(path) + } else { + // TODO: wrap this into ic.shouldEmit... + // TODO: strip /Workspace prefix if it's provided + ic.Emit(&resource{ + Resource: "databricks_workspace_file", + ID: path, + }) + } +} + +func (ic *importContext) emitNotebookOrRepo(path string) { + if strings.HasPrefix(path, "/Repos") { + ic.emitRepoByPath(path) + } else { + // TODO: strip /Workspace prefix if it's provided + ic.maybeEmitWorkspaceObject("databricks_notebook", path, nil) + } +} + +func (ic *importContext) getAllDirectories() []workspace.ObjectStatus { + if len(ic.allDirectories) == 0 { + objects := ic.getAllWorkspaceObjects(nil) + ic.wsObjectsMutex.Lock() + defer ic.wsObjectsMutex.Unlock() + if len(ic.allDirectories) == 0 { + for _, v := range objects { + if v.ObjectType == workspace.Directory { + ic.allDirectories = append(ic.allDirectories, v) + } + } + } + } + return ic.allDirectories +} + +// TODO: Ignore databricks_automl as well? +var directoriesToIgnore = []string{".ide", ".bundle", "__pycache__"} + +// TODO: add ignoring directories of deleted users? This could potentially decrease the number of processed objects... +func excludeAuxiliaryDirectories(v workspace.ObjectStatus) bool { + if v.ObjectType != workspace.Directory { + return true + } + // TODO: rewrite to use suffix check, etc., instead of split and slice contains? + parts := strings.Split(v.Path, "/") + result := len(parts) > 1 && slices.Contains[[]string, string](directoriesToIgnore, parts[len(parts)-1]) + if result { + log.Printf("[DEBUG] Ignoring directory %s", v.Path) + } + return !result +} + +func (ic *importContext) getAllWorkspaceObjects(visitor func([]workspace.ObjectStatus)) []workspace.ObjectStatus { + ic.wsObjectsMutex.Lock() + defer ic.wsObjectsMutex.Unlock() + if len(ic.allWorkspaceObjects) == 0 { + t1 := time.Now() + log.Print("[INFO] Starting to list all workspace objects") + notebooksAPI := workspace.NewNotebooksAPI(ic.Context, ic.Client) + ic.allWorkspaceObjects, _ = ListParallel(notebooksAPI, "/", excludeAuxiliaryDirectories, visitor) + log.Printf("[INFO] Finished listing of all workspace objects. %d objects in total. %v seconds", + len(ic.allWorkspaceObjects), time.Since(t1).Seconds()) + } + return ic.allWorkspaceObjects +} + +func shouldOmitMd5Field(ic *importContext, pathString string, as *schema.Schema, d *schema.ResourceData) bool { + if pathString == "md5" { // `md5` is kind of computed, but not declared as it... + return true + } + return defaultShouldOmitFieldFunc(ic, pathString, as, d) +} + +func workspaceObjectResouceName(ic *importContext, d *schema.ResourceData) string { + name := d.Get("path").(string) + if name == "" { + return d.Id() + } else { + name = nameNormalizationRegex.ReplaceAllString(name[1:], "_") + "_" + + strconv.FormatInt(int64(d.Get("object_id").(int)), 10) + } + return name +} + +func wsObjectGetModifiedAt(obs workspace.ObjectStatus) int64 { + if obs.ModifiedAtInteractive != nil && obs.ModifiedAtInteractive.TimeMillis != 0 { + return obs.ModifiedAtInteractive.TimeMillis + } + return obs.ModifiedAt +} + +func (ic *importContext) shouldEmitForPath(path string) bool { + if !ic.exportDeletedUsersAssets && strings.HasPrefix(path, "/Users/") { + return ic.IsUserOrServicePrincipalDirectory(path, "/Users", false) + } + return true +} + +func (ic *importContext) maybeEmitWorkspaceObject(resourceType, path string, obj *workspace.ObjectStatus) { + if ic.shouldEmitForPath(path) { + var data *schema.ResourceData + if obj != nil { + switch resourceType { + case "databricks_notebook": + data = workspace.ResourceNotebook().ToResource().TestResourceData() + case "databricks_workspace_file": + data = workspace.ResourceWorkspaceFile().ToResource().TestResourceData() + case "databricks_directory": + data = workspace.ResourceDirectory().ToResource().TestResourceData() + } + if data != nil { + scm := ic.Resources[resourceType].Schema + data.MarkNewResource() + data.SetId(path) + err := common.StructToData(obj, scm, data) + if err != nil { + log.Printf("[ERROR] can't convert %s object to data: %v. obj=%v", resourceType, err, obj) + data = nil + } + } + } + ic.Emit(&resource{ + Resource: resourceType, + ID: path, + Data: data, + Incremental: ic.incremental, + }) + } else { + log.Printf("[WARN] Not emitting a workspace object %s for deleted user. Path='%s'", resourceType, path) + ic.addIgnoredResource(fmt.Sprintf("%s. path=%s", resourceType, path)) + } +} + +func (ic *importContext) shouldSkipWorkspaceObject(object workspace.ObjectStatus, updatedSinceMs int64) bool { + if ic.incremental && object.ObjectType == workspace.Directory { + return true + } + if !(object.ObjectType == workspace.Notebook || object.ObjectType == workspace.File) || + strings.HasPrefix(object.Path, "/Repos") { + // log.Printf("[DEBUG] Skipping unsupported entry %v", object) + return true + } + if res := ignoreIdeFolderRegex.FindStringSubmatch(object.Path); res != nil { + return true + } + modifiedAt := wsObjectGetModifiedAt(object) + if ic.incremental && modifiedAt < updatedSinceMs { + p := ic.oldWorkspaceObjectMapping[object.ObjectID] + if p == "" || p == object.Path { + log.Printf("[DEBUG] skipping '%s' that was modified at %d (last active=%d)", + object.Path, modifiedAt, updatedSinceMs) + return true + } + log.Printf("[DEBUG] Different path for object %d. Old='%s', New='%s'", object.ObjectID, p, object.Path) + } + if !ic.MatchesName(object.Path) { + return true + } + return false +} + +func emitWorkpaceObject(ic *importContext, object workspace.ObjectStatus) { + // check the size of the default channel, and add delays if it has less than %20 capacity left. + // In this case we won't need to have increase the size of the default channel to extended capacity. + defChannelSize := len(ic.defaultChannel) + if float64(defChannelSize) > float64(ic.defaultHanlerChannelSize)*0.8 { + log.Printf("[DEBUG] waiting a bit before emitting a resource because default channel is 80%% full (%d): %v", + defChannelSize, object) + time.Sleep(1 * time.Second) + } + switch object.ObjectType { + case workspace.Notebook: + ic.maybeEmitWorkspaceObject("databricks_notebook", object.Path, &object) + case workspace.File: + ic.maybeEmitWorkspaceObject("databricks_workspace_file", object.Path, &object) + case workspace.Directory: + ic.maybeEmitWorkspaceObject("databricks_directory", object.Path, &object) + default: + log.Printf("[WARN] unknown type %s for path %s", object.ObjectType, object.Path) + } +} + +func listNotebooksAndWorkspaceFiles(ic *importContext) error { + objectsChannel := make(chan workspace.ObjectStatus, defaultChannelSize) + numRoutines := 2 // TODO: make configurable? together with the channel size? + var processedObjects atomic.Uint64 + for i := 0; i < numRoutines; i++ { + num := i + ic.waitGroup.Add(1) + go func() { + log.Printf("[DEBUG] Starting channel %d for workspace objects", num) + for object := range objectsChannel { + processedObjects.Add(1) + ic.waitGroup.Add(1) + emitWorkpaceObject(ic, object) + ic.waitGroup.Done() + } + log.Printf("[DEBUG] channel %d for workspace objects is finished", num) + ic.waitGroup.Done() + }() + } + // There are two use cases - this function will handle listing, or it will receive listing + updatedSinceMs := ic.getUpdatedSinceMs() + allObjects := ic.getAllWorkspaceObjects(func(objects []workspace.ObjectStatus) { + for _, object := range objects { + if object.ObjectType == workspace.Directory { + if !ic.incremental && object.Path != "/" && ic.isServiceInListing("directories") { + objectsChannel <- object + } + } else { + if ic.shouldSkipWorkspaceObject(object, updatedSinceMs) { + continue + } + object := object + switch object.ObjectType { + case workspace.Notebook, workspace.File: + objectsChannel <- object + default: + log.Printf("[WARN] unknown type %s for path %s", object.ObjectType, object.Path) + } + } + } + }) + close(objectsChannel) + log.Printf("[DEBUG] processedObjects=%d", processedObjects.Load()) + if processedObjects.Load() == 0 { // we didn't have side effect from listing as it was already happened + log.Printf("[DEBUG] ic.getAllWorkspaceObjects already was called before, so we need to explicitly submit all objects") + for _, object := range allObjects { + if ic.shouldSkipWorkspaceObject(object, updatedSinceMs) { + continue + } + if object.ObjectType == workspace.Directory && !ic.incremental && ic.isServiceInListing("directories") && object.Path != "/" { + emitWorkpaceObject(ic, object) + } else if (object.ObjectType == workspace.Notebook || object.ObjectType == workspace.File) && ic.isServiceInListing("notebooks") { + emitWorkpaceObject(ic, object) + } + } + } + return nil +} + +// Parallel listing implementation +type syncAnswer struct { + MU sync.Mutex + data []workspace.ObjectStatus +} + +func (a *syncAnswer) append(objs []workspace.ObjectStatus) { + a.MU.Lock() + a.data = append(a.data, objs...) + a.MU.Unlock() +} + +type directoryInfo struct { + Path string + Attempts int +} + +// constants related to the parallel listing +const ( + envVarListParallelism = "EXPORTER_WS_LIST_PARALLELISM" + envVarDirectoryChannelSize = "EXPORTER_DIRECTORIES_CHANNEL_SIZE" + defaultWorkersPoolSize = 10 + defaultDirectoryChannelSize = 100000 +) + +func recursiveAddPathsParallel(a workspace.NotebooksAPI, directory directoryInfo, dirChannel chan directoryInfo, + answer *syncAnswer, wg *sync.WaitGroup, shouldIncludeDir func(workspace.ObjectStatus) bool, visitor func([]workspace.ObjectStatus)) { + defer wg.Done() + notebookInfoList, err := a.ListInternalImpl(directory.Path) + if err != nil { + log.Printf("[WARN] error listing '%s': %v", directory.Path, err) + if isRetryableError(err.Error(), directory.Attempts) { + wg.Add(1) + log.Printf("[INFO] attempt %d of retrying listing of '%s' after error: %v", + directory.Attempts+1, directory.Path, err) + time.Sleep(time.Duration(retryDelaySeconds) * time.Second) + dirChannel <- directoryInfo{Path: directory.Path, Attempts: directory.Attempts + 1} + } + } + + newList := make([]workspace.ObjectStatus, 0, len(notebookInfoList)) + directories := make([]workspace.ObjectStatus, 0, len(notebookInfoList)) + for _, v := range notebookInfoList { + if v.ObjectType == workspace.Directory { + if shouldIncludeDir(v) { + newList = append(newList, v) + directories = append(directories, v) + } + } else { + newList = append(newList, v) + } + } + answer.append(newList) + for _, v := range directories { + wg.Add(1) + log.Printf("[DEBUG] putting directory '%s' into channel. Channel size: %d", v.Path, len(dirChannel)) + dirChannel <- directoryInfo{Path: v.Path} + time.Sleep(3 * time.Millisecond) + } + if visitor != nil { + visitor(newList) + } +} + +func ListParallel(a workspace.NotebooksAPI, path string, shouldIncludeDir func(workspace.ObjectStatus) bool, + visitor func([]workspace.ObjectStatus)) ([]workspace.ObjectStatus, error) { + var answer syncAnswer + wg := &sync.WaitGroup{} + + if shouldIncludeDir == nil { + shouldIncludeDir = func(workspace.ObjectStatus) bool { return true } + } + + numWorkers := getEnvAsInt(envVarListParallelism, defaultWorkersPoolSize) + channelSize := getEnvAsInt(envVarDirectoryChannelSize, defaultDirectoryChannelSize) + dirChannel := make(chan directoryInfo, channelSize) + for i := 0; i < numWorkers; i++ { + t := i + go func() { + log.Printf("[DEBUG] starting go routine %d", t) + for directory := range dirChannel { + log.Printf("[DEBUG] processing directory %s", directory.Path) + recursiveAddPathsParallel(a, directory, dirChannel, &answer, wg, shouldIncludeDir, visitor) + } + }() + + } + log.Print("[DEBUG] pushing initial path to channel") + wg.Add(1) + recursiveAddPathsParallel(a, directoryInfo{Path: path}, dirChannel, &answer, wg, shouldIncludeDir, visitor) + log.Print("[DEBUG] starting to wait") + wg.Wait() + log.Print("[DEBUG] closing the directory channel") + close(dirChannel) + + answer.MU.Lock() + defer answer.MU.Unlock() + return answer.data, nil +} + +func (ic *importContext) emitWorkspaceObjectParentDirectory(r *resource) { + if !ic.isServiceEnabled("directories") { + return + } + if idx := strings.LastIndex(r.ID, "/"); idx > 0 { // not found, or directly in the root... + directoryPath := r.ID[:idx] + ic.Emit(&resource{ + Resource: "databricks_directory", + ID: directoryPath, + }) + r.AddExtraData(ParentDirectoryExtraKey, directoryPath) + } +}