Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Exporter] Improving reliability of Emit function #4163

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions exporter/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,22 +904,27 @@ func (ic *importContext) handleResourceWrite(generatedFile string, ch dataWriteC
return
}

//
newResources := make(map[string]struct{}, 100)
log.Printf("[DEBUG] started processing new writes for %s", generatedFile)
for f := range ch {
if f != nil {
log.Printf("[DEBUG] started writing resource body for %s", f.BlockName)
_, err = tf.WriteString(f.ResourceBody)
if err == nil {
newResources[f.BlockName] = struct{}{}
if f.ImportCommand != "" {
ic.waitGroup.Add(1)
importChan <- f.ImportCommand
// check if we have the same blockname already written. To avoid duplicates
_, exists := newResources[f.BlockName]
if !exists {
log.Printf("[DEBUG] started writing resource body for %s", f.BlockName)
_, err = tf.WriteString(f.ResourceBody)
if err == nil {
newResources[f.BlockName] = struct{}{}
if f.ImportCommand != "" {
ic.waitGroup.Add(1)
importChan <- f.ImportCommand
}
log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName)
} else {
log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err)
}
log.Printf("[DEBUG] finished writing resource body for %s", f.BlockName)
} else {
log.Printf("[ERROR] Error when writing to %s: %v", generatedFile, err)
log.Printf("[WARN] Found duplicate resource: '%s'", f.BlockName)
}
} else {
log.Print("[WARN] got nil as resourceWriteData!")
Expand Down
41 changes: 28 additions & 13 deletions exporter/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
"databricks_sql_dashboard": 3,
"databricks_sql_widget": 4,
"databricks_sql_visualization": 4,
"databricks_query": 4,
"databricks_query": 6,
"databricks_alert": 2,
"databricks_permissions": 11,
}
Expand Down Expand Up @@ -615,17 +615,20 @@
return ic.State.Has(r)
}

func (ic *importContext) setImportingState(s string, state bool) {
ic.importingMutex.Lock()
defer ic.importingMutex.Unlock()
ic.importing[s] = state
}

func (ic *importContext) Add(r *resource) {
if ic.HasInState(r, true) { // resource must exist and already marked as added
return
}
ic.setImportingState(r.String(), true) // mark resource as added
rString := r.String()
ic.importingMutex.Lock()
_, ok := ic.importing[rString]
if ok {
ic.importingMutex.Unlock()
log.Printf("[DEBUG] %s already being added", rString)
return
}
ic.importing[rString] = true // mark resource as added
ic.importingMutex.Unlock()
state := r.Data.State()
if state == nil {
log.Printf("[ERROR] state is nil for %s", r)
Expand All @@ -648,7 +651,6 @@
Instances: []instanceApproximation{inst},
Resource: r,
})
// in single-threaded scenario scope is toposorted
ic.Scope.Append(r)
}

Expand Down Expand Up @@ -727,14 +729,25 @@
log.Printf("[DEBUG] %s already imported", r)
return
}
rString := r.String()
if ic.testEmits != nil {
log.Printf("[INFO] %s is emitted in test mode", r)
ic.testEmitsMutex.Lock()
ic.testEmits[r.String()] = true
ic.testEmits[rString] = true
ic.testEmitsMutex.Unlock()
return
}
ic.setImportingState(r.String(), false) // we're starting to add a new resource
// we need to check that we're not importing the same resource twice - this may happen under high concurrency
// for specific resources, for example, directories when they aren't part of the listing
ic.importingMutex.Lock()
res, ok := ic.importing[rString]
if ok {
ic.importingMutex.Unlock()
log.Printf("[DEBUG] %s already being imported: %v", rString, res)
Dismissed Show dismissed Hide dismissed
return
}
ic.importing[rString] = false // // we're starting to add a new resource
ic.importingMutex.Unlock()
_, ok = ic.Resources[r.Resource]
if !ok {
log.Printf("[ERROR] %s is not available in provider", r)
Expand All @@ -745,8 +758,10 @@
log.Printf("[DEBUG] %s (%s service) is not part of the account level export", r.Resource, ir.Service)
return
}
// TODO: add similar condition for checking workspace-level objects only. After new ACLs import is merged

if !ic.accountLevel && !ir.WorkspaceLevel {
log.Printf("[DEBUG] %s (%s service) is not part of the workspace level export", r.Resource, ir.Service)
return
}
// from here, it should be done by the goroutine... send resource into the channel
ch, exists := ic.channels[r.Resource]
if exists {
Expand Down
Loading