Skip to content

Commit

Permalink
Merge pull request #9169 from ministryofjustice/ELM-3031_Partitioned_…
Browse files Browse the repository at this point in the history
…DV_Job_v14

partitioned_parquet_output_data_validation - 2012 - 1
  • Loading branch information
madhu-k-sr2 authored Dec 20, 2024
2 parents 039ef9d + b60862b commit 7043afb
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# from logging import getLogger
# import pandas as pd

from itertools import chain

from glue_data_validation_lib import SparkSession
from glue_data_validation_lib import S3Methods
from glue_data_validation_lib import CustomPysparkMethods
Expand Down Expand Up @@ -257,8 +259,10 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name):
LOGGER.warn(f"""WARNING ! >> skipped_struct_fields_list = {skipped_struct_fields_list}""")
skipped_cols_condition_list = [f"(L.{col} != R.{col})"
for col in skip_columns_for_hashing]
skipped_cols_alias = [f"L.{col} as rds_{col}, R.{col} as dms_{col}"
for col in skip_columns_for_hashing]
skipped_cols_alias = list(
chain.from_iterable((f'L.{col} as rds_{col}', f'R.{col} as dms_{col}')
for col in skip_columns_for_hashing)
)


group_by_cols_list = ['year', 'month']
Expand Down Expand Up @@ -401,7 +405,8 @@ def write_parquet_to_s3(df_dv_output: DataFrame, database, db_sch_tbl_name):
f"L.{TABLE_PKEY_COLUMN} as {TABLE_PKEY_COLUMN}",
*skipped_cols_alias,
"L.RowHash as rds_row_hash",
"R.RowHash as dms_output_row_hash"
"R.RowHash as dms_output_row_hash",
"L.year as year", "L.month as month"
).limit(10)
else:
unmatched_hashvalues_df_select = unmatched_hashvalues_df.selectExpr(
Expand Down

0 comments on commit 7043afb

Please sign in to comment.