Skip to content

Commit

Permalink
Merge branch 'main' into staging/transacao-2-0
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Jan 16, 2025
2 parents cbe3bd1 + 1069dba commit 00f38e2
Show file tree
Hide file tree
Showing 31 changed files with 589 additions and 260 deletions.
4 changes: 2 additions & 2 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ class constants(Enum): # pylint: disable=c0103
WHERE
id > {last_id} AND id <= {max_id}
""",
"page_size": 1000,
"max_pages": 70,
"page_size": 35000,
"max_pages": 2,
},
"primary_key": ["id"],
"interval_minutes": 5,
Expand Down
2 changes: 1 addition & 1 deletion pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2025-01-06
DBT: 2025-01-13
"""

from copy import deepcopy
Expand Down
5 changes: 5 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - gtfs

## [1.2.1] - 2025-01-13

### Corrigido
- Removido teste de Ordem de Serviço regular (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/386)

## [1.2.0] - 2025-01-03

### Adicionado
Expand Down
2 changes: 1 addition & 1 deletion pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for gtfs
DBT 2024-12-20
DBT 2025-01-13
"""

from prefect import Parameter, case, task
Expand Down
19 changes: 8 additions & 11 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,15 @@ def processa_ordem_servico(
None
"""

if (
len([sheet for sheet in sheetnames if "ANEXO I:" in sheet]) != 1
and regular_sheet_index is None
):
raise Exception(
"More than 1 regular sheet in the file. Please specify the regular sheet index."
)
# if (
# len([sheet for sheet in sheetnames if "ANEXO I " in sheet]) != 1
# and regular_sheet_index is None
# ):
# raise Exception(
# "More than 1 regular sheet in the file. Please specify the regular sheet index."
# )

if regular_sheet_index is None:
regular_sheet_index = next(
(i for i, name in enumerate(sheetnames) if "ANEXO I" in name), None
)
regular_sheet_index = next((i for i, name in enumerate(sheetnames) if "ANEXO I" in name), None)

quadro_geral = pd.DataFrame()

Expand Down
7 changes: 7 additions & 0 deletions pipelines/migration/veiculo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog - veiculo

## [1.1.0] - 2025-01-16

### Alterado
- Alterações no tratamento do arquivo de infrações (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/126):
- Remove coluna `placa` das primary keys
- Remove filtro de modo

## [1.0.1] - 2024-05-28

### Adicionado
Expand Down
4 changes: 2 additions & 2 deletions pipelines/migration/veiculo/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class constants(Enum): # pylint: disable=c0103
Constant values for rj_smtr veiculo
"""

SPPO_LICENCIAMENTO_TABLE_ID = "sppo_licenciamento_stu"
SPPO_LICENCIAMENTO_TABLE_ID = "licenciamento_stu"

SPPO_LICENCIAMENTO_MAPPING_KEYS = {
"placa": "placa",
Expand Down Expand Up @@ -46,7 +46,7 @@ class constants(Enum): # pylint: disable=c0103
"names": SPPO_LICENCIAMENTO_MAPPING_KEYS.keys(), # pylint: disable=e1101
}

SPPO_INFRACAO_TABLE_ID = "sppo_infracao"
SPPO_INFRACAO_TABLE_ID = "infracao"

SPPO_INFRACAO_MAPPING_KEYS = {
"permissao": "permissao",
Expand Down
14 changes: 7 additions & 7 deletions pipelines/migration/veiculo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from prefect import task

from pipelines.constants import constants as smtr_constants
from pipelines.migration.utils import connect_ftp, data_info_str, filter_data
from pipelines.migration.utils import connect_ftp, data_info_str
from pipelines.migration.veiculo.constants import constants
from pipelines.utils.utils import log # ,get_vault_secret

Expand Down Expand Up @@ -200,13 +200,13 @@ def pre_treatment_sppo_infracao(status: dict, timestamp: datetime):
log("Updating valor type to float...", level="info")
data["valor"] = data["valor"].str.replace(",", ".").astype(float)

filters = ["modo != 'ONIBUS'"]
log(f"Filtering '{filters}'...", level="info")
data = filter_data(data, filters)
# filters = ["modo != 'ONIBUS'"]
# log(f"Filtering '{filters}'...", level="info")
# data = filter_data(data, filters)

log("Filtering null primary keys...", level="info")
primary_key = ["placa", "id_auto_infracao"]
data.dropna(subset=primary_key, inplace=True)
# log("Filtering null primary keys...", level="info")
primary_key = ["id_auto_infracao"]
# data.dropna(subset=primary_key, inplace=True)

# Check primary keys
# pk_columns = ["placa", "id_auto_infracao"]
Expand Down
5 changes: 5 additions & 0 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,17 @@ vars:
DATA_SUBSIDIO_V8_INICIO: "2024-07-20"
# Feature Apuração por faixa horária
DATA_SUBSIDIO_V9_INICIO: "2024-08-16"
# Feature desconsideração de km não vistoriado e não licenciado
DATA_SUBSIDIO_V9A_INICIO: "2024-09-01"
# Feature Apuração por novas faixas horárias (RESOLUÇÃO SMTR 3777/2024)
DATA_SUBSIDIO_V10_INICIO: "2024-11-01"
# Feature Apuração por novas faixas horárias (RESOLUÇÃO SMTR 3777/2024) - Feed (GTFS)
DATA_SUBSIDIO_V11_INICIO: "2024-11-06"
# Parâmetro 110 km/h + alterações de regras do modelo `viagem_transacao.sql`
DATA_SUBSIDIO_V12_INICIO: "2024-11-16"
# Inclusão de colunas de tecnologia em sppo_veiculo_dia
DATA_SUBSIDIO_V13_INICIO: "2025-01-01"


# Recursos #
recurso_staging: "rj-smtr-staging.projeto_subsidio_sppo_staging.recurso"
Expand Down
2 changes: 1 addition & 1 deletion queries/macros/get_license_date.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ SELECT
WHEN DATE("{{ var('run_date') }}") >= "2024-03-16" AND DATE("{{ var('run_date') }}") < "2024-04-01" THEN DATE("2024-04-09")
ELSE (
SELECT MIN(DATE(data))
FROM {{ ref("sppo_licenciamento_stu_staging") }}
FROM {{ ref("licenciamento_stu_staging") }}
WHERE DATE(data) >= DATE_ADD(DATE("{{ var('run_date') }}"), INTERVAL 5 DAY)
-- Admite apenas versões do STU igual ou após 2024-04-09 a partir de abril/24 devido à falha de atualização na fonte da dados (SIURB)
AND (DATE("{{ var('run_date') }}") < "2024-04-01" OR DATE(data) >= '2024-04-09')
Expand Down
11 changes: 10 additions & 1 deletion queries/models/monitoramento/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog - monitoramento


## [1.2.2] - 2025-01-08

### Adicionado

- Cria modelos `sumario_servico_dia_pagamento_historico.sql` e `sumario_servico_dia_tipo_viagem_historico.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/258)

- Adicionado o label `dashboard` aos modelos `sumario_servico_dia_pagamento_historico.sql` e `sumario_servico_dia_tipo_viagem_historico.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/258)

## [1.2.1] - 2025-01-03

### Adicionado
Expand Down Expand Up @@ -31,4 +40,4 @@
## [1.0.0] - 2024-10-21

### Adicionado
- Cria modelos para tratamento de viagens informadas: `staging_viagem_informada_rioonibus.sql` e `viagem_informada_monitoramento.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/276)
- Cria modelos para tratamento de viagens informadas: `staging_viagem_informada_rioonibus.sql` e `viagem_informada_monitoramento.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/276)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized="incremental",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
incremental_strategy="insert_overwrite",
alias="sumario_servico_dia_historico",
labels={'dashboard':'yes'}
)
}}
select *
from {{ ref("monitoramento_servico_dia") }}
where
data < date("{{ var("DATA_SUBSIDIO_V9_INICIO") }}")
union all
select *
from {{ ref("monitoramento_servico_dia_v2") }}
where
data >= date("{{ var("DATA_SUBSIDIO_V9_INICIO") }}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(
materialized="incremental",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
incremental_strategy="insert_overwrite",
alias="sumario_servico_dia_tipo_viagem_historico",
labels={'dashboard':'yes'}
)
}}

select *
from {{ ref("monitoramento_servico_dia_tipo_viagem") }}
where
data < date("{{ var("DATA_SUBSIDIO_V9_INICIO") }}")
union all
select *
from {{ ref("monitoramento_servico_dia_tipo_viagem_v2") }}
where
data >= date("{{ var("DATA_SUBSIDIO_V9_INICIO") }}")
and tipo_viagem != "Sem viagem apurada"
22 changes: 22 additions & 0 deletions queries/models/monitoramento/staging/monitoramento_servico_dia.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(materialized="ephemeral") }}

select
data,
tipo_dia,
consorcio,
servico,
vista,
viagens,
km_apurada,
km_planejada,
perc_km_planejada,
valor_subsidio_pago,
valor_penalidade
from {{ ref("sumario_servico_dia_historico") }}
-- `rj-smtr.dashboard_subsidio_sppo.sumario_servico_dia_historico`
where
data < DATE("{{ var("DATA_SUBSIDIO_V9_INICIO") }}") --noqa
{% if is_incremental() %}
AND data BETWEEN DATE("{{ var("start_date") }}")
AND DATE_ADD(DATE("{{ var("end_date") }}"), INTERVAL 1 DAY)
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(materialized="ephemeral") }}

select
data,
tipo_dia,
consorcio,
servico,
tipo_viagem,
indicador_ar_condicionado,
viagens,
km_apurada
from {{ ref("sumario_servico_tipo_viagem_dia") }}
-- `rj-smtr.dashboard_subsidio_sppo.sumario_servico_tipo_viagem_dia`
where
data < date("{{ var("DATA_SUBSIDIO_V9_INICIO") }}")
{% if is_incremental() %}
and data between date("{{ var("start_date") }}") and date_add(
date("{{ var("end_date") }}"), interval 1 day
)
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(materialized="ephemeral") }}

select
data,
tipo_dia,
consorcio,
servico,
tipo_viagem,
indicador_ar_condicionado,
sum(viagens_faixa) as viagens,
sum(km_apurada_faixa) as km_apurada
from {{ ref("subsidio_faixa_servico_dia_tipo_viagem") }}
-- `rj-smtr.financeiro.subsidio_faixa_servico_dia_tipo_viagem`
where
data >= date("{{ var("DATA_SUBSIDIO_V9_INICIO") }}")
and tipo_viagem != "Sem viagem apurada"
{% if is_incremental() %}
and data between date("{{ var("start_date") }}") and date_add(
date("{{ var("end_date") }}"), interval 1 day
)
{% endif %}
group by data, tipo_dia, consorcio, servico, tipo_viagem, indicador_ar_condicionado
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{{ config(materialized="ephemeral") }}

with
valores_subsidio AS (
SELECT
data,
sdp.tipo_dia,
sdp.consorcio,
servico,
sdp.viagens_dia,
SUM(
CASE
WHEN data >= DATE("{{ var('DATA_SUBSIDIO_V9A_INICIO') }}")
AND tipo_viagem NOT IN ("Não licenciado", "Não vistoriado")
THEN km_apurada_faixa
WHEN data < DATE("{{ var('DATA_SUBSIDIO_V9A_INICIO') }}")
THEN km_apurada_faixa
ELSE 0
END
) AS km_apurada,
km_planejada_dia,
valor_a_pagar,
valor_penalidade
FROM
{{ ref("subsidio_sumario_servico_dia_pagamento") }} as sdp
-- rj-smtr.financeiro.subsidio_sumario_servico_dia_pagamento
left join {{ ref("subsidio_faixa_servico_dia_tipo_viagem") }} as sdtv
using (data, servico)
WHERE
data BETWEEN DATE("{{ var('start_date') }}")
AND DATE("{{ var('end_date') }}")
group by data,
tipo_dia,
consorcio,
servico,
viagens_dia,
km_planejada_dia,
valor_a_pagar,
valor_penalidade
),
planejada as (
select distinct data, consorcio, servico, vista
from {{ ref("viagem_planejada") }}
-- `rj-smtr.projeto_subsidio_sppo.viagem_planejada`
where
data >= date("{{ var('DATA_SUBSIDIO_V9_INICIO') }}")
and (id_tipo_trajeto = 0 or id_tipo_trajeto is null)
and format_time("%T", time(faixa_horaria_inicio)) != "00:00:00"
),
pagamento as (
select
data,
tipo_dia,
consorcio,
servico,
vista,
viagens_dia as viagens,
km_apurada,
km_planejada_dia as km_planejada,
valor_a_pagar as valor_subsidio_pago,
valor_penalidade
from valores_subsidio as sdp
left join planejada as p using (data, servico, consorcio)
where
data >= date("{{ var('DATA_SUBSIDIO_V9_INICIO') }}")
{% if is_incremental() %}
and data between date("{{ var('start_date') }}") and date_add(
date("{{ var('end_date') }}"), interval 1 day
)
{% endif %}
)
select
data,
tipo_dia,
consorcio,
servico,
vista,
viagens,
km_apurada,
km_planejada,
ROUND(100 * km_apurada / km_planejada, 2) as perc_km_planejada,
valor_subsidio_pago,
valor_penalidade
from pagamento
6 changes: 6 additions & 0 deletions queries/models/projeto_subsidio_sppo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - projeto_subsidio_sppo

## [9.1.5] - 2025-01-08

### Adicionado

- Adicionado label `dashboard` ao modelo `viagem_completa.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/258)

## [9.1.4] - 2025-01-06

### Adicionado
Expand Down
3 changes: 2 additions & 1 deletion queries/models/projeto_subsidio_sppo/viagem_completa.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ config(
"granularity":"day"
},
unique_key=['id_viagem'],
incremental_strategy='insert_overwrite'
incremental_strategy='insert_overwrite',
labels = {'dashboard': 'yes'}
)
}}

Expand Down
Loading

0 comments on commit 00f38e2

Please sign in to comment.