Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria captura da tabela fiscalizacao e adiciona novas views na materialização da bilhetagem.transacao #645

Merged
merged 9 commits into from
Mar 27, 2024
35 changes: 35 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@

bilhetagem_transacao_captura.schedule = every_minute

# BILHETAGEM FISCALIZAÇÃO - CAPTURA A CADA 5 MINUTOS #

bilhetagem_fiscalizacao_captura = deepcopy(default_capture_flow)
bilhetagem_fiscalizacao_captura.name = "SMTR: Bilhetagem Fiscalização - Captura"
bilhetagem_fiscalizacao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_fiscalizacao_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_fiscalizacao_captura = set_default_parameters(
flow=bilhetagem_fiscalizacao_captura,
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value
| constants.BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS.value,
)

bilhetagem_fiscalizacao_captura.schedule = every_5_minutes

# BILHETAGEM INTEGRAÇÃO - CAPTURA A CADA MINUTO #

bilhetagem_integracao_captura = deepcopy(default_capture_flow)
Expand Down Expand Up @@ -255,6 +273,7 @@
| {"recapture": True},
)


# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO #

with Flow(
Expand Down Expand Up @@ -294,6 +313,22 @@
raise_final_state=True,
)

# Recaptura Fiscalização

run_recaptura_fiscalizacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS.value,
)

wait_recaptura_fiscalizacao_true = wait_for_flow_run(
run_recaptura_fiscalizacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

# Captura Auxiliar

runs_captura = create_flow_run.map(
Expand Down
41 changes: 33 additions & 8 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,16 @@ class constants(Enum): # pylint: disable=c0103
"engine": "postgresql",
"host": "10.5.12.107",
},
"fiscalizacao_db": {
"engine": "postgresql",
"host": "10.5.115.29",
},
},
"source_type": "db",
}

BILHETAGEM_PRIVATE_BUCKET = "rj-smtr-jae-private"

BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = {
"table_id": "transacao",
"partition_date_only": False,
Expand All @@ -770,6 +776,26 @@ class constants(Enum): # pylint: disable=c0103
"interval_minutes": 1,
}

BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS = {
"table_id": "fiscalizacao",
"partition_date_only": False,
"extract_params": {
"database": "fiscalizacao_db",
"query": """
SELECT
*
FROM
fiscalizacao
WHERE
dt_inclusao >= '{start}'
dt_inclusao < '{end}'
""",
},
"primary_key": ["id"],
"interval_minutes": 5,
"save_bucket_name": BILHETAGEM_PRIVATE_BUCKET,
}

BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS = {
"table_id": "integracao_transacao",
"partition_date_only": False,
Expand Down Expand Up @@ -888,8 +914,6 @@ class constants(Enum): # pylint: disable=c0103

BILHETAGEM_TRATAMENTO_INTERVAL = 60

BILHETAGEM_PRIVATE_BUCKET = "rj-smtr-jae-private"

BILHETAGEM_CAPTURE_PARAMS = [
{
"table_id": "linha",
Expand Down Expand Up @@ -1117,6 +1141,8 @@ class constants(Enum): # pylint: disable=c0103
},
]

BILHETAGEM_EXCLUDE = "+operadoras +consorcios"

BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": "integracao",
Expand All @@ -1128,12 +1154,11 @@ class constants(Enum): # pylint: disable=c0103
},
"version": {},
},
"exclude": "+operadoras +consorcios",
"exclude": BILHETAGEM_EXCLUDE,
}

BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": "passageiros_hora",
"dataset_id": "dashboard_bilhetagem_jae",
"upstream": True,
"dbt_vars": {
"date_range": {
Expand All @@ -1142,14 +1167,14 @@ class constants(Enum): # pylint: disable=c0103
},
"version": {},
},
"exclude": "integracao matriz_integracao",
"exclude": "integracao matriz_integracao stops_gtfs2 routes_gtfs2 feed_info_gtfs2",
}

BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": "ordem_pagamento",
"upstream": True,
"exclude": f"+{BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS['table_id']}",
"exclude": BILHETAGEM_EXCLUDE,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "data_ordem",
Expand All @@ -1163,7 +1188,7 @@ class constants(Enum): # pylint: disable=c0103
"dataset_id": BILHETAGEM_DATASET_ID,
"upstream": True,
"downstream": True,
"exclude": "+operadoras +consorcios",
"exclude": BILHETAGEM_EXCLUDE,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "datetime_captura",
Expand Down
Loading