Skip to content

Commit

Permalink
Deploying to gh-pages from @ 5f6589a 🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Jan 26, 2024
1 parent 2a9b164 commit 37119ac
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 92 deletions.
140 changes: 68 additions & 72 deletions rj_smtr/br_rj_riodejaneiro_bilhetagem/flows.html
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
# BILHETAGEM INTEGRAÇÃO - CAPTURA A CADA MINUTO #

bilhetagem_integracao_captura = deepcopy(default_capture_flow)
bilhetagem_integracao_captura.name = &#34;SMTR: Bilhetagem Integração - Captura&#34;
bilhetagem_integracao_captura.name = &#34;SMTR: Bilhetagem Integração - Captura (subflow)&#34;
bilhetagem_integracao_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
bilhetagem_integracao_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
Expand All @@ -108,7 +108,6 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
| constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
)

bilhetagem_integracao_captura.schedule = every_minute

# BILHETAGEM GPS - CAPTURA A CADA 5 MINUTOS #

Expand Down Expand Up @@ -324,23 +323,6 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
raise_final_state=True,
)

# Recaptura Integração

run_recaptura_integracao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
upstream_tasks=[wait_recaptura_transacao_true],
)

wait_recaptura_integracao_true = wait_for_flow_run(
run_recaptura_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

# Captura Auxiliar

runs_captura = create_flow_run.map(
Expand All @@ -350,7 +332,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
labels=unmapped(LABELS),
)

runs_captura.set_upstream(wait_recaptura_integracao_true)
runs_captura.set_upstream(wait_recaptura_transacao_true)

wait_captura_true = wait_for_flow_run.map(
runs_captura,
Expand Down Expand Up @@ -382,9 +364,8 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
wait_captura_false,
wait_recaptura_auxiliar_false,
wait_recaptura_transacao_false,
wait_recaptura_integracao_false,
) = task(
lambda: [None, None, None, None], name=&#34;assign_none_to_capture_runs&#34;, nout=4
lambda: [None, None, None], name=&#34;assign_none_to_capture_runs&#34;, nout=3
)()

wait_captura = merge(wait_captura_false, wait_captura_true)
Expand All @@ -394,9 +375,6 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
wait_recaptura_transacao = merge(
wait_recaptura_transacao_false, wait_recaptura_transacao_true
)
wait_recaptura_integracao = merge(
wait_recaptura_integracao_false, wait_recaptura_integracao_true
)

with case(materialize, True):
materialize_timestamp = get_current_timestamp(
Expand All @@ -411,7 +389,6 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
wait_captura,
wait_recaptura_auxiliar,
wait_recaptura_transacao,
wait_recaptura_integracao,
],
parameters={
&#34;timestamp&#34;: materialize_timestamp,
Expand All @@ -425,31 +402,12 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
raise_final_state=True,
)

run_materializacao_integracao = create_flow_run(
flow_name=bilhetagem_materializacao_integracao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
wait_materializacao_transacao,
],
parameters={
&#34;timestamp&#34;: materialize_timestamp,
},
)

wait_materializacao_integracao = wait_for_flow_run(
run_materializacao_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

run_materializacao_gps_validador = create_flow_run(
flow_name=bilhetagem_materializacao_gps_validador.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
wait_materializacao_integracao,
wait_materializacao_transacao,
],
parameters={
&#34;timestamp&#34;: materialize_timestamp,
Expand Down Expand Up @@ -507,6 +465,20 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
raise_final_state=unmapped(True),
)

runs_captura_integracao = create_flow_run(
flow_name=unmapped(bilhetagem_integracao_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
labels=unmapped(LABELS),
upstream_tasks=[wait_captura],
)

wait_captura_integracao = wait_for_flow_run(
runs_captura_integracao,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

# Recaptura #

runs_recaptura = create_flow_run.map(
Expand All @@ -525,23 +497,48 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
raise_final_state=unmapped(True),
)

# Recaptura Integração

run_recaptura_integracao = create_flow_run(
flow_name=bilhetagem_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
parameters=constants.BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS.value,
upstream_tasks=[wait_recaptura_true, wait_captura_integracao],
)

wait_recaptura_integracao_true = wait_for_flow_run(
run_recaptura_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

with case(capture, False):
wait_recaptura_false = task(lambda: None, name=&#34;assign_none_to_recapture&#34;)()
wait_recaptura_false, wait_recaptura_integracao_false = task(
lambda: [None, None], name=&#34;assign_none_to_recapture&#34;, nout=2
)()

wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)
wait_recaptura_integracao = merge(
wait_recaptura_integracao_true, wait_recaptura_integracao_false
)

# Materialização #

with case(materialize, True):
materialize_timestamp = get_current_timestamp(
timestamp=timestamp,
return_str=True,
)

run_materializacao = create_flow_run(
flow_name=bilhetagem_materializacao_ordem_pagamento.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[wait_recaptura],
upstream_tasks=[wait_recaptura, wait_recaptura_integracao],
parameters={
&#34;timestamp&#34;: get_current_timestamp(
timestamp=timestamp, return_str=True
),
&#34;timestamp&#34;: materialize_timestamp,
},
)

Expand All @@ -552,8 +549,27 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
raise_final_state=True,
)

run_materializacao_integracao = create_flow_run(
flow_name=bilhetagem_materializacao_integracao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[
wait_materializacao,
],
parameters={
&#34;timestamp&#34;: materialize_timestamp,
},
)

wait_materializacao_integracao = wait_for_flow_run(
run_materializacao_integracao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

bilhetagem_ordem_pagamento_captura_tratamento.set_reference_tasks(
[wait_materializacao, wait_recaptura]
[wait_materializacao_integracao, wait_recaptura]
)

bilhetagem_ordem_pagamento_captura_tratamento.storage = GCS(
Expand All @@ -573,21 +589,6 @@ <h1 class="title">Module <code>pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.f
<section>
</section>
<section>
<h2 class="section-title" id="header-functions">Functions</h2>
<dl>
<dt id="pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows.wait_recaptura_false"><code class="name flex">
<span>def <span class="ident">wait_recaptura_false</span></span>(<span>)</span>
</code></dt>
<dd>
<div class="desc"></div>
<details class="source">
<summary>
<span>Expand source code</span>
</summary>
<pre><code class="python">wait_recaptura_false = task(lambda: None, name=&#34;assign_none_to_recapture&#34;)()</code></pre>
</details>
</dd>
</dl>
</section>
<section>
</section>
Expand Down Expand Up @@ -649,11 +650,6 @@ <h1>Index</h1>
<li><code><a title="pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem" href="index.html">pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem</a></code></li>
</ul>
</li>
<li><h3><a href="#header-functions">Functions</a></h3>
<ul class="">
<li><code><a title="pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows.wait_recaptura_false" href="#pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows.wait_recaptura_false">wait_recaptura_false</a></code></li>
</ul>
</li>
</ul>
</nav>
</main>
Expand Down
40 changes: 20 additions & 20 deletions rj_smtr/constants.html
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ <h1 class="title">Module <code>pipelines.rj_smtr.constants</code></h1>
&#34;&#34;&#34;,
},
&#34;primary_key&#34;: [&#34;id&#34;],
&#34;interval_minutes&#34;: 1,
&#34;interval_minutes&#34;: 1440,
}

BILHETAGEM_TRACKING_CAPTURE_PARAMS = {
Expand Down Expand Up @@ -484,32 +484,32 @@ <h1 class="title">Module <code>pipelines.rj_smtr.constants</code></h1>
},
]

BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
&#34;dataset_id&#34;: BILHETAGEM_DATASET_ID,
&#34;table_id&#34;: BILHETAGEM_TRANSACAO_CAPTURE_PARAMS[&#34;table_id&#34;],
&#34;table_id&#34;: &#34;integracao&#34;,
&#34;upstream&#34;: True,
&#34;dbt_vars&#34;: {
&#34;date_range&#34;: {
&#34;table_run_datetime_column_name&#34;: &#34;datetime_transacao&#34;,
&#34;delay_hours&#34;: 1,
&#34;table_run_datetime_column_name&#34;: &#34;datetime_captura&#34;,
&#34;delay_hours&#34;: 0,
},
&#34;version&#34;: {},
},
&#34;exclude&#34;: &#34;+operadoras +consorcios&#34;,
}

BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
&#34;dataset_id&#34;: BILHETAGEM_DATASET_ID,
&#34;table_id&#34;: BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS[&#34;table_id&#34;],
&#34;table_id&#34;: &#34;passageiros_hora&#34;,
&#34;upstream&#34;: True,
&#34;dbt_vars&#34;: {
&#34;date_range&#34;: {
&#34;table_run_datetime_column_name&#34;: &#34;datetime_captura&#34;,
&#34;table_run_datetime_column_name&#34;: &#34;datetime_transacao&#34;,
&#34;delay_hours&#34;: 1,
&#34;table_alias&#34;: &#34;integracao&#34;,
},
&#34;version&#34;: {},
},
&#34;exclude&#34;: &#34;+diretorio_operadoras +diretorio_consorcios&#34;,
&#34;exclude&#34;: &#34;integracao matriz_integracao&#34;,
}

BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
Expand Down Expand Up @@ -1048,7 +1048,7 @@ <h2 class="section-title" id="header-classes">Classes</h2>
&#34;&#34;&#34;,
},
&#34;primary_key&#34;: [&#34;id&#34;],
&#34;interval_minutes&#34;: 1,
&#34;interval_minutes&#34;: 1440,
}

BILHETAGEM_TRACKING_CAPTURE_PARAMS = {
Expand Down Expand Up @@ -1276,32 +1276,32 @@ <h2 class="section-title" id="header-classes">Classes</h2>
},
]

BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
&#34;dataset_id&#34;: BILHETAGEM_DATASET_ID,
&#34;table_id&#34;: BILHETAGEM_TRANSACAO_CAPTURE_PARAMS[&#34;table_id&#34;],
&#34;table_id&#34;: &#34;integracao&#34;,
&#34;upstream&#34;: True,
&#34;dbt_vars&#34;: {
&#34;date_range&#34;: {
&#34;table_run_datetime_column_name&#34;: &#34;datetime_transacao&#34;,
&#34;delay_hours&#34;: 1,
&#34;table_run_datetime_column_name&#34;: &#34;datetime_captura&#34;,
&#34;delay_hours&#34;: 0,
},
&#34;version&#34;: {},
},
&#34;exclude&#34;: &#34;+operadoras +consorcios&#34;,
}

BILHETAGEM_MATERIALIZACAO_INTEGRACAO_PARAMS = {
BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
&#34;dataset_id&#34;: BILHETAGEM_DATASET_ID,
&#34;table_id&#34;: BILHETAGEM_INTEGRACAO_CAPTURE_PARAMS[&#34;table_id&#34;],
&#34;table_id&#34;: &#34;passageiros_hora&#34;,
&#34;upstream&#34;: True,
&#34;dbt_vars&#34;: {
&#34;date_range&#34;: {
&#34;table_run_datetime_column_name&#34;: &#34;datetime_captura&#34;,
&#34;table_run_datetime_column_name&#34;: &#34;datetime_transacao&#34;,
&#34;delay_hours&#34;: 1,
&#34;table_alias&#34;: &#34;integracao&#34;,
},
&#34;version&#34;: {},
},
&#34;exclude&#34;: &#34;+diretorio_operadoras +diretorio_consorcios&#34;,
&#34;exclude&#34;: &#34;integracao matriz_integracao&#34;,
}

BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS = {
Expand Down

0 comments on commit 37119ac

Please sign in to comment.