Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmenasha/cai fix issue 900 #905

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions tools/asset-inventory/asset_inventory/bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,22 @@ def _merge_fields(destination_field, source_field):
Returns:
A `google.cloud.bigquery.SchemaField` dict.
"""
field = copy.deepcopy(destination_field)

dd = destination_field.get('description', None)
sd = source_field.get('description', None)
dft = destination_field.get('field_type', None)
sft = source_field.get('field_type', None)
# use the field with more information.
if ((not dd and sd) or (sd and dd and len(dd) < len(sd))):
field['description'] = sd
field['field_type'] = sft
destination_field['description'] = sd
destination_field['field_type'] = sft

# use the less specific type. and join fields
elif ((dft != 'RECORD' and dft != 'STRING') and sft == 'STRING'):
field['field_type'] = sft
# but don't overwrite the timestamp field as per
# https://github.com/GoogleCloudPlatform/professional-services/issues/900
elif (source_field.get('name', None) != 'timestamp' and
(dft != 'RECORD' and dft != 'STRING') and sft == 'STRING'):
destination_field['field_type'] = sft

# https://github.com/GoogleCloudPlatform/professional-services/issues/614
# Use the schema with the additonalProperties overrides. See
Expand All @@ -174,23 +177,23 @@ def merge_additional_properties_fields(apf, fields):
sf = source_field.get('fields', [])
df = destination_field.get('fields', [])
if is_additonal_properties(sf) and not is_additonal_properties(df):
field['mode'] = 'REPEATED'
destination_field['mode'] = 'REPEATED'
sf = copy.deepcopy(sf)
merge_additional_properties_fields(sf, df)
field['fields'] = sf
destination_field['fields'] = sf
elif is_additonal_properties(df) and not is_additonal_properties(sf):
field['mode'] = 'REPEATED'
destination_field['mode'] = 'REPEATED'
merge_additional_properties_fields(df, sf)
field['fields'] = df
destination_field['fields'] = df
elif is_additonal_properties(df) and is_additonal_properties(sf):
field['mode'] = 'REPEATED'
destination_field['mode'] = 'REPEATED'
merge_additional_properties_fields(df, sf)
field['fields'] = df
destination_field['fields'] = df
else:
mf = _merge_schema(df, sf)
if mf:
field['fields'] = mf
return field
destination_field['fields'] = mf
return destination_field


def _merge_schema(destination_schema, source_schema):
Expand Down Expand Up @@ -401,11 +404,15 @@ def sanitize_property_value(property_value, depth=0, num_properties=0):
if isinstance(property_value, list):
for i in range(len(property_value)):
if isinstance(property_value[i], (dict, list)):
sanitize_property_value(property_value[i], depth, num_properties)
sanitize_property_value(property_value[i],
depth, num_properties)
else:
# if the list element has a primitive type, we need to re-affect the sanitized value
property_value[i] = sanitize_property_value(property_value[i], depth, num_properties)

# if the list element has a primitive type, we need to
# re-affect the sanitized value
property_value[i] = sanitize_property_value(
property_value[i],
depth, num_properties)

# and each nested json object.
if isinstance(property_value, dict):
remove_duplicates(property_value)
Expand Down
100 changes: 64 additions & 36 deletions tools/asset-inventory/asset_inventory/import_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ class AssignGroupByKey(beam.DoFn):
def __init__(self, group_by, num_shards):
if isinstance(group_by, string_types):
group_by = StaticValueProvider(str, group_by)

if isinstance(num_shards, str):
num_shards = StaticValueProvider(str, num_shards)

self.num_shards = num_shards
self.group_by = group_by
self.shard_map = None

def apply_shard(self, key):
# initialize shard_map from num_shard
if self.shard_map is None:
self.shard_map = {
k: int(v) for (k, v) in
Expand Down Expand Up @@ -169,24 +170,31 @@ def __init__(self, group_by):
group_by = StaticValueProvider(str, group_by)
self.group_by = group_by

def create_resource_copy(self, element):
# Write out a json consistenly structured row called 'resource' to the
# 'resource' table. returns a copy of element without data field which
# is grouped by the 'resource' key.
resource_data = element.get('resource', {}).pop('data', None)
resource_copy = copy.deepcopy(element)
# add it back after the copy operation.
if resource_data is not None:
element['resource']['data'] = resource_data
# override the group by field so it's written to the `resource` table
resource_copy['_group_by'] = 'resource'
return resource_copy

def process(self, element):
# add the resource.json_data property if we have resource.data
if ('resource' in element and
'data' in element['resource']):
resource = element['resource']
# add json_data property.
resource['json_data'] = json.dumps(resource['data'])
resource_element = copy.deepcopy(element)
resource_element['resource'].pop('data')
resource_element['_group_by'] = 'resource'
yield resource_element
if self.group_by.get() != 'NONE':
yield element
else:
resource_element = copy.deepcopy(element)
resource_element['_group_by'] = 'resource'
yield resource_element
if self.group_by.get() != 'NONE':
yield element
element['resource']['json_data'] = json.dumps(
element['resource']['data'])
# yeild the resource copy.
yield self.create_resource_copy(element)
# and the element if we are grouping by asset_type or
# asset_type_version.
if self.group_by.get() != 'NONE':
yield element


class AddLoadTime(beam.DoFn):
Expand All @@ -213,10 +221,11 @@ def process(self, element, schemas):
elements = element[1]
schema = schemas[key_name]
for elem in elements:
resource_data = elem.get('resource', {}).get('data', {})
if resource_data:
bigquery_schema.enforce_schema_data_types(elem, schema)
yield (key_name, elem)
if elem.get('resource', {}).get('data', {}):
yield (element[0], bigquery_schema.enforce_schema_data_types(
elem, schema))
else:
yield (element[0], elem)


class CombinePolicyResource(beam.DoFn):
Expand Down Expand Up @@ -284,14 +293,28 @@ def process(self, element):
file_handle.write(json.dumps(asset_line).encode())
file_handle.write(b'\n')
if created_file_path:
yield (element[0], created_file_path)
# key is bigquery table name so each table deleted and created
# independently
# value is sharded key and gcs filepath.
yield (AssignGroupByKey.remove_shard(element[0]),
(element[0], created_file_path))

def finish_bundle(self):
for _, file_handle in self.open_files.items():
logging.info('finish bundle')
file_handle.close()


class AssignShardedKeyForLoad(beam.DoFn):
"""Element is a tuple keyed by table, value is iterable of sharded key and
gcs file path. The transform unbundle the iterable and return a tuples of
sharded key and gcs file path to be loaded in parallel to bigquery.
"""
def process(self, element):
for (sharded_key, created_file_path) in element[1]:
yield (sharded_key, created_file_path)


class BigQueryDoFn(beam.DoFn):
"""Superclass for a DoFn that requires BigQuery dataset information."""

Expand Down Expand Up @@ -341,8 +364,8 @@ def start_bundle(self):
class DeleteDataSetTables(BigQueryDoFn):
"""Delete tables when truncating and not appending.

If we are not keeping old data around, it safer to delete all tables in the
dataset before loading so that no old asset types remain.
If we are not keeping old data around, it safer to delete tables in the
dataset before loading so that no old records remain.
"""

def __init__(self, dataset, add_load_date_suffix, load_time,
Expand All @@ -356,18 +379,21 @@ def __init__(self, dataset, add_load_date_suffix, load_time,
self.write_disposition = write_disposition

def process(self, element):
# If we are appending to the table, no need to Delete first.
if self.write_disposition.get() == 'WRITE_APPEND':
yield element
else:
key_name = AssignGroupByKey.remove_shard(element[0])
table_name = self.asset_type_to_table_name(key_name)
table_ref = self.get_dataset_ref().table(
table_name)
try:
self.bigquery_client.delete_table(table_ref)
except NotFound:
pass
yield element
return

# Delete the BigQuery table prior to loading to it.
key_name = element[0]
table_name = self.asset_type_to_table_name(key_name)
table_ref = self.get_dataset_ref().table(
table_name)
try:
self.bigquery_client.delete_table(table_ref)
except NotFound:
pass
yield element


class LoadToBigQuery(BigQueryDoFn):
Expand Down Expand Up @@ -500,7 +526,7 @@ def run(argv=None):
# Joining all iam_policy objects with resources of the same name.
merged_iam = (
sanitized | 'assign_name_key' >> beam.ParDo(
AssignGroupByKey('NAME', options.num_shards))
AssignGroupByKey('NAME', ''))
| 'group_by_name' >> beam.GroupByKey()
| 'combine_policy' >> beam.ParDo(CombinePolicyResource()))

Expand All @@ -517,16 +543,18 @@ def run(argv=None):
# pylint: disable=expression-not-assigned
(keyed_assets
| 'add_load_time' >> beam.ParDo(AddLoadTime(options.load_time))
| 'group_by_key_before_enforce' >> beam.GroupByKey()
| 'group_by_sharded_key_for_enfoce' >> beam.GroupByKey()
| 'enforce_schema' >> beam.ParDo(EnforceSchemaDataTypes(), pvalue_schemas)
| 'group_by_key_before_write' >> beam.GroupByKey()
| 'group_by_sharded_key_for_write' >> beam.GroupByKey()
| 'write_to_gcs' >> beam.ParDo(
WriteToGCS(options.stage, options.load_time))
| 'group_written_objects_by_key' >> beam.GroupByKey()
| 'delete_tables' >> beam.ParDo(
DeleteDataSetTables(options.dataset, options.add_load_date_suffix,
options.load_time,
options.write_disposition))
| 'assign_sharded_key_for_load' >> beam.ParDo(AssignShardedKeyForLoad())
| 'group_by_sharded_key_for_load' >> beam.GroupByKey()
| 'load_to_bigquery' >> beam.ParDo(
LoadToBigQuery(options.dataset, options.add_load_date_suffix,
options.load_time),
Expand Down