Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adiciona pipeline de captura e tratamento do gtfs #510

Merged
merged 247 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
247 commits
Select commit Hold shift + click to select a range
e930ec8
adiciona estrutura base da pipeline gtfs
fernandascovino Sep 15, 2023
03698e7
[WIP] update tasks and flow files, fixed typos
lingsv Sep 18, 2023
bed179a
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Sep 18, 2023
a252fe4
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Sep 19, 2023
03a869f
[WIP] update flow 1
lingsv Sep 19, 2023
592cbd7
Merge branch 'staging/smtr-pipeline-gtfs' of github.com:prefeitura-ri…
lingsv Sep 19, 2023
f908714
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2023
7a85db9
create default materialization flow
pixuimpou Sep 20, 2023
e91b2af
create tasks for default materialization flow
pixuimpou Sep 20, 2023
0b00cbc
make generate_execute_schedules more generic
pixuimpou Sep 20, 2023
012a9b0
create bilhetagem materialization flow
pixuimpou Sep 20, 2023
b900910
adapt bilhetagem schedules for the new model
pixuimpou Sep 20, 2023
718d402
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
b903d4d
Merge branch 'master' into staging/flow-bilhetagem-materializacao
mergify[bot] Sep 20, 2023
8ad9e18
add run config and storage
pixuimpou Sep 20, 2023
475f19d
add run_config and storage
pixuimpou Sep 20, 2023
df8cdf9
Update utils.py
eng-rodrigocunha Sep 20, 2023
b067670
fix sub tasks
pixuimpou Sep 20, 2023
0cf5a2b
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 20, 2023
e670280
fix fetch_dataset_sha run
pixuimpou Sep 20, 2023
10deba3
[WIP] update download_gtfs function
lingsv Sep 20, 2023
5ff6131
[WIP] update git ignore
lingsv Sep 20, 2023
969c578
add run_date variable to materialization flow
pixuimpou Sep 20, 2023
6ef7432
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2023
d37e16d
[WIP] update flow 1
lingsv Sep 21, 2023
bb3fdd2
remove discord notifications for testing
pixuimpou Sep 21, 2023
37b25c8
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 21, 2023
1e852d6
add manual date_range / fix flow run name
pixuimpou Sep 21, 2023
655cbd9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 21, 2023
bf4de52
fix missing table_id logic
pixuimpou Sep 21, 2023
93cc738
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 21, 2023
16ac45d
fix empty return
pixuimpou Sep 21, 2023
08d94ae
fix empty return
pixuimpou Sep 21, 2023
2d55d6e
add flag_date_range when var_params is blank
pixuimpou Sep 21, 2023
7f53e37
[WIP] Created Flow 2, update gtfs table params
lingsv Sep 21, 2023
e80f1df
change rename logic when has date variables
pixuimpou Sep 21, 2023
b6bb76e
change return values of create_dbt_run_vars
pixuimpou Sep 21, 2023
134fb87
create dict aux function
pixuimpou Sep 21, 2023
707724e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 21, 2023
b2afea6
remove *args from task
pixuimpou Sep 21, 2023
caa65c0
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 21, 2023
f10c1b6
change coalesce task
pixuimpou Sep 21, 2023
2d65b3b
fix rename task
pixuimpou Sep 22, 2023
a797f3f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 22, 2023
3019666
fix task order
pixuimpou Sep 22, 2023
f0b3a9f
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 22, 2023
3df5908
add docstrings
pixuimpou Sep 22, 2023
bf05209
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 22, 2023
cb3ded8
fix line too long
pixuimpou Sep 22, 2023
9de4be9
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 22, 2023
3834214
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 22, 2023
9082bbe
pre-commit hook
pixuimpou Sep 22, 2023
c2bd0ed
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Sep 22, 2023
f3535e4
adjust tasks
pixuimpou Sep 22, 2023
3a784c5
[WIP] update flows
lingsv Sep 22, 2023
b6089fa
remove task de particao nao usada
fernandascovino Sep 25, 2023
dc197cc
unifica tasks de particao de data e hora
fernandascovino Sep 25, 2023
66e84a1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 25, 2023
7cb436b
corrige condicional
fernandascovino Sep 25, 2023
8903027
Merge branch 'master' into hotfix-smtr-partition-date-only
mergify[bot] Sep 26, 2023
588fe7d
change capture flow
pixuimpou Sep 26, 2023
da92c19
Merge branch 'master' into hotfix-smtr-partition-date-only
mergify[bot] Sep 26, 2023
97746e1
change generic capture flow
pixuimpou Sep 26, 2023
7a11896
Merge branch 'master' into staging/flow-captura-generico
fernandascovino Sep 26, 2023
b62473c
Merge branch 'hotfix-smtr-partition-date-only' into staging/flow-capt…
fernandascovino Sep 26, 2023
6f12477
atualiza esquema do flow padrao
fernandascovino Sep 26, 2023
0c3df1b
change default capture flow structure
pixuimpou Sep 27, 2023
f6ca7ab
change generic capture flow
pixuimpou Sep 27, 2023
fa17be2
adjust constant structure
pixuimpou Sep 27, 2023
bdc3881
change bilhetagem to new capture flow structure
pixuimpou Sep 27, 2023
fc61c47
fix get_storage_blob function
pixuimpou Sep 27, 2023
0fc26cb
fix get_storage_blob call
pixuimpou Sep 27, 2023
634df85
organize constants order
pixuimpou Sep 27, 2023
bda52aa
fix get_raw_from_sources function call
pixuimpou Sep 27, 2023
b2548d6
change transform_raw_to_json to read_raw_data
pixuimpou Sep 27, 2023
307863a
transform transform_raw_data_to_json to read_raw_data
pixuimpou Sep 27, 2023
7f2c1e3
fix nout task parameter
pixuimpou Sep 27, 2023
51977c1
fix timedelta instantiation
pixuimpou Sep 27, 2023
8ef0b5d
set upstream tasks
pixuimpou Sep 27, 2023
4f21f0a
declare raw_filepath
pixuimpou Sep 27, 2023
11b9735
update docstrings
eng-rodrigocunha Sep 27, 2023
f484b88
adjust get_raw_from_sources return
pixuimpou Sep 27, 2023
50626a9
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Sep 27, 2023
2df4318
fix errors
pixuimpou Sep 27, 2023
df6525a
change agent label to dev
pixuimpou Sep 27, 2023
2983b68
refactore source values
pixuimpou Sep 28, 2023
2c78b09
update constants
eng-rodrigocunha Sep 28, 2023
1f3c2fc
update agent
eng-rodrigocunha Sep 28, 2023
702e70d
update schedule params
eng-rodrigocunha Sep 28, 2023
b5712d2
update interval
eng-rodrigocunha Sep 28, 2023
e3df22c
fix get_datetime_range interval
eng-rodrigocunha Sep 28, 2023
6ed06da
remove order by from queries
eng-rodrigocunha Sep 28, 2023
822c59f
fix get_raw_data_api
eng-rodrigocunha Sep 28, 2023
f3ddf24
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Sep 28, 2023
14dd234
Merge branch 'master' into staging/flow-captura-generico
mergify[bot] Sep 28, 2023
c58ea96
change json read function
pixuimpou Sep 28, 2023
045a423
update read_raw_data
lingsv Sep 28, 2023
d2d188f
update save_raw_local_func
lingsv Sep 28, 2023
b7c4e2f
log error
pixuimpou Sep 28, 2023
2bedf89
change raw api extraction for json
pixuimpou Sep 28, 2023
20b48df
change read json function
pixuimpou Sep 28, 2023
42c6ac0
print log traceback
pixuimpou Sep 28, 2023
2527604
skip pre treatment if empty df
pixuimpou Sep 29, 2023
0f907b9
skip save staging if dataframe is empty / save raw
pixuimpou Sep 29, 2023
e9d9fa5
pulled with flow generico branch
lingsv Sep 29, 2023
ba1dad2
remove skip upload if empty dataframe
pixuimpou Sep 29, 2023
4c3d1cf
update docstring and returned values
pixuimpou Sep 29, 2023
39e8606
reorganize task order
pixuimpou Sep 29, 2023
465ee52
fix tuple
pixuimpou Sep 29, 2023
67a1056
change zip logic
pixuimpou Sep 29, 2023
3f5f34c
remove skip
pixuimpou Sep 29, 2023
7860a4b
create gtfs zip constant
pixuimpou Sep 29, 2023
2d7c9cb
add gtfs zip file name
pixuimpou Sep 29, 2023
bfa6273
add csv to save raw / change filetype logic
pixuimpou Sep 29, 2023
524cd07
remove comments
pixuimpou Sep 29, 2023
3477a2c
fix csv_args default value
pixuimpou Sep 29, 2023
16e61c8
change docstring get raw api
pixuimpou Sep 29, 2023
4bdaa4f
change raw data gcs docstring
pixuimpou Sep 29, 2023
e3b7c14
remove commented task
pixuimpou Sep 29, 2023
0935cbd
change quadro primary key to list
pixuimpou Sep 29, 2023
e5bad98
update GTFS constants
lingsv Sep 29, 2023
705ba6e
Merge branch 'staging/flow-captura-generico' of github.com:prefeitura…
lingsv Sep 29, 2023
3e08d92
update flow de captura
lingsv Sep 29, 2023
d4230bb
change upload folder structure
pixuimpou Sep 29, 2023
759fb00
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Sep 29, 2023
c5c1028
Merge branch 'master' of https://github.com/prefeitura-rio/pipelines …
pixuimpou Sep 29, 2023
8074135
Merge branch 'master' into staging/flow-captura-generico
fernandascovino Sep 29, 2023
7c43d1d
undo silenciamento de falha de notificação
fernandascovino Sep 29, 2023
5c189e3
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Sep 29, 2023
350fbdf
Merge branch 'master' into staging/flow-captura-generico
fernandascovino Sep 29, 2023
f26b40a
update gtfs flow
lingsv Sep 29, 2023
685aae5
remove parametros de testes (gtfs)
fernandascovino Sep 29, 2023
cd5048e
Update pipelines/rj_smtr/constants.py
eng-rodrigocunha Sep 29, 2023
d17d161
corrige encadeamento de erros no flow
fernandascovino Sep 29, 2023
02b948a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 29, 2023
cb5ccc4
mudar estrutura do flow materializacao
pixuimpou Oct 2, 2023
fac7821
remove header treatment
pixuimpou Oct 2, 2023
e291e51
mudar agent dev para prd
pixuimpou Oct 2, 2023
e57d457
mudar agent de dev para prd
pixuimpou Oct 2, 2023
3767a56
ajustar retorno das funcoes
pixuimpou Oct 2, 2023
6564ea6
Atualiza documentação
eng-rodrigocunha Oct 2, 2023
9bafb20
Merge branch 'staging/flow-captura-generico' of github.com:prefeitura…
lingsv Oct 2, 2023
19bb0be
adicionar retorno em get_upload_storage_blob
pixuimpou Oct 2, 2023
d5168c7
Merge branch 'staging/flow-captura-generico' of https://github.com/pr…
pixuimpou Oct 2, 2023
bc87f44
Atualiza documentação
eng-rodrigocunha Oct 2, 2023
185d695
Atualiza string
eng-rodrigocunha Oct 2, 2023
05f0497
Merge branch 'master' into staging/flow-bilhetagem-materializacao
pixuimpou Oct 2, 2023
a2f3274
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 2, 2023
a0e47f5
Merge branch 'master' into staging/flow-bilhetagem-materializacao
mergify[bot] Oct 2, 2023
0b3c117
adicionar schedule de bilhetagem
pixuimpou Oct 2, 2023
6f23c4b
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Oct 2, 2023
10b5283
adicionar schedule no flow de materialização
pixuimpou Oct 2, 2023
02c856d
ajuste nome da coluna de datetime
pixuimpou Oct 2, 2023
9f1a632
Merge branch 'staging/flow-captura-generico' of github.com:prefeitura…
lingsv Oct 2, 2023
c741b55
atualização da branch
lingsv Oct 2, 2023
e898f7f
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Oct 2, 2023
1da0c4a
ajustar nome coluna
pixuimpou Oct 2, 2023
dfdf0e4
mudar coluna de data para datetime_transacao
pixuimpou Oct 2, 2023
ef114a9
ajusta variavel date_range manual
pixuimpou Oct 2, 2023
30d2f63
mudar nome parametro de variável dbt
pixuimpou Oct 2, 2023
ae2519e
adicionei detalhes do gtfs no flow
lingsv Oct 2, 2023
d130c87
Merge branch 'staging/smtr-pipeline-gtfs' of github.com:prefeitura-ri…
lingsv Oct 2, 2023
580eb94
update utils
lingsv Oct 2, 2023
262e577
update tasks e utils
lingsv Oct 2, 2023
6e6c5b2
update tasks
lingsv Oct 2, 2023
47717fc
Merge branch 'master' into staging/flow-bilhetagem-materializacao
mergify[bot] Oct 2, 2023
0ace182
refatora funcao get_raw_data_gcs
fernandascovino Oct 3, 2023
207df99
refatora parametros do flow generico
fernandascovino Oct 3, 2023
841f717
Merge branch 'master' into staging/smtr-pipeline-gtfs
lingsv Oct 3, 2023
fb9d014
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 3, 2023
577caed
cria flow de orquestração materialização
pixuimpou Oct 3, 2023
5c73967
volta notificação do discord
pixuimpou Oct 3, 2023
e80166e
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Oct 3, 2023
f15053a
ajusta wait_flow_run
pixuimpou Oct 3, 2023
8feccd1
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Oct 3, 2023
ff516e9
Merge branch 'master' into staging/flow-bilhetagem-materializacao
mergify[bot] Oct 3, 2023
e17c626
mudar query para teste
pixuimpou Oct 3, 2023
2e57929
Merge branch 'staging/flow-bilhetagem-materializacao' of https://gith…
pixuimpou Oct 3, 2023
79d78ff
atualização arquivos do gtfs
lingsv Oct 3, 2023
0b5c519
Merge branch 'staging/smtr-pipeline-gtfs' of github.com:prefeitura-ri…
lingsv Oct 3, 2023
16f7f9e
reverter query teste
pixuimpou Oct 3, 2023
447cff1
usar copy no dicionario de variaveis de data
pixuimpou Oct 3, 2023
cb63277
update arquivos gtfs
lingsv Oct 4, 2023
a516ff0
Merge branches 'staging/flow-bilhetagem-materializacao' and 'staging/…
lingsv Oct 4, 2023
46a6250
alteração do título do flow
lingsv Oct 6, 2023
106ce54
Merge branch 'master' into staging/smtr-pipeline-gtfs
fernandascovino Oct 6, 2023
2012e87
atualizei constantes de quadro para ordem_servico
lingsv Oct 9, 2023
13b9eb7
update docstring
lingsv Oct 9, 2023
133704b
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Oct 10, 2023
ce42239
adicionei ordem_servico às constantes novamente
lingsv Oct 10, 2023
a63d149
inseri stop_times nas constants
lingsv Oct 11, 2023
2535644
iniciei as configurações do flow de materialização
lingsv Oct 11, 2023
3332d31
update flow do gtfs
lingsv Oct 19, 2023
cb6acfa
excluí arquivo de schedule da pasta do gtfs
lingsv Oct 19, 2023
0442e44
alterei formato de constante
lingsv Oct 19, 2023
8e98df1
tsete de constante
lingsv Oct 19, 2023
6069e50
update flow
lingsv Oct 19, 2023
c1e42dd
teste flow
lingsv Oct 19, 2023
6619a30
consertei parte final do flow
lingsv Oct 19, 2023
d274fb0
consertei codeowner
lingsv Oct 19, 2023
1f838dd
[WIP] arrumei problemas no flow
lingsv Oct 19, 2023
baab517
teste de parâmetros
lingsv Oct 19, 2023
3048f56
update params
lingsv Oct 19, 2023
80adad6
update
lingsv Oct 19, 2023
9e16236
update flow
lingsv Oct 19, 2023
e4e0c8a
update flow
lingsv Oct 19, 2023
4a229fc
retirei parâmetro desnecessário do flow de materailização
lingsv Oct 19, 2023
4e2214d
update
lingsv Oct 19, 2023
ed9dc65
troca de aspas
lingsv Oct 19, 2023
9c55f71
alterações nos parâmetros
lingsv Oct 19, 2023
5671302
Merge branch 'master' into staging/smtr-pipeline-gtfs
lingsv Oct 19, 2023
8aaa2ea
alteração de parâmetros
lingsv Oct 19, 2023
9b75eb6
retirei unmapped do flow
lingsv Oct 19, 2023
cfa63c3
update
lingsv Oct 19, 2023
ae0f0e5
update flow
lingsv Oct 19, 2023
ec40cf5
adicionei unmapped
lingsv Oct 19, 2023
5667fd8
wip
lingsv Oct 19, 2023
eecc00a
WIP TESTE
lingsv Oct 19, 2023
5f79a55
update
lingsv Oct 20, 2023
7764fc6
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Oct 20, 2023
950188f
passei parâmetros da run captura com unmapped
lingsv Oct 20, 2023
67a49fa
update constants
eng-rodrigocunha Oct 20, 2023
7012428
update default_capture_flow params
eng-rodrigocunha Oct 20, 2023
65e8d98
update create_date_hour_partition params
eng-rodrigocunha Oct 20, 2023
23d5ffe
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Oct 20, 2023
53370ec
atualiza parâmetros gtfs_captura
eng-rodrigocunha Oct 20, 2023
2e0a058
Merge branch 'staging/smtr-pipeline-gtfs' of https://github.com/prefe…
eng-rodrigocunha Oct 20, 2023
7db3e8c
revert .gitignore
eng-rodrigocunha Oct 20, 2023
dd3229a
Revert README.md
eng-rodrigocunha Oct 20, 2023
07c0143
revert rj_smtr flows
eng-rodrigocunha Oct 20, 2023
bab5a02
revert rj_smtr tasks
eng-rodrigocunha Oct 20, 2023
00ec870
altera funções úteis rj_smtr
eng-rodrigocunha Oct 20, 2023
7a6e58e
reverte tasks rj_smtr
eng-rodrigocunha Oct 20, 2023
0cb1bf0
cria parâmetros para captura e/ou materialização
eng-rodrigocunha Oct 20, 2023
790f01c
Aprimora logs
eng-rodrigocunha Oct 20, 2023
71a3072
remove tasks em desuso
eng-rodrigocunha Oct 20, 2023
a384b6e
Cria task get_upstream_tasks
eng-rodrigocunha Oct 20, 2023
3c41158
adiciona task para wait_captura
eng-rodrigocunha Oct 20, 2023
97acf9d
atualiza flow gtfs_captura_tratamento
eng-rodrigocunha Oct 20, 2023
9591330
Atualiza gtfs_captura_tratamento flow
eng-rodrigocunha Oct 20, 2023
1f92c48
add data_versao_gtfs na task create_dbt_run_vars
eng-rodrigocunha Oct 20, 2023
1b08e2b
altera parâmetros de materialização do gtfs
eng-rodrigocunha Oct 20, 2023
d4894f4
muda agents dos flows de gtfs
eng-rodrigocunha Oct 20, 2023
1adde7a
Merge branch 'master' into staging/smtr-pipeline-gtfs
mergify[bot] Oct 23, 2023
a8b19fc
Merge branch 'master' into staging/smtr-pipeline-gtfs
eng-rodrigocunha Oct 23, 2023
43d2bb4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 23, 2023
6ce0add
alteração do project_name para constante do escritório.
lingsv Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipelines/rj_smtr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
from pipelines.rj_smtr.veiculo.flows import *
from pipelines.rj_smtr.example.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_gtfs.flows import *
Empty file.
135 changes: 135 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
"""
Flows for gtfs
"""
from copy import deepcopy

# Imports #

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefect import Parameter, case, task
from prefect.tasks.control_flow import merge


# EMD Imports #

from pipelines.constants import constants as emd_constants
from pipelines.utils.utils import set_default_parameters
from pipelines.utils.decorators import Flow
from pipelines.utils.tasks import (
rename_current_flow_run_now_time,
get_current_flow_labels,
)

# SMTR Imports #
from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.tasks import (
get_current_timestamp,
)

from pipelines.rj_smtr.flows import default_capture_flow, default_materialization_flow

# SETUP dos Flows

gtfs_captura = deepcopy(default_capture_flow)
gtfs_captura.name = "SMTR - Captura dos dados do GTFS"
gtfs_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
gtfs_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
gtfs_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
gtfs_captura = set_default_parameters(
flow=gtfs_captura,
default_parameters=constants.GTFS_GENERAL_CAPTURE_PARAMS.value,
)

gtfs_materializacao = deepcopy(default_materialization_flow)
gtfs_materializacao.name = "SMTR - Materialização dos dados do GTFS"
gtfs_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
gtfs_materializacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
gtfs_materializacao = set_default_parameters(
flow=gtfs_materializacao,
default_parameters=constants.GTFS_MATERIALIZACAO_PARAMS.value,
)

with Flow(
"SMTR: GTFS - Captura/Tratamento",
code_owners=["rodrigo", "carolinagomes"],
) as gtfs_captura_tratamento:
# SETUP

data_versao_gtfs = Parameter("data_versao_gtfs", default=None)
capture = Parameter("capture", default=True)
materialize = Parameter("materialize", default=True)

timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix=gtfs_captura_tratamento.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

with case(capture, True):
gtfs_capture_parameters = [
{"timestamp": data_versao_gtfs, **d}
for d in constants.GTFS_TABLE_CAPTURE_PARAMS.value
]

run_captura = create_flow_run.map(
flow_name=unmapped(gtfs_captura.name),
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=gtfs_capture_parameters,
labels=unmapped(LABELS),
)

wait_captura_true = wait_for_flow_run.map(
run_captura,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)

with case(capture, False):
wait_captura_false = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
)()

wait_captura = merge(wait_captura_true, wait_captura_false)

with case(materialize, True):
gtfs_materializacao_parameters = {
"dbt_vars": {
"data_versao_gtfs": data_versao_gtfs,
"version": {},
},
}

run_materializacao = create_flow_run(
flow_name=gtfs_materializacao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
parameters=gtfs_materializacao_parameters,
labels=LABELS,
upstream_tasks=[wait_captura],
)

wait_materializacao = wait_for_flow_run(
run_materializacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

gtfs_captura_tratamento.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
gtfs_captura_tratamento.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
75 changes: 75 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,78 @@ class constants(Enum): # pylint: disable=c0103
"version": {},
},
}

# GTFS
GTFS_DATASET_ID = "br_rj_riodejaneiro_gtfs"

GTFS_GENERAL_CAPTURE_PARAMS = {
"partition_date_only": True,
"source_type": "gcs",
"dataset_id": "br_rj_riodejaneiro_gtfs",
"extract_params": {"filename": "gtfs"},
"partition_date_name": "data_versao",
}

GTFS_TABLE_CAPTURE_PARAMS = [
{
"table_id": "agency",
"primary_key": ["agency_id"],
},
{
"table_id": "calendar_dates",
"primary_key": ["service_id", "date"],
},
{
"table_id": "calendar",
"primary_key": ["service_id"],
},
{
"table_id": "feed_info",
"primary_key": ["feed_publisher_name"],
},
{
"table_id": "frequencies",
"primary_key": ["trip_id", "start_time"],
},
{
"table_id": "routes",
"primary_key": ["route_id"],
},
{
"table_id": "shapes",
"primary_key": ["shape_id", "shape_pt_sequence"],
},
{
"table_id": "stops",
"primary_key": ["stop_id"],
},
{
"table_id": "stop_times",
"primary_key": ["trip_id", "stop_sequence"],
},
{
"table_id": "trips",
"primary_key": ["trip_id"],
},
{
"table_id": "fare_attributes",
"primary_key": ["fare_id"],
},
{
"table_id": "fare_rules",
"primary_key": [],
},
{
"table_id": "ordem_servico",
"primary_key": ["servico"],
"extract_params": {"filename": "ordem_servico"},
},
]

GTFS_MATERIALIZACAO_PARAMS = {
"dataset_id": GTFS_DATASET_ID,
"dbt_vars": {
"data_versao_gtfs": "",
"version": {},
},
}
10 changes: 8 additions & 2 deletions pipelines/rj_smtr/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
interval_minutes = Parameter("interval_minutes", default=None)
recapture = Parameter("recapture", default=False)
recapture_window_days = Parameter("recapture_window_days", default=1)
timestamp = Parameter("timestamp", default=None)
partition_date_name = Parameter("partition_date_name", default=None)

# Parâmetros Pré-tratamento #
primary_key = Parameter("primary_key", default=None)
Expand All @@ -70,7 +72,9 @@
checkpoint=False,
)

current_timestamp = get_rounded_timestamp(interval_minutes=interval_minutes)
current_timestamp = get_rounded_timestamp(
timestamp=timestamp, interval_minutes=interval_minutes
)

with case(recapture, True):
_, recapture_timestamps, recapture_previous_errors = query_logs(
Expand All @@ -96,7 +100,9 @@
)

partitions = create_date_hour_partition.map(
timestamps, partition_date_only=unmapped(partition_date_only)
timestamps,
partition_date_name=unmapped(partition_date_name),
partition_date_only=unmapped(partition_date_only),
)

filenames = parse_timestamp_to_string.map(timestamps)
Expand Down
35 changes: 30 additions & 5 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ def create_dbt_run_vars(

log(f"Creating DBT variables. Parameter received: {dbt_vars}")

if (not dbt_vars) or (not table_id):
log("dbt_vars or table_id are blank. Skiping task")
if not dbt_vars:
log("dbt_vars are blank. Skiping task...")
return [None], None, False

final_vars = []
Expand All @@ -187,6 +187,10 @@ def create_dbt_run_vars(
}
# Create date_range using Redis
else:
if not table_id:
log("table_id are blank. Skiping task...")
return [None], None, False

raw_table_id = raw_table_id or table_id

date_var = get_materialization_date_range.run(
Expand Down Expand Up @@ -230,6 +234,16 @@ def create_dbt_run_vars(

log(f"version created: {dataset_sha}")

if "data_versao_gtfs" in dbt_vars.keys():
log("Creating data_versao_gtfs variable")

temp_dict = {"data_versao_gtfs": dbt_vars["data_versao_gtfs"]}

if final_vars:
final_vars = get_join_dict.run(dict_list=final_vars, new_dict=temp_dict)
else:
final_vars.append(temp_dict)

log(f"All variables was created, final value is: {final_vars}")

return final_vars, date_var, flag_date_range
Expand Down Expand Up @@ -302,19 +316,22 @@ def get_current_timestamp(timestamp=None, truncate_minute: bool = True) -> datet

@task
def create_date_hour_partition(
timestamp: datetime, partition_date_only: bool = False
timestamp: datetime,
partition_date_name: str = "data",
partition_date_only: bool = False,
) -> str:
"""
Create a date (and hour) Hive partition structure from timestamp.

Args:
timestamp (datetime): timestamp to be used as reference
partition_date_name (str, optional): partition name. Defaults to "data".
partition_date_only (bool, optional): whether to add hour partition or not

Returns:
str: partition string
"""
partition = f"data={timestamp.strftime('%Y-%m-%d')}"
partition = f"{partition_date_name}={timestamp.strftime('%Y-%m-%d')}"
if not partition_date_only:
partition += f"/hora={timestamp.strftime('%H')}"
return partition
Expand Down Expand Up @@ -388,11 +405,16 @@ def save_treated_local(file_path: str, status: dict, mode: str = "staging") -> s
Returns:
str: Path to the saved file
"""

log(f"Saving treated data to: {file_path}, {status}")

_file_path = file_path.format(mode=mode, filetype="csv")

Path(_file_path).parent.mkdir(parents=True, exist_ok=True)
if status["error"] is None:
status["data"].to_csv(_file_path, index=False)
log(f"Treated data saved to: {_file_path}")

return _file_path


Expand Down Expand Up @@ -545,7 +567,7 @@ def get_raw( # pylint: disable=R0912
params (dict, optional): Params to be sent on request

Returns:
dict: Conatining keys
dict: Containing keys
* `data` (json): data result
* `error` (str): catched error, if any. Otherwise, returns None
"""
Expand Down Expand Up @@ -636,6 +658,9 @@ def create_request_params(
"query": extract_params["query"].format(**datetime_range),
}

elif dataset_id == constants.GTFS_DATASET_ID.value:
request_params = extract_params["filename"]

return request_params, request_url


Expand Down
2 changes: 2 additions & 0 deletions pipelines/rj_smtr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,13 @@ def get_upload_storage_blob(
Blob: blob object
"""
bucket = bd.Storage(dataset_id="", table_id="")
log(f"Filename: {filename}, dataset_id: {dataset_id}")
blob_list = list(
bucket.client["storage_staging"]
.bucket(bucket.bucket_name)
.list_blobs(prefix=f"upload/{dataset_id}/{filename}.")
)

return blob_list[0]


Expand Down