Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Viagens 2.0 - Validação de viagens #237

Merged
merged 118 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
bce9db5
testa modelo python
pixuimpou Sep 10, 2024
cab4c3b
cria divisao shape
pixuimpou Sep 10, 2024
c07a87c
cria tabelas segmento
pixuimpou Sep 12, 2024
37301fc
add source dados_mestres
pixuimpou Sep 13, 2024
b7b57c3
cria tabela segmento_shape
pixuimpou Sep 13, 2024
9eac0d9
correcoes prod
pixuimpou Sep 13, 2024
2582a22
correcao logica segmentacao
pixuimpou Sep 13, 2024
900a760
cria aux_shapes_geom_filtrada
pixuimpou Sep 13, 2024
10f1439
corrige tratamento tunel
pixuimpou Sep 17, 2024
40f1c37
Merge branch 'main' into staging/python-dbt
pixuimpou Sep 17, 2024
f907db9
corrige comparacao
pixuimpou Sep 17, 2024
2be2444
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 17, 2024
d372593
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 23, 2024
cbdfe1d
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 24, 2024
dd3825a
viagens 2.0
pixuimpou Sep 25, 2024
072536e
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Sep 25, 2024
bbee92a
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 25, 2024
620ed00
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 25, 2024
4668c17
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 25, 2024
b2effce
Merge branch 'main' into staging/segmento-shape
mergify[bot] Sep 27, 2024
c0094ff
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 3, 2024
710675d
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 4, 2024
a7667be
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 4, 2024
54c456b
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 6, 2024
64901b0
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 7, 2024
3b9a15b
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 8, 2024
4a01fae
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 9, 2024
fd0c0dc
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 9, 2024
ec6dc51
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 9, 2024
d60eed7
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 9, 2024
984daed
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 10, 2024
6c2b720
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 16, 2024
64a61d6
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 16, 2024
d8cff6f
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 16, 2024
f626975
corrige logica tunel
pixuimpou Oct 16, 2024
e63b30c
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 17, 2024
0325530
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 21, 2024
4e8d709
Merge branch 'main' into staging/segmento-shape
pixuimpou Oct 22, 2024
2bb123c
Update viagem_informada_monitoramento.sql
pixuimpou Oct 22, 2024
92f084d
remove database dev
pixuimpou Oct 23, 2024
7540b11
remove teste
pixuimpou Oct 23, 2024
2662746
cria arquivo viagem_planejada
pixuimpou Oct 23, 2024
8095022
calendario gtfs
pixuimpou Oct 23, 2024
d757061
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 23, 2024
d29670b
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 24, 2024
1c31ead
cria tabela calendario
pixuimpou Oct 24, 2024
46e3203
cria tabela viagem_planejada
pixuimpou Oct 24, 2024
684d283
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Oct 24, 2024
4aa9b68
Merge branch 'main' into staging/segmento-shape
mergify[bot] Oct 29, 2024
d616147
ajusta tipo_dia sabado
pixuimpou Oct 31, 2024
6415273
adiciona dados da os
pixuimpou Oct 31, 2024
0b4ff2e
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Oct 31, 2024
2bc063a
altera variavel filtro / altera fonte para viagem_informada
pixuimpou Nov 4, 2024
588f12e
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 4, 2024
4670f12
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 5, 2024
bb9ed38
corrige viagens duplicadas
pixuimpou Nov 6, 2024
5e93a8b
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Nov 6, 2024
8f5135a
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 6, 2024
0009c10
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 6, 2024
e4ddad6
adiciona coluna modo / descomenta refs
pixuimpou Nov 7, 2024
f66e7ff
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 7, 2024
2d0cbb2
altera filtro do gps
pixuimpou Nov 7, 2024
7733cf7
descomenta refs
pixuimpou Nov 7, 2024
24063b1
cria schema viagem_validacao
pixuimpou Nov 7, 2024
167db61
add colunas de controle
pixuimpou Nov 7, 2024
1286a11
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Nov 7, 2024
5b4f913
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 7, 2024
4b8aa10
cria selectors viagens 2
pixuimpou Nov 8, 2024
30719b5
add segmento_shape e shape_geom
pixuimpou Nov 8, 2024
58c5029
add tipo_dia
pixuimpou Nov 8, 2024
ae85259
add modo
pixuimpou Nov 8, 2024
45b2b6b
add indicador de serviço nao planejado
pixuimpou Nov 8, 2024
6da7e1b
adiciona indicador de viagem nao planejada
pixuimpou Nov 8, 2024
a6a3201
altera data inicial
pixuimpou Nov 8, 2024
570a02e
altera ordem colunas
pixuimpou Nov 8, 2024
776b428
cria pipeline validacao viagens
pixuimpou Nov 8, 2024
c882c1d
cria pipeline diario de planejamento
pixuimpou Nov 8, 2024
7cd7f04
adiciona integração com tabelas antigas
pixuimpou Nov 8, 2024
4275b7f
add novos crons
pixuimpou Nov 8, 2024
82537ec
importa flows planejamento
pixuimpou Nov 8, 2024
96d180f
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Nov 8, 2024
19423b2
altera target para hmg
pixuimpou Nov 8, 2024
bc021a6
ajusta lock
pixuimpou Nov 8, 2024
1d47abd
corrige wait
pixuimpou Nov 8, 2024
4bdab15
add change log
pixuimpou Nov 8, 2024
dc43be8
remove viagem planejada
pixuimpou Nov 8, 2024
0f482e6
remove execução do calendario junto com gtfs
pixuimpou Nov 8, 2024
3202631
add conversao de tz
pixuimpou Nov 8, 2024
c7b9929
remove conversao extra
pixuimpou Nov 8, 2024
8ee702b
converte tz
pixuimpou Nov 8, 2024
f1d47e4
teste
pixuimpou Nov 8, 2024
cdfcfe1
descomenta wait
pixuimpou Nov 8, 2024
4420c28
add variaveis
pixuimpou Nov 8, 2024
d9fc0c5
altera target
pixuimpou Nov 8, 2024
c6c909f
adiciona docs
pixuimpou Nov 8, 2024
0aa6cbd
corrige filtro incremental
pixuimpou Nov 11, 2024
4f3f145
altera logica viagem nao planejada
pixuimpou Nov 11, 2024
26e8cf1
altera logica coluna modo
pixuimpou Nov 11, 2024
83b5673
add doc
pixuimpou Nov 11, 2024
f7ae526
add docs feed_version
pixuimpou Nov 11, 2024
43b455f
add parametro_validacao
pixuimpou Nov 11, 2024
331817b
altera project
pixuimpou Nov 11, 2024
b7b5769
Merge branch 'main' into staging/segmento-shape
pixuimpou Nov 11, 2024
ad8109a
corrige lock
pixuimpou Nov 11, 2024
8f16b11
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Nov 11, 2024
216879c
att lock
pixuimpou Nov 11, 2024
75b9dec
add 1 middle_index
pixuimpou Nov 11, 2024
d4dcfc6
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 11, 2024
716b019
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 12, 2024
7816efe
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 12, 2024
363df3c
add indicadores
pixuimpou Nov 12, 2024
cad6335
add buffer e comprimento python
pixuimpou Nov 12, 2024
59bbbb9
Merge branch 'staging/segmento-shape' of https://github.com/prefeitur…
pixuimpou Nov 12, 2024
0ade9f3
verifica alterações planejamento
pixuimpou Nov 12, 2024
b444c7c
add vars
pixuimpou Nov 12, 2024
1614b9e
altera round
pixuimpou Nov 13, 2024
1d12aa6
altera nomes variaveis
pixuimpou Nov 13, 2024
9f2e02a
Merge branch 'main' into staging/segmento-shape
mergify[bot] Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
from pipelines.migration.veiculo.flows import * # noqa
from pipelines.serpro.flows import * # noqa
from pipelines.treatment.monitoramento.flows import * # noqa
from pipelines.treatment.planejamento.flows import * # noqa
1 change: 1 addition & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
+ " "
+ constants.PLANEJAMENTO_MATERIALIZACAO_DATASET_ID.value,
_vars=dbt_vars,
exclude="calendario",
).set_upstream(task=wait_captura)

wait_materialize_true = update_last_captured_os(
Expand Down
3 changes: 3 additions & 0 deletions pipelines/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
from pipelines.constants import constants
from pipelines.constants import constants as emd_constants

cron_every_day_hour_1 = "0 1 * * *"
cron_every_day_hour_6 = "0 6 * * *"
cron_every_day_hour_7 = "0 7 * * *"
cron_every_day_hour_7_minute_10 = "10 7 * * *"
cron_every_hour_minute_6 = "6 * * * *"


def generate_interval_schedule(
Expand Down
6 changes: 6 additions & 0 deletions pipelines/treatment/monitoramento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - monitoramento

## [1.1.0] - 2024-11-08

### Adicionado

- Cria flow de tratamento de validação de viagens informadas (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/237)

## [1.0.0] - 2024-10-21

### Adicionado
Expand Down
9 changes: 8 additions & 1 deletion pipelines/treatment/monitoramento/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from enum import Enum

from pipelines.schedules import cron_every_day_hour_7_minute_10
from pipelines.schedules import cron_every_day_hour_6, cron_every_day_hour_7_minute_10
from pipelines.treatment.templates.utils import DBTSelector


Expand All @@ -20,3 +20,10 @@ class constants(Enum): # pylint: disable=c0103
schedule_cron=cron_every_day_hour_7_minute_10,
initial_datetime=datetime(2024, 10, 16, 0, 0, 0),
)

VIAGEM_VALIDACAO_SELECTOR = DBTSelector(
name="viagem_validacao",
schedule_cron=cron_every_day_hour_6,
initial_datetime=datetime(2024, 10, 12, 0, 0, 0),
incremental_delay_hours=48,
)
33 changes: 33 additions & 0 deletions pipelines/treatment/monitoramento/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
constants as rioonibus_source_constants,
)
from pipelines.constants import constants as smtr_constants
from pipelines.migration.br_rj_riodejaneiro_onibus_gps_zirix.constants import (
constants as gps_zirix_constants,
)
from pipelines.schedules import cron_every_hour_minute_6
from pipelines.treatment.monitoramento.constants import constants
from pipelines.treatment.planejamento.constants import (
constants as planejamento_constants,
)
from pipelines.treatment.templates.flows import create_default_materialization_flow

VIAGEM_INFORMADA_MATERIALIZACAO = create_default_materialization_flow(
Expand All @@ -18,3 +25,29 @@
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[rioonibus_source_constants.VIAGEM_INFORMADA_SOURCE.value],
)

VIAGEM_VALIDACAO_MATERIALIZACAO = create_default_materialization_flow(
flow_name="viagem_validacao - materializacao",
selector=constants.VIAGEM_VALIDACAO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
constants.VIAGEM_INFORMADA_SELECTOR.value,
planejamento_constants.PLANEJAMENTO_DIARIO_SELECTOR.value,
{
"redis_key": f"{smtr_constants.GPS_SPPO_DATASET_ID.value}\
.{smtr_constants.GPS_SPPO_TREATED_TABLE_ID.value}",
"dict_key": "last_run_timestamp",
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"delay_hours": smtr_constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
"schedule_cron": cron_every_hour_minute_6,
},
{
"redis_key": f"{gps_zirix_constants.GPS_SPPO_ZIRIX_RAW_DATASET_ID.value}\
.{smtr_constants.GPS_SPPO_TREATED_TABLE_ID.value}",
"dict_key": "last_run_timestamp",
"datetime_format": "%Y-%m-%dT%H:%M:%S",
"delay_hours": smtr_constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
"schedule_cron": cron_every_hour_minute_6,
},
],
)
7 changes: 7 additions & 0 deletions pipelines/treatment/planejamento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Changelog - planejamento

## [1.0.0] - 2024-11-08

### Adicionado

- Cria flow de tratamento diário de dados de planejamento (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/237)
Empty file.
22 changes: 22 additions & 0 deletions pipelines/treatment/planejamento/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para materialização dos dados de planejamento
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import cron_every_day_hour_1
from pipelines.treatment.templates.utils import DBTSelector


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para materialização dos dados de planejamento
"""

PLANEJAMENTO_DIARIO_SELECTOR = DBTSelector(
name="planejamento_diario",
schedule_cron=cron_every_day_hour_1,
initial_datetime=datetime(2024, 9, 1, 0, 0, 0),
)
14 changes: 14 additions & 0 deletions pipelines/treatment/planejamento/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
"""
Flows de tratamento dos dados de planejamento
"""

from pipelines.constants import constants as smtr_constants
from pipelines.treatment.planejamento.constants import constants
from pipelines.treatment.templates.flows import create_default_materialization_flow

PLANEJAMENTO_DIARIO_MATERIALIZACAO = create_default_materialization_flow(
flow_name="planejamento_diario - materializacao",
selector=constants.PLANEJAMENTO_DIARIO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
)
7 changes: 7 additions & 0 deletions pipelines/treatment/templates/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
# Changelog - treatment

## [1.0.2] - 2024-11-08

### Alterado

- Adiciona lógica para verificar fontes de dados no padrão antigo (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/237)

## [1.0.1] - 2024-10-29

### Adicionado

- Adiciona as tasks `check_dbt_test_run`, `run_dbt_tests` e `dbt_data_quality_checks` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288)
- Adiciona a função `parse_dbt_test_output` no `utils.py` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/288)


## [1.0.0] - 2024-10-21

### Adicionado
Expand Down
23 changes: 20 additions & 3 deletions pipelines/treatment/templates/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests
from prefect import task
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client
from pytz import timezone

from pipelines.constants import constants
Expand All @@ -23,7 +24,7 @@
from pipelines.utils.gcp.bigquery import SourceTable
from pipelines.utils.prefect import flow_is_running_local, rename_current_flow_run
from pipelines.utils.secret import get_secret
from pipelines.utils.utils import convert_timezone
from pipelines.utils.utils import convert_timezone, cron_get_last_date

# from pipelines.utils.utils import get_last_materialization_redis_key

Expand Down Expand Up @@ -123,7 +124,7 @@ def wait_data_sources(
env: str,
datetime_start: datetime,
datetime_end: datetime,
data_sources: list[Union[SourceTable, DBTSelector]],
data_sources: list[Union[SourceTable, DBTSelector, dict]],
skip: bool,
):
"""
Expand All @@ -133,7 +134,7 @@ def wait_data_sources(
env (str): prod ou dev
datetime_start (datetime): Datetime inicial da materialização
datetime_end (datetime): Datetime final da materialização
data_sources (list[Union[SourceTable, DBTSelector]]): Fontes de dados para esperar
data_sources (list[Union[SourceTable, DBTSelector, dict]]): Fontes de dados para esperar
skip (bool): se a verificação deve ser pulada ou não
"""
if skip:
Expand All @@ -154,6 +155,22 @@ def wait_data_sources(
elif isinstance(ds, DBTSelector):
name = f"{ds.name}"
complete = ds.is_up_to_date(env=env, timestamp=datetime_end)
elif isinstance(ds, dict):
# source dicionário utilizado para compatibilização com flows antigos
name = ds["redis_key"]
redis_client = get_redis_client()
last_materialization = datetime.strptime(
redis_client.get(name)[ds["dict_key"]],
ds["datetime_format"],
)
last_schedule = cron_get_last_date(
cron_expr=ds["schedule_cron"],
timestamp=datetime_end,
)
complete = convert_timezone(
timestamp=last_materialization
) >= last_schedule - timedelta(hours=ds.get("delay_hours", 0))

else:
raise NotImplementedError(f"Espera por fontes do tipo {type(ds)} não implementada")

Expand Down
2 changes: 1 addition & 1 deletion pipelines/treatment/templates/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_last_materialized_datetime(self, env: str) -> datetime:
)
)

return last_datetime
return convert_timezone(timestamp=last_datetime)

def get_datetime_end(self, timestamp: datetime) -> datetime:
"""
Expand Down
Loading
Loading