Skip to content

Commit

Permalink
Merge branch 'main' into staging/apuracao-faixa-horaria-subsidio
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 18, 2024
2 parents 4699fcc + 0358a86 commit 6c42461
Show file tree
Hide file tree
Showing 52 changed files with 1,295 additions and 160 deletions.
2 changes: 1 addition & 1 deletion pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class constants(Enum): # pylint: disable=c0103
id > {last_id} AND id <= {max_id}
""",
"page_size": 1000,
"max_pages": 100,
"max_pages": 70,
},
"primary_key": ["id"],
"interval_minutes": 5,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog - br_rj_riodejaneiro_bilhetagem

## [1.2.0] - 2024-07-17

### Alterado

- Ativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)
- Adequa parâmetro exclude do flow `bilhetagem_validacao_jae` para as novas queries de validação (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)

## [1.1.0] - 2024-06-13

### Adicionado
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ class constants(Enum): # pylint: disable=c0103
BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS = {
"dataset_id": "validacao_dados_jae",
"upstream": True,
"exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao \
+ordem_pagamento_dia +integracao +servicos",
"exclude": "gtfs +transacao +integracao",
"dbt_vars": {
"run_date": {},
"version": {},
Expand Down
15 changes: 10 additions & 5 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2024-07-18
"""

from copy import deepcopy
Expand Down Expand Up @@ -34,6 +36,7 @@
from pipelines.schedules import (
every_5_minutes,
every_day_hour_five,
every_day_hour_seven,
every_hour,
every_minute,
)
Expand Down Expand Up @@ -138,12 +141,14 @@
| smtr_constants.BILHETAGEM_TRACKING_CAPTURE_PARAMS.value,
)

bilhetagem_tracking_captura.state_handlers.append(
[handler_inject_bd_credentials, handler_initialize_sentry, handler_skip_if_running]
)
bilhetagem_tracking_captura.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
handler_skip_if_running,
]


# bilhetagem_tracking_captura.schedule = every_5_minutes
bilhetagem_tracking_captura.schedule = every_5_minutes

# BILHETAGEM RESSARCIMENTO - SUBFLOW PARA RODAR DIARIAMENTE #

Expand Down Expand Up @@ -346,7 +351,7 @@
handler_skip_if_running,
]

# bilhetagem_validacao_jae.schedule = every_day_hour_seven
bilhetagem_validacao_jae.schedule = every_day_hour_seven


# RECAPTURA #
Expand Down
6 changes: 6 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - gtfs

## [1.0.3] - 2024-07-04

## Corrigido

- Corrigido o formato da data salva no redis de d/m/y para y-m-d (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/91)

## [1.0.2] - 2024-06-21

### Adicionado
Expand Down
2 changes: 2 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""
Flows for gtfs
DBT: 2024-07-15
"""

from prefect import Parameter, case, task
Expand Down
23 changes: 17 additions & 6 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

import openpyxl as xl
import pandas as pd
from google.oauth2 import service_account
from googleapiclient.discovery import build
from prefect import task
Expand Down Expand Up @@ -47,6 +48,15 @@ def get_last_capture_os(dataset_id: str, mode: str = "prod") -> dict:
if last_captured_os is not None:
last_captured_os = last_captured_os["last_captured_os"]

# verifica se last_capture_os tem formado dia/mes/ano_index
if last_captured_os is not None:
if "/" in last_captured_os:
index = last_captured_os.split("_")[1]
data = datetime.strptime(last_captured_os.split("_")[0], "%d/%m/%Y").strftime(
"%Y-%m-%d"
)
last_captured_os = data + "_" + index

log(f"Last captured os: {last_captured_os}")

return last_captured_os
Expand Down Expand Up @@ -99,15 +109,20 @@ def get_os_info(last_captured_os: str) -> dict:

df = filter_valid_rows(df)

# converte "Início da Vigência da OS" de dd/mm/aaaa para aaaa-mm-dd
df["Início da Vigência da OS"] = pd.to_datetime(
df["Início da Vigência da OS"], format="%d/%m/%Y"
).dt.strftime("%Y-%m-%d")

df["data_index"] = df["Início da Vigência da OS"].astype(str) + "_" + df["index"].astype(str)

# Ordena por despacho
# Ordena por data e index
df = df.sort_values(by=["data_index"], ascending=True)
if last_captured_os is None:
last_captured_os = df["data_index"].max()
df = df.loc[(df["data_index"] == last_captured_os)]
else:
# Filtra linhas onde 'Despacho' é maior que o último capturado
# Filtra linhas onde 'data_index' é maior que o último capturado
df = df.loc[(df["data_index"] > last_captured_os)]

log(f"Os info: {df.head()}")
Expand All @@ -116,10 +131,6 @@ def get_os_info(last_captured_os: str) -> dict:
data = df.to_dict(orient="records")[0] # Converte o DataFrame para um dicionário
flag_new_os = True # Se houver mais de uma OS, é uma nova OS

# converte "Início da Vigência da OS" de dd/mm/aaaa para aaaa-mm-dd
data["Início da Vigência da OS"] = datetime.strptime(
data["Início da Vigência da OS"], "%d/%m/%Y"
).strftime("%Y-%m-%d")
log(f"OS selecionada: {data}")
return flag_new_os, data, data["data_index"], data["Início da Vigência da OS"]

Expand Down
2 changes: 2 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_viagem_zirix
DBT: 2024-07-17
"""
from copy import deepcopy

Expand Down
6 changes: 6 additions & 0 deletions pipelines/migration/projeto_subsidio_sppo/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog - projeto_subsidio_sppo

## [1.0.1] - 2024-07-17

### Alterado

- Alterado o schedule do flow `SMTR: Subsídio SPPO Apuração - Tratamento` de 07:00 para 07:05 (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/106)

## [1.0.0] - 2024-06-28

### Alterado
Expand Down
4 changes: 2 additions & 2 deletions pipelines/migration/projeto_subsidio_sppo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
run_dbt_model,
)
from pipelines.migration.veiculo.flows import sppo_veiculo_dia
from pipelines.schedules import every_day_hour_five, every_day_hour_seven
from pipelines.schedules import every_day_hour_five, every_day_hour_seven_minute_five

# EMD Imports #

Expand Down Expand Up @@ -268,4 +268,4 @@
image=smtr_constants.DOCKER_IMAGE.value, labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value]
)
subsidio_sppo_apuracao.state_handlers = [handler_initialize_sentry, handler_inject_bd_credentials]
subsidio_sppo_apuracao.schedule = every_day_hour_seven
subsidio_sppo_apuracao.schedule = every_day_hour_seven_minute_five
12 changes: 12 additions & 0 deletions pipelines/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ def generate_interval_schedule(
]
)

every_day_hour_seven_minute_five = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2022, 11, 30, 7, 5, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)

every_dayofmonth_one_and_sixteen = Schedule(
clocks=[
CronClock(
Expand Down
18 changes: 12 additions & 6 deletions queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ models:
br_rj_riodejaneiro_stu:
+materialized: view
+schema: br_rj_riodejaneiro_stu
cadastro_staging:
+schema: cadastro_staging
cadastro:
+schema: cadastro
staging:
+schema: cadastro_staging
br_rj_riodejaneiro_recursos:
+materialized: incremental
+schema: br_rj_riodejaneiro_recursos
Expand All @@ -252,13 +252,13 @@ models:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: gtfs
indicadores_continuados_egp_staging:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: indicadores_continuados_egp_staging
indicadores_continuados_egp:
+materialized: view
+schema: indicadores_continuados_egp
staging:
+materialized: incremental
+incremental_strategy: insert_overwrite
+schema: indicadores_continuados_egp_staging
projeto_subsidio_sppo_encontro_contas:
+materialized: table
+schema: projeto_subsidio_sppo_encontro_contas
Expand All @@ -280,3 +280,9 @@ models:
staging:
+materialized: view
+schema: br_rj_riodejaneiro_viagem_zirix_staging
validacao_dados_jae:
+materialized: incremental
+schema: validacao_dados_jae
staging:
+materialized: incremental
+schema: validacao_dados_jae_staging
13 changes: 13 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog - bilhetagem

## [2.1.3] - 2024-07-18

### Adicionado
- Adiciona transações de Van no modelo `passageiros_hora.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/110)

### Alterado
- Define o tipo_gratuidade de transações do tipo `Gratuidade` que o cliente não foi encontrado na tabela `br_rj_riodejaneiro_bilhetagem_staging.aux_gratuidade` como `Não Identificado` no modelo `transacao.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/110)

## [2.1.2] - 2024-07-05

### Adicionado
- Adiciona coluna `versao_app` nos modelos `gps_validador_aux.sql`, `gps_validador.sql` e `gps_validador_van.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/96)

## [2.1.1] - 2024-06-19

### Corrigido
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SELECT
sentido,
estado_equipamento,
temperatura,
versao_app,
'{{ var("version") }}' as versao
FROM
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SELECT
sentido,
estado_equipamento,
temperatura,
versao_app,
'{{ var("version") }}' as versao
FROM
(
Expand Down
4 changes: 4 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ models:
description: "Validador aberto ou fechado no momento da transmissão"
- name: temperatura
description: "Temperatura do local, medida pelo sensor do validador (ºC)"
- name: versao_app
description: "Versão do Software do validador"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
- name: gps_validador_van
Expand Down Expand Up @@ -234,6 +236,8 @@ models:
description: "Validador aberto ou fechado no momento da transmissão"
- name: temperatura
description: "Temperatura do local, medida pelo sensor do validador (ºC)"
- name: versao_app
description: "Versão do Software do validador"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
- name: passageiros_hora
Expand Down
5 changes: 4 additions & 1 deletion queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ SELECT
WHEN t.tipo_transacao IN ("Débito", "Botoeira") THEN "Integral"
ELSE t.tipo_transacao
END AS tipo_transacao_smtr,
t.tipo_gratuidade,
CASE
WHEN t.tipo_transacao = "Gratuidade" AND t.tipo_gratuidade IS NULL THEN "Não Identificado"
ELSE t.tipo_gratuidade
END AS tipo_gratuidade,
t.id_tipo_integracao,
t.id_integracao,
t.latitude,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ SELECT
g.longitude_equipamento AS longitude,
INITCAP(g.sentido_linha) AS sentido,
g.estado_equipamento,
g.temperatura
g.temperatura,
g.versao_app
FROM
{{ ref("staging_gps_validador") }} g
LEFT JOIN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,11 @@ WHERE
{% endif %}
AND id_servico_jae NOT IN ("140", "142")
AND id_operadora != "2"
AND (modo = "BRT" OR (modo = "VLT" AND data >= DATE("2024-02-24")) OR (modo = "Ônibus" AND data >= DATE("2024-04-19")))
AND (
modo = "BRT"
OR (modo = "VLT" AND data >= DATE("2024-02-24"))
OR (modo = "Ônibus" AND data >= DATE("2024-04-19"))
OR (modo = "Van" AND consorcio = "STPC" AND data >= DATE("2024-07-01"))
OR (modo = "Van" AND consorcio = "STPL" AND data >= DATE("2024-07-15"))
)
AND tipo_transacao IS NOT NULL
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SELECT
SAFE_CAST(JSON_VALUE(content, '$.qtd_venda_botao') AS FLOAT64) AS qtd_venda_botao,
SAFE_CAST(JSON_VALUE(content, '$.sentido_linha') AS STRING) AS sentido_linha,
SAFE_CAST(JSON_VALUE(content, '$.tarifa_linha') AS FLOAT64) AS tarifa_linha,
SAFE_CAST(JSON_VALUE(content, '$.versao_app') AS FLOAT64) AS versao_app,
SAFE_CAST(JSON_VALUE(content, '$.versao_app') AS STRING) AS versao_app,
SAFE_CAST(JSON_VALUE(content, '$.temperatura') AS FLOAT64) AS temperatura
FROM
{{ source('br_rj_riodejaneiro_bilhetagem_staging', 'gps_validador') }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
{{ incremental_filter }}
{% endset %}

{% set partitions = run_query(partitions_query)[0].values() %}
{% set partitions = run_query(partitions_query).columns[0].values() %}
{% endif %}
{% endif %}

Expand Down
14 changes: 14 additions & 0 deletions queries/models/cadastro/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog - cadastro

## [1.2.0] - 2024-07-17

### Adicionado

- Cria modelos auxiliares para a tabela de servicos:
- `aux_routes_vigencia_gtfs.sql`
- `aux_stops_vigencia_gtfs.sql`
- `aux_servicos_gtfs.sql`

### Alterado

- Altera estrutura do modelo `servicos.sql` para adicionar datas de inicio e fim de vigência (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)
- Altera filtro no modelo `operadoras.sql`, deixando de filtrar operadores do modo `Fretamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98)

## [1.1.1] - 2024-04-25

### Alterado
Expand Down
2 changes: 1 addition & 1 deletion queries/models/cadastro/operadoras.sql
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,4 @@ SELECT
FROM
cadastro
WHERE
modo NOT IN ("Escolar", "Táxi", "TEC", "Fretamento")
modo NOT IN ("Escolar", "Táxi", "TEC")
Loading

0 comments on commit 6c42461

Please sign in to comment.