Skip to content

Commit

Permalink
Deploying to gh-pages from @ ad675a2 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Nov 9, 2023
1 parent 0d30722 commit 602b15c
Show file tree
Hide file tree
Showing 4 changed files with 487 additions and 40 deletions.
215 changes: 195 additions & 20 deletions rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f

from pipelines.rj_smtr.constants import constants

from pipelines.rj_smtr.schedules import every_hour, every_minute
from pipelines.rj_smtr.schedules import every_hour, every_minute, every_day_hour_five

# Flows #

Expand All @@ -87,7 +87,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f

bilhetagem_transacao_captura.schedule = every_minute

# BILHETAGEM GPS
# BILHETAGEM GPS - CAPTURA A CADA MINUTO #

bilhetagem_tracking_captura = deepcopy(default_capture_flow)
bilhetagem_tracking_captura.name = &#34;SMTR: Bilhetagem GPS Validador - Captura&#34;
Expand All @@ -105,6 +105,23 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f

bilhetagem_tracking_captura.schedule = every_minute

# BILHETAGEM RESSARCIMENTO - SUBFLOW PARA RODAR DIARIAMENTE #

bilhetagem_ressarcimento_captura = deepcopy(default_capture_flow)
bilhetagem_ressarcimento_captura.name = (
&#34;SMTR: Bilhetagem Ressarcimento - Captura (subflow)&#34;
)
bilhetagem_ressarcimento_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_ressarcimento_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_ressarcimento_captura = set_default_parameters(
flow=bilhetagem_ressarcimento_captura,
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
)

# BILHETAGEM AUXILIAR - SUBFLOW PARA RODAR ANTES DE CADA MATERIALIZAÇÃO #

bilhetagem_auxiliar_captura = deepcopy(default_capture_flow)
Expand All @@ -120,16 +137,20 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
default_parameters=constants.BILHETAGEM_GENERAL_CAPTURE_DEFAULT_PARAMS.value,
)

# MATERIALIZAÇÃO - SUBFLOW DE MATERIALIZAÇÃO
bilhetagem_materializacao = deepcopy(default_materialization_flow)
bilhetagem_materializacao.name = &#34;SMTR: Bilhetagem Transação - Materialização (subflow)&#34;
bilhetagem_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_materializacao.run_config = KubernetesRun(
# MATERIALIZAÇÃO #

# Transação
bilhetagem_materializacao_transacao = deepcopy(default_materialization_flow)
bilhetagem_materializacao_transacao.name = (
&#34;SMTR: Bilhetagem Transação - Materialização (subflow)&#34;
)
bilhetagem_materializacao_transacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_materializacao_transacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_materializacao_parameters = {
bilhetagem_materializacao_transacao_parameters = {
&#34;source_table_ids&#34;: [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value[&#34;table_id&#34;]
]
Expand All @@ -138,15 +159,52 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value[&#34;interval_minutes&#34;]
]
+ [d[&#34;interval_minutes&#34;] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value],
&#34;dataset_id&#34;: constants.BILHETAGEM_DATASET_ID.value,
} | constants.BILHETAGEM_MATERIALIZACAO_PARAMS.value
} | constants.BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS.value

bilhetagem_materializacao_transacao = set_default_parameters(
flow=bilhetagem_materializacao_transacao,
default_parameters=bilhetagem_materializacao_transacao_parameters,
)

bilhetagem_materializacao = set_default_parameters(
flow=bilhetagem_materializacao,
default_parameters=bilhetagem_materializacao_parameters,
# Ordem Pagamento

bilhetagem_materializacao_ordem_pagamento = deepcopy(default_materialization_flow)
bilhetagem_materializacao_ordem_pagamento.name = (
&#34;SMTR: Bilhetagem Ordem Pagamento - Materialização (subflow)&#34;
)
bilhetagem_materializacao_ordem_pagamento.storage = GCS(
emd_constants.GCS_FLOWS_BUCKET.value
)
bilhetagem_materializacao_ordem_pagamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

bilhetagem_materializacao_ordem_pagamento_parameters = {
&#34;source_table_ids&#34;: [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value[&#34;table_id&#34;]
]
+ [d[&#34;table_id&#34;] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value]
+ [
d[&#34;table_id&#34;] for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
],
&#34;capture_intervals_minutes&#34;: [
constants.BILHETAGEM_TRANSACAO_CAPTURE_PARAMS.value[&#34;interval_minutes&#34;]
]
+ [d[&#34;interval_minutes&#34;] for d in constants.BILHETAGEM_CAPTURE_PARAMS.value]
+ [
d[&#34;interval_minutes&#34;]
for d in constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value
],
} | constants.BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS.value

bilhetagem_materializacao_ordem_pagamento = set_default_parameters(
flow=bilhetagem_materializacao_ordem_pagamento,
default_parameters=bilhetagem_materializacao_ordem_pagamento_parameters,
)

# RECAPTURA

# RECAPTURA #

bilhetagem_recaptura = deepcopy(default_capture_flow)
bilhetagem_recaptura.name = &#34;SMTR: Bilhetagem - Recaptura (subflow)&#34;
Expand All @@ -157,10 +215,11 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
| {&#34;recapture&#34;: True},
)

# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO
# TRATAMENTO - RODA DE HORA EM HORA, RECAPTURAS + CAPTURA AUXILIAR + MATERIALIZAÇÃO #

with Flow(
&#34;SMTR: Bilhetagem Transação - Tratamento&#34;,
code_owners=[&#34;caio&#34;, &#34;fernanda&#34;, &#34;boris&#34;, &#34;rodrigo&#34;],
code_owners=[&#34;caio&#34;, &#34;fernanda&#34;, &#34;boris&#34;, &#34;rodrigo&#34;, &#34;rafaelpinheiro&#34;],
) as bilhetagem_transacao_tratamento:
# Configuração #

Expand Down Expand Up @@ -249,7 +308,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
with case(materialize, True):
# Materialização
run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao.name,
flow_name=bilhetagem_materializacao_transacao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
Expand All @@ -258,7 +317,9 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
wait_recaptura_transacao,
],
parameters={
&#34;timestamp&#34;: get_current_timestamp(timestamp=timestamp, return_str=True)
&#34;timestamp&#34;: get_current_timestamp(
timestamp=timestamp, return_str=True
),
},
)

Expand All @@ -279,7 +340,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f

with Flow(
&#34;SMTR: Bilhetagem GPS Validador - Tratamento&#34;,
code_owners=[&#34;caio&#34;, &#34;fernanda&#34;, &#34;boris&#34;, &#34;rodrigo&#34;],
code_owners=[&#34;caio&#34;, &#34;fernanda&#34;, &#34;boris&#34;, &#34;rodrigo&#34;, &#34;rafaelpinheiro&#34;],
) as bilhetagem_gps_tratamento:
timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
Expand Down Expand Up @@ -314,14 +375,123 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
bilhetagem_gps_tratamento.schedule = every_hour</code></pre>
bilhetagem_gps_tratamento.schedule = every_hour

with Flow(
&#34;SMTR: Bilhetagem Ordem Pagamento - Captura/Tratamento&#34;,
code_owners=[&#34;caio&#34;, &#34;fernanda&#34;, &#34;boris&#34;, &#34;rodrigo&#34;, &#34;rafaelpinheiro&#34;],
) as bilhetagem_ordem_pagamento_captura_tratamento:
capture = Parameter(&#34;capture&#34;, default=True)
materialize = Parameter(&#34;materialize&#34;, default=True)

timestamp = get_rounded_timestamp(
interval_minutes=constants.BILHETAGEM_TRATAMENTO_INTERVAL.value
)

rename_flow_run = rename_current_flow_run_now_time(
prefix=bilhetagem_ordem_pagamento_captura_tratamento.name + &#34; &#34;,
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Captura #
with case(capture, True):
runs_captura = create_flow_run.map(
flow_name=unmapped(bilhetagem_ressarcimento_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

wait_captura = wait_for_flow_run.map(
runs_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Recaptura #

runs_recaptura = create_flow_run.map(
flow_name=unmapped(bilhetagem_recaptura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=constants.BILHETAGEM_ORDEM_PAGAMENTO_CAPTURE_PARAMS.value,
labels=unmapped(LABELS),
)

runs_recaptura.set_upstream(wait_captura)

wait_recaptura_true = wait_for_flow_run.map(
runs_recaptura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

with case(capture, False):
wait_recaptura_false = task(lambda: None, name=&#34;assign_none_to_recapture&#34;)()

wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)

# Materialização #

with case(materialize, True):
run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao_ordem_pagamento.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[wait_recaptura],
parameters={
&#34;timestamp&#34;: get_current_timestamp(
timestamp=timestamp, return_str=True
),
},
)

wait_materializacao = wait_for_flow_run(
run_materializacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks(
[wait_materializacao, wait_recaptura]
)

bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS(
emd_constants.GCS_FLOWS_BUCKET.value
)
bilhetagem_ordem_pagamento_captura_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)


bilhetagem_ordem_pagamento_captura_tratamento.schedule = every_day_hour_five</code></pre>
</details>
</section>
<section>
</section>
<section>
</section>
<section>
<h2 class="section-title" id="header-functions">Functions</h2>
<dl>
<dt id="pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows.wait_recaptura_false"><code class="name flex">
<span>def <span class="ident">wait_recaptura_false</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">wait_recaptura_false = task(lambda: None, name=&#34;assign_none_to_recapture&#34;)()</code></pre>
</details>
</dd>
</dl>
</section>
<section>
</section>
Expand Down Expand Up @@ -383,6 +553,11 @@ <h1>Index</h1>
<li><code><a title="pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem" href="index.html">pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem</a></code></li>
</ul>
</li>
<li><h3><a href="#header-functions">Functions</a></h3>
<ul class="">
<li><code><a title="pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows.wait_recaptura_false" href="#pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows.wait_recaptura_false">wait_recaptura_false</a></code></li>
</ul>
</li>
</ul>
</nav>
</main>
Expand Down
Loading

0 comments on commit 602b15c

Please sign in to comment.