Skip to content

Commit

Permalink
muda logica recaptura bilhetagem
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Oct 18, 2023
1 parent 6b6d0cb commit ec23cf6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 42 deletions.
38 changes: 11 additions & 27 deletions pipelines/rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefect import Parameter

from prefect import task
from pipelines.utils.utils import log

# EMD Imports #

Expand All @@ -33,7 +30,7 @@
default_materialization_flow,
)

from pipelines.rj_smtr.tasks import get_current_timestamp, merge_dict_with_dict_list
from pipelines.rj_smtr.tasks import get_current_timestamp

from pipelines.rj_smtr.constants import constants

Expand Down Expand Up @@ -102,14 +99,12 @@

# RECAPTURA

bilhetagem_transacao_recaptura = deepcopy(default_capture_flow)
bilhetagem_transacao_recaptura.name = "SMTR: Bilhetagem Transação - Recaptura (subflow)"
bilhetagem_transacao_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_transacao_recaptura = set_default_parameters(
flow=bilhetagem_transacao_recaptura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS
| constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value
| {"recapture": True},
bilhetagem_recaptura = deepcopy(default_capture_flow)
bilhetagem_recaptura.name = "SMTR: Bilhetagem - Recaptura (subflow)"
bilhetagem_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_recaptura = set_default_parameters(
flow=bilhetagem_recaptura,
default_parameters=GENERAL_CAPTURE_DEFAULT_PARAMS | {"recapture": True},
)

# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
Expand All @@ -118,7 +113,6 @@
code_owners=["caio", "fernanda", "boris", "rodrigo"],
) as bilhetagem_transacao_tratamento:
# Configuração #
recapture_window_days = Parameter("recapture_window_days", default=1)

timestamp = get_current_timestamp()

Expand All @@ -132,11 +126,11 @@
# Recapturas

run_recaptura_trasacao = create_flow_run(
flow_name=bilhetagem_transacao_recaptura.name,
flow_name=bilhetagem_recaptura.name,
# project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
project_name="staging",
labels=LABELS,
parameters={"recapture_window_days": recapture_window_days},
parameters=constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value,
)

wait_recaptura_trasacao = wait_for_flow_run(
Expand All @@ -146,21 +140,11 @@
raise_final_state=True,
)

recaptura_auxiliar_params = merge_dict_with_dict_list(
dict_list=constants.BILHETAGEM_CAPTURE_PARAMS.value,
dict_to_merge={
"recapture": True,
"recapture_window_days": recapture_window_days,
},
)

task(lambda x: log(x))(x=recaptura_auxiliar_params)

runs_recaptura_auxiliar = create_flow_run.map(
flow_name=unmapped(bilhetagem_auxiliar_captura.name),
flow_name=unmapped(bilhetagem_recaptura.name),
# project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
project_name=unmapped("staging"),
parameters=recaptura_auxiliar_params,
parameters=constants.BILHETAGEM_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
upstream_tasks=[wait_recaptura_trasacao],
)
Expand Down
15 changes: 0 additions & 15 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,18 +1178,3 @@ def unpack_mapped_results_nout2(
"""
return [r[0] for r in mapped_results], [r[1] for r in mapped_results]


@task(checkpoint=False)
def merge_dict_with_dict_list(dict_list: list[dict], dict_to_merge: dict) -> list[dict]:
"""
Task to merge a dict with every dict inside a list
Args:
dict_list (list[dict]): A list of dictionaries to update
dict_to_merge (dict): The dict that will be merged in every dict inside the list
Returns:
list[dict]: The updated list
"""
return [inside_dict | dict_to_merge for inside_dict in dict_list]

0 comments on commit ec23cf6

Please sign in to comment.