Skip to content

Commit

Permalink
Cria captura da tabela fiscalizacao e adiciona novas views na materia…
Browse files Browse the repository at this point in the history
…lização da bilhetagem.transacao (#645)

* adiciona captura tabela fiscalizacao

* adiciona recaptura da fiscalização

* adiciona novas views na materialização transação

* ajusta parametros materialização ordem_pagamento

* cria constante de exclude

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Mar 27, 2024
1 parent 99300bc commit 1c9b5b3
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 8 deletions.
35 changes: 35 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@

bilhetagem_transacao_captura.schedule = every_minute

# BILHETAGEM FISCALIZAÇÃO - CAPTURA A CADA 5 MINUTOS #

bilhetagem_fiscalizacao_captura = deepcopy(default_capture_flow)
bilhetagem_fiscalizacao_captura.name = "SMTR: Bilhetagem Fiscalização - Captura"
bilhetagem_fiscalizacao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_fiscalizacao_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_fiscalizacao_captura = set_default_parameters(
flow=bilhetagem_fiscalizacao_captura,
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value
| constants.BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS.value,
)

bilhetagem_fiscalizacao_captura.schedule = every_5_minutes

# BILHETAGEM INTEGRAÇÃO - CAPTURA A CADA MINUTO #

bilhetagem_integracao_captura = deepcopy(default_capture_flow)
Expand Down Expand Up @@ -255,6 +273,7 @@
| {"recapture": True},
)


# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO #

with Flow(
Expand Down Expand Up @@ -294,6 +313,22 @@
raise_final_state=True,
)

# Recaptura Fiscalização

run_recaptura_fiscalizacao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS.value,
)

wait_recaptura_fiscalizacao_true = wait_for_flow_run(
run_recaptura_fiscalizacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

# Captura Auxiliar

runs_captura = create_flow_run.map(
Expand Down
41 changes: 33 additions & 8 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,16 @@ class constants(Enum): # pylint: disable=c0103
"engine": "postgresql",
"host": "10.5.12.107",
},
"fiscalizacao_db": {
"engine": "postgresql",
"host": "10.5.115.29",
},
},
"source_type": "db",
}

BILHETAGEM_PRIVATE_BUCKET = "rj-smtr-jae-private"

BILHETAGEM_TRANSACAO_CAPTURE_PARAMS = {
"table_id": "transacao",
"partition_date_only": False,
Expand All @@ -770,6 +776,26 @@ class constants(Enum): # pylint: disable=c0103
"interval_minutes": 1,
}

BILHETAGEM_FISCALIZACAO_CAPTURE_PARAMS = {
"table_id": "fiscalizacao",
"partition_date_only": False,
"extract_params": {
"database": "fiscalizacao_db",
"query": """
SELECT
*
FROM
fiscalizacao
WHERE
dt_inclusao >= '{start}'
dt_inclusao < '{end}'
""",
},
"primary_key": ["id"],
"interval_minutes": 5,
"save_bucket_name": BILHETAGEM_PRIVATE_BUCKET,
}

BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS = {
"table_id": "integracao_transacao",
"partition_date_only": False,
Expand Down Expand Up @@ -888,8 +914,6 @@ class constants(Enum): # pylint: disable=c0103

BILHETAGEM_TRATAMENTO_INTERVAL = 60

BILHETAGEM_PRIVATE_BUCKET = "rj-smtr-jae-private"

BILHETAGEM_CAPTURE_PARAMS = [
{
"table_id": "linha",
Expand Down Expand Up @@ -1117,6 +1141,8 @@ class constants(Enum): # pylint: disable=c0103
},
]

BILHETAGEM_EXCLUDE = "+operadoras +consorcios"

BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": "integracao",
Expand All @@ -1128,12 +1154,11 @@ class constants(Enum): # pylint: disable=c0103
},
"version": {},
},
"exclude": "+operadoras +consorcios",
"exclude": BILHETAGEM_EXCLUDE,
}

BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": "passageiros_hora",
"dataset_id": "dashboard_bilhetagem_jae",
"upstream": True,
"dbt_vars": {
"date_range": {
Expand All @@ -1142,14 +1167,14 @@ class constants(Enum): # pylint: disable=c0103
},
"version": {},
},
"exclude": "integracao matriz_integracao",
"exclude": "integracao matriz_integracao stops_gtfs2 routes_gtfs2 feed_info_gtfs2",
}

BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"table_id": "ordem_pagamento",
"upstream": True,
"exclude": f"+{BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS['table_id']}",
"exclude": BILHETAGEM_EXCLUDE,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "data_ordem",
Expand All @@ -1163,7 +1188,7 @@ class constants(Enum): # pylint: disable=c0103
"dataset_id": BILHETAGEM_DATASET_ID,
"upstream": True,
"downstream": True,
"exclude": "+operadoras +consorcios",
"exclude": BILHETAGEM_EXCLUDE,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "datetime_captura",
Expand Down

0 comments on commit 1c9b5b3

Please sign in to comment.