diff --git a/pipelines/serpro/flows.py b/pipelines/serpro/flows.py index ca7c261a..d809120c 100644 --- a/pipelines/serpro/flows.py +++ b/pipelines/serpro/flows.py @@ -19,7 +19,7 @@ parse_timestamp_to_string, rename_current_flow_run_now_time, run_dbt_model, - transform_raw_to_nested_structure, + transform_raw_to_nested_structure_chunked, upload_raw_data_to_gcs, upload_staging_data_to_gcs, ) @@ -58,13 +58,14 @@ jdbc=jdbc, start_date=start_date, end_date=end_date, local_filepath=local_filepaths ) - errors, treated_filepaths = transform_raw_to_nested_structure( + errors, treated_filepaths = transform_raw_to_nested_structure_chunked( raw_filepath=raw_filepaths, filepath=local_filepaths, primary_key=constants.SERPRO_CAPTURE_PARAMS.value["primary_key"], timestamp=timestamp, reader_args=constants.SERPRO_CAPTURE_PARAMS.value["pre_treatment_reader_args"], error=None, + chunksize=50000, ) errors = upload_raw_data_to_gcs(