From e47ff2d303e34e5618e50cc44eddc111c2724a3e Mon Sep 17 00:00:00 2001 From: Rafael Date: Thu, 19 Oct 2023 07:37:12 -0300 Subject: [PATCH] =?UTF-8?q?remover=20altera=C3=A7=C3=B5es=20de=20teste?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../br_rj_riodejaneiro_bilhetagem/flows.py | 28 +++++++++---------- pipelines/rj_smtr/tasks.py | 8 +++--- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py index 02b89a155..6cb5b47ae 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py @@ -176,20 +176,20 @@ ) # MaterializaĆ§Ć£o - # run_materializacao = create_flow_run( - # flow_name=bilhetagem_materializacao.name, - # # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, - # project_name="staging", - # labels=LABELS, - # upstream_tasks=[wait_captura], - # ) - - # wait_materializacao = wait_for_flow_run( - # run_materializacao, - # stream_states=True, - # stream_logs=True, - # raise_final_state=True, - # ) + run_materializacao = create_flow_run( + flow_name=bilhetagem_materializacao.name, + # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + project_name="staging", + labels=LABELS, + upstream_tasks=[wait_captura], + ) + + wait_materializacao = wait_for_flow_run( + run_materializacao, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) bilhetagem_transacao_tratamento.run_config = KubernetesRun( diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 5f5daf1a1..671d6171a 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -309,7 +309,7 @@ def create_local_partition_path( either to save raw or staging files. """ data_folder = os.getenv("DATA_FOLDER", "data") - file_path = f"{os.getcwd()}/{data_folder}/{{mode}}/{dataset_id}_dev/{table_id}" + file_path = f"{os.getcwd()}/{data_folder}/{{mode}}/{dataset_id}/{table_id}" file_path += f"/{partitions}/{filename}.{{filetype}}" log(f"Creating file path: {file_path}") return file_path @@ -783,7 +783,7 @@ def upload_raw_data_to_gcs( st_obj = Storage(table_id=table_id, dataset_id=dataset_id) log( f"""Uploading raw file to bucket {st_obj.bucket_name} at - {st_obj.bucket_name}/{dataset_id}_dev/{table_id}""" + {st_obj.bucket_name}/{dataset_id}/{table_id}""" ) st_obj.upload( path=raw_filepath, @@ -827,7 +827,7 @@ def upload_staging_data_to_gcs( try: # Creates and publish table if it does not exist, append to it otherwise create_or_append_table( - dataset_id=f"{dataset_id}_dev", + dataset_id=dataset_id, table_id=table_id, path=staging_filepath, partitions=partitions, @@ -837,7 +837,7 @@ def upload_staging_data_to_gcs( log(f"[CATCHED] Task failed with error: \n{error}", level="error") upload_run_logs_to_bq( - dataset_id=f"{dataset_id}_dev", + dataset_id=dataset_id, parent_table_id=table_id, error=error, timestamp=timestamp,