Skip to content

Commit

Permalink
Merge branch 'master' into staging/smtr-pipeline-gtfs
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsv committed Oct 19, 2023
2 parents 9c55f71 + 9c8a092 commit 5671302
Show file tree
Hide file tree
Showing 8 changed files with 625 additions and 302 deletions.
84 changes: 67 additions & 17 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped


# EMD Imports #

from pipelines.constants import constants as emd_constants
Expand All @@ -29,18 +30,18 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import (
get_current_timestamp,
)

from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.schedules import (
bilhetagem_transacao_schedule,
# bilhetagem_materializacao_schedule,
)
from pipelines.rj_smtr.tasks import get_current_timestamp

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour
from pipelines.rj_smtr.schedules import every_hour, every_minute


GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
}

# Flows #

Expand All @@ -53,7 +54,15 @@
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_captura.schedule = bilhetagem_transacao_schedule

bilhetagem_transacao_captura = set_default_parameters(
flow=bilhetagem_transacao_captura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

bilhetagem_transacao_captura.schedule = every_minute


# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

Expand All @@ -67,11 +76,7 @@

bilhetagem_auxiliar_captura = set_default_parameters(
flow=bilhetagem_auxiliar_captura,
default_parameters={
"dataset_id": constants.BILHETAGEM_DATASET_ID.value,
"secret_path": constants.BILHETAGEM_SECRET_PATH.value,
"source_type": constants.BILHETAGEM_GENERAL_CAPTURE_PARAMS.value["source_type"],
},
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS,
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
Expand All @@ -92,11 +97,23 @@
default_parameters=bilhetagem_materializacao_parameters,
)

# TRATAMENTO - RODA DE HORA EM HORA, CAPTURA AUXILIAR + MATERIALIZAÇÃO
# RECAPTURA

bilhetagem_recaptura = deepcopy(default_capture_flow)
bilhetagem_recaptura.name = "SMTR: Bilhetagem - Recaptura (subflow)"
bilhetagem_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_recaptura = set_default_parameters(
flow=bilhetagem_recaptura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS | {"recapture": True},
)

# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
with Flow(
"SMTR: Bilhetagem Transação - Tratamento",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as bilhetagem_transacao_tratamento:
# Configuração #

timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
Expand All @@ -106,6 +123,38 @@

LABELS = get_current_flow_labels()

# Recapturas

run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

wait_recaptura_trasacao = wait_for_flow_run(
run_recaptura_trasacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_recaptura_auxiliar.set_upstream(wait_recaptura_trasacao)

wait_recaptura_auxiliar = wait_for_flow_run.map(
runs_recaptura_auxiliar,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Captura
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
Expand All @@ -114,6 +163,8 @@
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_auxiliar)

wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
Expand Down Expand Up @@ -142,4 +193,3 @@
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_transacao_tratamento.schedule = every_hour
# bilhetagem_materializacao.schedule = bilhetagem_materializacao_schedule
47 changes: 0 additions & 47 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/schedules.py

This file was deleted.

40 changes: 13 additions & 27 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,18 @@ class constants(Enum): # pylint: disable=c0103
"databases": {
"principal_db": {
"engine": "mysql",
"host": "principal-database-replica.internal",
"host": "10.5.114.121",
},
"tarifa_db": {
"engine": "postgres",
"host": "tarifa-database-replica.internal",
"engine": "postgresql",
"host": "10.5.113.254",
},
"transacao_db": {
"engine": "postgres",
"host": "transacao-database-replica.internal",
"engine": "postgresql",
"host": "10.5.115.1",
},
},
"vpn_url": "http://vpn-jae.mobilidade.rio/",
"source_type": "api-json",
}

BILHETAGEM_CAPTURE_RUN_INTERVAL = {
"transacao_run_interval": {"minutes": 1},
"principal_run_interval": {"hours": 1},
"source_type": "db",
}

BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = {
Expand All @@ -204,9 +198,9 @@ class constants(Enum): # pylint: disable=c0103
data_processamento BETWEEN '{start}'
AND '{end}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL["transacao_run_interval"],
},
"primary_key": ["id"], # id column to nest data on
"primary_key": ["id"],
"interval_minutes": 1,
}

BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"
Expand All @@ -225,11 +219,9 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": ["CD_LINHA"], # id column to nest data on
"interval_minutes": 60,
},
{
"table_id": "grupo",
Expand All @@ -244,11 +236,9 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": ["CD_GRUPO"], # id column to nest data on
"interval_minutes": 60,
},
{
"table_id": "grupo_linha",
Expand All @@ -263,11 +253,9 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": ["CD_GRUPO", "CD_LINHA"], # id column to nest data on
"primary_key": ["CD_GRUPO", "CD_LINHA"],
"interval_minutes": 60,
},
{
"table_id": "matriz_integracao",
Expand All @@ -282,14 +270,12 @@ class constants(Enum): # pylint: disable=c0103
WHERE
dt_inclusao >= '{start}'
""",
"run_interval": BILHETAGEM_CAPTURE_RUN_INTERVAL[
"principal_run_interval"
],
},
"primary_key": [
"cd_versao_matriz",
"cd_integracao",
], # id column to nest data on
"interval_minutes": 60,
},
]

Expand Down
Loading

0 comments on commit 5671302

Please sign in to comment.