Skip to content

Commit

Permalink
run flow only if it has new data
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Mar 26, 2024
1 parent 526529f commit d27c332
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 40 deletions.
2 changes: 1 addition & 1 deletion pipelines/rj_cor/comando/eventos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class constants(Enum): # pylint: disable=c0103
PATH_BASE_ENDERECOS = "/tmp/base_enderecos.csv"
DATASET_ID = "adm_cor_comando"
TABLE_ID_EVENTOS = "ocorrencias_nova_api"
REDIS_NAME = "cor_api_last_days"
REDIS_NAME = "last_update"
TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis_nova_api"
# TABLE_ID_POPS = "procedimento_operacional_padrao"
# TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis"
Expand Down
35 changes: 15 additions & 20 deletions pipelines/rj_cor/comando/eventos/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
download_data_atividades,
get_date_interval,
get_redis_df,
get_redis_max_date,
save_data,
save_redis_max_date,
treat_data_ocorrencias,
treat_data_atividades,
)
Expand Down Expand Up @@ -84,35 +86,18 @@

dfr = download_data_ocorrencias(first_date, last_date)

dfr_redis = get_redis_df(
redis_max_date = get_redis_max_date(
dataset_id=dataset_id,
table_id=table_id,
name=redis_name,
mode=redis_mode,
)

dfr_treated, dfr_redis = treat_data_ocorrencias(
dfr_treated, redis_max_date = treat_data_ocorrencias(
dfr,
dfr_redis=dfr_redis,
columns=["id_evento", "data_inicio", "status"],
redis_max_date,
)

# dfr = compare_actual_df_with_redis_df(
# dfr,
# dfr_redis=dfr_redis,
# columns=columns,

# )

# save_redis_df(
# dfr_redis,
# dataset_id,
# table_id,
# redis_name,
# keep_n_days=1,
# mode = mode,
# )

path = save_data(dfr_treated)
task_upload = create_table_and_upload_to_gcs(
data_path=path,
Expand All @@ -123,6 +108,16 @@
wait=path,
)

save_redis_max_date(
dataset_id=dataset_id,
table_id=table_id,
name=redis_name,
mode=redis_mode,
redis_max_date=redis_max_date,
)

save_redis_max_date.set_upstream(task_upload)

# Warning: this task won't execute if we provide a date interval
# on parameters. The reason this happens is for if we want to
# perform backfills, it won't mess with the Redis interval.
Expand Down
85 changes: 66 additions & 19 deletions pipelines/rj_cor/comando/eventos/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# -*- coding: utf-8 -*-
# pylint: disable=R0914,W0613,W0102,W0613,R0912,R0915,E1136,E1137,W0702
# flake8: noqa: E722
# TODO: colocar id_pops novos
# TODO: gerar alerta quando tiver id_pop novo
# TODO: apagar histórico da nova api para ter o id_pop novo
# TODO: criar tabela dim do id_pop novo
# TODO: salvar no redis o máximo entre as colunas de data_inicio e data_fim, seguir flow só se tiver novidades em alguma dessas colunas
"""
Tasks for comando
"""
Expand All @@ -21,18 +26,20 @@
from prefect.engine.state import Skipped

# from prefect.triggers import all_successful

# url_eventos = "http://aplicativo.cocr.com.br/comando/ocorrencias_api_nova"
from pipelines.rj_cor.utils import compare_actual_df_with_redis_df
from pipelines.rj_cor.comando.eventos.utils import (
build_redis_key,
compare_actual_df_with_redis_df,
get_redis_output, # TODO: atualizar o do utils.utils
# build_redis_key,
format_date,
treat_wrong_id_pop,
)
from pipelines.utils.utils import (
build_redis_key,
get_redis_output,
get_vault_secret,
log,
parse_date_columns,
save_str_on_redis,
to_partitions,
)

Expand All @@ -49,7 +56,7 @@ def get_date_interval(first_date, last_date) -> Tuple[dict, str]:
first_date, last_date = format_date(first_date, last_date)
else:
last_date = pendulum.today(tz="America/Sao_Paulo").date()
first_date = last_date.subtract(days=1) # atenção mudar para 3
first_date = last_date.subtract(days=3)
first_date, last_date = format_date(
first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d")
)
Expand All @@ -70,7 +77,8 @@ def get_redis_df(
redis_key = build_redis_key(dataset_id, table_id, name, mode)
log(f"Acessing redis_key: {redis_key}")

dfr_redis = get_redis_output(redis_key, is_df=True)
dfr_redis = get_redis_output(redis_key)
# dfr_redis = get_redis_output(redis_key, is_df=True)
log(f"Redis output: {dfr_redis}")

# if len(dfr_redis) == 0:
Expand All @@ -90,6 +98,48 @@ def get_redis_df(
return dfr_redis


@task
def get_redis_max_date(
dataset_id: str,
table_id: str,
name: str = None,
mode: str = "prod",
) -> str:
"""
Acess redis to get the last saved date and compare to actual df.
"""
redis_key = build_redis_key(dataset_id, table_id, name, mode)
log(f"Acessing redis_key: {redis_key}")

redis_max_date = get_redis_output(redis_key)

try:
redis_max_date = redis_max_date["max_date"]
except KeyError:
redis_max_date = "1990-01-01"
log("Creating a fake date because this key doesn't exist.")

log(f"Redis output: {redis_max_date}")
return redis_max_date


@task
def save_redis_max_date(
dataset_id: str,
table_id: str,
name: str = None,
mode: str = "prod",
redis_max_date: str = None,
) -> str:
"""
Acess redis to save last date.
"""
redis_key = build_redis_key(dataset_id, table_id, name, mode)
log(f"Acessing redis_key: {redis_key}")

save_str_on_redis(redis_key, "max_date", redis_max_date)


@task(
nout=1,
max_retries=3,
Expand All @@ -104,6 +154,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame:
url_secret = get_vault_secret("comando")["data"]
url_eventos = url_secret["endpoint_eventos"]

log(f"\n\nDownloading data from {first_date} to {last_date} (not included)")
dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}")

return dfr
Expand All @@ -112,8 +163,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame:
@task(nout=2)
def treat_data_ocorrencias(
dfr: pd.DataFrame,
dfr_redis: pd.DataFrame,
columns: list,
redis_max_date: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Rename cols and normalize data.
Expand All @@ -133,21 +183,18 @@ def treat_data_ocorrencias(

dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str)

log(f"Dataframe before comparing with last data saved on redis \n{dfr.head()}")
columns = ["id_evento", "data_inicio", "status"]
dfr, dfr_redis = compare_actual_df_with_redis_df(
dfr,
dfr_redis,
columns,
)
log(f"Dataframe after comparing with last data saved on redis {dfr.head()}")
max_date = dfr[["data_inicio", "data_fim"]].max().max()

# If df is empty stop flow
if dfr.shape[0] == 0:
log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}")

if max_date <= redis_max_date:
skip_text = "No new data available on API"
print(skip_text)
raise ENDRUN(state=Skipped(skip_text))

# Get new max_date to save on redis
redis_max_date = max_date

dfr["tipo"] = dfr["tipo"].replace(
{
"Primária": "Primario",
Expand Down Expand Up @@ -208,7 +255,7 @@ def treat_data_ocorrencias(
"%Y-%m-%d %H:%M:%S"
)

return dfr.drop_duplicates(), dfr_redis
return dfr.drop_duplicates(), redis_max_date


@task(
Expand Down
59 changes: 59 additions & 0 deletions pipelines/rj_cor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
"""
Utils for rj-cor
"""
import json
import pandas as pd
from pipelines.utils.utils import (
get_redis_client,
log,
treat_redis_output,
)


def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod"):
Expand All @@ -10,3 +17,55 @@ def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod
"""
key = mode + "." + dataset_id + "." + table_id + "." + name
return key


def get_redis_output(redis_key, is_df: bool = False):
"""
Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair.
"""
redis_client = get_redis_client() # (host="127.0.0.1")

if is_df:
json_data = redis_client.get(redis_key)
log(f"[DEGUB] json_data {json_data}")
if json_data:
# If data is found, parse the JSON string back to a Python object (dictionary)
data_dict = json.loads(json_data)
# Convert the dictionary back to a DataFrame
return pd.DataFrame(data_dict)

return pd.DataFrame()

output = redis_client.hgetall(redis_key)
if len(output) > 0:
output = treat_redis_output(output)
return output


def compare_actual_df_with_redis_df(
dfr: pd.DataFrame,
dfr_redis: pd.DataFrame,
columns: list,
) -> pd.DataFrame:
"""
Compare df from redis to actual df and return only the rows from actual df
that are not already saved on redis.
"""
for col in columns:
if col not in dfr_redis.columns:
dfr_redis[col] = None
dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes)
log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}")

dfr_diff = (
pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True)
.query('_merge == "left_only"')
.drop("_merge", axis=1)
)
log(
f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}"
)

updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]])

return dfr_diff, updated_dfr_redis

0 comments on commit d27c332

Please sign in to comment.