diff --git a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py index edc7f3abf1f..d02ed22e913 100644 --- a/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py +++ b/terraform/environments/electronic-monitoring-data/glue-job/dms_dv_on_rows_hashvalue_partitionby_yyyy_mm.py @@ -5,6 +5,8 @@ # from logging import getLogger # import pandas as pd +from itertools import chain + from glue_data_validation_lib import SparkSession from glue_data_validation_lib import S3Methods from glue_data_validation_lib import CustomPysparkMethods @@ -257,8 +259,10 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name): LOGGER.warn(f"""WARNING ! >> skipped_struct_fields_list = {skipped_struct_fields_list}""") skipped_cols_condition_list = [f"(L.{col} != R.{col})" for col in skip_columns_for_hashing] - skipped_cols_alias = [f"L.{col} as rds_{col}, R.{col} as dms_{col}" - for col in skip_columns_for_hashing] + skipped_cols_alias = list( + chain.from_iterable((f'L.{col} as rds_{col}', f'R.{col} as dms_{col}') + for col in skip_columns_for_hashing) + ) group_by_cols_list = ['year', 'month'] @@ -401,7 +405,8 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name): f"L.{TABLE_PKEY_COLUMN} as {TABLE_PKEY_COLUMN}", *skipped_cols_alias, "L.RowHash as rds_row_hash", - "R.RowHash as dms_output_row_hash" + "R.RowHash as dms_output_row_hash", + "L.year as year", "L.month as month" ).limit(10) else: unmatched_hashvalues_df_select = unmatched_hashvalues_df.selectExpr(