diff --git a/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html b/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html index 6d65cacc2..2f37b971e 100644 --- a/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html +++ b/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html @@ -65,7 +65,7 @@
pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
from pipelines.rj_smtr.constants import constants
-from pipelines.rj_smtr.schedules import every_hour, every_minute
+from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day_hour_five
# Flows #
@@ -87,7 +87,7 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
bilhetagem_transacao_captura.schedule = every_minute
-# BILHETAGEM GPS
+# BILHETAGEM GPS - CAPTURA A CADA MINUTO #
bilhetagem_tracking_captura = deepcopy(default_capture_flow)
bilhetagem_tracking_captura.name = "SMTR: Bilhetagem GPS Validador - Captura"
@@ -105,6 +105,23 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
bilhetagem_tracking_captura.schedule = every_minute
+# BILHETAGEM RESSARCIMENTO - SUBFLOW PARA RODAR DIARIAMENTE #
+
+bilhetagem_ressarcimento_captura = deepcopy(default_capture_flow)
+bilhetagem_ressarcimento_captura.name = (
+ "SMTR: Bilhetagem Ressarcimento - Captura (subflow)"
+)
+bilhetagem_ressarcimento_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
+bilhetagem_ressarcimento_captura.run_config = KubernetesRun(
+ image=emd_constants.DOCKER_IMAGE.value,
+ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
+)
+
+bilhetagem_ressarcimento_captura = set_default_parameters(
+ flow=bilhetagem_ressarcimento_captura,
+ default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
+)
+
# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #
bilhetagem_auxiliar_captura = deepcopy(default_capture_flow)
@@ -120,16 +137,20 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
)
-# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
-bilhetagem_materializacao = deepcopy(default_materialization_flow)
-bilhetagem_materializacao.name = "SMTR: Bilhetagem Transação - Materialização (subflow)"
-bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
-bilhetagem_materializacao.run_config = KubernetesRun(
+# MATERIALIZAÇÃO #
+
+# Transação
+bilhetagem_materializacao_transacao = deepcopy(default_materialization_flow)
+bilhetagem_materializacao_transacao.name = (
+ "SMTR: Bilhetagem Transação - Materialização (subflow)"
+)
+bilhetagem_materializacao_transacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
+bilhetagem_materializacao_transacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
-bilhetagem_materializacao_parameters = {
+bilhetagem_materializacao_transacao_parameters = {
"source_table_ids": [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
]
@@ -138,15 +159,52 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"]
]
+ [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value],
- "dataset_id": constants.BILHETAGEM_DATASET_ID.value,
-} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value
+} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value
+
+bilhetagem_materializacao_transacao = set_default_parameters(
+ flow=bilhetagem_materializacao_transacao,
+ default_parameters=bilhetagem_materializacao_transacao_parameters,
+)
-bilhetagem_materializacao = set_default_parameters(
- flow=bilhetagem_materializacao,
- default_parameters=bilhetagem_materializacao_parameters,
+# Ordem Pagamento
+
+bilhetagem_materializacao_ordem_pagamento = deepcopy(default_materialization_flow)
+bilhetagem_materializacao_ordem_pagamento.name = (
+ "SMTR: Bilhetagem Ordem Pagamento - Materialização (subflow)"
+)
+bilhetagem_materializacao_ordem_pagamento.storage = GCS(
+ emd_constants.GCS_FLOWS_BUCKET.value
+)
+bilhetagem_materializacao_ordem_pagamento.run_config = KubernetesRun(
+ image=emd_constants.DOCKER_IMAGE.value,
+ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
+)
+
+bilhetagem_materializacao_ordem_pagamento_parameters = {
+ "source_table_ids": [
+ constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["table_id"]
+ ]
+ + [d["table_id"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value]
+ + [
+ d["table_id"] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
+ ],
+ "capture_intervals_minutes": [
+ constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value["interval_minutes"]
+ ]
+ + [d["interval_minutes"] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value]
+ + [
+ d["interval_minutes"]
+ for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
+ ],
+} | constants.BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS.value
+
+bilhetagem_materializacao_ordem_pagamento = set_default_parameters(
+ flow=bilhetagem_materializacao_ordem_pagamento,
+ default_parameters=bilhetagem_materializacao_ordem_pagamento_parameters,
)
-# RECAPTURA
+
+# RECAPTURA #
bilhetagem_recaptura = deepcopy(default_capture_flow)
bilhetagem_recaptura.name = "SMTR: Bilhetagem - Recaptura (subflow)"
@@ -157,10 +215,11 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
| {"recapture": True},
)
-# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
+# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO #
+
with Flow(
"SMTR: Bilhetagem Transação - Tratamento",
- code_owners=["caio", "fernanda", "boris", "rodrigo"],
+ code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
) as bilhetagem_transacao_tratamento:
# Configuração #
@@ -249,7 +308,7 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
with case(materialize, True):
# Materialização
run_materializacao = create_flow_run(
- flow_name=bilhetagem_materializacao.name,
+ flow_name=bilhetagem_materializacao_transacao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
@@ -258,7 +317,9 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
wait_recaptura_transacao,
],
parameters={
- "timestamp": get_current_timestamp(timestamp=timestamp, return_str=True)
+ "timestamp": get_current_timestamp(
+ timestamp=timestamp, return_str=True
+ ),
},
)
@@ -279,7 +340,7 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
with Flow(
"SMTR: Bilhetagem GPS Validador - Tratamento",
- code_owners=["caio", "fernanda", "boris", "rodrigo"],
+ code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
) as bilhetagem_gps_tratamento:
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
@@ -314,7 +375,101 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
-bilhetagem_gps_tratamento.schedule = every_hour
+bilhetagem_gps_tratamento.schedule = every_hour
+
+with Flow(
+ "SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento",
+ code_owners=["caio", "fernanda", "boris", "rodrigo", "rafaelpinheiro"],
+) as bilhetagem_ordem_pagamento_captura_tratamento:
+ capture = Parameter("capture", default=True)
+ materialize = Parameter("materialize", default=True)
+
+ timestamp = get_rounded_timestamp(
+ interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
+ )
+
+ rename_flow_run = rename_current_flow_run_now_time(
+ prefix=bilhetagem_ordem_pagamento_captura_tratamento.name + " ",
+ now_time=timestamp,
+ )
+
+ LABELS = get_current_flow_labels()
+
+ # Captura #
+ with case(capture, True):
+ runs_captura = create_flow_run.map(
+ flow_name=unmapped(bilhetagem_ressarcimento_captura.name),
+ project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
+ parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value,
+ labels=unmapped(LABELS),
+ )
+
+ wait_captura = wait_for_flow_run.map(
+ runs_captura,
+ stream_states=unmapped(True),
+ stream_logs=unmapped(True),
+ raise_final_state=unmapped(True),
+ )
+
+ # Recaptura #
+
+ runs_recaptura = create_flow_run.map(
+ flow_name=unmapped(bilhetagem_recaptura.name),
+ project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
+ parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value,
+ labels=unmapped(LABELS),
+ )
+
+ runs_recaptura.set_upstream(wait_captura)
+
+ wait_recaptura_true = wait_for_flow_run.map(
+ runs_recaptura,
+ stream_states=unmapped(True),
+ stream_logs=unmapped(True),
+ raise_final_state=unmapped(True),
+ )
+
+ with case(capture, False):
+ wait_recaptura_false = task(lambda: None, name="assign_none_to_recapture")()
+
+ wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)
+
+ # Materialização #
+
+ with case(materialize, True):
+ run_materializacao = create_flow_run(
+ flow_name=bilhetagem_materializacao_ordem_pagamento.name,
+ project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
+ labels=LABELS,
+ upstream_tasks=[wait_recaptura],
+ parameters={
+ "timestamp": get_current_timestamp(
+ timestamp=timestamp, return_str=True
+ ),
+ },
+ )
+
+ wait_materializacao = wait_for_flow_run(
+ run_materializacao,
+ stream_states=True,
+ stream_logs=True,
+ raise_final_state=True,
+ )
+
+ bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks(
+ [wait_materializacao, wait_recaptura]
+ )
+
+bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS(
+ emd_constants.GCS_FLOWS_BUCKET.value
+)
+bilhetagem_ordem_pagamento_captura_tratamento.run_config = KubernetesRun(
+ image=emd_constants.DOCKER_IMAGE.value,
+ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
+)
+
+
+bilhetagem_ordem_pagamento_captura_tratamento.schedule = every_day_hour_five
@@ -322,6 +477,21 @@ Module pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
+Functions
+
+
+def wait_recaptura_false()
+
+-
+
+
+
+Expand source code
+
+wait_recaptura_false = task(lambda: None, name="assign_none_to_recapture")()
+
+
+
@@ -383,6 +553,11 @@ Index
pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem
+Functions
+
+
diff --git a/rj_smtr/constants.html b/rj_smtr/constants.html
index 7fedf189c..9750238ba 100644
--- a/rj_smtr/constants.html
+++ b/rj_smtr/constants.html
@@ -213,6 +213,10 @@ Module pipelines.rj_smtr.constants
"engine": "postgresql",
"host": "10.5.15.25",
},
+ "ressarcimento_db": {
+ "engine": "postgresql",
+ "host": "10.5.15.127",
+ },
},
"source_type": "db",
}
@@ -255,6 +259,45 @@ Module pipelines.rj_smtr.constants
"interval_minutes": 1,
}
+ BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS = [
+ {
+ "table_id": "ordem_ressarcimento",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "ressarcimento_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ ordem_ressarcimento
+ WHERE
+ data_inclusao BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["id"],
+ "interval_minutes": 1440,
+ },
+ {
+ "table_id": "ordem_pagamento",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "ressarcimento_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ ordem_pagamento
+ WHERE
+ data_inclusao BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["id"],
+ "interval_minutes": 1440,
+ },
+ ]
+
BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"
BILHETAGEM_TRATAMENTO_INTERVAL = 60
@@ -335,9 +378,79 @@ Module pipelines.rj_smtr.constants
], # id column to nest data on
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
},
+ {
+ "table_id": "operadora_transporte",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ OPERADORA_TRANSPORTE
+ WHERE
+ DT_INCLUSAO BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["CD_OPERADORA_TRANSPORTE"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
+ {
+ "table_id": "pessoa_juridica",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ PESSOA_JURIDICA
+ """,
+ },
+ "primary_key": ["CD_CLIENTE"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
+ {
+ "table_id": "consorcio",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ CONSORCIO
+ WHERE
+ DT_INCLUSAO BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["CD_CONSORCIO"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
+ {
+ "table_id": "linha_consorcio",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ LINHA_CONSORCIO
+ WHERE
+ DT_INCLUSAO BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["CD_CONSORCIO", "CD_LINHA"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
]
- BILHETAGEM_MATERIALIZACAO_PARAMS = {
+ BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
+ "dataset_id": BILHETAGEM_DATASET_ID,
"table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"],
"upstream": True,
"dbt_vars": {
@@ -349,6 +462,20 @@ Module pipelines.rj_smtr.constants
},
}
+ BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
+ "dataset_id": BILHETAGEM_DATASET_ID,
+ "table_id": "ordem_pagamento_validacao",
+ "upstream": True,
+ "exclude": f"+{BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS['table_id']}",
+ "dbt_vars": {
+ "date_range": {
+ "table_run_datetime_column_name": "data",
+ "delay_hours": 0,
+ },
+ "version": {},
+ },
+ }
+
BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"secret_path": BILHETAGEM_SECRET_PATH,
@@ -628,6 +755,10 @@ Classes
"engine": "postgresql",
"host": "10.5.15.25",
},
+ "ressarcimento_db": {
+ "engine": "postgresql",
+ "host": "10.5.15.127",
+ },
},
"source_type": "db",
}
@@ -670,6 +801,45 @@ Classes
"interval_minutes": 1,
}
+ BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS = [
+ {
+ "table_id": "ordem_ressarcimento",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "ressarcimento_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ ordem_ressarcimento
+ WHERE
+ data_inclusao BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["id"],
+ "interval_minutes": 1440,
+ },
+ {
+ "table_id": "ordem_pagamento",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "ressarcimento_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ ordem_pagamento
+ WHERE
+ data_inclusao BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["id"],
+ "interval_minutes": 1440,
+ },
+ ]
+
BILHETAGEM_SECRET_PATH = "smtr_jae_access_data"
BILHETAGEM_TRATAMENTO_INTERVAL = 60
@@ -750,9 +920,79 @@ Classes
], # id column to nest data on
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
},
+ {
+ "table_id": "operadora_transporte",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ OPERADORA_TRANSPORTE
+ WHERE
+ DT_INCLUSAO BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["CD_OPERADORA_TRANSPORTE"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
+ {
+ "table_id": "pessoa_juridica",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ PESSOA_JURIDICA
+ """,
+ },
+ "primary_key": ["CD_CLIENTE"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
+ {
+ "table_id": "consorcio",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ CONSORCIO
+ WHERE
+ DT_INCLUSAO BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["CD_CONSORCIO"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
+ {
+ "table_id": "linha_consorcio",
+ "partition_date_only": True,
+ "extract_params": {
+ "database": "principal_db",
+ "query": """
+ SELECT
+ *
+ FROM
+ LINHA_CONSORCIO
+ WHERE
+ DT_INCLUSAO BETWEEN '{start}'
+ AND '{end}'
+ """,
+ },
+ "primary_key": ["CD_CONSORCIO", "CD_LINHA"], # id column to nest data on
+ "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
+ },
]
- BILHETAGEM_MATERIALIZACAO_PARAMS = {
+ BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
+ "dataset_id": BILHETAGEM_DATASET_ID,
"table_id": BILHETAGEM_TRANSACAO_CAPTURE_PARAMS["table_id"],
"upstream": True,
"dbt_vars": {
@@ -764,6 +1004,20 @@ Classes
},
}
+ BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
+ "dataset_id": BILHETAGEM_DATASET_ID,
+ "table_id": "ordem_pagamento_validacao",
+ "upstream": True,
+ "exclude": f"+{BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS['table_id']}",
+ "dbt_vars": {
+ "date_range": {
+ "table_run_datetime_column_name": "data",
+ "delay_hours": 0,
+ },
+ "version": {},
+ },
+ }
+
BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS = {
"dataset_id": BILHETAGEM_DATASET_ID,
"secret_path": BILHETAGEM_SECRET_PATH,
@@ -867,7 +1121,15 @@ Class variables
-var BILHETAGEM_MATERIALIZACAO_PARAMS
+var BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS
+
+
+
+var BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS
+
+
+
+var BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS
@@ -1206,7 +1468,9 @@ BILHETAGEM_DATASET_ID
BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS
BILHETAGEM_GENERAL_CAPTURE_PARAMS
-BILHETAGEM_MATERIALIZACAO_PARAMS
+BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS
+BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS
+BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS
BILHETAGEM_SECRET_PATH
BILHETAGEM_TRACKING_CAPTURE_PARAMS
BILHETAGEM_TRANSACAO_CAPTURE_PARAMS
diff --git a/rj_smtr/tasks.html b/rj_smtr/tasks.html
index 5792b735c..31f88498b 100644
--- a/rj_smtr/tasks.html
+++ b/rj_smtr/tasks.html
@@ -32,7 +32,7 @@ Module pipelines.rj_smtr.tasks
"""
Tasks for rj_smtr
"""
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, date
import json
import os
from pathlib import Path
@@ -461,7 +461,7 @@ Module pipelines.rj_smtr.tasks
# Extract data
#
###############
-@task(nout=3)
+@task(nout=3, max_retries=3, retry_delay=timedelta(seconds=5))
def query_logs(
dataset_id: str,
table_id: str,
@@ -557,6 +557,7 @@ Module pipelines.rj_smtr.tasks
"""
log(f"Run query to check logs:\n{query}")
results = bd.read_sql(query=query, billing_project_id=bq_project())
+
if len(results) > 0:
results = results.sort_values(["timestamp_captura"])
results["timestamp_captura"] = (
@@ -1116,6 +1117,9 @@ Module pipelines.rj_smtr.tasks
else:
last_run = datetime.strptime(last_run, timestr)
+ if isinstance(last_run, date):
+ last_run = datetime(last_run.year, last_run.month, last_run.day)
+
# set start to last run hour (H)
start_ts = last_run.replace(minute=0, second=0, microsecond=0).strftime(timestr)
@@ -2352,6 +2356,9 @@ Returns
else:
last_run = datetime.strptime(last_run, timestr)
+ if isinstance(last_run, date):
+ last_run = datetime(last_run.year, last_run.month, last_run.day)
+
# set start to last run hour (H)
start_ts = last_run.replace(minute=0, second=0, microsecond=0).strftime(timestr)
@@ -2755,7 +2762,7 @@ Returns
Expand source code
-@task(nout=3)
+@task(nout=3, max_retries=3, retry_delay=timedelta(seconds=5))
def query_logs(
dataset_id: str,
table_id: str,
@@ -2851,6 +2858,7 @@ Returns
"""
log(f"Run query to check logs:\n{query}")
results = bd.read_sql(query=query, billing_project_id=bq_project())
+
if len(results) > 0:
results = results.sort_values(["timestamp_captura"])
results["timestamp_captura"] = (
diff --git a/rj_smtr/utils.html b/rj_smtr/utils.html
index a6cc80fe0..276bbc93b 100644
--- a/rj_smtr/utils.html
+++ b/rj_smtr/utils.html
@@ -36,7 +36,7 @@ Module pipelines.rj_smtr.utils
from ftplib import FTP
from pathlib import Path
-from datetime import timedelta, datetime
+from datetime import timedelta, datetime, date
from typing import List, Union, Any
import traceback
import io
@@ -501,12 +501,12 @@ Module pipelines.rj_smtr.utils
Returns:
Any: Serialized object
"""
- if isinstance(obj, pd.Timestamp):
- if obj.tzinfo is None:
- obj = obj.tz_localize("UTC").tz_convert(
- emd_constants.DEFAULT_TIMEZONE.value
- )
-
+ if isinstance(obj, (pd.Timestamp, date)):
+ if isinstance(obj, pd.Timestamp):
+ if obj.tzinfo is None:
+ obj = obj.tz_localize("UTC").tz_convert(
+ emd_constants.DEFAULT_TIMEZONE.value
+ )
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
@@ -1119,12 +1119,12 @@ Returns
Returns:
Any: Serialized object
"""
- if isinstance(obj, pd.Timestamp):
- if obj.tzinfo is None:
- obj = obj.tz_localize("UTC").tz_convert(
- emd_constants.DEFAULT_TIMEZONE.value
- )
-
+ if isinstance(obj, (pd.Timestamp, date)):
+ if isinstance(obj, pd.Timestamp):
+ if obj.tzinfo is None:
+ obj = obj.tz_localize("UTC").tz_convert(
+ emd_constants.DEFAULT_TIMEZONE.value
+ )
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")