Skip to content

Commit

Permalink
Merge branch 'main' into staging/add-riocard-jae
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 5, 2024
2 parents 4d525e9 + b0090f7 commit 1c05fe3
Show file tree
Hide file tree
Showing 44 changed files with 520 additions and 131 deletions.
11 changes: 10 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
[flake8]
max-line-length = 100
max-line-length = 100
extend-ignore =
E231,
E221,
E702,
E202,
E271,
E272,
E225,
E222
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,55 @@
# Pipelines rj-smtr
# Pipelines rj-smtr


## Setup

### Etapa 1 - Preparação de Ambiente

- Na raiz do projeto, crie um ambiente virtual para isolar as dependencias:
- `python3.10 -m venv .pipelines`

- Ative o ambiente virtual:
- `. .pipelines/bin/activate`

- Instale as dependencias do projeto:
- `poetry install --all-extras`
- `pip install -e .`

- Crie um arquivo `.env` na raiz do projeto, contendo as seguintes variáveis:
- ```
INFISICAL_ADDRESS = ''
INFISICAL_TOKEN = ''
- Solicite os valores a serem utilizados para a equipe de devops
- Adicione as variáveis de ambiente à sua sessão de terminal:
- `set -a && source .env && set +a`
### Testando flows localmente:
- Adicione um arquivo `test.py` na raiz do projeto:
- Neste arquivo, você deve importar o flow a ser testado
- Importar a função `run_local`
- `from pipelines.utils.prefect import run_local`
- A assinatura da função é a seguinte:
`run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None)`
Permitindo que se varie os parâmetros a serem passados ao flow durante
uma execução local
- Use `run_local(<flow_a_ser_testado>)` e execute o arquivo:
- `python test.py`
- Uma dica interessante que pode ajudar no processo de teste e debug é adicionar
`| tee logs.txt`
ao executar seu teste.
- Isso gerará um arquivo com os logs daquela execução, para que você possa
analisar esses logs mais facilmente do que usando somente o terminal.
### Etapa 2 - Deploy para staging e PR
- Sempre trabalhe com branchs `staging/<nome>`
- Dê push e abra seu Pull Request.
- Cada commit nesta branch irá disparar as rotinas do Github que:
- Verificam formatação
- Fazem Deploy
- Registram flows em staging (ambiente de testes)
- Você acompanha o status destas rotinas na própria página do seu PR
- Flows registrados aparecem no servidor Prefect. Eles podem ser rodados por lá
8 changes: 4 additions & 4 deletions pipelines/capture/templates/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ def transform_raw_to_nested_structure(
if print_inputs:
log(
f"""
Received inputs:
- timestamp:\n{timestamp}
- data:\n{data.head()}"""
Received inputs:
- timestamp:\n{timestamp}
- data:\n{data.head()}"""
)

if data.empty:
Expand All @@ -220,7 +220,7 @@ def transform_raw_to_nested_structure(
log(f"timestamp column = {timestamp}", level="info")

log(
f"Finished nested structure! Data:\n{data_info_str(data)}",
f"Finished nested structure! Data: \n{data_info_str(data)}",
level="info",
)

Expand Down
2 changes: 1 addition & 1 deletion pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class constants(Enum): # pylint: disable=c0103
"stops": ["stop_id"],
"trips": ["trip_id"],
"fare_attributes": ["fare_id"],
"fare_rules": [],
"fare_rules": ["fare_id", "route_id"],
"stop_times": ["trip_id", "stop_sequence"],
}

Expand Down
6 changes: 4 additions & 2 deletions pipelines/janitor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# from typing import Dict, List
import traceback
from datetime import datetime
from typing import Dict, List

import requests
from prefect import task
Expand All @@ -11,6 +10,8 @@
from pipelines.utils.secret import get_secret
from pipelines.utils.utils import log

# from typing import Dict, List


@task
def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_project="production"):
Expand Down Expand Up @@ -54,7 +55,8 @@ def query_not_active_flows(flows, prefect_client=None, prefect_project="producti
flow_name, last_version = flows
now = datetime.now().isoformat()
query = """
query($flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int, $project_name:String){
query(\
$flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int, $project_name:String){
flow(
where:{
name: {_eq:$flow_name},
Expand Down
21 changes: 21 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
# Changelog - gtfs

## Adicionado

## [1.0.6] - 2024-08-02

- Adiciona filtro para os nomes de tabs da planilha de controle os na task `get_raw_drive_files` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/128/files)

- Adiociona etapa de remover pontos antes da converção de metro para km no processamento da OS (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/129)

## Corrigido

## [1.0.5] - 2024-07-23

- Corrigido o parse da data_versao_gtf (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/118)

## [1.0.4] - 2024-07-17

### Adicionado

- Adiciona parametros para a captura manual do gtfs no flow `SMTR: GTFS - Captura/Tratamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82/)

## [1.0.3] - 2024-07-04

## Corrigido

- Corrigido o formato da data salva no redis de d/m/y para y-m-d (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/91)


## [1.0.2] - 2024-06-21

### Adicionado
Expand Down
164 changes: 100 additions & 64 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
"""
Flows for gtfs
DBT: 2024-07-15
DBT: 2024-07-23
"""

from prefect import Parameter, case, task
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.control_flow import merge
from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow

Expand Down Expand Up @@ -98,94 +99,129 @@
# )

with Flow("SMTR: GTFS - Captura/Tratamento") as gtfs_captura_nova:
capture = Parameter("capture", default=True)
materialize = Parameter("materialize", default=True)
materialize_only = Parameter("materialize_only", default=False)
regular_sheet_index = Parameter("regular_sheet_index", default=None)
data_versao_gtfs_param = Parameter("data_versao_gtfs", default=None)

mode = get_current_flow_mode()
last_captured_os = get_last_capture_os(mode=mode, dataset_id=constants.GTFS_DATASET_ID.value)

timestamp = get_scheduled_timestamp()

flag_new_os, os_control, data_index, data_versao_gtfs = get_os_info(
last_captured_os=last_captured_os
)

with case(flag_new_os, True):
rename_current_flow_run_now_time(
prefix=gtfs_captura_nova.name + ' ["' + data_versao_gtfs + '"] ', now_time=timestamp
)

data_versao_gtfs = get_current_timestamp(data_versao_gtfs)

partition = create_date_hour_partition(
timestamp=data_versao_gtfs, partition_date_name="data_versao", partition_date_only=True
)

filename = parse_timestamp_to_string(data_versao_gtfs)

table_ids = task(lambda: list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()))()

local_filepaths = create_local_partition_path.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=table_ids,
partitions=unmapped(partition),
filename=unmapped(filename),
data_versao_gtfs_task = None
last_captured_os_none = None
with case(data_versao_gtfs_param, None):
last_captured_os_redis = get_last_capture_os(
mode=mode, dataset_id=constants.GTFS_DATASET_ID.value
)

raw_filepaths, primary_keys = get_raw_drive_files(
os_control=os_control, local_filepath=local_filepaths
)
last_captured_os = merge(last_captured_os_none, last_captured_os_redis)
with case(materialize_only, False):
timestamp = get_scheduled_timestamp()

transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map(
raw_filepath=raw_filepaths,
filepath=local_filepaths,
primary_key=primary_keys,
timestamp=unmapped(data_versao_gtfs),
error=unmapped(None),
flag_new_os, os_control, data_index, data_versao_gtfs_task = get_os_info(
last_captured_os=last_captured_os, data_versao_gtfs=data_versao_gtfs_param
)

errors, treated_filepaths = unpack_mapped_results_nout2(
mapped_results=transform_raw_to_nested_structure_results
)
with case(flag_new_os, True):
rename_current_flow_run_now_time(
prefix=gtfs_captura_nova.name + ' ["' + data_versao_gtfs_task + '"] ',
now_time=timestamp,
)

data_versao_gtfs_task = get_current_timestamp(data_versao_gtfs_task)

partition = create_date_hour_partition(
timestamp=data_versao_gtfs_task,
partition_date_name="data_versao",
partition_date_only=True,
)

filename = parse_timestamp_to_string(data_versao_gtfs_task)

table_ids = task(lambda: list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()))()

local_filepaths = create_local_partition_path.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=table_ids,
partitions=unmapped(partition),
filename=unmapped(filename),
)

raw_filepaths, primary_keys = get_raw_drive_files(
os_control=os_control,
local_filepath=local_filepaths,
regular_sheet_index=regular_sheet_index,
)

transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map(
raw_filepath=raw_filepaths,
filepath=local_filepaths,
primary_key=primary_keys,
timestamp=unmapped(data_versao_gtfs_task),
error=unmapped(None),
)

errors, treated_filepaths = unpack_mapped_results_nout2(
mapped_results=transform_raw_to_nested_structure_results
)

errors = upload_raw_data_to_gcs.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=table_ids,
raw_filepath=raw_filepaths,
partitions=unmapped(partition),
error=unmapped(None),
)

wait_captura_true = upload_staging_data_to_gcs.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=table_ids,
staging_filepath=treated_filepaths,
partitions=unmapped(partition),
timestamp=unmapped(data_versao_gtfs_task),
error=errors,
)
with case(materialize_only, True):
wait_captura_false = task()

data_versao_gtfs_merge = merge(data_versao_gtfs_task, data_versao_gtfs_param)
wait_captura = merge(wait_captura_true, wait_captura_false)

verifica_materialize = task(lambda data_versao: data_versao is not None)(
data_versao=data_versao_gtfs_merge
)

errors = upload_raw_data_to_gcs.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=table_ids,
raw_filepath=raw_filepaths,
partitions=unmapped(partition),
error=unmapped(None),
with case(verifica_materialize, True):
data_versao_gtfs_is_str = task(lambda data_versao: isinstance(data_versao, str))(
data_versao_gtfs_merge
)
with case(data_versao_gtfs_is_str, False):
string_data_versao_gtfs = parse_timestamp_to_string(
timestamp=data_versao_gtfs_merge, pattern="%Y-%m-%d"
)

wait_upload_staging_data_to_gcs = upload_staging_data_to_gcs.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=table_ids,
staging_filepath=treated_filepaths,
partitions=unmapped(partition),
timestamp=unmapped(data_versao_gtfs),
error=errors,
)
data_versao_gtfs = merge(string_data_versao_gtfs, data_versao_gtfs_merge)

string_data_versao_gtfs = parse_timestamp_to_string(
timestamp=data_versao_gtfs, pattern="%Y-%m-%d"
)
version = fetch_dataset_sha(dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value)
dbt_vars = get_join_dict([{"data_versao_gtfs": string_data_versao_gtfs}], version)[0]
dbt_vars = get_join_dict([{"data_versao_gtfs": data_versao_gtfs}], version)[0]

wait_run_dbt_model = run_dbt_model(
dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value,
_vars=dbt_vars,
).set_upstream(task=wait_upload_staging_data_to_gcs)
).set_upstream(task=wait_captura)

update_last_captured_os(
wait_materialize_true = update_last_captured_os(
dataset_id=constants.GTFS_DATASET_ID.value,
data_index=data_index,
mode=mode,
).set_upstream(task=wait_run_dbt_model)

with case(verifica_materialize, False):
wait_materialize_false = task()

wait_materialize = merge(wait_materialize_true, wait_materialize_false)

with case(flag_new_os, False):
rename_current_flow_run_now_time(
prefix=gtfs_captura_nova.name + " [SKIPPED] ", now_time=timestamp
)
).set_upstream(task=wait_materialize)


gtfs_captura_nova.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
Expand Down
Loading

0 comments on commit 1c05fe3

Please sign in to comment.