diff --git a/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html b/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html index 5c268f3dd..71b7a42d3 100644 --- a/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html +++ b/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html @@ -194,6 +194,7 @@
pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
)
bilhetagem_materializacao_transacao_parameters = {
+ "source_dataset_ids": [constants.BILHETAGEM_DATASET_ID.value],
"source_table_ids": [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
],
@@ -221,13 +222,16 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
+
+ordem_pagamento_sources_table_ids = [
+ constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
+] + [d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value]
+
bilhetagem_materializacao_ordem_pagamento_parameters = {
- "source_table_ids": [
- constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
- ]
- + [
- d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
+ "source_dataset_ids": [
+ constants.BILHETAGEM_DATASET_ID.value for _ in ordem_pagamento_sources_table_ids
],
+ "source_table_ids": ordem_pagamento_sources_table_ids,
"capture_intervals_minutes": [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"]
]
@@ -255,6 +259,7 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
)
bilhetagem_materializacao_integracao_parameters = {
+ "source_dataset_ids": [constants.BILHETAGEM_DATASET_ID.value],
"source_table_ids": [
constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value["table_id"]
],
diff --git a/rj_smtr/flows.html b/rj_smtr/flows.html
index 349aca7eb..b8b1733a6 100644
--- a/rj_smtr/flows.html
+++ b/rj_smtr/flows.html
@@ -231,6 +231,7 @@ Module pipelines.rj_smtr.flows
timestamp = Parameter("timestamp", default=None)
# Parametros Verificação de Recapturas
+ source_dataset_ids = Parameter("source_dataset_ids", default=[])
source_table_ids = Parameter("source_table_ids", default=[])
capture_intervals_minutes = Parameter("capture_intervals_minutes", default=[])
@@ -255,7 +256,7 @@ Module pipelines.rj_smtr.flows
)
query_logs_output = query_logs.map(
- dataset_id=unmapped(dataset_id),
+ dataset_id=source_dataset_ids,
table_id=source_table_ids,
interval_minutes=capture_intervals_minutes,
datetime_filter=query_logs_timestamps,