Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria tabelas de validação da Jaé com verificação no GTFS / Remodela tabela cadastro.servicos #98

Merged
merged 27 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
abd84eb
cria validação da jae com o gtfs
pixuimpou Jul 4, 2024
5fbb1c9
validacao ordem pagamento staging
pixuimpou Jul 9, 2024
07c2c62
Merge branch 'main' into staging/validacao-jae-gtfs
mergify[bot] Jul 9, 2024
b633b08
Merge branch 'main' into staging/validacao-jae-gtfs
mergify[bot] Jul 16, 2024
5bb25ba
adiciona operadoras de fretamento
pixuimpou Jul 17, 2024
0fa0abc
altera aux_servicos_gtfs para ephemeral
pixuimpou Jul 17, 2024
8340404
altera filtro incremental da integracao_invalida
pixuimpou Jul 17, 2024
90129cd
altera filtro incremental da transacao_invalida
pixuimpou Jul 17, 2024
e26988c
corrige aux_transacao_ordem
pixuimpou Jul 17, 2024
4ae7471
corrige validacao ordem_pagamento por servico
pixuimpou Jul 17, 2024
a9931f3
cria modelo ordem_pagamento_dia_invalida
pixuimpou Jul 17, 2024
a11c594
cria validacao ordem_pagamento final
pixuimpou Jul 17, 2024
d0f08cb
altera schema validacao_dados_jae
pixuimpou Jul 17, 2024
b94087b
agenda pipeline de materializacao das validacoes
pixuimpou Jul 17, 2024
3590a6d
corrige query viagem_informada
pixuimpou Jul 17, 2024
4e0e8d9
Merge branch 'staging/validacao-jae-gtfs' of https://github.com/prefe…
pixuimpou Jul 17, 2024
4da70b5
Merge branch 'main' into staging/validacao-jae-gtfs
mergify[bot] Jul 17, 2024
d6827a9
add changelog
pixuimpou Jul 17, 2024
7d2521a
adiciona prefixo data nas colunas de vigencia
pixuimpou Jul 17, 2024
9e7ff3d
corrige nome das colunas de vigencia
pixuimpou Jul 17, 2024
c4b2ac0
altera schema da tabela de servicos
pixuimpou Jul 17, 2024
3add40b
Merge branch 'staging/validacao-jae-gtfs' of https://github.com/prefe…
pixuimpou Jul 17, 2024
bb040e6
Merge branch 'main' into staging/validacao-jae-gtfs
mergify[bot] Jul 17, 2024
8bb98cd
add data dbt
pixuimpou Jul 17, 2024
abd30db
Merge branch 'staging/validacao-jae-gtfs' of https://github.com/prefe…
pixuimpou Jul 17, 2024
349ffe9
corrige typo
pixuimpou Jul 17, 2024
2546a98
altera cast para safe_cast
pixuimpou Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.2.0] - 2024-07-17

### Alterado

- Ativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)
- Adequa parâmetro exclude do flow `bilhetagem_validacao_jae` para as novas queries de validação (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)

## [1.1.0] - 2024-06-13

### Adicionado
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ class constants(Enum): # pylint: disable=c0103
BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS = {
"dataset_id": "validacao_dados_jae",
"upstream": True,
"exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao \
+ordem_pagamento_dia +integracao +servicos",
"exclude": "gtfs +transacao +integracao",
"dbt_vars": {
"run_date": {},
"version": {},
Expand Down
5 changes: 3 additions & 2 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2024-07-05
DBT: 2024-07-17
"""

from copy import deepcopy
Expand Down Expand Up @@ -36,6 +36,7 @@
from pipelines.schedules import (
every_5_minutes,
every_day_hour_five,
every_day_hour_seven,
every_hour,
every_minute,
)
Expand Down Expand Up @@ -350,7 +351,7 @@
handler_skip_if_running,
]

# bilhetagem_validacao_jae.schedule = every_day_hour_seven
bilhetagem_validacao_jae.schedule = every_day_hour_seven


# RECAPTURA #
Expand Down
2 changes: 2 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_viagem_zirix
DBT: 2024-07-17
"""
from copy import deepcopy

Expand Down
10 changes: 8 additions & 2 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ models:
br_rj_riodejaneiro_stu:
+materialized: view
+schema: br_rj_riodejaneiro_stu
cadastro_staging:
+schema: cadastro_staging
cadastro:
+schema: cadastro
staging:
+schema: cadastro_staging
br_rj_riodejaneiro_recursos:
+materialized: incremental
+schema: br_rj_riodejaneiro_recursos
Expand Down Expand Up @@ -280,3 +280,9 @@ models:
staging:
+materialized: view
+schema: br_rj_riodejaneiro_viagem_zirix_staging
validacao_dados_jae:
+materialized: incremental
+schema: validacao_dados_jae
staging:
+materialized: incremental
+schema: validacao_dados_jae_staging
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
{{ incremental_filter }}
{% endset %}

{% set partitions = run_query(partitions_query)[0].values() %}
{% set partitions = run_query(partitions_query).columns[0].values() %}
{% endif %}
{% endif %}

Expand Down
14 changes: 14 additions & 0 deletions queries/models/cadastro/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog - cadastro

## [1.2.0] - 2024-07-17

### Adicionado

- Cria modelos auxiliares para a tabela de servicos:
- `aux_routes_vigencia_gtfs.sql`
- `aux_stops_vigencia_gtfs.sql`
- `aux_servicos_gtfs.sql`

### Alterado

- Altera estrutura do modelo `servicos.sql` para adicionar datas de inicio e fim de vigência (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)
- Altera filtro no modelo `operadoras.sql`, deixando de filtrar operadores do modo `Fretamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)

## [1.1.1] - 2024-04-25

### Alterado
Expand Down
2 changes: 1 addition & 1 deletion queries/models/cadastro/operadoras.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,4 @@ SELECT
FROM
cadastro
WHERE
modo NOT IN ("Escolar", "Táxi", "TEC", "Fretamento")
modo NOT IN ("Escolar", "Táxi", "TEC")
8 changes: 7 additions & 1 deletion queries/models/cadastro/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,10 @@ models:
- name: tabela_origem_gtfs
description: "Tabela do GTFS de onde foram coletadas as informações (stops ou routes)"
- name: feed_start_date_gtfs
description: "Último feed_start_date do GTFS em que as informações do serviço estavam disponíveis"
description: "Último feed_start_date do GTFS em que as informações do serviço estavam disponíveis"
- name: data_inicio_vigencia
description: "Data de inicio da vigência do serviço segundo o GTFS ou o cadastro do serviço no sistema da Jaé (caso não exista no GTFS)"
- name: data_fim_vigencia
description: "Data de fim da vigência do serviço segundo o GTFS"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
5 changes: 3 additions & 2 deletions queries/models/cadastro/servicos.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ SELECT
g.latitude,
g.longitude,
g.tabela_origem_gtfs,
g.feed_start_date AS feed_start_date_gtfs,
COALESCE(g.inicio_vigencia, DATE(j.datetime_inclusao)) AS data_inicio_vigencia,
g.fim_vigencia AS data_fim_vigencia,
'{{ var("version") }}' as versao
FROM
{{ ref("staging_linha") }} j
FULL OUTER JOIN
{{ ref("servicos_gtfs_aux") }} g
{{ ref("aux_servicos_gtfs") }} g
ON
COALESCE(j.gtfs_route_id, j.gtfs_stop_id) = g.id_servico
70 changes: 70 additions & 0 deletions queries/models/cadastro/staging/aux_routes_vigencia_gtfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{{
config(
materialized="ephemeral",
)
}}

WITH routes_rn AS (
SELECT
route_id AS id_servico,
route_short_name AS servico,
route_long_name AS descricao_servico,
feed_start_date AS inicio_vigencia,
feed_end_date AS fim_vigencia,
LAG(feed_end_date) OVER (PARTITION BY route_id ORDER BY feed_start_date) AS feed_end_date_anterior,
ROW_NUMBER() OVER (PARTITION BY route_id ORDER BY feed_start_date DESC) AS rn
FROM
{{ ref("routes_gtfs") }}
),
routes_agrupada AS (
SELECT
id_servico,
inicio_vigencia,
servico,
descricao_servico,
IFNULL(fim_vigencia, CURRENT_DATE("America/Sao_Paulo")) as fim_vigencia,
SUM(
CASE
WHEN feed_end_date_anterior IS NULL OR feed_end_date_anterior <> DATE_SUB(inicio_vigencia, INTERVAL 1 DAY) THEN 1
ELSE 0
END
) OVER (PARTITION BY id_servico ORDER BY inicio_vigencia) AS group_id
FROM
routes_rn
),
vigencia AS (
SELECT
id_servico,
MIN(inicio_vigencia) AS inicio_vigencia,
MAX(fim_vigencia) AS fim_vigencia
FROM
routes_agrupada
GROUP BY
id_servico,
group_id
)
SELECT
id_servico,
r.servico,
r.descricao_servico,
NULL AS latitude,
NULL AS longitude,
v.inicio_vigencia,
CASE
WHEN v.fim_vigencia != CURRENT_DATE("America/Sao_Paulo") THEN v.fim_vigencia
END AS fim_vigencia,
'routes' AS tabela_origem_gtfs,
FROM
vigencia v
JOIN
(
SELECT
id_servico,
servico,
descricao_servico
FROM
routes_rn
WHERE
rn = 1
) r
USING(id_servico)
34 changes: 34 additions & 0 deletions queries/models/cadastro/staging/aux_servicos_gtfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{{
config(
materialized="ephemeral",
)
}}

SELECT
id_servico,
servico,
descricao_servico,
latitude,
longitude,
inicio_vigencia,
fim_vigencia,
tabela_origem_gtfs,
'{{ var("version") }}' as versao
FROM
{{ ref('aux_routes_vigencia_gtfs') }}

UNION ALL

SELECT
id_servico,
servico,
descricao_servico,
latitude,
longitude,
inicio_vigencia,
fim_vigencia,
tabela_origem_gtfs,
'{{ var("version") }}' as versao
FROM
{{ ref('aux_stops_vigencia_gtfs') }}

76 changes: 76 additions & 0 deletions queries/models/cadastro/staging/aux_stops_vigencia_gtfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{{
config(
materialized="ephemeral",
)
}}

WITH stops_rn AS (
SELECT
stop_id AS id_servico,
stop_code AS servico,
stop_name AS descricao_servico,
stop_lat AS latitude,
stop_lon AS longitude,
feed_start_date AS inicio_vigencia,
feed_end_date AS fim_vigencia,
LAG(feed_end_date) OVER (PARTITION BY stop_id ORDER BY feed_start_date) AS feed_end_date_anterior,
ROW_NUMBER() OVER (PARTITION BY stop_id ORDER BY feed_start_date DESC) AS rn
FROM
{{ ref("stops_gtfs") }}
WHERE
location_type = '1'
),
stops_agrupada AS (
SELECT
id_servico,
inicio_vigencia,
servico,
descricao_servico,
IFNULL(fim_vigencia, CURRENT_DATE("America/Sao_Paulo")) AS fim_vigencia,
SUM(
CASE
WHEN feed_end_date_anterior IS NULL OR feed_end_date_anterior <> DATE_SUB(inicio_vigencia, INTERVAL 1 DAY) THEN 1
ELSE 0
END
) OVER (PARTITION BY id_servico ORDER BY inicio_vigencia) AS group_id
FROM
stops_rn
),
vigencia AS (
SELECT
id_servico,
MIN(inicio_vigencia) AS inicio_vigencia,
MAX(fim_vigencia) AS fim_vigencia
FROM
stops_agrupada
GROUP BY
id_servico,
group_id
)
SELECT
id_servico,
r.servico,
r.descricao_servico,
r.latitude,
r.longitude,
v.inicio_vigencia,
CASE
WHEN v.fim_vigencia != CURRENT_DATE("America/Sao_Paulo") THEN v.fim_vigencia
END AS fim_vigencia,
'stops' AS tabela_origem_gtfs,
FROM
vigencia v
JOIN
(
SELECT
id_servico,
servico,
descricao_servico,
latitude,
longitude
FROM
stops_rn
WHERE
rn = 1
) r
USING(id_servico)
Loading
Loading