Skip to content

Commit

Permalink
Cria flow de captura de dados de tracking do Jae (#532)
Browse files Browse the repository at this point in the history
* remove task de particao nao usada

* unifica tasks de particao de data e hora

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* corrige condicional

* change capture flow

* change generic capture flow

* atualiza esquema do flow padrao

* change default capture flow structure

* change generic capture flow

* adjust constant structure

* change bilhetagem to new capture flow structure

* fix get_storage_blob function

* fix get_storage_blob call

* organize constants order

* fix get_raw_from_sources function call

* change transform_raw_to_json to read_raw_data

* transform transform_raw_data_to_json to read_raw_data

* fix nout task parameter

* fix timedelta instantiation

* set upstream tasks

* declare raw_filepath

* update docstrings

* adjust get_raw_from_sources return

* fix errors

* change agent label to dev

* refactore source values

* update constants

* update agent

* update schedule params

* update interval

* fix get_datetime_range interval

* remove order by from queries

* fix get_raw_data_api

* change json read function

* update read_raw_data

* update save_raw_local_func

* log error

* change raw api extraction for json

* change read json function

* print log traceback

* skip pre treatment if empty df

* skip save staging if dataframe is empty / save raw

* remove skip upload if empty dataframe

* update docstring and returned values

* reorganize task order

* fix tuple

* change zip logic

* remove skip

* create gtfs zip constant

* add gtfs zip file name

* add csv to save raw / change filetype logic

* remove comments

* fix csv_args default value

* change docstring get raw api

* change raw data gcs docstring

* remove commented task

* change quadro primary key to list

* update GTFS constants

* change upload folder structure

* undo silenciamento de falha de notificação

* adicionar partition date only na transacao

* remove parametros de testes (gtfs)

* Update pipelines/rj_smtr/constants.py

Co-authored-by: Fernanda Scovino <fscovinom@gmail.com>

* corrige encadeamento de erros no flow

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove header treatment

* mudar agent dev para prd

* mudar agent de dev para prd

* ajustar retorno das funcoes

* Atualiza documentação

* adicionar retorno em get_upload_storage_blob

* Atualiza documentação

* Atualiza string

* adiciona recaptura no flow generico

* alterar labels para dev

* adicionar logica de recaptura

* criar conexão com banco de dados

* criar conexão com banco de dados

* cria função para map de multiplos retornos

* remover unmapped dos filepaths

* log para debbug

* retirar unmapped das partições

* adicionar unmapped no parametro recapture

* adicionar psycopg2

* comentários dos parametros

* adicionar conexão com postgresql

* mudar bilhetagem para extrair do db

* padronizar nomenclatura dos argumentos

* mudar label schedule para dev

* corrigir constante db bilhetagem postgresql

* alterar nomeação para runs de recaptura

* ajuste connector

* alterar IP para DNS

* Serialize datetime objects / read sql with pandas

* mudar logica do nome da run

* cria recaptura bilhetagem

* mudar host para IP / adiciona interval_minutes

* adiciona parametro interval minutes

* remove linha comentada

* remove arquivo de schedules da bilhetagem

* generaliza função query logs

* ajuste remove schedule personalizado

* unmap interval_minutes

* alteração de pasta de gravação para teste

* teste retirar timezone

* mudar timezone

* corrigir logica de recaptura

* adicionar possibilidade de recapturar mais dias

* ajustar recapture_window_days default

* adicionae recapture_window na task query_logs

* merge previous_errors

* remover log de teste

* ajustar log recaptura

* adicionar recaptura auxiliar

* criar parametros recaptura tabelas auxiliares

* comentar materializacao

* teste log

* muda logica recaptura bilhetagem

* unmapped upstream tasks

* mudar forma de upstream

* remover alterações de teste

* mudar agent para prd

* corrigir project_name

* passar tirar query_logs_func

* corrigir project_name

* remover comentários

* remover query_logs_func

* aumentar max_recaptures

* adiciona extracao tracking

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* muda agent para dev

* corrige constante

* formatar constante database

* altera nome do flow

* alterar queries bilhetagem auxiliar

* ajuste na logica de recaptura bilhetagem auxiliar

* remover parametro timestamp

* remove truncate hour

* mudar agent para prd

* mudar project name

* criar constante interval

* criar recaptura gps

* corrigir docstring

* alterar comentario recaptura

* voltar task get_current_timestamp

---------

Co-authored-by: fernandascovino <fscovinom@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: eng-rodrigocunha <engtransportes.rodrigocunha@gmail.com>
Co-authored-by: Carolina Gomes <gsv.lina@gmail.com>
Co-authored-by: Rodrigo Cunha <66736583+eng-rodrigocunha@users.noreply.github.com>
  • Loading branch information
7 people authored Oct 23, 2023
1 parent 5de362d commit 880e951
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 26 deletions.
82 changes: 67 additions & 15 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import get_current_timestamp
from pipelines.rj_smtr.tasks import get_rounded_timestamp

from pipelines.rj_smtr.constants import constants

Expand Down Expand Up @@ -63,6 +63,23 @@

bilhetagem_transacao_captura.schedule = every_minute

# BILHETAGEM GPS

bilhetagem_tracking_captura = deepcopy(default_capture_flow)
bilhetagem_tracking_captura.name = "SMTR: Bilhetagem GPS Validador - Captura"
bilhetagem_tracking_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_tracking_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_tracking_captura = set_default_parameters(
flow=bilhetagem_tracking_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

bilhetagem_tracking_captura.schedule = every_minute

# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

Expand Down Expand Up @@ -114,7 +131,9 @@
) as bilhetagem_transacao_tratamento:
# Configuração #

timestamp = get_current_timestamp()
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
Expand All @@ -123,7 +142,7 @@

LABELS = get_current_flow_labels()

# Recapturas
# Recaptura Transação

run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
Expand All @@ -139,34 +158,35 @@
raise_final_state=True,
)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
# Captura Auxiliar

runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao)

wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
# Recaptura Auxiliar

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_auxiliar)
runs_recaptura_auxiliar.set_upstream(wait_captura)

wait_captura = wait_for_flow_run.map(
runs_captura,
wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
Expand All @@ -193,3 +213,35 @@
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_tratamento.schedule = every_hour


with Flow(
"SMTR: Bilhetagem GPS Validador - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as bilhetagem_gps_tratamento:
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_transacao_tratamento.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Recaptura GPS

run_recaptura_gps = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

wait_recaptura_gps = wait_for_flow_run(
run_recaptura_gps,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
49 changes: 41 additions & 8 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class constants(Enum): # pylint: disable=c0103
"engine": "postgresql",
"host": "10.5.115.1",
},
"tracking_db": {
"engine": "postgresql",
"host": "10.5.15.25",
},
},
"source_type": "db",
}
Expand All @@ -203,8 +207,29 @@ class constants(Enum): # pylint: disable=c0103
"interval_minutes": 1,
}

BILHETAGEM_TRACKING_CAPTURE_PARAMS = {
"table_id": "gps_validador",
"partition_date_only": False,
"extract_params": {
"database": "tracking_db",
"query": """
SELECT
*
FROM
tracking_detalhe
WHERE
data_tracking BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["id"],
"interval_minutes": 1,
}

BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"

BILHETAGEM_TRATAMENTO_INTERVAL = 60

BILHETAGEM_CAPTURE_PARAMS = [
{
"table_id": "linha",
Expand All @@ -217,11 +242,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
LINHA
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_LINHA"], # id column to nest data on
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
{
"table_id": "grupo",
Expand All @@ -234,11 +261,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
GRUPO
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_GRUPO"], # id column to nest data on
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
{
"table_id": "grupo_linha",
Expand All @@ -251,11 +280,13 @@ class constants(Enum): # pylint: disable=c0103
FROM
GRUPO_LINHA
WHERE
DT_INCLUSAO >= '{start}'
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": ["CD_GRUPO", "CD_LINHA"],
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
{
"table_id": "matriz_integracao",
Expand All @@ -268,14 +299,16 @@ class constants(Enum): # pylint: disable=c0103
FROM
matriz_integracao
WHERE
dt_inclusao >= '{start}'
dt_inclusao BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": [
"cd_versao_matriz",
"cd_integracao",
], # id column to nest data on
"interval_minutes": 60,
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"truncate_hour": True,
},
]

Expand Down
7 changes: 5 additions & 2 deletions pipelines/rj_smtr/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pipelines.rj_smtr.tasks import (
create_date_hour_partition,
create_local_partition_path,
get_current_timestamp,
get_rounded_timestamp,
parse_timestamp_to_string,
transform_raw_to_nested_structure,
create_dbt_run_vars,
Expand Down Expand Up @@ -70,16 +70,19 @@
checkpoint=False,
)

current_timestamp = get_rounded_timestamp(interval_minutes=interval_minutes)

with case(recapture, True):
_, recapture_timestamps, recapture_previous_errors = query_logs(
dataset_id=dataset_id,
table_id=table_id,
datetime_filter=current_timestamp,
interval_minutes=interval_minutes,
recapture_window_days=recapture_window_days,
)

with case(recapture, False):
capture_timestamp = [get_current_timestamp()]
capture_timestamp = [current_timestamp]
capture_previous_errors = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_errors"
)()
Expand Down
39 changes: 38 additions & 1 deletion pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,44 @@ def create_dbt_run_vars(
###############


@task
def get_rounded_timestamp(
timestamp: Union[str, datetime, None] = None,
interval_minutes: Union[int, None] = None,
) -> datetime:
"""
Calculate rounded timestamp for flow run.
Args:
timestamp (Union[str, datetime, None]): timestamp to be used as reference
interval_minutes (Union[int, None], optional): interval in minutes between each recapture
Returns:
datetime: timestamp for flow run
"""
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)

if not timestamp:
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))

timestamp = timestamp.replace(second=0, microsecond=0)

if interval_minutes:
if interval_minutes >= 60:
hours = interval_minutes / 60
interval_minutes = round(((hours) % 1) * 60)

if interval_minutes == 0:
rounded_minutes = interval_minutes
else:
rounded_minutes = (timestamp.minute // interval_minutes) * interval_minutes

timestamp = timestamp.replace(minute=rounded_minutes)

return timestamp


@task
def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datetime:
"""
Expand All @@ -260,7 +298,6 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet
timestamp = datetime.now(tz=timezone(constants.TIMEZONE.value))
if truncate_minute:
return timestamp.replace(second=0, microsecond=0)
return timestamp


@task
Expand Down

0 comments on commit 880e951

Please sign in to comment.