Skip to content

Commit

Permalink
Fix join on multiple destinations
Browse files Browse the repository at this point in the history
  • Loading branch information
frodrigo committed Apr 8, 2024
1 parent 4cfbd40 commit 1327a97
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
13 changes: 7 additions & 6 deletions datasources/connectors/join.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ def setup(kiba)
'metadata' => JSON.parse(File.read("./#{@path}/#{source_config['metadata']}.metadata.json"))[source]
})
)
}

kiba.transform(JoinTransformer, JoinTransformer::Settings.from_hash({
'destination_id' => destination_id,
'key' => key,
'full_join' => true,
}))
kiba.transform(JoinTransformer, JoinTransformer::Settings.from_hash({
'source_ids' => [join.keys],
'destination_id' => destination_id,
'key' => key,
'full_join' => true,
}))
}
}
end
end
3 changes: 3 additions & 0 deletions datasources/transforms/join.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class JoinTransformer < Transformer
extend T::Sig

class Settings < Transformer::TransformerSettings
const :source_ids, T.nilable(T::Array[String])
const :destination_id, String
const :key, String
const :full_join, T::Boolean, default: false
Expand Down Expand Up @@ -51,6 +52,8 @@ def process_tags(current_tags, update_tags, current_source, update_source)
end

def process_data(row)
return row if @settings.destination_id != row[:destination_id]

key = JsonPath.on(row[:properties][:tags], @path)
if key.present?
if @rows.key?(key)
Expand Down

0 comments on commit 1327a97

Please sign in to comment.