From b3f0905c47c1ea6bb34e33f86fb631b3b7e7665d Mon Sep 17 00:00:00 2001 From: Ben Menasha Date: Mon, 10 Oct 2022 07:40:37 -0400 Subject: [PATCH] Don't overide the `timestamp` field's type. - Fix #900 by not overriding timestamp's type from `timestamp` to string as it's used as a table partition. - Performance improvements: - Don't perform a deep copy of the destination field within _merge_fields, it's not necessary as it's always a new object and Cloud Profiler says it was resource intensive. - Don't deep copy the 'data' field within ProduceResourceJson by removing it prior to the deep copy and adding it back to the element after. - Fix a bug where enforce_schema wouldn't work because it yieled the original elements, it needs to yeild a new element key pair. - Fix a bug when sharding asset types were we would delete the table while loading because not all shards were processed before deleting and loading occured. To fix this we now group by table name after WriteToGCS rather then shard key. Then assign back the sharded prior to loading. --- .../asset_inventory/bigquery_schema.py | 41 ++++--- .../asset_inventory/import_pipeline.py | 100 +++++++++++------- 2 files changed, 88 insertions(+), 53 deletions(-) diff --git a/tools/asset-inventory/asset_inventory/bigquery_schema.py b/tools/asset-inventory/asset_inventory/bigquery_schema.py index c4d1f24bb6..6cdd029379 100644 --- a/tools/asset-inventory/asset_inventory/bigquery_schema.py +++ b/tools/asset-inventory/asset_inventory/bigquery_schema.py @@ -144,7 +144,6 @@ def _merge_fields(destination_field, source_field): Returns: A `google.cloud.bigquery.SchemaField` dict. """ - field = copy.deepcopy(destination_field) dd = destination_field.get('description', None) sd = source_field.get('description', None) @@ -152,11 +151,15 @@ def _merge_fields(destination_field, source_field): sft = source_field.get('field_type', None) # use the field with more information. if ((not dd and sd) or (sd and dd and len(dd) < len(sd))): - field['description'] = sd - field['field_type'] = sft + destination_field['description'] = sd + destination_field['field_type'] = sft + # use the less specific type. and join fields - elif ((dft != 'RECORD' and dft != 'STRING') and sft == 'STRING'): - field['field_type'] = sft + # but don't overwrite the timestamp field as per + # https://github.com/GoogleCloudPlatform/professional-services/issues/900 + elif (source_field.get('name', None) != 'timestamp' and + (dft != 'RECORD' and dft != 'STRING') and sft == 'STRING'): + destination_field['field_type'] = sft # https://github.com/GoogleCloudPlatform/professional-services/issues/614 # Use the schema with the additonalProperties overrides. See @@ -174,23 +177,23 @@ def merge_additional_properties_fields(apf, fields): sf = source_field.get('fields', []) df = destination_field.get('fields', []) if is_additonal_properties(sf) and not is_additonal_properties(df): - field['mode'] = 'REPEATED' + destination_field['mode'] = 'REPEATED' sf = copy.deepcopy(sf) merge_additional_properties_fields(sf, df) - field['fields'] = sf + destination_field['fields'] = sf elif is_additonal_properties(df) and not is_additonal_properties(sf): - field['mode'] = 'REPEATED' + destination_field['mode'] = 'REPEATED' merge_additional_properties_fields(df, sf) - field['fields'] = df + destination_field['fields'] = df elif is_additonal_properties(df) and is_additonal_properties(sf): - field['mode'] = 'REPEATED' + destination_field['mode'] = 'REPEATED' merge_additional_properties_fields(df, sf) - field['fields'] = df + destination_field['fields'] = df else: mf = _merge_schema(df, sf) if mf: - field['fields'] = mf - return field + destination_field['fields'] = mf + return destination_field def _merge_schema(destination_schema, source_schema): @@ -401,11 +404,15 @@ def sanitize_property_value(property_value, depth=0, num_properties=0): if isinstance(property_value, list): for i in range(len(property_value)): if isinstance(property_value[i], (dict, list)): - sanitize_property_value(property_value[i], depth, num_properties) + sanitize_property_value(property_value[i], + depth, num_properties) else: - # if the list element has a primitive type, we need to re-affect the sanitized value - property_value[i] = sanitize_property_value(property_value[i], depth, num_properties) - + # if the list element has a primitive type, we need to + # re-affect the sanitized value + property_value[i] = sanitize_property_value( + property_value[i], + depth, num_properties) + # and each nested json object. if isinstance(property_value, dict): remove_duplicates(property_value) diff --git a/tools/asset-inventory/asset_inventory/import_pipeline.py b/tools/asset-inventory/asset_inventory/import_pipeline.py index 93e4619c3f..6e8a8f99b8 100644 --- a/tools/asset-inventory/asset_inventory/import_pipeline.py +++ b/tools/asset-inventory/asset_inventory/import_pipeline.py @@ -81,14 +81,15 @@ class AssignGroupByKey(beam.DoFn): def __init__(self, group_by, num_shards): if isinstance(group_by, string_types): group_by = StaticValueProvider(str, group_by) + if isinstance(num_shards, str): num_shards = StaticValueProvider(str, num_shards) - self.num_shards = num_shards self.group_by = group_by self.shard_map = None def apply_shard(self, key): + # initialize shard_map from num_shard if self.shard_map is None: self.shard_map = { k: int(v) for (k, v) in @@ -169,24 +170,31 @@ def __init__(self, group_by): group_by = StaticValueProvider(str, group_by) self.group_by = group_by + def create_resource_copy(self, element): + # Write out a json consistenly structured row called 'resource' to the + # 'resource' table. returns a copy of element without data field which + # is grouped by the 'resource' key. + resource_data = element.get('resource', {}).pop('data', None) + resource_copy = copy.deepcopy(element) + # add it back after the copy operation. + if resource_data is not None: + element['resource']['data'] = resource_data + # override the group by field so it's written to the `resource` table + resource_copy['_group_by'] = 'resource' + return resource_copy + def process(self, element): + # add the resource.json_data property if we have resource.data if ('resource' in element and 'data' in element['resource']): - resource = element['resource'] - # add json_data property. - resource['json_data'] = json.dumps(resource['data']) - resource_element = copy.deepcopy(element) - resource_element['resource'].pop('data') - resource_element['_group_by'] = 'resource' - yield resource_element - if self.group_by.get() != 'NONE': - yield element - else: - resource_element = copy.deepcopy(element) - resource_element['_group_by'] = 'resource' - yield resource_element - if self.group_by.get() != 'NONE': - yield element + element['resource']['json_data'] = json.dumps( + element['resource']['data']) + # yeild the resource copy. + yield self.create_resource_copy(element) + # and the element if we are grouping by asset_type or + # asset_type_version. + if self.group_by.get() != 'NONE': + yield element class AddLoadTime(beam.DoFn): @@ -213,10 +221,11 @@ def process(self, element, schemas): elements = element[1] schema = schemas[key_name] for elem in elements: - resource_data = elem.get('resource', {}).get('data', {}) - if resource_data: - bigquery_schema.enforce_schema_data_types(elem, schema) - yield (key_name, elem) + if elem.get('resource', {}).get('data', {}): + yield (element[0], bigquery_schema.enforce_schema_data_types( + elem, schema)) + else: + yield (element[0], elem) class CombinePolicyResource(beam.DoFn): @@ -284,7 +293,11 @@ def process(self, element): file_handle.write(json.dumps(asset_line).encode()) file_handle.write(b'\n') if created_file_path: - yield (element[0], created_file_path) + # key is bigquery table name so each table deleted and created + # independently + # value is sharded key and gcs filepath. + yield (AssignGroupByKey.remove_shard(element[0]), + (element[0], created_file_path)) def finish_bundle(self): for _, file_handle in self.open_files.items(): @@ -292,6 +305,16 @@ def finish_bundle(self): file_handle.close() +class AssignShardedKeyForLoad(beam.DoFn): + """Element is a tuple keyed by table, value is iterable of sharded key and + gcs file path. The transform unbundle the iterable and return a tuples of + sharded key and gcs file path to be loaded in parallel to bigquery. + """ + def process(self, element): + for (sharded_key, created_file_path) in element[1]: + yield (sharded_key, created_file_path) + + class BigQueryDoFn(beam.DoFn): """Superclass for a DoFn that requires BigQuery dataset information.""" @@ -341,8 +364,8 @@ def start_bundle(self): class DeleteDataSetTables(BigQueryDoFn): """Delete tables when truncating and not appending. - If we are not keeping old data around, it safer to delete all tables in the - dataset before loading so that no old asset types remain. + If we are not keeping old data around, it safer to delete tables in the + dataset before loading so that no old records remain. """ def __init__(self, dataset, add_load_date_suffix, load_time, @@ -356,18 +379,21 @@ def __init__(self, dataset, add_load_date_suffix, load_time, self.write_disposition = write_disposition def process(self, element): + # If we are appending to the table, no need to Delete first. if self.write_disposition.get() == 'WRITE_APPEND': yield element - else: - key_name = AssignGroupByKey.remove_shard(element[0]) - table_name = self.asset_type_to_table_name(key_name) - table_ref = self.get_dataset_ref().table( - table_name) - try: - self.bigquery_client.delete_table(table_ref) - except NotFound: - pass - yield element + return + + # Delete the BigQuery table prior to loading to it. + key_name = element[0] + table_name = self.asset_type_to_table_name(key_name) + table_ref = self.get_dataset_ref().table( + table_name) + try: + self.bigquery_client.delete_table(table_ref) + except NotFound: + pass + yield element class LoadToBigQuery(BigQueryDoFn): @@ -500,7 +526,7 @@ def run(argv=None): # Joining all iam_policy objects with resources of the same name. merged_iam = ( sanitized | 'assign_name_key' >> beam.ParDo( - AssignGroupByKey('NAME', options.num_shards)) + AssignGroupByKey('NAME', '')) | 'group_by_name' >> beam.GroupByKey() | 'combine_policy' >> beam.ParDo(CombinePolicyResource())) @@ -517,9 +543,9 @@ def run(argv=None): # pylint: disable=expression-not-assigned (keyed_assets | 'add_load_time' >> beam.ParDo(AddLoadTime(options.load_time)) - | 'group_by_key_before_enforce' >> beam.GroupByKey() + | 'group_by_sharded_key_for_enfoce' >> beam.GroupByKey() | 'enforce_schema' >> beam.ParDo(EnforceSchemaDataTypes(), pvalue_schemas) - | 'group_by_key_before_write' >> beam.GroupByKey() + | 'group_by_sharded_key_for_write' >> beam.GroupByKey() | 'write_to_gcs' >> beam.ParDo( WriteToGCS(options.stage, options.load_time)) | 'group_written_objects_by_key' >> beam.GroupByKey() @@ -527,6 +553,8 @@ def run(argv=None): DeleteDataSetTables(options.dataset, options.add_load_date_suffix, options.load_time, options.write_disposition)) + | 'assign_sharded_key_for_load' >> beam.ParDo(AssignShardedKeyForLoad()) + | 'group_by_sharded_key_for_load' >> beam.GroupByKey() | 'load_to_bigquery' >> beam.ParDo( LoadToBigQuery(options.dataset, options.add_load_date_suffix, options.load_time),