diff --git a/rj_cor/comando/eventos/constants.html b/rj_cor/comando/eventos/constants.html index dfc1e6a3c..0a542f33c 100644 --- a/rj_cor/comando/eventos/constants.html +++ b/rj_cor/comando/eventos/constants.html @@ -43,10 +43,11 @@
pipelines.rj_cor.comando.eventos.constants
PATH_BASE_ENDERECOS = "/tmp/base_enderecos.csv"
DATASET_ID = "adm_cor_comando"
- TABLE_ID_EVENTOS = "ocorrencias"
- TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis"
- TABLE_ID_POPS = "procedimento_operacional_padrao"
- TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis"
+ TABLE_ID_EVENTOS = "ocorrencias_nova_api"
+ REDIS_NAME = "cor_api_last_days"
+ # TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis"
+ # TABLE_ID_POPS = "procedimento_operacional_padrao"
+ # TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis"
RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS = {
"redis_data_key": "data_alagamento_recente_comando",
"redis_update_key": "data_update_alagamento_recente_comando",
@@ -61,22 +62,44 @@ pipelines.rj_cor.comando.eventos.constants
END AS tipo,
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
CAST(latitude AS FLOAT64)) AS geometry
- FROM `rj-cor.adm_cor_comando_staging.ocorrencias`
+ -- FROM `rj-cor.adm_cor_comando_staging.ocorrencias`
+ FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api`
WHERE id_pop IN ("5", "6", "31", "32", "33")
- AND data_particao >= DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 1 day), day)
- AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 1 day)
- AND data_fim IS NULL
+ AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS STRING)
+ AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE)
+ -- AND data_fim IS NULL
+ AND status = "Aberto"
),
+
+ intersected_areas AS (
+ SELECT
+ h3_grid.id AS id_h3,
+ bairros.nome AS bairro,
+ h3_grid.geometry,
+ -- bairros.geometry,
+ ST_AREA(ST_INTERSECTION(bairros.geometry, h3_grid.geometry)) AS intersection_area,
+ ROW_NUMBER() OVER (PARTITION BY h3_grid.id ORDER BY ST_AREA(ST_INTERSECTION(bairros.geometry, h3_grid.geometry)) DESC) AS row_num
+ FROM
+ `rj-cor.dados_mestres.h3_grid_res8` h3_grid
+ LEFT JOIN
+ `rj-cor.dados_mestres.bairro` AS bairros
+ ON
+ ST_INTERSECTS(bairros.geometry, h3_grid.geometry)
+ WHERE NOT ST_CONTAINS(ST_GEOGFROMTEXT('POLYGON((-43.35167114973923 -23.03719187431942, -43.21742224531541 -23.11411703410819, -43.05787930227852 -23.08560586153892, -43.13797293161925 -22.9854505090441, -43.24908435505957 -23.01309491285712, -43.29357259322761 -23.02302500142027, -43.35372293867113 -23.02286949608791, -43.35167114973923 -23.03719187431942))'), h3_grid.geometry)
+ AND NOT ST_CONTAINS(ST_GEOGFROMTEXT('POLYGON((-43.17255470033881 -22.80357287766821, -43.16164114820394 -22.8246787848653, -43.1435175784006 -22.83820699694974, -43.08831858222521 -22.79901386772875, -43.09812065965735 -22.76990583135868, -43.11917632397501 -22.77502040608505, -43.12252626904735 -22.74275730775724, -43.13510053525226 -22.7443347361711, -43.1568784870256 -22.79110122556994, -43.17255470033881 -22.80357287766821))'), h3_grid.geometry)
+ AND h3_grid.id not in ("88a8a06a31fffff", "88a8a069b5fffff", "88a8a3d357fffff", "88a8a3d355fffff", "88a8a068adfffff", "88a8a06991fffff", "88a8a06999fffff")
+
+ ),
+
final_table AS (
SELECT
- h3_grid.id AS id_h3,
- nome AS bairro,
+ id_h3,
+ bairro,
COALESCE(MAX(tipo), 0) AS tipo
- FROM `rj-cor.dados_mestres.h3_grid_res8` h3_grid
- LEFT JOIN `rj-cor.dados_mestres.bairro`
- ON ST_CONTAINS(`rj-cor.dados_mestres.bairro`.geometry, ST_CENTROID(h3_grid.geometry))
+ FROM intersected_areas
LEFT JOIN alagamentos
- ON ST_CONTAINS(h3_grid.geometry, alagamentos.geometry)
+ ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry)
+ WHERE intersected_areas.row_num = 1
GROUP BY id_h3, bairro
)
@@ -99,7 +122,7 @@ pipelines.rj_cor.comando.eventos.constants
FROM final_table
""",
"query_update": """
- SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update
+ SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update
""",
}
RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS = {
@@ -116,25 +139,45 @@ pipelines.rj_cor.comando.eventos.constants
END AS tipo,
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
CAST(latitude AS FLOAT64)) AS geometry
- FROM `rj-cor.adm_cor_comando_staging.ocorrencias`
+ -- FROM `rj-cor.adm_cor_comando_staging.ocorrencias`
+ FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api`
WHERE id_pop IN ("5", "6", "31", "32", "33")
- AND data_particao >= DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE), day)
+ AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE), day) AS STRING)
AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE)
- OR data_fim IS NULL)
- -- AND data_particao >= DATE_TRUNC(TIMESTAMP_SUB(CAST("2022-04-01 04:39:15" as datetime), INTERVAL 120 MINUTE), day)
- -- AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CAST("2022-04-01 04:39:15" as datetime), INTERVAL 120 MINUTE)
- -- OR data_fim IS NULL)
+ -- OR data_fim IS NULL
+ )
+ AND status = "Aberto"
),
+
+ intersected_areas AS (
+ SELECT
+ h3_grid.id AS id_h3,
+ bairros.nome AS bairro,
+ h3_grid.geometry,
+ -- bairros.geometry,
+ ST_AREA(ST_INTERSECTION(bairros.geometry, h3_grid.geometry)) AS intersection_area,
+ ROW_NUMBER() OVER (PARTITION BY h3_grid.id ORDER BY ST_AREA(ST_INTERSECTION(bairros.geometry, h3_grid.geometry)) DESC) AS row_num
+ FROM
+ `rj-cor.dados_mestres.h3_grid_res8` h3_grid
+ LEFT JOIN
+ `rj-cor.dados_mestres.bairro` AS bairros
+ ON
+ ST_INTERSECTS(bairros.geometry, h3_grid.geometry)
+ WHERE NOT ST_CONTAINS(ST_GEOGFROMTEXT('POLYGON((-43.35167114973923 -23.03719187431942, -43.21742224531541 -23.11411703410819, -43.05787930227852 -23.08560586153892, -43.13797293161925 -22.9854505090441, -43.24908435505957 -23.01309491285712, -43.29357259322761 -23.02302500142027, -43.35372293867113 -23.02286949608791, -43.35167114973923 -23.03719187431942))'), h3_grid.geometry)
+ AND NOT ST_CONTAINS(ST_GEOGFROMTEXT('POLYGON((-43.17255470033881 -22.80357287766821, -43.16164114820394 -22.8246787848653, -43.1435175784006 -22.83820699694974, -43.08831858222521 -22.79901386772875, -43.09812065965735 -22.76990583135868, -43.11917632397501 -22.77502040608505, -43.12252626904735 -22.74275730775724, -43.13510053525226 -22.7443347361711, -43.1568784870256 -22.79110122556994, -43.17255470033881 -22.80357287766821))'), h3_grid.geometry)
+ AND h3_grid.id not in ("88a8a06a31fffff", "88a8a069b5fffff", "88a8a3d357fffff", "88a8a3d355fffff", "88a8a068adfffff", "88a8a06991fffff", "88a8a06999fffff")
+
+ ),
+
final_table AS (
SELECT
- h3_grid.id AS id_h3,
- nome AS bairro,
+ id_h3,
+ bairro,
COALESCE(MAX(tipo), 0) AS tipo
- FROM `rj-cor.dados_mestres.h3_grid_res8` h3_grid
- LEFT JOIN `rj-cor.dados_mestres.bairro`
- ON ST_CONTAINS(`rj-cor.dados_mestres.bairro`.geometry, ST_CENTROID(h3_grid.geometry))
+ FROM intersected_areas
LEFT JOIN alagamentos
- ON ST_CONTAINS(h3_grid.geometry, alagamentos.geometry)
+ ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry)
+ WHERE intersected_areas.row_num = 1
GROUP BY id_h3, bairro
)
@@ -162,7 +205,7 @@ pipelines.rj_cor.comando.eventos.constants
FROM final_table
""",
"query_update": """
- SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update
+ SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update
""",
}
@@ -193,10 +236,11 @@ var TABLE_ID_ATIVIDADES_EVENTOS
var TABLE_ID_ATIVIDADES_POPS
var REDIS_NAME
var TABLE_ID_POPS
PATH_BASE_ENDERECOS
RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS
RAIN_DASHBOARD_LAST_2H_FLOW_SCHEDULE_PARAMETERS
TABLE_ID_ATIVIDADES_EVENTOS
TABLE_ID_ATIVIDADES_POPS
REDIS_NAME
TABLE_ID_EVENTOS
TABLE_ID_POPS
pipelines.rj_cor.comando.eventos.flows
pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
Module pipelines.rj_cor.comando.eventos.flows
+# with Flow(
+# "COR: Comando - POPs e Atividades dos POPs",
+# code_owners=[
+# "paty",
+# ],
+# ) as rj_cor_comando_pops_flow:
+# # Dump mode
+# dump_mode = Parameter("dump_mode", default="overwrite", required=False)
+
+# # Materialization parameters
+# materialize_after_dump = Parameter(
+# "materialize_after_dump", default=False, required=False
+# )
+# materialization_mode = Parameter(
+# "materialization_mode", default="prod", required=False
+# )
+# materialize_to_datario = Parameter(
+# "materialize_to_datario", default=False, required=False
+# )
+
+# # Dump to GCS after? Should only dump to GCS if materializing to datario
+# dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False)
+# maximum_bytes_processed = Parameter(
+# "maximum_bytes_processed",
+# required=False,
+# default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
+# )
+
+# dataset_id = Parameter(
+# "dataset_id", default=comando_constants.DATASET_ID.value, required=False
+# )
+# table_id_pops = Parameter(
+# "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False
+# )
+# table_id_atividades_pops = Parameter(
+# "table_id_atividades_pops",
+# default=comando_constants.TABLE_ID_ATIVIDADES_POPS.value,
+# required=False,
+# )
+
+# pops = get_pops()
+# redis_pops = get_on_redis(dataset_id, table_id_atividades_pops, mode="dev")
+# atividades_pops, update_pops_redis = get_atividades_pops(
+# pops=pops, redis_pops=redis_pops
+# )
+# has_update = not_none(update_pops_redis)
+
+# path_pops = save_no_partition(dataframe=pops)
+
+# task_upload_pops = create_table_and_upload_to_gcs(
+# data_path=path_pops,
+# dataset_id=dataset_id,
+# table_id=table_id_pops,
+# biglake_table=False,
+# dump_mode=dump_mode,
+# )
+
+# with case(has_update, True):
+# path_atividades_pops = save_no_partition(
+# dataframe=atividades_pops, append=False
+# )
+
+# task_upload_atividades_pops = create_table_and_upload_to_gcs(
+# data_path=path_atividades_pops,
+# dataset_id=dataset_id,
+# table_id=table_id_atividades_pops,
+# biglake_table=False,
+# dump_mode="overwrite",
+# )
+
+# save_on_redis(
+# dataset_id,
+# table_id_atividades_pops,
+# "dev",
+# update_pops_redis,
+# wait=task_upload_atividades_pops,
+# )
+
+# with case(materialize_after_dump, True):
+# # Trigger DBT flow run
+# current_flow_labels = get_current_flow_labels()
+
+# materialization_pops_flow = create_flow_run(
+# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
+# project_name=constants.PREFECT_DEFAULT_PROJECT.value,
+# parameters={
+# "dataset_id": dataset_id,
+# "table_id": table_id_pops,
+# "mode": materialization_mode,
+# "materialize_to_datario": materialize_to_datario,
+# },
+# labels=current_flow_labels,
+# run_name=f"Materialize {dataset_id}.{table_id_pops}",
+# )
+# materialization_pops_flow.set_upstream(task_upload_pops)
+
+# materialization_atividades_pops_flow = create_flow_run(
+# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
+# project_name=constants.PREFECT_DEFAULT_PROJECT.value,
+# parameters={
+# "dataset_id": dataset_id,
+# "table_id": table_id_atividades_pops,
+# "mode": materialization_mode,
+# "materialize_to_datario": materialize_to_datario,
+# },
+# labels=current_flow_labels,
+# run_name=f"Materialize {dataset_id}.{table_id_atividades_pops}",
+# )
+# materialization_atividades_pops_flow.set_upstream(task_upload_atividades_pops)
+
+# wait_for_pops_materialization = wait_for_flow_run(
+# materialization_pops_flow,
+# stream_states=True,
+# stream_logs=True,
+# raise_final_state=True,
+# )
+# wait_for_pops_materialization.max_retries = (
+# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
+# )
+# wait_for_pops_materialization.retry_delay = timedelta(
+# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
+# )
+
+# wait_for_atividades_pops_materialization = wait_for_flow_run(
+# materialization_atividades_pops_flow,
+# stream_states=True,
+# stream_logs=True,
+# raise_final_state=True,
+# )
+# wait_for_atividades_pops_materialization.max_retries = (
+# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
+# )
+# wait_for_atividades_pops_materialization.retry_delay = timedelta(
+# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
+# )
+
+# with case(dump_to_gcs, True):
+# # Trigger Dump to GCS flow run with project id as datario
+# dump_pops_to_gcs_flow = create_flow_run(
+# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
+# project_name=constants.PREFECT_DEFAULT_PROJECT.value,
+# parameters={
+# "project_id": "datario",
+# "dataset_id": dataset_id,
+# "table_id": table_id_pops,
+# "maximum_bytes_processed": maximum_bytes_processed,
+# },
+# labels=[
+# "datario",
+# ],
+# run_name=f"Dump to GCS {dataset_id}.{table_id_pops}",
+# )
+# dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization)
+
+# dump_atividades_pops_to_gcs_flow = create_flow_run(
+# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value,
+# project_name=constants.PREFECT_DEFAULT_PROJECT.value,
+# parameters={
+# "project_id": "datario",
+# "dataset_id": dataset_id,
+# "table_id": table_id_atividades_pops,
+# "maximum_bytes_processed": maximum_bytes_processed,
+# },
+# labels=[
+# "datario",
+# ],
+# run_name=f"Dump to GCS {dataset_id}.{table_id_atividades_pops}",
+# )
+# dump_atividades_pops_to_gcs_flow.set_upstream(
+# wait_for_atividades_pops_materialization
+# )
+
+# wait_for_dump_pops_to_gcs = wait_for_flow_run(
+# dump_pops_to_gcs_flow,
+# stream_states=True,
+# stream_logs=True,
+# raise_final_state=True,
+# )
+
+# wait_for_dump_atividades_pops_to_gcs = wait_for_flow_run(
+# dump_atividades_pops_to_gcs_flow,
+# stream_states=True,
+# stream_logs=True,
+# raise_final_state=True,
+# )
+
+# rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
+# rj_cor_comando_pops_flow.run_config = KubernetesRun(
+# image=constants.DOCKER_IMAGE.value,
+# labels=[
+# constants.RJ_COR_AGENT_LABEL.value,
+# ],
+# )
+
+# rj_cor_comando_pops_flow.schedule = every_month
pipelines.rj_cor.comando.eventos.schedules
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
+ parameter_defaults={
+ "materialize_after_dump": True,
+ "materialization_mode": "prod",
+ "materialize_to_datario": True,
+ "dump_to_gcs": True,
+ "trigger_rain_dashboard_update": True,
+ "redis_mode": "prod",
+ },
),
]
)
diff --git a/rj_cor/comando/eventos/tasks.html b/rj_cor/comando/eventos/tasks.html
index af6910e32..263e1da58 100644
--- a/rj_cor/comando/eventos/tasks.html
+++ b/rj_cor/comando/eventos/tasks.html
@@ -34,101 +34,143 @@ pipelines.rj_cor.comando.eventos.tasks
pipelines.rj_cor.comando.eventos.tasks
pipelines.rj_cor.comando.eventos.tasks
Module pipelines.rj_cor.comando.eventos.tasks
-def download_eventos(date_interval, wait=None) ‑> Tuple[pandas.core.frame.DataFrame, str]
+
+def compare_actual_df_with_redis_df(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> pandas.core.frame.DataFrame
-
-
Faz o request dos dados de eventos e das atividades do evento
+Compare df from redis to actual df and return only the rows from actual df
+that are not already saved on redis.
+
+
+Expand source code
+
+def compare_actual_df_with_redis_df(
+ dfr: pd.DataFrame,
+ dfr_redis: pd.DataFrame,
+ columns: list,
+) -> pd.DataFrame:
+ """
+ Compare df from redis to actual df and return only the rows from actual df
+ that are not already saved on redis.
+ """
+ for col in columns:
+ if col not in dfr_redis.columns:
+ dfr_redis[col] = None
+ dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes)
+ log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}")
+
+ dfr_diff = (
+ pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True)
+ .query('_merge == "left_only"')
+ .drop("_merge", axis=1)
+ )
+ log(
+ f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}"
+ )
+
+ updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]])
+
+ return dfr_diff, updated_dfr_redis
+
+
+
+def download_data(first_date, last_date, wait=None) ‑> pandas.core.frame.DataFrame
+
+-
+
Download data from API
Expand source code
@@ -466,334 +402,129 @@ Functions
max_retries=3,
retry_delay=timedelta(seconds=60),
)
-def download_eventos(date_interval, wait=None) -> Tuple[pd.DataFrame, str]:
+def download_data(first_date, last_date, wait=None) -> pd.DataFrame:
"""
- Faz o request dos dados de eventos e das atividades do evento
+ Download data from API
"""
-
- auth_token = get_token()
+ # auth_token = get_token()
url_secret = get_vault_secret("comando")["data"]
url_eventos = url_secret["endpoint_eventos"]
- url_atividades_evento = url_secret["endpoint_atividades_evento"]
-
- # Request Eventos
- response = get_url(url=url_eventos, parameters=date_interval, token=auth_token)
+ ## url_atividades_evento = url_secret["endpoint_atividades_evento"]
- if "eventos" in response and len(response["eventos"]) > 0:
- eventos = pd.DataFrame(response["eventos"])
- elif ("eventos" in response) and (len(response["eventos"])) == 0:
- log("No events found on this date interval")
- skip = Skipped("No events found on this date interval")
- raise ENDRUN(state=skip)
- else:
- raise Exception(f"{response}")
-
- rename_columns = {
- "id": "id_evento",
- "titulo": "pop_titulo",
- "pop_id": "id_pop",
- "inicio": "data_inicio",
- "fim": "data_fim",
- }
-
- eventos.rename(rename_columns, inplace=True, axis=1)
- eventos["id_evento"] = eventos["id_evento"].astype("int")
- evento_id_list = eventos["id_evento"].unique()
-
- atividades_evento = []
- problema_ids_atividade = []
- problema_ids_request = []
-
- # Request AtividadesDoEvento
- for i in evento_id_list:
- # log(f">>>>>>> Requesting AtividadesDoEvento for id_evento: {i}")
- try:
- response = get_url(
- url=url_atividades_evento + f"?eventoId={i}", token=auth_token
- )
- if "atividades" in response.keys():
- response = response["atividades"]
- for elem in response:
- elem["id_evento"] = i
- atividades_evento.append(elem)
- else:
- problema_ids_atividade.append(i)
- except Exception as exc:
- log(
- f"Request AtividadesDoEvento for id_evento: {i}"
- + f"resulted in the following error: {exc}"
- )
- problema_ids_request.append(i)
- raise exc
- log(f"\n>>>>>>> problema_ids_request: {problema_ids_request}")
- log(f"\n>>>>>>> problema_ids_atividade: {problema_ids_atividade}")
-
- problema_ids_atividade = problema_ids_atividade + problema_ids_request
-
- atividades_evento = pd.DataFrame(atividades_evento)
- atividades_evento.rename(
- {
- "orgao": "sigla",
- "titulo": "pop_titulo",
- "evento_id": "id_evento",
- "pop_id": "id_pop",
- "inicio": "data_inicio",
- "fim": "data_fim",
- "chegada": "data_chegada",
- },
- inplace=True,
- axis=1,
- )
+ dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}")
- eventos_cols = [
- "id_pop",
- "id_evento",
- "bairro",
- "data_inicio",
- "data_fim",
- "prazo",
- "descricao",
- "gravidade",
- "latitude",
- "longitude",
- "status",
- "tipo",
- ]
- for col in eventos_cols:
- if col not in eventos.columns:
- eventos[col] = None
-
- atividades_evento_cols = [
- "id_evento",
- "sigla",
- "data_chegada",
- "data_inicio",
- "data_fim",
- "descricao",
- "status",
- ]
- for col in atividades_evento_cols:
- if col not in atividades_evento.columns:
- atividades_evento[col] = None
-
- eventos_categorical_cols = [
- "bairro",
- "prazo",
- "descricao",
- "gravidade",
- "status",
- "tipo",
- ]
- for i in eventos_categorical_cols:
- eventos[i] = eventos[i].str.capitalize()
-
- atividade_evento_categorical_cols = ["sigla", "descricao", "status"]
- for i in atividade_evento_categorical_cols:
- atividades_evento[i] = atividades_evento[i].str.capitalize()
-
- eventos_datas_cols = ["data_inicio", "data_fim"]
- atividades_eventos_datas_cols = ["data_chegada", "data_inicio", "data_fim"]
- eventos[eventos_datas_cols] = eventos[eventos_datas_cols].fillna(
- "1970-01-01 00:00:00"
- )
- atividades_evento[atividades_eventos_datas_cols] = atividades_evento[
- atividades_eventos_datas_cols
- ].fillna("1970-01-01 00:00:00")
-
- # Treat id_pop col
- eventos["id_pop"] = eventos["id_pop"].astype(float).astype(int)
-
- # Fixa colunas e ordem
- eventos = eventos[eventos_cols].drop_duplicates()
- atividades_evento = atividades_evento[atividades_evento_cols].drop_duplicates()
- return eventos, atividades_evento, problema_ids_atividade
+ return dfr
-
-def get_atividades_pops(pops: pandas.core.frame.DataFrame, redis_pops: list) ‑> pandas.core.frame.DataFrame
+
+def get_date_interval(first_date, last_date) ‑> Tuple[dict, str]
-
-
Get the list of POP's activities from API
+If first_date
and last_date
are provided, format it to DD/MM/YYYY. Else,
+get data from last 3 days.
+first_date: str YYYY-MM-DD
+last_date: str YYYY-MM-DD
Expand source code
-@task(nout=2)
-def get_atividades_pops(pops: pd.DataFrame, redis_pops: list) -> pd.DataFrame:
+@task
+def get_date_interval(first_date, last_date) -> Tuple[dict, str]:
"""
- Get the list of POP's activities from API
+ If `first_date` and `last_date` are provided, format it to DD/MM/YYYY. Else,
+ get data from last 3 days.
+ first_date: str YYYY-MM-DD
+ last_date: str YYYY-MM-DD
"""
- log(">>>>>>> Requesting POP's activities")
-
- auth_token = get_token()
-
- url_secret = get_vault_secret("comando")["data"]
- url = url_secret["endpoint_atividades_pop"]
-
- pop_ids = pops["id_pop"].unique()
-
- # remove pop_id 0
- pop_ids = [i for i in pop_ids if i not in [0, "0"]]
-
- atividades_pops = []
- for pop_id in pop_ids:
- log(f">>>>>>> Requesting POP's activities for pop_id: {pop_id}")
- response = get_url(url=url + f"?popId={pop_id}", token=auth_token)
-
- tentativa = 0
- while (
- "error" in response.keys() or response == {"response": None}
- ) and tentativa <= 5:
- log(
- f">>>>>>> Requesting POP's activities for pop_id: {pop_id} Time: {tentativa+1}"
- )
- time.sleep(60)
- response = get_url(url=url + f"?popId={pop_id}", token=auth_token)
- tentativa += 1
-
- if (
- "error" in response.keys() or response == {"response": None}
- ) and tentativa > 5:
- continue
-
- try:
- row_template = {
- "pop_titulo": response["pop"],
- "id_pop": pop_id,
- "sigla": "",
- "orgao": "",
- "acao": "",
- }
- except:
- log(f"Problem on response {response}")
-
- for activity in response["atividades"]:
- row = deepcopy(row_template)
- row["sigla"] = activity["sigla"]
- row["orgao"] = activity["orgao"]
- row["acao"] = activity["acao"]
- atividades_pops.append(row)
-
- dataframe = pd.DataFrame(atividades_pops)
- dataframe = dataframe.sort_values(["id_pop", "sigla", "acao"])
-
- for i in ["sigla", "orgao", "acao"]:
- dataframe[i] = dataframe[i].str.capitalize()
-
- dataframe["key"] = (
- dataframe["id_pop"].astype(str)
- + "_"
- + dataframe["sigla"]
- + "_"
- + dataframe["acao"]
- )
- update_pops_redis = dataframe["key"].unique()
-
- # Checar pop_ids não salvos no redis
- update_pops_redis = [i for i in update_pops_redis if i not in redis_pops]
-
- if len(update_pops_redis) < 1:
- update_pops_redis = None
- dataframe = pd.DataFrame()
+ if first_date and last_date:
+ first_date, last_date = format_date(first_date, last_date)
else:
- # mantém apenas esses pop_ids no dataframe
- dataframe = dataframe[dataframe["key"].isin(update_pops_redis)]
- dataframe = dataframe[["id_pop", "sigla", "orgao", "acao"]]
-
- return dataframe, update_pops_redis
+ last_date = pendulum.today(tz="America/Sao_Paulo").date()
+ first_date = last_date.subtract(days=1) # atenção mudar para 3
+ first_date, last_date = format_date(
+ first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d")
+ )
+ return first_date, last_date
-
-def get_date_interval(date_interval_text: str, dataset_id: str, table_id: str, mode: str = 'prod') ‑> Tuple[dict, str]
+
+def get_redis_df(dataset_id: str, table_id: str, name: str, mode: str = 'prod') ‑> pandas.core.frame.DataFrame
-
-
If date_interval_text
is provided, parse it for the date interval. Else,
-get the date interval from Redis.
-Example of date_interval_text to use when selecting type string:
+Acess redis to get the last saved df and compare to actual df,
+return only the rows from actual df that are not already saved.
Expand source code
-@task(nout=2)
-def get_date_interval(
- date_interval_text: str,
+@task
+def get_redis_df(
dataset_id: str,
table_id: str,
+ name: str,
mode: str = "prod",
-) -> Tuple[dict, str]:
- """
- If `date_interval_text` is provided, parse it for the date interval. Else,
- get the date interval from Redis.
- Example of date_interval_text to use when selecting type string:
- {"inicio": "2023-04-19 08:54:41.0", "fim": "2023-04-19 10:40:41.0"}
- """
- if date_interval_text:
- log(f">>>>>>>>>>> Date interval was provided: {date_interval_text}")
- date_interval = json.loads(date_interval_text)
- current_time = date_interval["fim"]
- log(f">>>>>>>>>>> Date interval: {date_interval}")
- return date_interval, current_time
-
- log(">>>>>>>>>>> Date interval was not provided")
- redis_client = get_redis_client()
-
- key_last_update = build_redis_key(dataset_id, table_id, "last_update", mode)
-
- current_time = pendulum.now("America/Sao_Paulo")
-
- last_update = redis_client.get(key_last_update)
- if last_update is None:
- log("Last update was not found in Redis, setting it to D-30")
- # Set to current_time - 30 days
- last_update = current_time.subtract(days=30).strftime("%Y-%m-%d %H:%M:%S.0")
- log(f">>>>>>>>>>> Last update: {last_update}")
-
- current_time_str = current_time.strftime("%Y-%m-%d %H:%M:%S.0")
-
- date_interval = {
- "inicio": last_update,
- "fim": current_time_str,
- }
-
- log(f">>>>>>>>>>> date_interval: {date_interval}")
-
- return date_interval, current_time_str
+) -> pd.DataFrame:
+ """
+ Acess redis to get the last saved df and compare to actual df,
+ return only the rows from actual df that are not already saved.
+ """
+ redis_key = build_redis_key(dataset_id, table_id, name, mode)
+ log(f"Acessing redis_key: {redis_key}")
+
+ dfr_redis = get_redis_output(redis_key, is_df=True)
+ log(f"Redis output: {dfr_redis}")
+
+ # if len(dfr_redis) == 0:
+ # dfr_redis = pd.DataFrame()
+ # dict_redis = {k: None for k in columns}
+ # print(f"\nCreating Redis fake values for key: {redis_key}\n")
+ # print(dict_redis)
+ # dfr_redis = pd.DataFrame(
+ # dict_redis, index=[0]
+ # )
+ # else:
+ # dfr_redis = pd.DataFrame(
+ # dict_redis.items(), columns=columns
+ # )
+ # print(f"\nConverting redis dict to df: \n{dfr_redis.head()}")
+
+ return dfr_redis
-
-def get_pops() ‑> pandas.core.frame.DataFrame
+
+def get_redis_output(redis_key, is_df: bool = False)
-
-
Get the list of POPS from the API
+Get Redis output
+Example: {b'date': b'2023-02-27 07:29:04'}
Expand source code
-@task
-def get_pops() -> pd.DataFrame:
+def get_redis_output(redis_key, is_df: bool = False):
"""
- Get the list of POPS from the API
+ Get Redis output
+ Example: {b'date': b'2023-02-27 07:29:04'}
"""
- log(">>>>>>> Requesting POPS")
-
- auth_token = get_token()
-
- url_secret = get_vault_secret("comando")["data"]
- url = url_secret["endpoint_pops"]
+ redis_client = get_redis_client() # (host="127.0.0.1")
- response = get_url(url=url, token=auth_token)
+ if is_df:
+ json_data = redis_client.get(redis_key)
+ print(type(json_data))
+ print(json_data)
+ if json_data:
+ # If data is found, parse the JSON string back to a Python object (dictionary)
+ data_dict = json.loads(json_data)
+ # Convert the dictionary back to a DataFrame
+ return pd.DataFrame(data_dict)
- pops = pd.DataFrame(response["objeto"])
- pops["id"] = pops["id"].astype("int")
- pops = pops.rename({"id": "id_pop", "titulo": "pop_titulo"}, axis=1).sort_values(
- "id_pop"
- )
- pops["pop_titulo"] = pops[
- "pop_titulo"
- ].str.capitalize() # pylint: disable=unsubscriptable-object, E1137
+ return pd.DataFrame()
- return pops[["id_pop", "pop_titulo"]] # pylint: disable=unsubscriptable-object
+ output = redis_client.hgetall(redis_key)
+ if len(output) > 0:
+ output = treat_redis_output(output)
+ return output
@@ -813,38 +544,37 @@ Functions
return something is not None
-
-def salvar_dados(dfr: pandas.core.frame.DataFrame, current_time: str, name: str) ‑> Union[str, pathlib.Path]
+
+def save_data(dataframe: pandas.core.frame.DataFrame) ‑> Union[str, pathlib.Path]
-
-
Salvar dados tratados em csv para conseguir subir pro GCP
+Save data on a csv file to be uploaded to GCP
Expand source code
@task
-def salvar_dados(dfr: pd.DataFrame, current_time: str, name: str) -> Union[str, Path]:
+def save_data(dataframe: pd.DataFrame) -> Union[str, Path]:
"""
- Salvar dados tratados em csv para conseguir subir pro GCP
+ Save data on a csv file to be uploaded to GCP
"""
- dfr["ano_particao"] = pd.to_datetime(dfr["data_inicio"]).dt.year
- dfr["mes_particao"] = pd.to_datetime(dfr["data_inicio"]).dt.month
- dfr["data_particao"] = pd.to_datetime(dfr["data_inicio"]).dt.date
- dfr["ano_particao"] = dfr["ano_particao"].astype("int")
- dfr["mes_particao"] = dfr["mes_particao"].astype("int")
+ prepath = Path("/tmp/data/")
+ prepath.mkdir(parents=True, exist_ok=True)
- partitions_path = Path(os.getcwd(), "data", "comando", name)
- if not os.path.exists(partitions_path):
- os.makedirs(partitions_path)
+ partition_column = "data_inicio"
+ dataframe, partitions = parse_date_columns(dataframe, partition_column)
+ current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")
to_partitions(
- dfr,
- partition_columns=["ano_particao", "mes_particao", "data_particao"],
- savepath=partitions_path,
+ data=dataframe,
+ partition_columns=partitions,
+ savepath=prepath,
+ data_type="csv",
suffix=current_time,
)
- return partitions_path
+ log(f"[DEBUG] Files saved on {prepath}")
+ return prepath
@@ -877,44 +607,109 @@ Functions
return path_to_directory
-
-def set_last_updated_on_redis(dataset_id: str, table_id: str, mode: str = 'prod', current_time: str = None, problem_ids_atividade: str = None) ‑> None
+
+def treat_data(dfr: pandas.core.frame.DataFrame, dfr_redis: pandas.core.frame.DataFrame, columns: list) ‑> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]
-
-
Set the last updated time on Redis.
+Rename cols and normalize data.
Expand source code
-@task(trigger=all_successful)
-def set_last_updated_on_redis(
- dataset_id: str,
- table_id: str,
- mode: str = "prod",
- current_time: str = None,
- problem_ids_atividade: str = None,
-) -> None:
+@task(nout=2)
+def treat_data(
+ dfr: pd.DataFrame,
+ dfr_redis: pd.DataFrame,
+ columns: list,
+) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
- Set the last updated time on Redis.
+ Rename cols and normalize data.
"""
- redis_client = get_redis_client()
- if not current_time:
- current_time = pendulum.now("America/Sao_Paulo").strftime("%Y-%m-%d %H:%M:%S.0")
+ log("Start treating data")
+ dfr = dfr.rename(
+ columns={
+ "id": "id_evento",
+ "pop_id": "id_pop",
+ "inicio": "data_inicio",
+ "pop": "pop_titulo",
+ "titulo": "pop_especificacao",
+ }
+ )
- key_last_update = build_redis_key(dataset_id, table_id, "last_update", mode)
- redis_client.set(key_last_update, current_time)
+ dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str)
- key_problema_ids = build_redis_key(dataset_id, table_id, "problema_ids", mode)
+ log(f"Dataframe before comparing with last data saved on redis \n{dfr.head()}")
+ columns = ["id_evento", "data_inicio", "status"]
+ dfr, dfr_redis = compare_actual_df_with_redis_df(
+ dfr,
+ dfr_redis,
+ columns,
+ )
+ log(f"Dataframe after comparing with last data saved on redis {dfr.head()}")
- problem_ids_atividade_antigos = redis_client.get(key_problema_ids)
+ # If df is empty stop flow
+ if dfr.shape[0] == 0:
+ skip_text = "No new data available on API"
+ print(skip_text)
+ # raise ENDRUN(state=Skipped(skip_text))
- if problem_ids_atividade_antigos is not None:
- problem_ids_atividade = problem_ids_atividade + list(
- problem_ids_atividade_antigos
- )
+ dfr["tipo"] = dfr["tipo"].replace(
+ {
+ "Primária": "Primario",
+ "Secundária": "Secundario",
+ }
+ )
+ dfr["descricao"] = dfr["descricao"].apply(unidecode)
+
+ mandatory_cols = [
+ "id_pop",
+ "id_evento",
+ "bairro",
+ "data_inicio",
+ "data_fim",
+ "prazo",
+ "descricao",
+ "gravidade",
+ "latitude",
+ "longitude",
+ "status",
+ "tipo",
+ ]
+ # Create cols if they don exist on new API
+ for col in mandatory_cols:
+ if col not in dfr.columns:
+ dfr[col] = None
+
+ categorical_cols = [
+ "bairro",
+ "prazo",
+ "descricao",
+ "gravidade",
+ "status",
+ "tipo",
+ "pop_titulo",
+ ]
+ for i in categorical_cols:
+ dfr[i] = dfr[i].str.capitalize()
+
+ # This treatment is temporary. Now the id_pop from API is comming with the same value as id_evento
+ dfr = treat_wrong_id_pop(dfr)
+ log(f"This id_pop are missing {dfr[dfr.id_pop.isna()]} they were replaced by 99")
+ dfr["id_pop"] = dfr["id_pop"].fillna(99)
+
+ # Treat id_pop col
+ dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int)
+
+ # Set the order to match the original table
+ dfr = dfr[mandatory_cols]
+
+ # Create a column with time of row creation to keep last event on dbt
+ dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime(
+ "%Y-%m-%d %H:%M:%S"
+ )
- redis_client.set(key_problema_ids, list(set(problem_ids_atividade)))
+ return dfr.drop_duplicates(), dfr_redis
pipelines.rj_cor.comando.eventos.utils
+def format_date(first_date, last_date)
+
Format date to "dd/mm/yyyy" and add one day to last date because +the API has open interval at the end: [first_date, last_date).
def format_date(first_date, last_date):
+ """
+ Format date to "dd/mm/yyyy" and add one day to last date because
+ the API has open interval at the end: [first_date, last_date).
+ """
+ first_date = pendulum.from_format(first_date, "YYYY-MM-DD").strftime("%d/%m/%Y")
+ last_date = (
+ pendulum.from_format(last_date, "YYYY-MM-DD").add(days=1).strftime("%d/%m/%Y")
+ )
+ return first_date, last_date
+
def get_token()
+def treat_wrong_id_pop(dfr)
+
Create id_pop based on pop_titulo column
def treat_wrong_id_pop(dfr):
+ """
+ Create id_pop based on pop_titulo column
+ """
+ pop = {
+ "Ajuste de banco": 0,
+ "Acidente/enguiço sem vítima": 1,
+ "Acidente com vítima(s)": 2,
+ "Acidente com vítima(s) fatal(is)": 3,
+ "Incêndio em veículo(s)": 4,
+ "Bolsão d'água em via": 5,
+ "Alagamentos e enchentes": 6,
+ "Manifestação em local público": 7,
+ "Incêndio em imóvel": 8,
+ "Sinais de trânsito com mau funcionamento": 9,
+ "Reintegração de posse": 10,
+ "Queda de árvore": 11,
+ "Queda de poste": 12,
+ "Acidente com queda de carga": 13,
+ "Incêndio no entorno de vias públicas": 14,
+ "Incêndio dentro de túneis": 15,
+ "Vazamento de água / esgoto": 16,
+ "Falta de luz / apagão": 17,
+ "Implosão": 18,
+ "Queda de estrutura de alvenaria": 19,
+ "Vazamento de gás": 20,
+ "Evento em local público ou particular": 21,
+ "Atropelamento": 22,
+ "Afundamento de pista / buraco na via": 23,
+ "Abalroamento": 24,
+ "Obra em local público": 25,
+ "Operação policial": 26,
+ "Bloco de rua": 27,
+ "Deslizamento": 28,
+ "Animal em local público": 29,
+ "Acionamento de sirenes": 30,
+ "Alagamento": 31,
+ "Enchente": 32,
+ "Lâmina d'água": 33,
+ "Acidente ambiental": 34,
+ "Bueiro": 35,
+ "Incidente com bueiro": 35,
+ "Resgate ou remoção de animais terrestres e aéreos": 36,
+ "Remoção de animais mortos na areia": 37,
+ "Resgate de animal marinho preso em rede / encalhado": 38,
+ "Incendio em vegetacao": 39,
+ "Queda de árvore sobre fiação": 40,
+ "Residuos na via": 41,
+ "Evento não programado": 99,
+ }
+ dfr["id_pop"] = dfr["pop_titulo"].map(pop)
+ return dfr
+