From 490badc2314e6ce6463ad1a66ca0bd2ae77c9781 Mon Sep 17 00:00:00 2001 From: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> Date: Wed, 17 Jul 2024 14:18:12 -0300 Subject: [PATCH] =?UTF-8?q?Cria=20tabelas=20de=20valida=C3=A7=C3=A3o=20da?= =?UTF-8?q?=20Ja=C3=A9=20com=20verifica=C3=A7=C3=A3o=20no=20GTFS=20/=20Rem?= =?UTF-8?q?odela=20tabela=20cadastro.servicos=20(#98)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cria validação da jae com o gtfs * validacao ordem pagamento staging * adiciona operadoras de fretamento * altera aux_servicos_gtfs para ephemeral * altera filtro incremental da integracao_invalida * altera filtro incremental da transacao_invalida * corrige aux_transacao_ordem * corrige validacao ordem_pagamento por servico * cria modelo ordem_pagamento_dia_invalida * cria validacao ordem_pagamento final * altera schema validacao_dados_jae * agenda pipeline de materializacao das validacoes * corrige query viagem_informada * add changelog * adiciona prefixo data nas colunas de vigencia * corrige nome das colunas de vigencia * altera schema da tabela de servicos * add data dbt * corrige typo * altera cast para safe_cast --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../CHANGELOG.md | 7 + .../constants.py | 3 +- .../br_rj_riodejaneiro_bilhetagem/flows.py | 5 +- .../br_rj_riodejaneiro_viagem_zirix/flows.py | 2 + queries/dbt_project.yml | 10 +- .../viagem_informada.sql | 2 +- queries/models/cadastro/CHANGELOG.md | 14 ++ queries/models/cadastro/operadoras.sql | 2 +- queries/models/cadastro/schema.yml | 8 +- queries/models/cadastro/servicos.sql | 5 +- .../staging/aux_routes_vigencia_gtfs.sql | 70 +++++++++ .../cadastro/staging/aux_servicos_gtfs.sql | 34 +++++ .../staging/aux_stops_vigencia_gtfs.sql | 76 ++++++++++ .../cadastro_staging/servicos_gtfs_aux.sql | 97 ------------- .../models/validacao_dados_jae/CHANGELOG.md | 14 ++ .../integracao_invalida.sql | 85 +++++++++++ .../ordem_pagamento_invalida.sql | 98 +++++++++++++ queries/models/validacao_dados_jae/schema.yml | 94 ++++++++++++ .../staging/aux_transacao_ordem.sql | 101 +++++++++++++ ...ordem_pagamento_consorcio_dia_invalida.sql | 71 +++++++++ ...amento_consorcio_operador_dia_invalida.sql | 76 ++++++++++ .../staging/ordem_pagamento_dia_invalida.sql | 62 ++++++++ ...agamento_servico_operador_dia_invalida.sql | 135 ++++++++++++++++++ .../transacao_invalida.sql | 112 +++++++++++++++ 24 files changed, 1075 insertions(+), 108 deletions(-) create mode 100644 queries/models/cadastro/staging/aux_routes_vigencia_gtfs.sql create mode 100644 queries/models/cadastro/staging/aux_servicos_gtfs.sql create mode 100644 queries/models/cadastro/staging/aux_stops_vigencia_gtfs.sql delete mode 100644 queries/models/cadastro_staging/servicos_gtfs_aux.sql create mode 100644 queries/models/validacao_dados_jae/CHANGELOG.md create mode 100644 queries/models/validacao_dados_jae/integracao_invalida.sql create mode 100644 queries/models/validacao_dados_jae/ordem_pagamento_invalida.sql create mode 100644 queries/models/validacao_dados_jae/schema.yml create mode 100644 queries/models/validacao_dados_jae/staging/aux_transacao_ordem.sql create mode 100644 queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_dia_invalida.sql create mode 100644 queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_operador_dia_invalida.sql create mode 100644 queries/models/validacao_dados_jae/staging/ordem_pagamento_dia_invalida.sql create mode 100644 queries/models/validacao_dados_jae/staging/ordem_pagamento_servico_operador_dia_invalida.sql create mode 100644 queries/models/validacao_dados_jae/transacao_invalida.sql diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index ccb55366..50339ec9 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog - br_rj_riodejaneiro_bilhetagem +## [1.2.0] - 2024-07-17 + +### Alterado + +- Ativa schedule do flow `bilhetagem_validacao_jae` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98) +- Adequa parâmetro exclude do flow `bilhetagem_validacao_jae` para as novas queries de validação (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98) + ## [1.1.0] - 2024-06-13 ### Adicionado diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py index 5b14276d..34f67e72 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py @@ -521,8 +521,7 @@ class constants(Enum): # pylint: disable=c0103 BILHETAGEM_MATERIALIZACAO_VALIDACAO_JAE_PARAMS = { "dataset_id": "validacao_dados_jae", "upstream": True, - "exclude": "+gps_sppo +sppo_veiculo_dia +gps_validador +transacao \ -+ordem_pagamento_dia +integracao +servicos", + "exclude": "gtfs +transacao +integracao", "dbt_vars": { "run_date": {}, "version": {}, diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py index 02472ae2..6722eeda 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py @@ -2,7 +2,7 @@ """ Flows for br_rj_riodejaneiro_bilhetagem -DBT: 2024-07-05 +DBT: 2024-07-17 """ from copy import deepcopy @@ -36,6 +36,7 @@ from pipelines.schedules import ( every_5_minutes, every_day_hour_five, + every_day_hour_seven, every_hour, every_minute, ) @@ -350,7 +351,7 @@ handler_skip_if_running, ] -# bilhetagem_validacao_jae.schedule = every_day_hour_seven +bilhetagem_validacao_jae.schedule = every_day_hour_seven # RECAPTURA # diff --git a/pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py b/pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py index 7c9fe288..3a919b1b 100644 --- a/pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_viagem_zirix/flows.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- """ Flows for br_rj_riodejaneiro_viagem_zirix + +DBT: 2024-07-17 """ from copy import deepcopy diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 2b583341..4ae666c1 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -238,10 +238,10 @@ models: br_rj_riodejaneiro_stu: +materialized: view +schema: br_rj_riodejaneiro_stu - cadastro_staging: - +schema: cadastro_staging cadastro: +schema: cadastro + staging: + +schema: cadastro_staging br_rj_riodejaneiro_recursos: +materialized: incremental +schema: br_rj_riodejaneiro_recursos @@ -280,3 +280,9 @@ models: staging: +materialized: view +schema: br_rj_riodejaneiro_viagem_zirix_staging + validacao_dados_jae: + +materialized: incremental + +schema: validacao_dados_jae + staging: + +materialized: incremental + +schema: validacao_dados_jae_staging \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_viagem_zirix/viagem_informada.sql b/queries/models/br_rj_riodejaneiro_viagem_zirix/viagem_informada.sql index 9742f93c..71b55c88 100644 --- a/queries/models/br_rj_riodejaneiro_viagem_zirix/viagem_informada.sql +++ b/queries/models/br_rj_riodejaneiro_viagem_zirix/viagem_informada.sql @@ -26,7 +26,7 @@ {{ incremental_filter }} {% endset %} - {% set partitions = run_query(partitions_query)[0].values() %} + {% set partitions = run_query(partitions_query).columns[0].values() %} {% endif %} {% endif %} diff --git a/queries/models/cadastro/CHANGELOG.md b/queries/models/cadastro/CHANGELOG.md index ab71b37c..49cc575a 100644 --- a/queries/models/cadastro/CHANGELOG.md +++ b/queries/models/cadastro/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog - cadastro +## [1.2.0] - 2024-07-17 + +### Adicionado + +- Cria modelos auxiliares para a tabela de servicos: + - `aux_routes_vigencia_gtfs.sql` + - `aux_stops_vigencia_gtfs.sql` + - `aux_servicos_gtfs.sql` + +### Alterado + +- Altera estrutura do modelo `servicos.sql` para adicionar datas de inicio e fim de vigência (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98) +- Altera filtro no modelo `operadoras.sql`, deixando de filtrar operadores do modo `Fretamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98) + ## [1.1.1] - 2024-04-25 ### Alterado diff --git a/queries/models/cadastro/operadoras.sql b/queries/models/cadastro/operadoras.sql index d39917c4..65dcdd79 100644 --- a/queries/models/cadastro/operadoras.sql +++ b/queries/models/cadastro/operadoras.sql @@ -148,4 +148,4 @@ SELECT FROM cadastro WHERE - modo NOT IN ("Escolar", "Táxi", "TEC", "Fretamento") \ No newline at end of file + modo NOT IN ("Escolar", "Táxi", "TEC") \ No newline at end of file diff --git a/queries/models/cadastro/schema.yml b/queries/models/cadastro/schema.yml index cf0fc437..78abbe79 100644 --- a/queries/models/cadastro/schema.yml +++ b/queries/models/cadastro/schema.yml @@ -124,4 +124,10 @@ models: - name: tabela_origem_gtfs description: "Tabela do GTFS de onde foram coletadas as informações (stops ou routes)" - name: feed_start_date_gtfs - description: "Último feed_start_date do GTFS em que as informações do serviço estavam disponíveis" \ No newline at end of file + description: "Último feed_start_date do GTFS em que as informações do serviço estavam disponíveis" + - name: data_inicio_vigencia + description: "Data de inicio da vigência do serviço segundo o GTFS ou o cadastro do serviço no sistema da Jaé (caso não exista no GTFS)" + - name: data_fim_vigencia + description: "Data de fim da vigência do serviço segundo o GTFS" + - name: versao + description: "Código de controle de versão do dado (SHA Github)" \ No newline at end of file diff --git a/queries/models/cadastro/servicos.sql b/queries/models/cadastro/servicos.sql index 65b7596a..667125fe 100644 --- a/queries/models/cadastro/servicos.sql +++ b/queries/models/cadastro/servicos.sql @@ -16,11 +16,12 @@ SELECT g.latitude, g.longitude, g.tabela_origem_gtfs, - g.feed_start_date AS feed_start_date_gtfs, + COALESCE(g.inicio_vigencia, DATE(j.datetime_inclusao)) AS data_inicio_vigencia, + g.fim_vigencia AS data_fim_vigencia, '{{ var("version") }}' as versao FROM {{ ref("staging_linha") }} j FULL OUTER JOIN - {{ ref("servicos_gtfs_aux") }} g + {{ ref("aux_servicos_gtfs") }} g ON COALESCE(j.gtfs_route_id, j.gtfs_stop_id) = g.id_servico \ No newline at end of file diff --git a/queries/models/cadastro/staging/aux_routes_vigencia_gtfs.sql b/queries/models/cadastro/staging/aux_routes_vigencia_gtfs.sql new file mode 100644 index 00000000..2398d56f --- /dev/null +++ b/queries/models/cadastro/staging/aux_routes_vigencia_gtfs.sql @@ -0,0 +1,70 @@ +{{ + config( + materialized="ephemeral", + ) +}} + +WITH routes_rn AS ( + SELECT + route_id AS id_servico, + route_short_name AS servico, + route_long_name AS descricao_servico, + feed_start_date AS inicio_vigencia, + feed_end_date AS fim_vigencia, + LAG(feed_end_date) OVER (PARTITION BY route_id ORDER BY feed_start_date) AS feed_end_date_anterior, + ROW_NUMBER() OVER (PARTITION BY route_id ORDER BY feed_start_date DESC) AS rn + FROM + {{ ref("routes_gtfs") }} +), +routes_agrupada AS ( + SELECT + id_servico, + inicio_vigencia, + servico, + descricao_servico, + IFNULL(fim_vigencia, CURRENT_DATE("America/Sao_Paulo")) as fim_vigencia, + SUM( + CASE + WHEN feed_end_date_anterior IS NULL OR feed_end_date_anterior <> DATE_SUB(inicio_vigencia, INTERVAL 1 DAY) THEN 1 + ELSE 0 + END + ) OVER (PARTITION BY id_servico ORDER BY inicio_vigencia) AS group_id + FROM + routes_rn +), +vigencia AS ( + SELECT + id_servico, + MIN(inicio_vigencia) AS inicio_vigencia, + MAX(fim_vigencia) AS fim_vigencia + FROM + routes_agrupada + GROUP BY + id_servico, + group_id +) +SELECT + id_servico, + r.servico, + r.descricao_servico, + NULL AS latitude, + NULL AS longitude, + v.inicio_vigencia, + CASE + WHEN v.fim_vigencia != CURRENT_DATE("America/Sao_Paulo") THEN v.fim_vigencia + END AS fim_vigencia, + 'routes' AS tabela_origem_gtfs, +FROM + vigencia v +JOIN +( + SELECT + id_servico, + servico, + descricao_servico + FROM + routes_rn + WHERE + rn = 1 +) r +USING(id_servico) diff --git a/queries/models/cadastro/staging/aux_servicos_gtfs.sql b/queries/models/cadastro/staging/aux_servicos_gtfs.sql new file mode 100644 index 00000000..3e562961 --- /dev/null +++ b/queries/models/cadastro/staging/aux_servicos_gtfs.sql @@ -0,0 +1,34 @@ +{{ + config( + materialized="ephemeral", + ) +}} + +SELECT + id_servico, + servico, + descricao_servico, + latitude, + longitude, + inicio_vigencia, + fim_vigencia, + tabela_origem_gtfs, + '{{ var("version") }}' as versao +FROM + {{ ref('aux_routes_vigencia_gtfs') }} + +UNION ALL + +SELECT + id_servico, + servico, + descricao_servico, + latitude, + longitude, + inicio_vigencia, + fim_vigencia, + tabela_origem_gtfs, + '{{ var("version") }}' as versao +FROM + {{ ref('aux_stops_vigencia_gtfs') }} + diff --git a/queries/models/cadastro/staging/aux_stops_vigencia_gtfs.sql b/queries/models/cadastro/staging/aux_stops_vigencia_gtfs.sql new file mode 100644 index 00000000..67ac97ae --- /dev/null +++ b/queries/models/cadastro/staging/aux_stops_vigencia_gtfs.sql @@ -0,0 +1,76 @@ +{{ + config( + materialized="ephemeral", + ) +}} + +WITH stops_rn AS ( + SELECT + stop_id AS id_servico, + stop_code AS servico, + stop_name AS descricao_servico, + stop_lat AS latitude, + stop_lon AS longitude, + feed_start_date AS inicio_vigencia, + feed_end_date AS fim_vigencia, + LAG(feed_end_date) OVER (PARTITION BY stop_id ORDER BY feed_start_date) AS feed_end_date_anterior, + ROW_NUMBER() OVER (PARTITION BY stop_id ORDER BY feed_start_date DESC) AS rn + FROM + {{ ref("stops_gtfs") }} + WHERE + location_type = '1' +), +stops_agrupada AS ( + SELECT + id_servico, + inicio_vigencia, + servico, + descricao_servico, + IFNULL(fim_vigencia, CURRENT_DATE("America/Sao_Paulo")) AS fim_vigencia, + SUM( + CASE + WHEN feed_end_date_anterior IS NULL OR feed_end_date_anterior <> DATE_SUB(inicio_vigencia, INTERVAL 1 DAY) THEN 1 + ELSE 0 + END + ) OVER (PARTITION BY id_servico ORDER BY inicio_vigencia) AS group_id + FROM + stops_rn +), +vigencia AS ( + SELECT + id_servico, + MIN(inicio_vigencia) AS inicio_vigencia, + MAX(fim_vigencia) AS fim_vigencia + FROM + stops_agrupada + GROUP BY + id_servico, + group_id +) +SELECT + id_servico, + r.servico, + r.descricao_servico, + r.latitude, + r.longitude, + v.inicio_vigencia, + CASE + WHEN v.fim_vigencia != CURRENT_DATE("America/Sao_Paulo") THEN v.fim_vigencia + END AS fim_vigencia, + 'stops' AS tabela_origem_gtfs, +FROM + vigencia v +JOIN +( + SELECT + id_servico, + servico, + descricao_servico, + latitude, + longitude + FROM + stops_rn + WHERE + rn = 1 +) r +USING(id_servico) diff --git a/queries/models/cadastro_staging/servicos_gtfs_aux.sql b/queries/models/cadastro_staging/servicos_gtfs_aux.sql deleted file mode 100644 index ba3bd015..00000000 --- a/queries/models/cadastro_staging/servicos_gtfs_aux.sql +++ /dev/null @@ -1,97 +0,0 @@ -{{ - config( - materialized="incremental", - unique_key="id_servico" - ) -}} - -{% set gtfs_partiton_table = ref('feed_info_gtfs') %} -{% if execute %} - {% if is_incremental() %} - {% set gtfs_partition_query %} - SELECT DISTINCT - MAX(feed_start_date) AS feed_start_date - FROM - {{ gtfs_partiton_table }} - {% endset %} - - {% set gtfs_last_partition = run_query(gtfs_partition_query).columns[0].values()[0] %} - {% endif %} -{% endif %} - - -WITH routes_gtfs AS ( - SELECT - * - FROM - ( - SELECT - route_id AS id_servico, - route_short_name AS servico, - route_long_name AS descricao_servico, - NULL AS latitude, - NULL AS longitude, - feed_start_date, - 'routes' AS tabela_origem_gtfs, - '{{ var("version") }}' as versao, - ROW_NUMBER() OVER (PARTITION BY route_id ORDER BY feed_start_date DESC) AS rn - FROM - {{ ref("routes_gtfs") }} - {% if is_incremental() %} - WHERE - feed_start_date = '{{ gtfs_last_partition }}' - {% endif %} - ) - WHERE rn = 1 -), -stops_gtfs AS ( - SELECT - * EXCEPT(rn) - FROM - ( - SELECT - stop_id AS id_servico, - stop_code AS servico, - stop_name AS descricao_servico, - stop_lat AS latitude, - stop_lon AS longitude, - feed_start_date, - 'stops' AS tabela_origem_gtfs, - '{{ var("version") }}' as versao, - ROW_NUMBER() OVER (PARTITION BY stop_id ORDER BY feed_start_date DESC) AS rn - FROM - {{ ref("stops_gtfs") }} - WHERE - location_type = '1' - {% if is_incremental() %} - AND feed_start_date = '{{ gtfs_last_partition }}' - {% endif %} - ) - WHERE rn = 1 -) -SELECT - id_servico, - servico, - descricao_servico, - latitude, - longitude, - feed_start_date, - tabela_origem_gtfs, - versao -FROM - routes_gtfs - -UNION ALL - -SELECT - id_servico, - servico, - descricao_servico, - latitude, - longitude, - feed_start_date, - tabela_origem_gtfs, - versao -FROM - stops_gtfs - diff --git a/queries/models/validacao_dados_jae/CHANGELOG.md b/queries/models/validacao_dados_jae/CHANGELOG.md new file mode 100644 index 00000000..b787bacd --- /dev/null +++ b/queries/models/validacao_dados_jae/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog - validacao_dados_jae + +## [1.0.0] - 2024-07-17 + +### Adicionado + - Cria modelos para validação dos dados da Jaé (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/98): + - `aux_transacao_ordem.sql` + - `ordem_pagamento_servico_operador_dia_invalida.sql` + - `ordem_pagamento_consorcio_operador_dia_invalida.sql` + - `ordem_pagamento_consorcio_dia_invalida.sql` + - `ordem_pagamento_dia_invalida.sql` + - `integracao_invalida.sql` + - `transacao_invalida.sql` + - `ordem_pagamento_invalida.sql` \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/integracao_invalida.sql b/queries/models/validacao_dados_jae/integracao_invalida.sql new file mode 100644 index 00000000..66afe05d --- /dev/null +++ b/queries/models/validacao_dados_jae/integracao_invalida.sql @@ -0,0 +1,85 @@ +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +{% set integracao_table = ref('integracao') %} +{% if execute %} + {% if is_incremental() %} + + {% set partitions_query %} + SELECT + CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data + FROM + `{{ integracao_table.database }}.{{ integracao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + WHERE + table_name = "{{ integracao_table.identifier }}" + AND partition_id != "__NULL__" + AND DATE(last_modified_time, "America/Sao_Paulo") = DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 1 DAY) + {% endset %} + + {{ log("Running query: \n"~partitions_query, info=True) }} + {% set partitions = run_query(partitions_query) %} + + {% set partition_list = partitions.columns[0].values() %} + {{ log("integracao partitions: \n"~partition_list, info=True) }} + {% endif %} +{% endif %} + +WITH sequencias_validas AS ( + SELECT + id_matriz_integracao, + STRING_AGG(modo, ', ' ORDER BY sequencia_integracao) AS modos + FROM + {{ ref("matriz_integracao") }} + GROUP BY + id_matriz_integracao +), +integracao_agg AS ( + SELECT + DATE(datetime_processamento_integracao) AS data, + id_integracao, + STRING_AGG(modo, ', ' ORDER BY sequencia_integracao) AS modos, + MIN(datetime_transacao) AS datetime_primeira_transacao, + MAX(datetime_transacao) AS datetime_ultima_transacao, + MIN(intervalo_integracao) AS menor_intervalo + FROM + {{ ref("integracao") }} + WHERE + {% if is_incremental() %} + {% if partition_list|length > 0 %} + data IN ({{ partition_list|join(', ') }}) + {% else %} + data = "2000-01-01" + {% endif %} + {% endif %} + GROUP BY + 1, + 2 +), +indicadores AS ( + SELECT + data, + id_integracao, + modos, + modos NOT IN (SELECT DISTINCT modos FROM sequencias_validas) AS indicador_fora_matriz, + TIMESTAMP_DIFF(datetime_ultima_transacao, datetime_primeira_transacao, MINUTE) > 180 AS indicador_tempo_integracao_invalido, + menor_intervalo < 5 AS indicador_intervalo_transacao_suspeito + FROM + integracao_agg +) +SELECT + *, + '{{ var("version") }}' as versao +FROM + indicadores +WHERE + indicador_fora_matriz = TRUE + OR indicador_tempo_integracao_invalido = TRUE + OR indicador_intervalo_transacao_suspeito = TRUE \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/ordem_pagamento_invalida.sql b/queries/models/validacao_dados_jae/ordem_pagamento_invalida.sql new file mode 100644 index 00000000..3dd74747 --- /dev/null +++ b/queries/models/validacao_dados_jae/ordem_pagamento_invalida.sql @@ -0,0 +1,98 @@ +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data_ordem", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +WITH servico_operador_dia_invalida AS ( + SELECT + data_ordem, + id_ordem_pagamento, + MAX(indicador_captura_invalida) AS indicador_captura_invalida, + MAX(indicador_servico_fora_vigencia) AS indicador_servico_fora_vigencia + FROM + {{ ref("ordem_pagamento_servico_operador_dia_invalida") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} + GROUP BY + 1, + 2 +), +consorcio_operador_dia_invalida AS ( + SELECT DISTINCT + data_ordem, + id_ordem_pagamento + FROM + {{ ref("ordem_pagamento_consorcio_operador_dia_invalida") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +consorcio_dia_invalida AS ( + SELECT DISTINCT + data_ordem, + id_ordem_pagamento + FROM + {{ ref("ordem_pagamento_consorcio_dia_invalida") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +dia_invalida AS ( + SELECT + data_ordem, + id_ordem_pagamento + FROM + {{ ref("ordem_pagamento_dia_invalida") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +indicadores AS ( + SELECT + data_ordem, + id_ordem_pagamento, + CASE + WHEN sod.id_ordem_pagamento IS NOT NULL THEN sod.indicador_captura_invalida + ELSE FALSE + END AS indicador_captura_invalida, + CASE + WHEN sod.id_ordem_pagamento IS NOT NULL THEN sod.indicador_servico_fora_vigencia + ELSE FALSE + END AS indicador_servico_fora_vigencia, + cod.id_ordem_pagamento IS NOT NULL AS indicador_agregacao_consorcio_operador_dia_invalida, + cd.id_ordem_pagamento IS NOT NULL AS indicador_agregacao_consorcio_dia_invalida, + d.id_ordem_pagamento IS NOT NULL AS indicador_agregacao_dia_invalida, + FROM + dia_invalida d + FULL OUTER JOIN + servico_operador_dia_invalida sod + USING(data_ordem, id_ordem_pagamento) + FULL OUTER JOIN + consorcio_operador_dia_invalida cod + USING(data_ordem, id_ordem_pagamento) + FULL OUTER JOIN + consorcio_dia_invalida cd + USING(data_ordem, id_ordem_pagamento) +) +SELECT + *, + '{{ var("version") }}' AS versao +FROM + indicadores +WHERE + indicador_captura_invalida + OR indicador_servico_fora_vigencia + OR indicador_agregacao_consorcio_operador_dia_invalida + OR indicador_agregacao_consorcio_dia_invalida + OR indicador_agregacao_dia_invalida diff --git a/queries/models/validacao_dados_jae/schema.yml b/queries/models/validacao_dados_jae/schema.yml new file mode 100644 index 00000000..ed8e82a5 --- /dev/null +++ b/queries/models/validacao_dados_jae/schema.yml @@ -0,0 +1,94 @@ +version: 2 + +models: + - name: integracao_invalida + description: "Tabela para validação dos dados de integração da Jaé" + columns: + - name: data + description: "Data de processamento da integração (partição)" + - name: id_integracao + description: "Identificador único da integração" + - name: modos + description: "Sequência modos das transações presentes na integração (separado por ', ')" + - name: indicador_fora_matriz + description: "Indica se a sequência de modos não aparece na tabela br_rj_riodejaneiro_bilhetagem.matriz_integracao" + - name: indicador_tempo_integracao_invalido + description: "Indica se o tempo entre a primeira e a última transação da integração é maior que 3 horas" + - name: indicador_intervalo_transacao_suspeito + description: "Indica se existe algum intervalo entre transações menor que 5 minutos dentro da integração" + - name: versao + description: "Código de controle de versão do dado (SHA Github)" + - name: transacao_invalida + description: "Tabela para validação dos dados de transação da Jaé" + columns: + - name: data + description: "Data da transação (partição)" + - name: hora + description: "Hora da transação" + - name: datetime_transacao + description: "Data e hora da transação" + - name: datetime_processamento + description: "Data e hora de processamento da transação" + - name: datetime_captura + description: "Data e hora de captura dos dados da transação" + - name: modo + description: "Tipo de transporte (Ônibus, Van, BRT)" + - name: id_consorcio + description: "Identificador do consórcio na tabela cadastro.consorcios" + - name: consorcio + description: "Nome do consórcio" + - name: id_operadora + description: "Identificador da operadora na tabela cadastro.operadoras" + - name: operadora + description: "Nome da operadora de transporte (mascarado se for pessoa física)" + - name: id_servico_jae + description: "Identificador da linha no banco de dados da jaé (É possível cruzar os dados com a tabela rj-smtr.cadastro.servicos usando a coluna id_servico_jae)" + - name: servico_jae + description: "Nome curto da linha operada pelo veículo com variação de serviço (ex: 010, 011SN, ...) ou código da estação de BRT na Jaé" + - name: descricao_servico_jae + description: "Nome longo da linha operada pelo veículo com variação de serviço (ex: 010, 011SN, ...) ou nome da estação de BRT na Jaé" + # - name: servico + # description: "Nome curto da linha operada pelo veículo com variação de serviço (ex: 010, 011SN, ...) ou código da estação no caso de BRT" + - name: id_transacao + description: "Identificador único da transação" + - name: longitude + description: "Longitude da transação (WGS84)" + - name: latitude + description: "Latitude da transação (WGS84)" + - name: longitude_servico + description: "Longitude do stop no GTFS (WGS84)" + - name: latitude_servico + description: "Latitude do stop no GTFS (WGS84)" + - name: indicador_geolocalizacao_zerada + description: "Indica se a transação está com geolocalização zerada ou nula" + - name: indicador_geolocalizacao_fora_rio + description: "Indica se a transação está com geolocalização fora do município (transações com geolocalização zeradas não se enquadram)" + - name: indicador_geolocalizacao_fora_stop + description: "Indica se a transação do modo BRT está com geolocalização a mais de 100m da estação (transações com geolocalização zeradas não se enquadram)" + - name: descricao_geolocalizacao_invalida + description: "Resumo do tipo de geolocalização inválida de acordo com as colunas indicadoras" + - name: indicador_servico_fora_gtfs + description: "Indica se o serviço não tem correspondência com as tabela 'trips' ou 'stops' do GTFS" + - name: indicador_servico_fora_vigencia + description: "Indica se a transação foi feita em um serviço fora de vigência" + - name: versao + description: "Código de controle de versão do dado (SHA Github)" + - name: ordem_pagamento_invalida + description: "Tabela para validação dos dados da ordem de pagamento da Jaé" + columns: + - name: data_ordem + description: "Data da ordem de pagamento (partição)" + - name: id_ordem_pagamento + description: "Identificador único da ordem de pagamento" + - name: indicador_captura_invalida + description: "Indica se o número e valor das transações capturadas é diferente das informações encontradas na ordem de pagamento" + - name: indicador_servico_fora_vigencia + description: "Indica se houve alguma transação feita em um serviço fora de vigência. Quando nulo informa que não foi possível encontrar a vigência do serviço" + - name: indicador_agregacao_consorcio_operador_dia_invalida + description: "Indica se os valores e quantidades da tabela ordem_pagamento_servico_operador_dia agregada por consórcio, operador e dia está diferente das informações da tabela ordem_pagamento_consorcio_operador_dia" + - name: indicador_agregacao_consorcio_dia_invalida + description: "Indica se os valores e quantidades da tabela ordem_pagamento_consorcio_operador_dia agregada por consórcio e dia está diferente das informações da tabela ordem_pagamento_consorcio_dia" + - name: indicador_agregacao_dia_invalida + description: "Indica se os valores e quantidades da tabela ordem_pagamento_consorcio_dia agregada por dia está diferente das informações da tabela ordem_pagamento_dia" + - name: versao + description: "Código de controle de versão do dado (SHA Github)" \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/staging/aux_transacao_ordem.sql b/queries/models/validacao_dados_jae/staging/aux_transacao_ordem.sql new file mode 100644 index 00000000..c0479f48 --- /dev/null +++ b/queries/models/validacao_dados_jae/staging/aux_transacao_ordem.sql @@ -0,0 +1,101 @@ +{{ + config( + materialized="table", + ) +}} + +-- WITH servico_motorista AS ( +-- SELECT +-- * EXCEPT(rn) +-- FROM +-- ( +-- SELECT +-- id_servico, +-- dt_fechamento, +-- nr_logico_midia, +-- cd_linha, +-- cd_operadora, +-- ROW_NUMBER() OVER (PARTITION BY id_servico, nr_logico_midia ORDER BY timestamp_captura DESC) AS rn +-- FROM +-- {{ ref("staging_servico_motorista") }} +-- {% if is_incremental() %} +-- WHERE +-- DATE(data) BETWEEN DATE_SUB(DATE("{{var('date_range_start')}}"), INTERVAL 1 DAY) AND DATE("{{var('date_range_end')}}") +-- {% endif %} +-- ) +-- ), +WITH transacao AS ( + SELECT + t.id AS id_transacao, + t.timestamp_captura, + DATE(t.data_transacao) AS data_transacao, + DATE(t.data_processamento) AS data_processamento, + t.data_processamento AS datetime_processamento, + t.cd_linha AS id_servico_jae, + do.id_operadora, + t.valor_transacao, + t.tipo_transacao, + t.id_tipo_modal, + dc.id_consorcio, + -- sm.dt_fechamento AS datetime_fechamento_servico, + -- sm.cd_linha AS cd_linha_servico, + -- sm.cd_operadora AS cd_operadora_servico, + t.id_servico + FROM + {{ ref("staging_transacao") }} t + -- LEFT JOIN + -- servico_motorista sm + -- ON + -- sm.id_servico = t.id_servico + -- AND sm.nr_logico_midia = t.nr_logico_midia_operador + LEFT JOIN + {{ ref("operadoras") }} AS do + ON + t.cd_operadora = do.id_operadora_jae + LEFT JOIN + {{ ref("consorcios") }} AS dc + ON + t.cd_consorcio = dc.id_consorcio_jae + WHERE + {% if is_incremental() %} + DATE(t.data) BETWEEN DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 2 DAY) AND DATE("{{var('run_date')}}") + AND t.data_processamento BETWEEN DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 2 DAY) AND DATE("{{var('run_date')}}") + {% else %} + DATE(t.data) <= CURRENT_DATE("America/Sao_Paulo") + AND DATE(t.data_processamento) <= CURRENT_DATE("America/Sao_Paulo") + {% endif %} +), +transacao_deduplicada AS ( + SELECT + t.* EXCEPT(rn), + DATE_ADD(data_processamento, INTERVAL 1 DAY) AS data_ordem -- TODO: Regra da data por serviços fechados no modo Ônibus quando começar a operação + FROM + ( + SELECT + *, + ROW_NUMBER() OVER (PARTITION BY id_transacao ORDER BY timestamp_captura DESC) AS rn + FROM + transacao + ) t + WHERE + rn = 1 +) +SELECT + t.* +FROM + transacao_deduplicada t +LEFT JOIN + {{ ref("staging_linha_sem_ressarcimento") }} l +ON + t.id_servico_jae = l.id_linha +WHERE + -- Remove dados com data de ordem de pagamento maiores que a execução do modelo + {% if is_incremental() %} + t.data_ordem <= DATE("{{var('run_date')}}") + {% else %} + t.data_ordem <= CURRENT_DATE("America/Sao_Paulo") + {% endif %} + -- Remove linhas de teste que não entram no ressarcimento + AND l.id_linha IS NULL + -- Remove gratuidades e transferências da contagem de transações + AND tipo_transacao NOT IN ('5', '21', '40') \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_dia_invalida.sql b/queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_dia_invalida.sql new file mode 100644 index 00000000..33aa0071 --- /dev/null +++ b/queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_dia_invalida.sql @@ -0,0 +1,71 @@ +-- depends_on: {{ ref("ordem_pagamento_consorcio_operador_dia_invalida") }} +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data_ordem", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +WITH ordem_pagamento_consorcio_operador_dia AS ( + SELECT + data_ordem, + id_consorcio, + id_ordem_pagamento, + SUM(quantidade_total_transacao) AS quantidade_total_transacao, + SUM(valor_total_transacao_liquido_ordem) AS valor_total_transacao_liquido, + FROM + {{ ref("ordem_pagamento_consorcio_operador_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} + GROUP BY + 1, + 2, + 3 +), +ordem_pagamento_consorcio_dia AS ( + SELECT + data_ordem, + id_consorcio, + id_ordem_pagamento, + quantidade_total_transacao, + valor_total_transacao_liquido + FROM + {{ ref("ordem_pagamento_consorcio_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +indicadores AS ( + SELECT + cd.data_ordem, + cd.id_consorcio, + cd.id_ordem_pagamento, + cd.quantidade_total_transacao, + cod.quantidade_total_transacao AS quantidade_total_transacao_agregacao, + cd.valor_total_transacao_liquido, + cod.valor_total_transacao_liquido AS valor_total_transacao_liquido_agregacao, + ROUND(cd.valor_total_transacao_liquido, 2) != ROUND(cod.valor_total_transacao_liquido, 2) OR cd.quantidade_total_transacao != cod.quantidade_total_transacao AS indicador_agregacao_invalida + FROM + ordem_pagamento_consorcio_dia cd + LEFT JOIN + ordem_pagamento_consorcio_operador_dia cod + USING( + data_ordem, + id_consorcio, + id_ordem_pagamento + ) +) +SELECT + *, + '{{ var("version") }}' AS versao +FROM + indicadores +WHERE + indicador_agregacao_invalida = TRUE \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_operador_dia_invalida.sql b/queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_operador_dia_invalida.sql new file mode 100644 index 00000000..0debba34 --- /dev/null +++ b/queries/models/validacao_dados_jae/staging/ordem_pagamento_consorcio_operador_dia_invalida.sql @@ -0,0 +1,76 @@ +-- depends_on: {{ ref("ordem_pagamento_servico_operador_dia_invalida") }} +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data_ordem", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +WITH ordem_pagamento_servico_operador_dia AS ( + SELECT + data_ordem, + id_consorcio, + id_operadora, + id_ordem_pagamento, + SUM(quantidade_total_transacao) AS quantidade_total_transacao, + SUM(valor_total_transacao_liquido) AS valor_total_transacao_liquido, + FROM + {{ ref("ordem_pagamento_servico_operador_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} + GROUP BY + 1, + 2, + 3, + 4 +), +ordem_pagamento_consorcio_operador_dia AS ( + SELECT + data_ordem, + id_consorcio, + id_operadora, + id_ordem_pagamento, + quantidade_total_transacao, + valor_total_transacao_liquido_ordem AS valor_total_transacao_liquido + FROM + {{ ref("ordem_pagamento_consorcio_operador_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +indicadores AS ( + SELECT + cod.data_ordem, + cod.id_consorcio, + cod.id_operadora, + cod.id_ordem_pagamento, + cod.quantidade_total_transacao, + sod.quantidade_total_transacao AS quantidade_total_transacao_agregacao, + cod.valor_total_transacao_liquido, + sod.valor_total_transacao_liquido AS valor_total_transacao_liquido_agregacao, + ROUND(cod.valor_total_transacao_liquido, 2) != ROUND(sod.valor_total_transacao_liquido, 2) OR cod.quantidade_total_transacao != sod.quantidade_total_transacao AS indicador_agregacao_invalida + FROM + ordem_pagamento_consorcio_operador_dia cod + LEFT JOIN + ordem_pagamento_servico_operador_dia sod + USING( + data_ordem, + id_consorcio, + id_operadora, + id_ordem_pagamento + ) +) +SELECT + *, + '{{ var("version") }}' AS versao +FROM + indicadores +WHERE + indicador_agregacao_invalida = TRUE \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/staging/ordem_pagamento_dia_invalida.sql b/queries/models/validacao_dados_jae/staging/ordem_pagamento_dia_invalida.sql new file mode 100644 index 00000000..4b9824da --- /dev/null +++ b/queries/models/validacao_dados_jae/staging/ordem_pagamento_dia_invalida.sql @@ -0,0 +1,62 @@ +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data_ordem", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +WITH ordem_pagamento_consorcio_dia AS ( + SELECT + data_ordem, + id_ordem_pagamento, + SUM(quantidade_total_transacao) AS quantidade_total_transacao, + SUM(valor_total_transacao_liquido) AS valor_total_transacao_liquido + FROM + {{ ref("ordem_pagamento_consorcio_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} + GROUP BY + 1, + 2 +), +ordem_pagamento_dia AS ( + SELECT + data_ordem, + id_ordem_pagamento, + quantidade_total_transacao, + valor_total_transacao_liquido + FROM + {{ ref("ordem_pagamento_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +indicadores AS ( +SELECT + d.data_ordem, + d.id_ordem_pagamento, + d.quantidade_total_transacao, + cd.quantidade_total_transacao AS quantidade_total_transacao_agregacao, + d.valor_total_transacao_liquido, + cd.valor_total_transacao_liquido AS valor_total_transacao_liquido_agregacao, + ROUND(cd.valor_total_transacao_liquido, 2) != ROUND(d.valor_total_transacao_liquido, 2) OR cd.quantidade_total_transacao != d.quantidade_total_transacao AS indicador_agregacao_invalida +FROM + ordem_pagamento_dia d +LEFT JOIN + ordem_pagamento_consorcio_dia cd +USING(data_ordem, id_ordem_pagamento) +) +SELECT + *, + '{{ var("version") }}' AS versao +FROM + indicadores +WHERE + indicador_agregacao_invalida = TRUE \ No newline at end of file diff --git a/queries/models/validacao_dados_jae/staging/ordem_pagamento_servico_operador_dia_invalida.sql b/queries/models/validacao_dados_jae/staging/ordem_pagamento_servico_operador_dia_invalida.sql new file mode 100644 index 00000000..bd915d21 --- /dev/null +++ b/queries/models/validacao_dados_jae/staging/ordem_pagamento_servico_operador_dia_invalida.sql @@ -0,0 +1,135 @@ +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data_ordem", + "data_type": "date", + "granularity": "day" + }, + ) +}} + +{% set transacao_ordem = ref("aux_transacao_ordem") %} +{% if execute and is_incremental() %} + {% set transacao_validacao_partition_query %} + SELECT DISTINCT + CONCAT("'", DATE(data_transacao), "'") AS data_transacao + FROM + {{ transacao_ordem }} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endset %} + {% set transacao_validacao_partitions = run_query(transacao_validacao_partition_query).columns[0].values() %} +{% endif %} + +WITH transacao_invalida AS ( + SELECT + id_transacao, + indicador_servico_fora_vigencia + FROM + {{ ref("transacao_invalida") }} + WHERE + indicador_servico_fora_vigencia = TRUE + {% if is_incremental() %} + AND + {% if transacao_validacao_partitions|length > 0 %} + data IN ({{ transacao_validacao_partitions|join(', ') }}) + {% else %} + data = "2000-01-01" + {% endif %} + {% endif %} +), +transacao_agg AS ( + SELECT + t.data_ordem, + ANY_VALUE(t.id_consorcio) AS id_consorcio, + t.id_servico_jae, + t.id_operadora, + COUNT(*) AS quantidade_total_transacao_captura, + SUM(t.valor_transacao) AS valor_total_transacao_captura, + MAX(ti.indicador_servico_fora_vigencia) IS NOT NULL AS indicador_servico_fora_vigencia + FROM + {{ ref("aux_transacao_ordem") }} t + LEFT JOIN + transacao_invalida ti + USING(id_transacao) + GROUP BY + data_ordem, + id_servico_jae, + id_operadora +), +ordem_pagamento AS ( + SELECT + * + FROM + {{ ref("ordem_pagamento_servico_operador_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +id_ordem_pagamento AS ( + SELECT + data_ordem, + id_ordem_pagamento + FROM + {{ ref("ordem_pagamento_dia") }} + {% if is_incremental() %} + WHERE + data_ordem = DATE("{{var('run_date')}}") + {% endif %} +), +transacao_ordem AS ( + SELECT + COALESCE(op.data_ordem, t.data_ordem) AS data_ordem, + COALESCE(op.id_consorcio, t.id_consorcio) AS id_consorcio, + COALESCE(op.id_operadora, t.id_operadora) AS id_operadora, + COALESCE(op.id_servico_jae, t.id_servico_jae) AS id_servico_jae, + op.quantidade_total_transacao, + op.valor_total_transacao_bruto, + op.valor_total_transacao_liquido, + t.quantidade_total_transacao_captura, + SAFE_CAST(t.valor_total_transacao_captura + op.valor_rateio_credito + op.valor_rateio_debito AS NUMERIC) AS valor_total_transacao_captura, + t.indicador_servico_fora_vigencia + FROM + ordem_pagamento op + FULL OUTER JOIN + transacao_agg t + USING(data_ordem, id_servico_jae, id_operadora) +), +indicadores AS ( + SELECT + o.data_ordem, + id.id_ordem_pagamento, + o.id_consorcio, + o.id_operadora, + o.id_servico_jae, + o.quantidade_total_transacao, + o.valor_total_transacao_bruto, + o.valor_total_transacao_liquido, + o.quantidade_total_transacao_captura, + o.valor_total_transacao_captura, + COALESCE( + ( + quantidade_total_transacao_captura != quantidade_total_transacao + OR ROUND(valor_total_transacao_captura, 2) != ROUND(valor_total_transacao_bruto, 2) + ), + TRUE + ) AS indicador_captura_invalida, + o.indicador_servico_fora_vigencia + FROM + transacao_ordem o + JOIN + id_ordem_pagamento id + USING(data_ordem) +) +SELECT + *, + '{{ var("version") }}' AS versao +FROM + indicadores +WHERE + indicador_servico_fora_vigencia = TRUE + OR indicador_captura_invalida = TRUE + AND id_servico_jae NOT IN (SELECT id_linha FROM {{ ref("staging_linha_sem_ressarcimento") }}) + diff --git a/queries/models/validacao_dados_jae/transacao_invalida.sql b/queries/models/validacao_dados_jae/transacao_invalida.sql new file mode 100644 index 00000000..e8a82f42 --- /dev/null +++ b/queries/models/validacao_dados_jae/transacao_invalida.sql @@ -0,0 +1,112 @@ +{{ + config( + incremental_strategy="insert_overwrite", + partition_by={ + "field": "data", + "data_type": "date", + "granularity": "day" + }, + ) +}} +{% set transacao_table = ref('transacao') %} +{% if execute %} + {% if is_incremental() %} + + {% set partitions_query %} + SELECT + CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data + FROM + `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + WHERE + table_name = "{{ transacao_table.identifier }}" + AND partition_id != "__NULL__" + AND DATE(last_modified_time, "America/Sao_Paulo") = DATE_SUB(DATE("{{var('run_date')}}"), INTERVAL 1 DAY) + {% endset %} + + {{ log("Running query: \n"~partitions_query, info=True) }} + {% set partitions = run_query(partitions_query) %} + + {% set partition_list = partitions.columns[0].values() %} + {{ log("trasacao partitions: \n"~partition_list, info=True) }} + {% endif %} +{% endif %} + +WITH transacao AS ( + SELECT + t.data, + t.hora, + t.datetime_transacao, + t.datetime_processamento, + t.datetime_captura, + t.modo, + t.id_consorcio, + t.consorcio, + t.id_operadora, + t.operadora, + t.id_servico_jae, + t.servico_jae, + t.descricao_servico_jae, + t.id_transacao, + t.longitude, + t.latitude, + IFNULL(t.longitude, 0) AS longitude_tratada, + IFNULL(t.latitude, 0) AS latitude_tratada, + s.longitude AS longitude_servico, + s.latitude AS latitude_servico, + s.id_servico_gtfs, + s.id_servico_jae AS id_servico_jae_cadastro + FROM + {{ ref("transacao") }} t + LEFT JOIN + {{ ref("servicos") }} s + ON + t.id_servico_jae = s.id_servico_jae + AND t.data >= s.data_inicio_vigencia AND (t.data <= s.data_fim_vigencia OR s.data_fim_vigencia IS NULL) + WHERE + {% if is_incremental() %} + {% if partition_list|length > 0 %} + data IN ({{ partition_list|join(', ') }}) + {% else %} + data = "2000-01-01" + {% endif %} + {% endif %} +), +indicadores AS ( + SELECT + * EXCEPT(id_servico_gtfs, latitude_tratada, longitude_tratada, id_servico_jae_cadastro), + latitude_tratada = 0 OR longitude_tratada = 0 AS indicador_geolocalizacao_zerada, + ( + (latitude_tratada != 0 OR longitude_tratada != 0) + AND NOT ST_INTERSECTSBOX(ST_GEOGPOINT(longitude_tratada, latitude_tratada), -43.87, -23.13, -43.0, -22.59) + ) AS indicador_geolocalizacao_fora_rio, + ( + latitude_tratada != 0 + AND longitude_tratada != 0 + AND latitude_servico IS NOT NULL + AND longitude_servico IS NOT NULL + AND modo = "BRT" + AND ST_DISTANCE(ST_GEOGPOINT(longitude_tratada, latitude_tratada), ST_GEOGPOINT(longitude_servico, latitude_servico)) > 100 + ) AS indicador_geolocalizacao_fora_stop, + id_servico_gtfs IS NULL AND id_servico_jae_cadastro IS NOT NULL AND modo IN ("Ônibus", "BRT") AS indicador_servico_fora_gtfs, + id_servico_jae_cadastro IS NULL AS indicador_servico_fora_vigencia + FROM + transacao +) +SELECT + * EXCEPT(indicador_servico_fora_gtfs, indicador_servico_fora_vigencia), + CASE + WHEN indicador_geolocalizacao_zerada = TRUE THEN "Geolocalização zerada" + WHEN indicador_geolocalizacao_fora_rio = TRUE THEN "Geolocalização fora do município" + WHEN indicador_geolocalizacao_fora_stop = TRUE THEN "Geolocalização fora do stop" + END AS descricao_geolocalizacao_invalida, + indicador_servico_fora_gtfs, + indicador_servico_fora_vigencia, + '{{ var("version") }}' as versao +FROM + indicadores +WHERE + indicador_geolocalizacao_zerada = TRUE + OR indicador_geolocalizacao_fora_rio = TRUE + OR indicador_geolocalizacao_fora_stop = TRUE + OR indicador_servico_fora_gtfs = TRUE + OR indicador_servico_fora_vigencia = TRUE \ No newline at end of file