From 222ffa6c00304f7e2c5be46392e6687682ecc694 Mon Sep 17 00:00:00 2001 From: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:34:11 -0300 Subject: [PATCH] =?UTF-8?q?Adiciona=20transa=C3=A7=C3=B5es=20do=20RioCard?= =?UTF-8?q?=20na=20tabela=20`passageiros=5Fhora`=20/=20Cria=20captura=20da?= =?UTF-8?q?=20tabela=20`endereco`=20e=20`linha=5Fconsorcio=5Foperadora=5Ft?= =?UTF-8?q?ransporte`=20(#164)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * altera para ephemeral * adiciona filtre de timestamp_captura * separa materializacao transacao e passageiros_hora * corrige table_id * otimiza filtro de particao * add dependencia transacao * add captura endereco e linha_op_transporte * cria modelo * add coluna geo_point_transacao * add particoes integracao * add linha_consorcio_operadora_transporte/endereco --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../constants.py | 70 +++++++++- .../br_rj_riodejaneiro_bilhetagem/flows.py | 18 +++ .../flows.py | 2 +- .../passageiros_hora.sql | 50 +++++++ .../passageiros_tile_hora.sql | 50 +++++++ .../br_rj_riodejaneiro_bilhetagem/schema.yml | 4 + .../transacao.sql | 6 +- .../transacao_riocard.sql | 5 +- .../aux_passageiros_hora.sql | 123 +++++------------- .../staging_endereco.sql | 39 ++++++ ...g_linha_consorcio_operadora_transporte.sql | 32 +++++ queries/models/sources.yml | 3 + 12 files changed, 304 insertions(+), 98 deletions(-) create mode 100644 queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_endereco.sql create mode 100644 queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_linha_consorcio_operadora_transporte.sql diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py index 647aa1cf..3392cf27 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py @@ -445,16 +445,64 @@ class constants(Enum): # pylint: disable=c0103 WHERE DT_INCLUSAO BETWEEN '{start}' AND '{end}' - OR DT_FIM_VALIDADE BETWEEN '{start}' + OR DT_FIM_VALIDADE BETWEEN DATE('{start}') + AND DATE('{end}') + """, + }, + "primary_key": [ + "CD_CONSORCIO", + "CD_LINHA", + ], # id column to nest data on + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + }, + { + "table_id": "linha_consorcio_operadora_transporte", + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + LINHA_CONSORCIO_OPERADORA_TRANSPORTE + WHERE + DT_INCLUSAO BETWEEN '{start}' AND '{end}' + OR DT_FIM_VALIDADE BETWEEN DATE('{start}') + AND DATE('{end}') """, }, "primary_key": [ "CD_CONSORCIO", + "CD_OPERADORA_TRANSPORTE", "CD_LINHA", ], # id column to nest data on "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, }, + { + "table_id": "endereco", + "partition_date_only": True, + "extract_params": { + "database": "principal_db", + "query": """ + SELECT + * + FROM + ENDERECO + WHERE + DT_INCLUSAO BETWEEN '{start}' + AND '{end}' + OR + DT_INATIVACAO BETWEEN '{start}' + AND '{end}' + """, + }, + "primary_key": [ + "NR_SEQ_ENDERECO", + ], # id column to nest data on + "interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL, + "save_bucket_name": BILHETAGEM_PRIVATE_BUCKET, + }, ] BILHETAGEM_EXCLUDE = "+operadoras +consorcios +servicos" @@ -476,8 +524,8 @@ class constants(Enum): # pylint: disable=c0103 } BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = { - "dataset_id": BILHETAGEM_JAE_DASHBOARD_DATASET_ID, - "table_id": "view_passageiros_hora", + "dataset_id": smtr_constants.BILHETAGEM_DATASET_ID.value, + "table_id": "transacao", "upstream": True, "dbt_vars": { "date_range": { @@ -490,7 +538,21 @@ class constants(Enum): # pylint: disable=c0103 ordem_pagamento_dia ordem_pagamento_consorcio_dia ordem_pagamento_consorcio_operador_dia \ staging_ordem_pagamento_consorcio staging_ordem_pagamento \ ordem_pagamento_servico_operador_dia staging_ordem_pagamento_consorcio_operadora \ -aux_retorno_ordem_pagamento", +aux_retorno_ordem_pagamento staging_arquivo_retorno", + } + + BILHETAGEM_MATERIALIZACAO_PASSAGEIROS_HORA_PARAMS = { + "dataset_id": BILHETAGEM_JAE_DASHBOARD_DATASET_ID, + "table_id": "view_passageiros_hora", + "upstream": True, + "dbt_vars": { + "date_range": { + "table_run_datetime_column_name": "data", + "delay_hours": 0, + }, + "version": {}, + }, + "exclude": "+transacao", } BILHETAGEM_MATERIALIZACAO_DASHBOARD_CONTROLE_VINCULO_PARAMS = { diff --git a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py index b76001fa..8b9ac56f 100644 --- a/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py @@ -513,6 +513,24 @@ raise_final_state=True, ) + run_materializacao_passageiros_hora = create_flow_run( + flow_name=bilhetagem_materializacao_transacao.name, + project_name=PROJECT, + labels=LABELS, + upstream_tasks=[wait_materializacao_transacao], + parameters={ + "timestamp": materialize_timestamp, + } + | constants.BILHETAGEM_MATERIALIZACAO_PASSAGEIROS_HORA_PARAMS.value, + ) + + wait_materializacao_passageiros_hora = wait_for_flow_run( + run_materializacao_passageiros_hora, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + run_materializacao_gps_validador = create_flow_run( flow_name=bilhetagem_materializacao_gps_validador.name, project_name=PROJECT, diff --git a/pipelines/migration/br_rj_riodejaneiro_onibus_gps_zirix/flows.py b/pipelines/migration/br_rj_riodejaneiro_onibus_gps_zirix/flows.py index 8eae5843..6144a6f2 100644 --- a/pipelines/migration/br_rj_riodejaneiro_onibus_gps_zirix/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_onibus_gps_zirix/flows.py @@ -134,7 +134,7 @@ # Set specific run parameters # date_range = get_materialization_date_range( dataset_id=dataset_id, - table_id="gps_sppo", + table_id=table_id, raw_dataset_id=raw_dataset_id, raw_table_id=raw_table_id, table_run_datetime_column_name="timestamp_gps", diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_hora.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_hora.sql index d8ded17c..831e4700 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_hora.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_hora.sql @@ -10,12 +10,62 @@ ) }} +/* +consulta as partições a serem atualizadas com base nas transações capturadas entre date_range_start e date_range_end +e as integrações capturadas entre date_range_start e date_range_end +*/ +{% set transacao_table = ref('transacao') %} +{% set transacao_riocard_table = ref('transacao_riocard') %} +{% if execute %} + {% if is_incremental() %} + -- Transações Jaé + {% set partitions_query %} + SELECT + CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data + FROM + -- `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + `rj-smtr.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + WHERE + table_name = "{{ transacao_table.identifier }}" + AND partition_id != "__NULL__" + AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND (DATETIME("{{var('date_range_end')}}")) + + UNION DISTINCT + + SELECT + CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data + FROM + -- `{{ transacao_riocard_table.database }}.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + `rj-smtr.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + WHERE + table_name = "{{ transacao_riocard_table.identifier }}" + AND partition_id != "__NULL__" + AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}") + + {% endset %} + + {% set partitions = run_query(partitions_query) %} + + {% set partition_list = partitions.columns[0].values() %} + {% endif %} +{% endif %} + SELECT * EXCEPT(id_transacao, geo_point_transacao), COUNT(id_transacao) AS quantidade_passageiros, '{{ var("version") }}' AS versao FROM {{ ref("aux_passageiros_hora") }} +{% if is_incremental() %} + WHERE + {% if partition_list|length > 0 %} + data IN ({{ partition_list|join(', ') }}) + {% else %} + data = "2000-01-01" + {% endif %} +{% else %} + data >= "2023-07-19" +{% endif %} GROUP BY data, hora, diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_tile_hora.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_tile_hora.sql index 9406bb3d..6f844b5c 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_tile_hora.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_tile_hora.sql @@ -10,6 +10,46 @@ ) }} +/* +consulta as partições a serem atualizadas com base nas transações capturadas entre date_range_start e date_range_end +e as integrações capturadas entre date_range_start e date_range_end +*/ +{% set transacao_table = ref('transacao') %} +{% set transacao_riocard_table = ref('transacao_riocard') %} +{% if execute %} + {% if is_incremental() %} + -- Transações Jaé + {% set partitions_query %} + SELECT + CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data + FROM + -- `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + `rj-smtr.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + WHERE + table_name = "{{ transacao_table.identifier }}" + AND partition_id != "__NULL__" + AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND (DATETIME("{{var('date_range_end')}}")) + + UNION DISTINCT + + SELECT + CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data + FROM + -- `{{ transacao_riocard_table.database }}.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + `rj-smtr.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` + WHERE + table_name = "{{ transacao_riocard_table.identifier }}" + AND partition_id != "__NULL__" + AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}") + + {% endset %} + + {% set partitions = run_query(partitions_query) %} + + {% set partition_list = partitions.columns[0].values() %} + {% endif %} +{% endif %} + SELECT p.* EXCEPT(id_transacao, geo_point_transacao), geo.tile_id, @@ -21,6 +61,16 @@ JOIN {{ ref("aux_h3_res9") }} geo ON ST_CONTAINS(geo.geometry, geo_point_transacao) +{% if is_incremental() %} + WHERE + {% if partition_list|length > 0 %} + data IN ({{ partition_list|join(', ') }}) + {% else %} + data = "2000-01-01" + {% endif %} +{% else %} + data >= "2023-07-19" +{% endif %} GROUP BY data, hora, diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml b/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml index ac2fc7c0..30d2d3e9 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml @@ -58,6 +58,8 @@ models: description: "Latitude da transação (WGS84)" - name: longitude description: "Longitude da transação (WGS84)" + - name: geo_point_transacao + description: "Ponto geográfico do local da transação" - name: tile_id description: "Identificador do hexágono da geolocalização da transação na tabela rj-smtr.br_rj_riodejaneiro_geo.h3_res9" - name: stop_id @@ -570,6 +572,8 @@ models: description: "Latitude da transação (WGS84)" - name: longitude description: "Longitude da transação (WGS84)" + - name: geo_point_transacao + description: "Ponto geográfico do local da transação" - name: valor_transacao description: "Valor debitado na transação atual (R$)" - name: versao diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql index 125e537f..1fd7ae0c 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql @@ -14,7 +14,7 @@ -- TODO: Usar variável de run_date_hour para otimizar o numero de partições lidas em staging {% set incremental_filter %} DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") - -- AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}") + AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}") {% endset %} {% set transacao_staging = ref('staging_transacao') %} @@ -150,6 +150,7 @@ new_data AS ( NULL AS id_integracao, latitude_trx AS latitude, longitude_trx AS longitude, + ST_GEOGPOINT(longitude_trx, latitude_trx) AS geo_point_transacao, NULL AS stop_id, NULL AS stop_lat, NULL AS stop_lon, @@ -226,6 +227,7 @@ complete_partitions AS ( id_integracao, latitude, longitude, + geo_point_transacao, stop_id, stop_lat, stop_lon, @@ -263,6 +265,7 @@ complete_partitions AS ( id_integracao, latitude, longitude, + geo_point_transacao, stop_id, stop_lat, stop_lon, @@ -326,6 +329,7 @@ SELECT t.id_integracao, t.latitude, t.longitude, + t.geo_point_transacao, t.stop_id, t.stop_lat, t.stop_lon, diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/transacao_riocard.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/transacao_riocard.sql index e01d30a5..f4837745 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/transacao_riocard.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/transacao_riocard.sql @@ -1,3 +1,4 @@ +-- depends_on: {{ ref('transacao') }} {{ config( materialized="incremental", @@ -12,6 +13,7 @@ {% set incremental_filter %} DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") + AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}") {% endset %} {% set transacao_staging = ref('staging_transacao_riocard') %} @@ -35,7 +37,7 @@ WITH staging_transacao AS ( {{ transacao_staging }} {% if is_incremental() %} WHERE - DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") + {{ incremental_filter }} {% endif %} ), novos_dados AS ( @@ -63,6 +65,7 @@ novos_dados AS ( t.id AS id_transacao, t.latitude_trx AS latitude, t.longitude_trx AS longitude, + ST_GEOGPOINT(t.longitude_trx, t.latitude_trx) AS geo_point_transacao, t.valor_transacao FROM staging_transacao t diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/aux_passageiros_hora.sql b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/aux_passageiros_hora.sql index 1e2cfba4..44aecd29 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/aux_passageiros_hora.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/aux_passageiros_hora.sql @@ -1,52 +1,7 @@ {{ - config(materialized="table") + config(materialized="ephemeral") }} - -/* -consulta as partições a serem atualizadas com base nas transações capturadas entre date_range_start e date_range_end -e as integrações capturadas entre date_range_start e date_range_end -*/ -{% set transacao_table = ref('transacao') %} -{% set transacao_riocard_table = ref('transacao_riocard') %} -{% if execute %} - {% if is_incremental() %} - -- Transações Jaé - {% set partitions_query %} - SELECT - PARSE_DATE("%Y%m%d", partition_id) AS data - FROM - `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` - WHERE - table_name = "{{ transacao_table.identifier }}" - AND partition_id != "__NULL__" - AND DATE(last_modified_time, "America/Sao_Paulo") BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") - {% endset %} - - {{ log("Running query: \n"~partitions_query) }} - {% set partitions = run_query(partitions_query) %} - - {% set partition_list = partitions.columns[0].values() %} - - -- Transações RioCard - -- {% set partitions_riocard_query %} - -- SELECT - -- PARSE_DATE("%Y%m%d", partition_id) AS data - -- FROM - -- `{{ transacao_riocard_table.database }}.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS` - -- WHERE - -- table_name = "{{ transacao_riocard_table.identifier }}" - -- AND partition_id != "__NULL__" - -- AND DATE(last_modified_time, "America/Sao_Paulo") BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}") - -- {% endset %} - - -- {{ log("Running query: \n"~partitions_riocard_query) }} - -- {% set partitions_riocard = run_query(partitions_riocard_query) %} - - -- {% set partition_riocard_list = partitions_riocard.columns[0].values() %} - {% endif %} -{% endif %} - SELECT data, hora, @@ -66,20 +21,11 @@ SELECT END AS tipo_transacao_detalhe_smtr, tipo_gratuidade, tipo_pagamento, - ST_GEOGPOINT(longitude, latitude) AS geo_point_transacao + geo_point_transacao FROM {{ transacao_table }} WHERE - {% if is_incremental() %} - {% if partition_list|length > 0 %} - data IN ({{ partition_list|join(', ') }}) - {% else %} - data = "2000-01-01" - {% endif %} - {% else %} - data >= "2023-07-19" - {% endif %} - AND id_servico_jae NOT IN ("140", "142") + id_servico_jae NOT IN ("140", "142") AND id_operadora != "2" AND ( modo = "BRT" @@ -90,38 +36,33 @@ WHERE ) AND tipo_transacao IS NOT NULL --- {% if partition_riocard_list|length > 0 or not is_incremental()%} --- UNION ALL +UNION ALL --- SELECT --- data, --- hora, --- modo, --- consorcio, --- id_servico_jae, --- servico_jae, --- descricao_servico_jae, --- sentido, --- id_transacao, --- "RioCard" AS tipo_transacao_smtr, --- "RioCard" AS tipo_transacao_detalhe_smtr, --- NULL AS tipo_gratuidade, --- "RioCard" AS tipo_pagamento, --- ST_GEOGPOINT(longitude, latitude) AS geo_point_transacao --- FROM --- {{ transacao_riocard_table }} --- WHERE --- (id_servico_jae NOT IN ("140", "142") OR id_servico_jae IS NULL) --- AND (id_operadora != "2" OR id_operadora IS NULL) --- AND ( --- modo = "BRT" --- OR (modo = "VLT" AND data >= DATE("2024-02-24")) --- OR (modo = "Ônibus" AND data >= DATE("2024-04-19")) --- OR (modo = "Van" AND consorcio = "STPC" AND data >= DATE("2024-07-01")) --- OR (modo = "Van" AND consorcio = "STPL" AND data >= DATE("2024-07-15")) --- OR modo IS NULL --- ) --- {% if is_incremental()%} --- AND data IN ({{ partition_riocard_list|join(', ') }}) --- {% endif %} --- {% endif %} \ No newline at end of file +SELECT + data, + hora, + modo, + consorcio, + id_servico_jae, + servico_jae, + descricao_servico_jae, + sentido, + id_transacao, + "RioCard" AS tipo_transacao_smtr, + "RioCard" AS tipo_transacao_detalhe_smtr, + NULL AS tipo_gratuidade, + "RioCard" AS tipo_pagamento, + ST_GEOGPOINT(longitude, latitude) AS geo_point_transacao +FROM + {{ transacao_riocard_table }} +WHERE + (id_servico_jae NOT IN ("140", "142") OR id_servico_jae IS NULL) + AND (id_operadora != "2" OR id_operadora IS NULL) + AND ( + modo = "BRT" + OR (modo = "VLT" AND data >= DATE("2024-02-24")) + OR (modo = "Ônibus" AND data >= DATE("2024-04-19")) + OR (modo = "Van" AND consorcio = "STPC" AND data >= DATE("2024-07-01")) + OR (modo = "Van" AND consorcio = "STPL" AND data >= DATE("2024-07-15")) + OR modo IS NULL + ) \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_endereco.sql b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_endereco.sql new file mode 100644 index 00000000..05110015 --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_endereco.sql @@ -0,0 +1,39 @@ +{{ + config( + alias='endereco', + ) +}} + +WITH + endereco AS ( + SELECT + data, + SAFE_CAST(NR_SEQ_ENDERECO AS STRING) AS nr_seq_endereco, + timestamp_captura, + SAFE_CAST(JSON_VALUE(content, '$.CD_CLIENTE') AS STRING) AS cd_cliente, + SAFE_CAST(JSON_VALUE(content, '$.CD_TIPO_ENDERECO') AS STRING) AS cd_tipo_endereco, + SAFE_CAST(JSON_VALUE(content, '$.CD_TIPO_LOGRADOURO') AS STRING) AS cd_tipo_logradouro, + DATETIME(PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%S%Ez', SAFE_CAST(JSON_VALUE(content, '$.DT_INCLUSAO') AS STRING)), "America/Sao_Paulo") AS dt_inclusao, + SAFE_CAST(JSON_VALUE(content, '$.NM_BAIRRO') AS STRING) AS nm_bairro, + SAFE_CAST(JSON_VALUE(content, '$.NM_CIDADE') AS STRING) AS nm_cidade, + SAFE_CAST(JSON_VALUE(content, '$.NR_CEP') AS STRING) AS nr_cep, + SAFE_CAST(JSON_VALUE(content, '$.NR_LOGRADOURO') AS STRING) AS nr_logradouro, + SAFE_CAST(JSON_VALUE(content, '$.SG_UF') AS STRING) AS sg_uf, + SAFE_CAST(JSON_VALUE(content, '$.TX_COMPLEMENTO_LOGRADOURO') AS STRING) AS tx_complemento_logradouro, + SAFE_CAST(JSON_VALUE(content, '$.TX_LOGRADOURO') AS STRING) AS tx_logradouro + FROM + {{ source("br_rj_riodejaneiro_bilhetagem_staging", "endereco") }} + ), + endereco_rn AS ( + SELECT + *, + ROW_NUMBER() OVER (PARTITION BY nr_seq_endereco ORDER BY timestamp_captura DESC) AS rn + FROM + endereco + ) +SELECT + * EXCEPT(rn) +FROM + endereco_rn +WHERE + rn = 1 \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_linha_consorcio_operadora_transporte.sql b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_linha_consorcio_operadora_transporte.sql new file mode 100644 index 00000000..6a2d02c0 --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_linha_consorcio_operadora_transporte.sql @@ -0,0 +1,32 @@ +{{ + config( + alias='linha_consorcio_operadora_transporte', + ) +}} + +WITH linha_consorcio_operadora_transporte AS ( + SELECT + data, + SAFE_CAST(CD_CONSORCIO AS STRING) AS cd_consorcio, + SAFE_CAST(CD_OPERADORA_TRANSPORTE AS STRING) AS cd_operadora_transporte, + SAFE_CAST(CD_LINHA AS STRING) AS cd_linha, + timestamp_captura, + DATETIME(PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%S%Ez', SAFE_CAST(JSON_VALUE(content, '$.DT_INCLUSAO') AS STRING)), "America/Sao_Paulo") AS dt_inclusao, + PARSE_DATE("%Y-%m-%d", SAFE_CAST(JSON_VALUE(content, '$.DT_INICIO_VALIDADE') AS STRING)) AS dt_inicio_validade, + PARSE_DATE("%Y-%m-%d", SAFE_CAST(JSON_VALUE(content, '$.DT_FIM_VALIDADE') AS STRING)) AS dt_fim_validade + FROM + {{ source("br_rj_riodejaneiro_bilhetagem_staging", "linha_consorcio_operadora_transporte") }} +), +linha_consorcio_operadora_transporte_rn AS ( + SELECT + *, + ROW_NUMBER() OVER (PARTITION BY cd_consorcio, cd_operadora_transporte, cd_linha ORDER BY timestamp_captura DESC) AS rn + FROM + linha_consorcio_operadora_transporte +) +SELECT + * EXCEPT(rn) +FROM + linha_consorcio_operadora_transporte_rn +WHERE + rn = 1 \ No newline at end of file diff --git a/queries/models/sources.yml b/queries/models/sources.yml index 198b660a..3a50d8ba 100644 --- a/queries/models/sources.yml +++ b/queries/models/sources.yml @@ -28,6 +28,9 @@ sources: - name: ordem_pagamento_consorcio_operadora - name: ordem_pagamento_consorcio - name: linha_consorcio + - name: linha_consorcio_operadora_transporte + - name: endereco + - name: br_rj_riodejaneiro_gtfs_staging database: rj-smtr-staging