From 6731fe29535e8ac191cdc954b1cc45959fef9401 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 13 Mar 2024 17:16:42 -0300 Subject: [PATCH 01/11] adding inea flows --- pipelines/rj_cor/__init__.py | 1 + .../meteorologia/precipitacao_inea/flows.py | 224 ++++++++++++++++++ .../precipitacao_inea/schedules.py | 34 +++ .../meteorologia/precipitacao_inea/tasks.py | 208 ++++++++++++++++ 4 files changed, 467 insertions(+) create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py create mode 100644 pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py diff --git a/pipelines/rj_cor/__init__.py b/pipelines/rj_cor/__init__.py index 34ffc0e76..711a915fe 100644 --- a/pipelines/rj_cor/__init__.py +++ b/pipelines/rj_cor/__init__.py @@ -7,6 +7,7 @@ from pipelines.rj_cor.meteorologia.meteorologia_redemet.flows import * from pipelines.rj_cor.meteorologia.precipitacao_alertario.flows import * from pipelines.rj_cor.meteorologia.precipitacao_cemaden.flows import * +from pipelines.rj_cor.meteorologia.precipitacao_inea.flows import * from pipelines.rj_cor.meteorologia.satelite.flows import * from pipelines.rj_cor.meteorologia.precipitacao_websirene.flows import * from pipelines.rj_cor.meteorologia.radar.precipitacao.flows import * diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py new file mode 100644 index 000000000..e09d779a5 --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Flows for precipitacao_inea. +""" +from datetime import timedelta + +from prefect import case, Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.tasks.prefect import create_flow_run + +from pipelines.constants import constants +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.custom import wait_for_flow_run_with_timeout +from pipelines.rj_cor.meteorologia.precipitacao_inea.tasks import ( + check_for_new_stations, + download_data, + treat_data, + save_data, +) +from pipelines.rj_cor.meteorologia.precipitacao_inea.schedules import ( + minute_schedule, +) +from pipelines.utils.decorators import Flow +from pipelines.utils.dump_db.constants import constants as dump_db_constants +from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, +) + +wait_for_flow_run_with_2min_timeout = wait_for_flow_run_with_timeout( + timeout=timedelta(minutes=2) +) + +with Flow( + name="COR: Meteorologia - Precipitacao INEA", + code_owners=[ + "paty", + ], + # skip_if_running=True, +) as cor_meteorologia_precipitacao_inea: + DUMP_MODE = Parameter("dump_mode", default="append", required=True) + DATASET_ID_PLUVIOMETRIC = Parameter( + "dataset_id_pluviometric", default="clima_pluviometro", required=True + ) + TABLE_ID_PLUVIOMETRIC = Parameter( + "table_id_pluviometric", default="taxa_precipitacao_inea", required=True + ) + DATASET_ID_FLUVIOMETRIC = Parameter( + "dataset_id_fluviometric", default="clima_fluviometro", required=True + ) + TABLE_ID_FLUVIOMETRIC = Parameter( + "table_id_fluviometric", default="taxa_precipitacao_inea", required=True + ) + + # Materialization parameters + MATERIALIZE_AFTER_DUMP = Parameter( + "materialize_after_dump", default=True, required=False + ) + MATERIALIZE_TO_DATARIO = Parameter( + "materialize_to_datario", default=True, required=False + ) + MATERIALIZATION_MODE = Parameter("mode", default="prod", required=False) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + DUMP_TO_GCS = Parameter("dump_to_gcs", default=False, required=False) + + MAXIMUM_BYTES_PROCESSED = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dataframe = download_data() + dfr_pluviometric, dfr_fluviometric = treat_data( + dataframe=dataframe, + dataset_id=DATASET_ID_PLUVIOMETRIC, + table_id=TABLE_ID_PLUVIOMETRIC, + mode=MATERIALIZATION_MODE, + ) + path_pluviometric = save_data(dataframe=dfr_pluviometric, folder_name="pluviometer") + path_fluviometric = save_data(dataframe=dfr_fluviometric, folder_name="fluviometer") + + # Create pluviometric table in BigQuery + UPLOAD_TABLE_PLUVIOMETRIC = create_table_and_upload_to_gcs( + data_path=path_pluviometric, + dataset_id=DATASET_ID_PLUVIOMETRIC, + table_id=TABLE_ID_PLUVIOMETRIC, + dump_mode=DUMP_MODE, + wait=path_pluviometric, + ) + + # Trigger pluviometric DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID_PLUVIOMETRIC, + "table_id": TABLE_ID_PLUVIOMETRIC, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + ) + + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID_PLUVIOMETRIC, + "table_id": TABLE_ID_PLUVIOMETRIC, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + # Create fluviometric table in BigQuery + UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( + data_path=path_pluviometric, + dataset_id=DATASET_ID_FLUVIOMETRIC, + table_id=TABLE_ID_FLUVIOMETRIC, + dump_mode=DUMP_MODE, + wait=path_pluviometric, + ) + + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": DATASET_ID_FLUVIOMETRIC, + "table_id": TABLE_ID_FLUVIOMETRIC, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, + }, + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", + ) + + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID_FLUVIOMETRIC, + "table_id": TABLE_ID_FLUVIOMETRIC, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + check_for_new_stations(dataframe, wait=UPLOAD_TABLE_PLUVIOMETRIC) + +# para rodar na cloud +cor_meteorologia_precipitacao_inea.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +cor_meteorologia_precipitacao_inea.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_COR_AGENT_LABEL.value], +) +cor_meteorologia_precipitacao_inea.schedule = minute_schedule diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py new file mode 100644 index 000000000..f16b41aaf --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/schedules.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Schedules for precipitacao_inea +Rodar a cada 1 minuto +""" +from datetime import timedelta, datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from pipelines.constants import constants + +minute_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=5), + start_date=datetime(2023, 1, 1, 0, 1, 0), + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], + parameter_defaults={ + # "trigger_rain_dashboard_update": True, + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + "dump_mode": "append", + "dataset_id_pluviometric": "clima_pluviometro", + "table_id_pluviometric": "taxa_precipitacao_inea", + "dataset_id_fluviometric": "clima_fluviometro", + "table_id_fluviometric": "lamina_agua_inea", + }, + ), + ] +) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py new file mode 100644 index 000000000..1eadb850a --- /dev/null +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# pylint: disable=C0103 +""" +Tasks for precipitacao_inea +""" +from datetime import timedelta +from pathlib import Path +from typing import Union, Tuple + +import numpy as np +import pandas as pd +import pendulum +from prefect import task +from prefect.engine.signals import ENDRUN +from prefect.engine.state import Skipped, Failed +from pipelines.constants import constants +from pipelines.utils.utils import ( + log, + parse_date_columns, + save_updated_rows_on_redis, + to_partitions, +) + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def download_data() -> pd.DataFrame: + """ + Download data from API + """ + + estacoes = { + "1": "225543320", # Campo Grande + "2": "BE70E166", # Capela Mayrink + "3": "225543250", # Eletrobras + "4": "2243088", # Realengo + "5": "225443130", # Sao Cristovao + } + + dataframe = pd.DataFrame() + for key, value in estacoes.items(): + url = f"http://200.20.53.8/alertadecheias/{value}.xlsx" + dataframe_temp = pd.read_excel(url) + dataframe_temp["id_estacao"] = key + dataframe = pd.concat([dataframe, dataframe_temp]) + return dataframe + + +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def treat_data( + dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev" +) -> Tuple[pd.DataFrame, bool]: + """ + Rename cols and filter data using Redis + """ + + dataframe["data_medicao"] = ( + pd.to_datetime(dataframe.Data, format="%d/%m/%Y").dt.strftime("%Y-%m-%d") + + " " + + dataframe["Hora"] + ) + + rename_cols = { + "Chuva Último dado": "acumulado_chuva_15_min", + " Chuva Acumulada 1H": "acumulado_chuva_1_h", + " Chuva Acumulada 4H": "acumulado_chuva_4_h", + " Chuva Acumulada 24H": "acumulado_chuva_24_h", + " Chuva Acumulada 96H": "acumulado_chuva_96_h", + " Chuva Acumulada 30D": "acumulado_chuva_30_d", + " Último Nível": "altura_agua", + } + dataframe.rename(columns=rename_cols, inplace=True) + + # replace all "Dado Nulo" to nan + dataframe.replace({"Dado Nulo": np.nan}, inplace=True) + + # Eliminate where the id_estacao is the same keeping the smallest one + dataframe.sort_values( + ["id_estacao", "data_medicao"] + list(rename_cols.values()), inplace=True + ) + dataframe.drop_duplicates(subset=["id_estacao", "data_medicao"], keep="first") + + date_format = "%Y-%m-%d %H:%M:%S" + # dataframe["data_medicao"] = dataframe["data_medicao"].dt.strftime(date_format) + + log(f"Dataframe before comparing with last data saved on redis {dataframe.head()}") + log(f"Dataframe before comparing {dataframe[dataframe['id_estacao']=='1']}") + + dataframe = save_updated_rows_on_redis( + dataframe, + dataset_id, + table_id, + unique_id="id_estacao", + date_column="data_medicao", + date_format=date_format, + mode=mode, + ) + + log(f"Dataframe after comparing with last data saved on redis {dataframe.head()}") + log(f"Dataframe after comparing {dataframe[dataframe['id_estacao']=='1']}") + + # If df is empty stop flow + if dataframe.shape[0] == 0: + skip_text = "No new data available on API" + log(skip_text) + raise ENDRUN(state=Skipped(skip_text)) + + pluviometric_cols = [ + "id_estacao", + "data_medicao", + "acumulado_chuva_15_min", + "acumulado_chuva_1_h", + "acumulado_chuva_4_h", + "acumulado_chuva_24_h", + "acumulado_chuva_96_h", + "acumulado_chuva_30_d", + ] + fluviometric_cols = ["data_medicao", "altura_agua"] + + dfr_pluviometric = dataframe.loc[ + dataframe["altura_agua"] == "Estação pluviométrica", pluviometric_cols + ].copy() + dfr_fluviometric = dataframe.loc[ + dataframe["altura_agua"] != "Estação pluviométrica", fluviometric_cols + ].copy() + + # Replace all values bigger than 10000 on altura_agua to nan + dfr_fluviometric.loc[ + dfr_fluviometric["altura_agua"] > 10000, "altura_agua" + ] = np.nan + + dfr_fluviometric["id_reservatorio"] = 1 + dfr_fluviometric["tipo_reservatorio"] = "rio" + + fluviometric_cols_order = [ + "id_reservatorio", + "data_medicao", + "tipo_reservatorio", + "altura_agua", + ] + dfr_fluviometric = dfr_fluviometric[fluviometric_cols_order].copy() + + return dfr_pluviometric, dfr_fluviometric + + +@task +def save_data(dataframe: pd.DataFrame, folder_name: str = None) -> Union[str, Path]: + """ + Save data on a csv file to be uploaded to GCP + """ + + prepath = Path("/tmp/precipitacao") + if folder_name: + prepath = Path("/tmp/precipitacao") / folder_name + prepath.mkdir(parents=True, exist_ok=True) + + partition_column = "data_medicao" + dataframe, partitions = parse_date_columns(dataframe, partition_column) + current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") + + to_partitions( + data=dataframe, + partition_columns=partitions, + savepath=prepath, + data_type="csv", + suffix=current_time, + ) + log(f"[DEBUG] Files saved on {prepath}") + return prepath + + +@task +def check_for_new_stations( + dataframe: pd.DataFrame, + wait=None, # pylint: disable=unused-argument +) -> None: + """ + Check if the updated stations are the same as before. + If not, consider flow as failed and call attention to + add this new station on estacoes_cemaden. + I can't automatically update this new station, because + I couldn't find a url that gives me the lat and lon for + all the stations. + """ + + stations_before = [ + "1", + "2", + "3", + "4", + "5", + ] + new_stations = [ + i for i in dataframe.id_estacao.unique() if str(i) not in stations_before + ] + if len(new_stations) != 0: + message = f"New station identified. You need to update INEA\ + estacoes_inea adding station(s) {new_stations}: \ + {dataframe[dataframe.id_estacao.isin(new_stations)]} " + log(message) + raise ENDRUN(state=Failed(message)) From 07651db51ade89013a7d69e0e0c524123a494e51 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 13 Mar 2024 18:01:54 -0300 Subject: [PATCH 02/11] adding openpyxml to libraries --- poetry.lock | 29 ++++++++++++++++++++++++++++- pyproject.toml | 1 + 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 76ec6ab48..8d50fb59c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1612,6 +1612,18 @@ files = [ {file = "entrypoints-0.4.tar.gz", hash = "sha256:b706eddaa9218a19ebcd67b56818f05bb27589b1ca9e8d797b74affad4ccacd4"}, ] +[[package]] +name = "et-xmlfile" +version = "1.1.0" +description = "An implementation of lxml.xmlfile for the standard library" +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "et_xmlfile-1.1.0-py3-none-any.whl", hash = "sha256:a2ba85d1d6a74ef63837eed693bcb89c3f752169b0e3e7ae5b16ca5e1b3deada"}, + {file = "et_xmlfile-1.1.0.tar.gz", hash = "sha256:8eb9e2bc2f8c97e37a2dc85a09ecdcdec9d8a396530a6d5a33b30b9a92da0c5c"}, +] + [[package]] name = "exceptiongroup" version = "1.1.2" @@ -4219,6 +4231,21 @@ numpy = [ {version = ">=1.21.4", markers = "python_version >= \"3.10\" and platform_system == \"Darwin\""}, ] +[[package]] +name = "openpyxl" +version = "3.1.2" +description = "A Python library to read/write Excel 2010 xlsx/xlsm files" +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "openpyxl-3.1.2-py2.py3-none-any.whl", hash = "sha256:f91456ead12ab3c6c2e9491cf33ba6d08357d802192379bb482f1033ade496f5"}, + {file = "openpyxl-3.1.2.tar.gz", hash = "sha256:a6f5977418eff3b2d5500d54d9db50c8277a368436f4e4f8ddb1be3422870184"}, +] + +[package.dependencies] +et-xmlfile = "*" + [[package]] name = "orjson" version = "3.9.2" @@ -7582,4 +7609,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.11" -content-hash = "e858f1313971a28f2977c43b2bf0fbaf0490f8a44ad450df133a7a3e96f90666" +content-hash = "63042a0ef31df90a35c18fd5e65e78a8ca1f67c54289f0182b1957b19b465c9c" diff --git a/pyproject.toml b/pyproject.toml index 83db62307..0ba4b7f52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,6 +66,7 @@ pyodbc = "^5.0.1" h3 = "^3.7.6" dask = "^2023.11.0" cartopy = "^0.22.0" +openpyxl = "^3.1.2" [tool.poetry.dev-dependencies] pylint = "^2.12.2" From 27ac70928ea9692201d5875e6e9e2a1168fc9aef Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 08:53:18 -0300 Subject: [PATCH 03/11] bugfix datehour --- pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py index 1eadb850a..eff8f2f07 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -65,6 +65,7 @@ def treat_data( pd.to_datetime(dataframe.Data, format="%d/%m/%Y").dt.strftime("%Y-%m-%d") + " " + dataframe["Hora"] + + ":00" ) rename_cols = { From 1f6ef8077e592000e5b526d6cadd83ed8cbef00b Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 09:44:50 -0300 Subject: [PATCH 04/11] bugfix --- pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py | 6 +++--- pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py index e09d779a5..bf42ae055 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -35,7 +35,7 @@ ) with Flow( - name="COR: Meteorologia - Precipitacao INEA", + name="COR: Meteorologia - Precipitacao e Fluviometria INEA", code_owners=[ "paty", ], @@ -150,11 +150,11 @@ # Create fluviometric table in BigQuery UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( - data_path=path_pluviometric, + data_path=path_fluviometric, dataset_id=DATASET_ID_FLUVIOMETRIC, table_id=TABLE_ID_FLUVIOMETRIC, dump_mode=DUMP_MODE, - wait=path_pluviometric, + wait=path_fluviometric, ) # Trigger DBT flow run diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py index eff8f2f07..d6f04fd08 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -137,7 +137,7 @@ def treat_data( dfr_fluviometric["altura_agua"] > 10000, "altura_agua" ] = np.nan - dfr_fluviometric["id_reservatorio"] = 1 + dfr_fluviometric["id_reservatorio"] = dfr_fluviometric["id_estacao"] dfr_fluviometric["tipo_reservatorio"] = "rio" fluviometric_cols_order = [ From a3458e36e831cdfb48d822760324e031bd6585fa Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 10:54:51 -0300 Subject: [PATCH 05/11] forcing save fluviometric after pluviometric to avoid prefect mistake saving pluviometric data on fluviometric table --- .../meteorologia/precipitacao_inea/flows.py | 213 ++++++++++-------- .../meteorologia/precipitacao_inea/tasks.py | 31 ++- 2 files changed, 141 insertions(+), 103 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py index bf42ae055..0fb1b3b65 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -15,6 +15,7 @@ from pipelines.utils.custom import wait_for_flow_run_with_timeout from pipelines.rj_cor.meteorologia.precipitacao_inea.tasks import ( check_for_new_stations, + check_new_data, download_data, treat_data, save_data, @@ -80,138 +81,150 @@ table_id=TABLE_ID_PLUVIOMETRIC, mode=MATERIALIZATION_MODE, ) - path_pluviometric = save_data(dataframe=dfr_pluviometric, folder_name="pluviometer") - path_fluviometric = save_data(dataframe=dfr_fluviometric, folder_name="fluviometer") - - # Create pluviometric table in BigQuery - UPLOAD_TABLE_PLUVIOMETRIC = create_table_and_upload_to_gcs( - data_path=path_pluviometric, - dataset_id=DATASET_ID_PLUVIOMETRIC, - table_id=TABLE_ID_PLUVIOMETRIC, - dump_mode=DUMP_MODE, - wait=path_pluviometric, + new_pluviometric_data, new_fluviometric_data = check_new_data( + dfr_pluviometric, dfr_fluviometric ) - # Trigger pluviometric DBT flow run - with case(MATERIALIZE_AFTER_DUMP, True): - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": DATASET_ID_PLUVIOMETRIC, - "table_id": TABLE_ID_PLUVIOMETRIC, - "mode": MATERIALIZATION_MODE, - "materialize_to_datario": MATERIALIZE_TO_DATARIO, - }, - labels=current_flow_labels, - run_name=f"Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + with case(new_pluviometric_data, True): + path_pluviometric = save_data( + dataframe=dfr_pluviometric, folder_name="pluviometer" ) - materialization_flow.set_upstream(current_flow_labels) - - wait_for_materialization = wait_for_flow_run_with_2min_timeout( - flow_run_id=materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + # Create pluviometric table in BigQuery + UPLOAD_TABLE_PLUVIOMETRIC = create_table_and_upload_to_gcs( + data_path=path_pluviometric, + dataset_id=DATASET_ID_PLUVIOMETRIC, + table_id=TABLE_ID_PLUVIOMETRIC, + dump_mode=DUMP_MODE, + wait=path_pluviometric, ) - with case(DUMP_TO_GCS, True): - # Trigger Dump to GCS flow run with project id as datario - dump_to_gcs_flow = create_flow_run( - flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + # Trigger pluviometric DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, project_name=constants.PREFECT_DEFAULT_PROJECT.value, parameters={ - "project_id": "datario", "dataset_id": DATASET_ID_PLUVIOMETRIC, "table_id": TABLE_ID_PLUVIOMETRIC, - "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, }, - labels=[ - "datario", - ], - run_name=f"Dump to GCS {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", ) - dump_to_gcs_flow.set_upstream(wait_for_materialization) - wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( - flow_run_id=dump_to_gcs_flow, + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, stream_states=True, stream_logs=True, raise_final_state=True, ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) - # Create fluviometric table in BigQuery - UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( - data_path=path_fluviometric, - dataset_id=DATASET_ID_FLUVIOMETRIC, - table_id=TABLE_ID_FLUVIOMETRIC, - dump_mode=DUMP_MODE, - wait=path_fluviometric, - ) - - # Trigger DBT flow run - with case(MATERIALIZE_AFTER_DUMP, True): - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": DATASET_ID_FLUVIOMETRIC, - "table_id": TABLE_ID_FLUVIOMETRIC, - "mode": MATERIALIZATION_MODE, - "materialize_to_datario": MATERIALIZE_TO_DATARIO, - }, - labels=current_flow_labels, - run_name=f"Materialize {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", - ) - - materialization_flow.set_upstream(current_flow_labels) - - wait_for_materialization = wait_for_flow_run_with_2min_timeout( - flow_run_id=materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID_PLUVIOMETRIC, + "table_id": TABLE_ID_PLUVIOMETRIC, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID_PLUVIOMETRIC}.{TABLE_ID_PLUVIOMETRIC}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + with case(new_pluviometric_data, True): + path_fluviometric = save_data( + dataframe=dfr_fluviometric, folder_name="fluviometer" ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + path_fluviometric.set_upstream(path_pluviometric) + + # Create fluviometric table in BigQuery + UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( + data_path=path_fluviometric, + dataset_id=DATASET_ID_FLUVIOMETRIC, + table_id=TABLE_ID_FLUVIOMETRIC, + dump_mode=DUMP_MODE, + wait=path_fluviometric, ) - with case(DUMP_TO_GCS, True): - # Trigger Dump to GCS flow run with project id as datario - dump_to_gcs_flow = create_flow_run( - flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + # Trigger DBT flow run + with case(MATERIALIZE_AFTER_DUMP, True): + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, project_name=constants.PREFECT_DEFAULT_PROJECT.value, parameters={ - "project_id": "datario", "dataset_id": DATASET_ID_FLUVIOMETRIC, "table_id": TABLE_ID_FLUVIOMETRIC, - "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + "mode": MATERIALIZATION_MODE, + "materialize_to_datario": MATERIALIZE_TO_DATARIO, }, - labels=[ - "datario", - ], - run_name=f"Dump to GCS {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", + labels=current_flow_labels, + run_name=f"Materialize {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", ) - dump_to_gcs_flow.set_upstream(wait_for_materialization) - wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( - flow_run_id=dump_to_gcs_flow, + materialization_flow.set_upstream(current_flow_labels) + + wait_for_materialization = wait_for_flow_run_with_2min_timeout( + flow_run_id=materialization_flow, stream_states=True, stream_logs=True, raise_final_state=True, ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(DUMP_TO_GCS, True): + # Trigger Dump to GCS flow run with project id as datario + dump_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": DATASET_ID_FLUVIOMETRIC, + "table_id": TABLE_ID_FLUVIOMETRIC, + "maximum_bytes_processed": MAXIMUM_BYTES_PROCESSED, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {DATASET_ID_FLUVIOMETRIC}.{TABLE_ID_FLUVIOMETRIC}", + ) + dump_to_gcs_flow.set_upstream(wait_for_materialization) + + wait_for_dump_to_gcs = wait_for_flow_run_with_2min_timeout( + flow_run_id=dump_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) check_for_new_stations(dataframe, wait=UPLOAD_TABLE_PLUVIOMETRIC) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py index d6f04fd08..6bfbcf08b 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -56,7 +56,7 @@ def download_data() -> pd.DataFrame: ) def treat_data( dataframe: pd.DataFrame, dataset_id: str, table_id: str, mode: str = "dev" -) -> Tuple[pd.DataFrame, bool]: +) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Rename cols and filter data using Redis """ @@ -132,7 +132,7 @@ def treat_data( dataframe["altura_agua"] != "Estação pluviométrica", fluviometric_cols ].copy() - # Replace all values bigger than 10000 on altura_agua to nan + # Replace all values bigger than 10000 on "altura_agua" to nan dfr_fluviometric.loc[ dfr_fluviometric["altura_agua"] > 10000, "altura_agua" ] = np.nan @@ -151,7 +151,32 @@ def treat_data( return dfr_pluviometric, dfr_fluviometric -@task +@task( + nout=2, + max_retries=constants.TASK_MAX_RETRIES.value, + retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value), +) +def check_new_data( + dfr_pluviometric: pd.DataFrame, + dfr_fluviometric: pd.DataFrame, +) -> Tuple[bool, bool]: + """ + Check if the dataframes are empty + """ + + new_pluviometric_data = True + new_fluviometric_data = True + + if dfr_pluviometric.shape[0] == 0: + log("No new pluviometric data available on API") + new_pluviometric_data = False + if dfr_fluviometric.shape[0] == 0: + log("No new fluviometric data available on API") + new_fluviometric_data = False + return new_pluviometric_data, new_fluviometric_data + + +@task(skip_on_upstream_skip=False) def save_data(dataframe: pd.DataFrame, folder_name: str = None) -> Union[str, Path]: """ Save data on a csv file to be uploaded to GCP From 36b6f09a8e74c5468e788f76d140e9596683007b Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 11:38:13 -0300 Subject: [PATCH 06/11] bugfix --- pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py index 0fb1b3b65..c481c8406 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -155,11 +155,11 @@ raise_final_state=True, ) - with case(new_pluviometric_data, True): + with case(new_fluviometric_data, True): path_fluviometric = save_data( dataframe=dfr_fluviometric, folder_name="fluviometer" ) - path_fluviometric.set_upstream(path_pluviometric) + path_fluviometric.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC) # Create fluviometric table in BigQuery UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( From b84d86a74ceeae09299017c59242f6b2dcd59be7 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 14:36:11 -0300 Subject: [PATCH 07/11] bugfix --- pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py index 6bfbcf08b..2ff11dc3a 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -113,6 +113,8 @@ def treat_data( log(skip_text) raise ENDRUN(state=Skipped(skip_text)) + dataframe["id_reservatorio"] = dataframe["id_estacao"] + pluviometric_cols = [ "id_estacao", "data_medicao", @@ -123,7 +125,7 @@ def treat_data( "acumulado_chuva_96_h", "acumulado_chuva_30_d", ] - fluviometric_cols = ["data_medicao", "altura_agua"] + fluviometric_cols = ["id_reservatorio", "data_medicao", "altura_agua"] dfr_pluviometric = dataframe.loc[ dataframe["altura_agua"] == "Estação pluviométrica", pluviometric_cols @@ -137,7 +139,6 @@ def treat_data( dfr_fluviometric["altura_agua"] > 10000, "altura_agua" ] = np.nan - dfr_fluviometric["id_reservatorio"] = dfr_fluviometric["id_estacao"] dfr_fluviometric["tipo_reservatorio"] = "rio" fluviometric_cols_order = [ From 29f90f2e5afc9fcd1871ed10dbaf69b85bec2c04 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 15:56:33 -0300 Subject: [PATCH 08/11] adding a wait_task --- .../rj_cor/meteorologia/precipitacao_inea/flows.py | 5 ++++- .../rj_cor/meteorologia/precipitacao_inea/tasks.py | 14 +++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py index c481c8406..dde2f81ff 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -19,6 +19,7 @@ download_data, treat_data, save_data, + wait_task, ) from pipelines.rj_cor.meteorologia.precipitacao_inea.schedules import ( minute_schedule, @@ -156,10 +157,12 @@ ) with case(new_fluviometric_data, True): + status = wait_task() + status.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC) path_fluviometric = save_data( dataframe=dfr_fluviometric, folder_name="fluviometer" ) - path_fluviometric.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC) + path_fluviometric.set_upstream(status) # Create fluviometric table in BigQuery UPLOAD_TABLE_FLUVIOMETRIC = create_table_and_upload_to_gcs( diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py index 2ff11dc3a..15456b331 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -127,9 +127,7 @@ def treat_data( ] fluviometric_cols = ["id_reservatorio", "data_medicao", "altura_agua"] - dfr_pluviometric = dataframe.loc[ - dataframe["altura_agua"] == "Estação pluviométrica", pluviometric_cols - ].copy() + dfr_pluviometric = dataframe[pluviometric_cols].copy() dfr_fluviometric = dataframe.loc[ dataframe["altura_agua"] != "Estação pluviométrica", fluviometric_cols ].copy() @@ -178,6 +176,13 @@ def check_new_data( @task(skip_on_upstream_skip=False) +def wait_task(): + """Task create because prefect was messing up paths to be saved on each table""" + log("End waiting pluviometric task to end.") + return "continue" + + +@task def save_data(dataframe: pd.DataFrame, folder_name: str = None) -> Union[str, Path]: """ Save data on a csv file to be uploaded to GCP @@ -188,6 +193,9 @@ def save_data(dataframe: pd.DataFrame, folder_name: str = None) -> Union[str, Pa prepath = Path("/tmp/precipitacao") / folder_name prepath.mkdir(parents=True, exist_ok=True) + log(f"Start saving data on {prepath}") + log(f"Data to be saved {dataframe.head()}") + partition_column = "data_medicao" dataframe, partitions = parse_date_columns(dataframe, partition_column) current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") From bf52ccfa81fe196a338078d577cab1ab6194198b Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 14 Mar 2024 16:48:12 -0300 Subject: [PATCH 09/11] removing some columns from lamina_agua_inea --- .../rj_cor/meteorologia/precipitacao_inea/tasks.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py index 15456b331..acc66e445 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/tasks.py @@ -113,8 +113,6 @@ def treat_data( log(skip_text) raise ENDRUN(state=Skipped(skip_text)) - dataframe["id_reservatorio"] = dataframe["id_estacao"] - pluviometric_cols = [ "id_estacao", "data_medicao", @@ -125,7 +123,7 @@ def treat_data( "acumulado_chuva_96_h", "acumulado_chuva_30_d", ] - fluviometric_cols = ["id_reservatorio", "data_medicao", "altura_agua"] + fluviometric_cols = ["id_estacao", "data_medicao", "altura_agua"] dfr_pluviometric = dataframe[pluviometric_cols].copy() dfr_fluviometric = dataframe.loc[ @@ -137,12 +135,9 @@ def treat_data( dfr_fluviometric["altura_agua"] > 10000, "altura_agua" ] = np.nan - dfr_fluviometric["tipo_reservatorio"] = "rio" - fluviometric_cols_order = [ - "id_reservatorio", + "id_estacao", "data_medicao", - "tipo_reservatorio", "altura_agua", ] dfr_fluviometric = dfr_fluviometric[fluviometric_cols_order].copy() @@ -176,10 +171,9 @@ def check_new_data( @task(skip_on_upstream_skip=False) -def wait_task(): +def wait_task() -> None: """Task create because prefect was messing up paths to be saved on each table""" log("End waiting pluviometric task to end.") - return "continue" @task From b23b2bd7cccab298a2e3835feeef9cb328d693e5 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Mar 2024 10:55:23 -0300 Subject: [PATCH 10/11] testing wait_task before case fluviometric_data True --- pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py index dde2f81ff..ea2da42ff 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -156,9 +156,9 @@ raise_final_state=True, ) + status = wait_task() + status.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC) with case(new_fluviometric_data, True): - status = wait_task() - status.set_upstream(UPLOAD_TABLE_PLUVIOMETRIC) path_fluviometric = save_data( dataframe=dfr_fluviometric, folder_name="fluviometer" ) From aa5e25e0e3cc8c74b0f9a6e280669a620e176225 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 15 Mar 2024 15:04:28 -0300 Subject: [PATCH 11/11] typo --- pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py index ea2da42ff..19307c5b4 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_inea/flows.py @@ -54,7 +54,7 @@ "dataset_id_fluviometric", default="clima_fluviometro", required=True ) TABLE_ID_FLUVIOMETRIC = Parameter( - "table_id_fluviometric", default="taxa_precipitacao_inea", required=True + "table_id_fluviometric", default="lamina_agua_inea", required=True ) # Materialization parameters