Skip to content

Commit

Permalink
Cria tabela com integrações da Jaé calculadas (#157)
Browse files Browse the repository at this point in the history
* cria query integracao

* correcoes

* filtra integracoes

* corrige id_integracao

* add integracao_nao_realizada

* registra flows

* add changelog

* corrige integracao_nao_realizada

* prepara para prod

* remove coluna não utilizada

* altera filtro

* ajustes sintaxe

* add docs

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
pixuimpou and mergify[bot] authored Aug 21, 2024
1 parent 491d7c6 commit c910e80
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Flows for br_rj_riodejaneiro_bilhetagem
DBT: 2024-08-09
DBT: 2024-08-21
"""

from copy import deepcopy
Expand Down
5 changes: 5 additions & 0 deletions queries/models/validacao_dados_jae/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - validacao_dados_jae

## [1.1.0] - 2024-08-21

### Adicionado
- Cria modelo `integracao_nao_realizada.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/157)

## [1.0.0] - 2024-07-17

### Adicionado
Expand Down
248 changes: 248 additions & 0 deletions queries/models/validacao_dados_jae/integracao_nao_realizada.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
{{
config(
materilized="incremental",
incremental_strategy="insert_overwrite",
partition_by={
"field": "data",
"data_type": "date",
"granularity": "day"
},
)
}}

{% set transacao_table = ref('transacao') %}
{% if execute %}
{% if is_incremental() %}

{% set partitions_query %}
SELECT
CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data
FROM
`{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
-- `rj-smtr.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = "{{ transacao_table.identifier }}"
AND partition_id != "__NULL__"
AND DATE(last_modified_time, "America/Sao_Paulo") BETWEEN DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 1 DAY) AND DATE("{{var('run_date')}}")
{% endset %}

{{ log("Running query: \n"~partitions_query, info=True) }}
{% set partitions = run_query(partitions_query) %}

{% set partition_list = partitions.columns[0].values() %}
{{ log("transacao partitions: \n"~partition_list, info=True) }}
{% endif %}
{% endif %}

{% set max_transactions = var('quantidade_integracoes_max') %} -- Número máximo de pernas em uma integração
{% set pivot_columns = ["datetime_transacao", "id_transacao", "modo", "servico_sentido"] %}
{% set transaction_date_filter %}
{% if partition_list|length > 0 %}
{% for p in partition_list %}
(
data BETWEEN DATE_SUB(DATE({{ p }}), INTERVAL 1 DAY) AND DATE_ADD(DATE({{ p }}), INTERVAL 1 DAY)
AND
(
DATE(datetime_transacao) = DATE({{ p }})
OR DATE(DATETIME_SUB(datetime_transacao, INTERVAL 180 MINUTE)) = DATE({{ p }})
OR DATE(DATETIME_ADD(datetime_transacao, INTERVAL 180 MINUTE)) = DATE({{ p }})
)
)
{% if not loop.last %}OR{% endif %}
{% endfor %}
{% else %}
data = "2000-01-01"
{% endif %}
{% endset %}

WITH matriz AS (
SELECT
string_agg(modo order by sequencia_integracao) AS sequencia_valida
FROM
{{ ref("matriz_integracao") }}
-- `rj-smtr.br_rj_riodejaneiro_bilhetagem.matriz_integracao`
group by id_matriz_integracao
),
transacao AS (
SELECT
id_cliente,
{% for column in pivot_columns %}
{% if column == "servico_sentido" %}
CONCAT(id_servico_jae, '_', sentido) AS servico_sentido,
{% else %}
{{ column }},
{% endif %}
{% endfor %}
FROM
{{ ref("transacao") }}
-- `rj-smtr.br_rj_riodejaneiro_bilhetagem.transacao`
WHERE
data < CURRENT_DATE("America/Sao_Paulo")
{% if is_incremental() %}
AND
{{ transaction_date_filter }}
{% endif %}
),
transacao_agrupada AS (
SELECT
id_cliente,
-- Cria o conjunto de colunas para a transação atual e as 4 próximas transações do cliente
{% for column in pivot_columns %}
{% for transaction_number in range(max_transactions) %}
{% if loop.first %}
{{ column }} AS {{ column }}_{{ transaction_number }},
{% else %}
LEAD({{ column }}, {{ loop.index0 }}) OVER (PARTITION BY id_cliente ORDER BY datetime_transacao) AS {{ column }}_{{ transaction_number }},
{% endif %}
{% endfor %}
{% endfor %}
FROM
transacao
),
integracao_possivel AS (
SELECT
*,
{% set modos = ["modo_0"] %}
{% set servicos = ["servico_sentido_0"] %}
{% for transaction_number in range(1, max_transactions) %}
{% do modos.append("modo_"~transaction_number) %}
(
DATETIME_DIFF(datetime_transacao_{{ transaction_number }}, datetime_transacao_0, MINUTE) <= 180
AND CONCAT({{ modos|join(", ',', ") }}) IN (SELECT sequencia_valida FROM matriz)
{% if loop.first %}
AND servico_sentido_{{ transaction_number }} != servico_sentido_0
{% else %}
AND servico_sentido_{{ transaction_number }} NOT IN ({{ servicos|join(", ',', ") }})
{% endif %}
) AS indicador_integracao_{{ transaction_number }},
{% do servicos.append("servico_sentido_"~transaction_number) %}

{% endfor %}
FROM
transacao_agrupada
WHERE
id_transacao_1 IS NOT NULL
),
transacao_filtrada AS (
SELECT
id_cliente,
{% for column in pivot_columns %}
{% for transaction_number in range(max_transactions) %}
{% if transaction_number < 2 %}
{{ column }}_{{ transaction_number }},
{% else %}
CASE
WHEN
indicador_integracao_{{ transaction_number }} THEN {{ column }}_{{ transaction_number }}
END AS {{ column }}_{{ transaction_number }},
{% endif %}
{% endfor %}
{% endfor %}
indicador_integracao_1,
indicador_integracao_2,
indicador_integracao_3,
indicador_integracao_4
FROM
integracao_possivel
WHERE
indicador_integracao_1
),
transacao_listada AS (
SELECT
*,
ARRAY_TO_STRING(
[
{% for i in range(1, max_transactions) %}
id_transacao_{{ i }} {% if not loop.last %},{% endif %}
{% endfor %}
],
", "
) AS transacoes
FROM
transacao_filtrada
),
{% for i in range(max_transactions - 1) %}
validacao_integracao_{{ max_transactions - i }}_pernas AS (
SELECT
(
id_transacao_{{ max_transactions - i - 1 }} IS NOT NULL
{% if not loop.first %}
AND id_transacao_{{ max_transactions - i }} IS NULL
{% endif %}
AND id_transacao_0 IN UNNEST(
SPLIT(
STRING_AGG(transacoes, ", ") OVER (PARTITION BY id_cliente ORDER BY datetime_transacao_0 ROWS BETWEEN 5 PRECEDING AND 1 PRECEDING),
', ')
)
) AS remover_{{ max_transactions - i }},
*
FROM
{% if loop.first %}
transacao_listada
{% else %}
validacao_integracao_{{ max_transactions - i + 1 }}_pernas
WHERE
NOT remover_{{ max_transactions - i + 1 }}
{% endif %}
),
{% endfor %}
integracoes_validas AS (
SELECT
DATE(datetime_transacao_0) AS data,
id_transacao_0 AS id_integracao,
*
FROM
validacao_integracao_2_pernas
WHERE
NOT remover_2
),
melted AS (
SELECT
data,
id_integracao,
sequencia_integracao,
datetime_transacao,
id_transacao,
modo,
SPLIT(servico_sentido, '_')[0] AS id_servico_jae,
SPLIT(servico_sentido, '_')[1] AS sentido
FROM
integracoes_validas,
UNNEST(
[
{% for transaction_number in range(max_transactions) %}
STRUCT(
{% for column in pivot_columns %}
{{ column }}_{{ transaction_number }} AS {{ column }},
{% endfor %}
{{ transaction_number }} AS sequencia_integracao
){%if not loop.last %},{% endif %}
{% endfor %}
]
)
),
integracao_nao_realizada AS (
SELECT DISTINCT
id_integracao
FROM
melted
WHERE
id_transacao NOT IN (
SELECT
id_transacao
FROM
{{ ref("integracao") }}
-- `rj-smtr.br_rj_riodejaneiro_bilhetagem.integracao`
{% if is_incremental() %}
WHERE
{{ transaction_date_filter }}
{% endif %}
)
)
SELECT
*,
'{{ var("version") }}' as versao
FROM
melted
WHERE
id_integracao IN (SELECT id_integracao FROM integracao_nao_realizada)
27 changes: 24 additions & 3 deletions queries/models/validacao_dados_jae/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ models:
- name: indicador_intervalo_transacao_suspeito
description: "Indica se existe algum intervalo entre transações menor que 5 minutos dentro da integração"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
description: "{{ doc('versao') }}"
- name: transacao_invalida
description: "Tabela para validação dos dados de transação da Jaé"
columns:
Expand Down Expand Up @@ -72,7 +72,7 @@ models:
- name: indicador_servico_fora_vigencia
description: "Indica se a transação foi feita em um serviço fora de vigência"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
description: "{{ doc('versao') }}"
- name: ordem_pagamento_invalida
description: "Tabela para validação dos dados da ordem de pagamento da Jaé"
columns:
Expand All @@ -91,4 +91,25 @@ models:
- name: indicador_agregacao_dia_invalida
description: "Indica se os valores e quantidades da tabela ordem_pagamento_consorcio_dia agregada por dia está diferente das informações da tabela ordem_pagamento_dia"
- name: versao
description: "Código de controle de versão do dado (SHA Github)"
description: "{{ doc('versao') }}"
- name: integracao_nao_realizada
description: "Tabela contendo integrações que deveriam ter ocorrido mas não estão listadas na base da Jaé"
columns:
- name: data
description: "Data da transação (partição)."
- name: id_integracao
description: "Identificador único da integração (é igual ao id_transacao da primeira transação da integração)."
- name: sequencia_integracao
description: "Sequência da transação dentro da integração"
- name: datetime_transacao
description: "Data e hora da transação em GMT-3"
- name: id_transacao
description: "Identificador único da transação"
- name: modo
description: "Tipo de transporte (Ônibus, Van, BRT)"
- name: id_servico_jae
description: "Identificador da linha no banco de dados da jaé (É possível cruzar os dados com a tabela rj-smtr.cadastro.servicos usando a coluna id_servico_jae)"
- name: sentido
description: "Sentido de operação do serviço (0 = ida, 1 = volta)"
- name: versao
description: "{{ doc('versao') }}"

0 comments on commit c910e80

Please sign in to comment.