Skip to content

Commit

Permalink
Cria tabelas de validação da Jaé com verificação no GTFS / Remodela t…
Browse files Browse the repository at this point in the history
…abela cadastro.servicos (#98)

* cria validação da jae com o gtfs

* validacao ordem pagamento staging

* adiciona operadoras de fretamento

* altera aux_servicos_gtfs para ephemeral

* altera filtro incremental da integracao_invalida

* altera filtro incremental da transacao_invalida

* corrige aux_transacao_ordem

* corrige validacao ordem_pagamento por servico

* cria modelo ordem_pagamento_dia_invalida

* cria validacao ordem_pagamento final

* altera schema validacao_dados_jae

* agenda pipeline de materializacao das validacoes

* corrige query viagem_informada

* add changelog

* adiciona prefixo data nas colunas de vigencia

* corrige nome das colunas de vigencia

* altera schema da tabela de servicos

* add data dbt

* corrige typo

* altera cast para safe_cast

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Jul 17, 2024
1 parent e9184df commit 490badc
Show file tree
Hide file tree
Showing 24 changed files with 1,075 additions and 108 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.2.0] - 2024-07-17

### Alterado

- Ativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)
- Adequa parâmetro exclude do flow `bilhetagem_validacao_jae` para as novas queries de validação (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)

## [1.1.0] - 2024-06-13

### Adicionado
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ class constants(Enum): # pylint: disable=c0103
BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS = {
"dataset_id": "validacao_dados_jae",
"upstream": True,
"exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao \
+ordem_pagamento_dia +integracao +servicos",
"exclude": "gtfs +transacao +integracao",
"dbt_vars": {
"run_date": {},
"version": {},
Expand Down
5 changes: 3 additions & 2 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2024-07-05
DBT: 2024-07-17
"""

from copy import deepcopy
Expand Down Expand Up @@ -36,6 +36,7 @@
from pipelines.schedules import (
every_5_minutes,
every_day_hour_five,
every_day_hour_seven,
every_hour,
every_minute,
)
Expand Down Expand Up @@ -350,7 +351,7 @@
handler_skip_if_running,
]

# bilhetagem_validacao_jae.schedule = every_day_hour_seven
bilhetagem_validacao_jae.schedule = every_day_hour_seven


# RECAPTURA #
Expand Down
2 changes: 2 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_viagem_zirix
DBT: 2024-07-17
"""
from copy import deepcopy

Expand Down
10 changes: 8 additions & 2 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ models:
br_rj_riodejaneiro_stu:
+materialized: view
+schema: br_rj_riodejaneiro_stu
cadastro_staging:
+schema: cadastro_staging
cadastro:
+schema: cadastro
staging:
+schema: cadastro_staging
br_rj_riodejaneiro_recursos:
+materialized: incremental
+schema: br_rj_riodejaneiro_recursos
Expand Down Expand Up @@ -280,3 +280,9 @@ models:
staging:
+materialized: view
+schema: br_rj_riodejaneiro_viagem_zirix_staging
validacao_dados_jae:
+materialized: incremental
+schema: validacao_dados_jae
staging:
+materialized: incremental
+schema: validacao_dados_jae_staging
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
{{ incremental_filter }}
{% endset %}

{% set partitions = run_query(partitions_query)[0].values() %}
{% set partitions = run_query(partitions_query).columns[0].values() %}
{% endif %}
{% endif %}

Expand Down
14 changes: 14 additions & 0 deletions queries/models/cadastro/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog - cadastro

## [1.2.0] - 2024-07-17

### Adicionado

- Cria modelos auxiliares para a tabela de servicos:
- `aux_routes_vigencia_gtfs.sql`
- `aux_stops_vigencia_gtfs.sql`
- `aux_servicos_gtfs.sql`

### Alterado

- Altera estrutura do modelo `servicos.sql` para adicionar datas de inicio e fim de vigência (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)
- Altera filtro no modelo `operadoras.sql`, deixando de filtrar operadores do modo `Fretamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)

## [1.1.1] - 2024-04-25

### Alterado
Expand Down
2 changes: 1 addition & 1 deletion queries/models/cadastro/operadoras.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,4 @@ SELECT
FROM
cadastro
WHERE
modo NOT IN ("Escolar", "Táxi", "TEC", "Fretamento")
modo NOT IN ("Escolar", "Táxi", "TEC")
8 changes: 7 additions & 1 deletion queries/models/cadastro/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,10 @@ models:
- name: tabela_origem_gtfs
description: "Tabela do GTFS de onde foram coletadas as informações (stops ou routes)"
- name: feed_start_date_gtfs
description: "Último feed_start_date do GTFS em que as informações do serviço estavam disponíveis"
description: "Último feed_start_date do GTFS em que as informações do serviço estavam disponíveis"
- name: data_inicio_vigencia
description: "Data de inicio da vigência do serviço segundo o GTFS ou o cadastro do serviço no sistema da Jaé (caso não exista no GTFS)"
- name: data_fim_vigencia
description: "Data de fim da vigência do serviço segundo o GTFS"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
5 changes: 3 additions & 2 deletions queries/models/cadastro/servicos.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ SELECT
g.latitude,
g.longitude,
g.tabela_origem_gtfs,
g.feed_start_date AS feed_start_date_gtfs,
COALESCE(g.inicio_vigencia, DATE(j.datetime_inclusao)) AS data_inicio_vigencia,
g.fim_vigencia AS data_fim_vigencia,
'{{ var("version") }}' as versao
FROM
{{ ref("staging_linha") }} j
FULL OUTER JOIN
{{ ref("servicos_gtfs_aux") }} g
{{ ref("aux_servicos_gtfs") }} g
ON
COALESCE(j.gtfs_route_id, j.gtfs_stop_id) = g.id_servico
70 changes: 70 additions & 0 deletions queries/models/cadastro/staging/aux_routes_vigencia_gtfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{{
config(
materialized="ephemeral",
)
}}

WITH routes_rn AS (
SELECT
route_id AS id_servico,
route_short_name AS servico,
route_long_name AS descricao_servico,
feed_start_date AS inicio_vigencia,
feed_end_date AS fim_vigencia,
LAG(feed_end_date) OVER (PARTITION BY route_id ORDER BY feed_start_date) AS feed_end_date_anterior,
ROW_NUMBER() OVER (PARTITION BY route_id ORDER BY feed_start_date DESC) AS rn
FROM
{{ ref("routes_gtfs") }}
),
routes_agrupada AS (
SELECT
id_servico,
inicio_vigencia,
servico,
descricao_servico,
IFNULL(fim_vigencia, CURRENT_DATE("America/Sao_Paulo")) as fim_vigencia,
SUM(
CASE
WHEN feed_end_date_anterior IS NULL OR feed_end_date_anterior <> DATE_SUB(inicio_vigencia, INTERVAL 1 DAY) THEN 1
ELSE 0
END
) OVER (PARTITION BY id_servico ORDER BY inicio_vigencia) AS group_id
FROM
routes_rn
),
vigencia AS (
SELECT
id_servico,
MIN(inicio_vigencia) AS inicio_vigencia,
MAX(fim_vigencia) AS fim_vigencia
FROM
routes_agrupada
GROUP BY
id_servico,
group_id
)
SELECT
id_servico,
r.servico,
r.descricao_servico,
NULL AS latitude,
NULL AS longitude,
v.inicio_vigencia,
CASE
WHEN v.fim_vigencia != CURRENT_DATE("America/Sao_Paulo") THEN v.fim_vigencia
END AS fim_vigencia,
'routes' AS tabela_origem_gtfs,
FROM
vigencia v
JOIN
(
SELECT
id_servico,
servico,
descricao_servico
FROM
routes_rn
WHERE
rn = 1
) r
USING(id_servico)
34 changes: 34 additions & 0 deletions queries/models/cadastro/staging/aux_servicos_gtfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{{
config(
materialized="ephemeral",
)
}}

SELECT
id_servico,
servico,
descricao_servico,
latitude,
longitude,
inicio_vigencia,
fim_vigencia,
tabela_origem_gtfs,
'{{ var("version") }}' as versao
FROM
{{ ref('aux_routes_vigencia_gtfs') }}

UNION ALL

SELECT
id_servico,
servico,
descricao_servico,
latitude,
longitude,
inicio_vigencia,
fim_vigencia,
tabela_origem_gtfs,
'{{ var("version") }}' as versao
FROM
{{ ref('aux_stops_vigencia_gtfs') }}

76 changes: 76 additions & 0 deletions queries/models/cadastro/staging/aux_stops_vigencia_gtfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{{
config(
materialized="ephemeral",
)
}}

WITH stops_rn AS (
SELECT
stop_id AS id_servico,
stop_code AS servico,
stop_name AS descricao_servico,
stop_lat AS latitude,
stop_lon AS longitude,
feed_start_date AS inicio_vigencia,
feed_end_date AS fim_vigencia,
LAG(feed_end_date) OVER (PARTITION BY stop_id ORDER BY feed_start_date) AS feed_end_date_anterior,
ROW_NUMBER() OVER (PARTITION BY stop_id ORDER BY feed_start_date DESC) AS rn
FROM
{{ ref("stops_gtfs") }}
WHERE
location_type = '1'
),
stops_agrupada AS (
SELECT
id_servico,
inicio_vigencia,
servico,
descricao_servico,
IFNULL(fim_vigencia, CURRENT_DATE("America/Sao_Paulo")) AS fim_vigencia,
SUM(
CASE
WHEN feed_end_date_anterior IS NULL OR feed_end_date_anterior <> DATE_SUB(inicio_vigencia, INTERVAL 1 DAY) THEN 1
ELSE 0
END
) OVER (PARTITION BY id_servico ORDER BY inicio_vigencia) AS group_id
FROM
stops_rn
),
vigencia AS (
SELECT
id_servico,
MIN(inicio_vigencia) AS inicio_vigencia,
MAX(fim_vigencia) AS fim_vigencia
FROM
stops_agrupada
GROUP BY
id_servico,
group_id
)
SELECT
id_servico,
r.servico,
r.descricao_servico,
r.latitude,
r.longitude,
v.inicio_vigencia,
CASE
WHEN v.fim_vigencia != CURRENT_DATE("America/Sao_Paulo") THEN v.fim_vigencia
END AS fim_vigencia,
'stops' AS tabela_origem_gtfs,
FROM
vigencia v
JOIN
(
SELECT
id_servico,
servico,
descricao_servico,
latitude,
longitude
FROM
stops_rn
WHERE
rn = 1
) r
USING(id_servico)
Loading

0 comments on commit 490badc

Please sign in to comment.