Skip to content

Commit

Permalink
remover alterações de teste
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Oct 19, 2023
1 parent a33a4b8 commit e47ff2d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
28 changes: 14 additions & 14 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,20 @@
)

# Materialização
# run_materializacao = create_flow_run(
# flow_name=bilhetagem_materializacao.name,
# # project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
# project_name="staging",
# labels=LABELS,
# upstream_tasks=[wait_captura],
# )

# wait_materializacao = wait_for_flow_run(
# run_materializacao,
# stream_states=True,
# stream_logs=True,
# raise_final_state=True,
# )
run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao.name,
# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=LABELS,
upstream_tasks=[wait_captura],
)

wait_materializacao = wait_for_flow_run(
run_materializacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

bilhetagem_transacao_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_tratamento.run_config = KubernetesRun(
Expand Down
8 changes: 4 additions & 4 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def create_local_partition_path(
either to save raw or staging files.
"""
data_folder = os.getenv("DATA_FOLDER", "data")
file_path = f"{os.getcwd()}/{data_folder}/{{mode}}/{dataset_id}_dev/{table_id}"
file_path = f"{os.getcwd()}/{data_folder}/{{mode}}/{dataset_id}/{table_id}"
file_path += f"/{partitions}/{filename}.{{filetype}}"
log(f"Creating file path: {file_path}")
return file_path
Expand Down Expand Up @@ -783,7 +783,7 @@ def upload_raw_data_to_gcs(
st_obj = Storage(table_id=table_id, dataset_id=dataset_id)
log(
f"""Uploading raw file to bucket {st_obj.bucket_name} at
{st_obj.bucket_name}/{dataset_id}_dev/{table_id}"""
{st_obj.bucket_name}/{dataset_id}/{table_id}"""
)
st_obj.upload(
path=raw_filepath,
Expand Down Expand Up @@ -827,7 +827,7 @@ def upload_staging_data_to_gcs(
try:
# Creates and publish table if it does not exist, append to it otherwise
create_or_append_table(
dataset_id=f"{dataset_id}_dev",
dataset_id=dataset_id,
table_id=table_id,
path=staging_filepath,
partitions=partitions,
Expand All @@ -837,7 +837,7 @@ def upload_staging_data_to_gcs(
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

upload_run_logs_to_bq(
dataset_id=f"{dataset_id}_dev",
dataset_id=dataset_id,
parent_table_id=table_id,
error=error,
timestamp=timestamp,
Expand Down

0 comments on commit e47ff2d

Please sign in to comment.