Skip to content

Commit

Permalink
Merge branch 'main' into staging/add_gtfs_validator
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 26, 2024
2 parents 166aee1 + 222ffa6 commit ce0ef37
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 98 deletions.
70 changes: 66 additions & 4 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,16 +445,64 @@ class constants(Enum): # pylint: disable=c0103
WHERE
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
OR DT_FIM_VALIDADE BETWEEN '{start}'
OR DT_FIM_VALIDADE BETWEEN DATE('{start}')
AND DATE('{end}')
""",
},
"primary_key": [
"CD_CONSORCIO",
"CD_LINHA",
], # id column to nest data on
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
},
{
"table_id": "linha_consorcio_operadora_transporte",
"partition_date_only": True,
"extract_params": {
"database": "principal_db",
"query": """
SELECT
*
FROM
LINHA_CONSORCIO_OPERADORA_TRANSPORTE
WHERE
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
OR DT_FIM_VALIDADE BETWEEN DATE('{start}')
AND DATE('{end}')
""",
},
"primary_key": [
"CD_CONSORCIO",
"CD_OPERADORA_TRANSPORTE",
"CD_LINHA",
], # id column to nest data on
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
},
{
"table_id": "endereco",
"partition_date_only": True,
"extract_params": {
"database": "principal_db",
"query": """
SELECT
*
FROM
ENDERECO
WHERE
DT_INCLUSAO BETWEEN '{start}'
AND '{end}'
OR
DT_INATIVACAO BETWEEN '{start}'
AND '{end}'
""",
},
"primary_key": [
"NR_SEQ_ENDERECO",
], # id column to nest data on
"interval_minutes": BILHETAGEM_TRATAMENTO_INTERVAL,
"save_bucket_name": BILHETAGEM_PRIVATE_BUCKET,
},
]

BILHETAGEM_EXCLUDE = "+operadoras +consorcios +servicos"
Expand All @@ -476,8 +524,8 @@ class constants(Enum): # pylint: disable=c0103
}

BILHETAGEM_MATERIALIZACAO_TRANSACAO_PARAMS = {
"dataset_id": BILHETAGEM_JAE_DASHBOARD_DATASET_ID,
"table_id": "view_passageiros_hora",
"dataset_id": smtr_constants.BILHETAGEM_DATASET_ID.value,
"table_id": "transacao",
"upstream": True,
"dbt_vars": {
"date_range": {
Expand All @@ -490,7 +538,21 @@ class constants(Enum): # pylint: disable=c0103
ordem_pagamento_dia ordem_pagamento_consorcio_dia ordem_pagamento_consorcio_operador_dia \
staging_ordem_pagamento_consorcio staging_ordem_pagamento \
ordem_pagamento_servico_operador_dia staging_ordem_pagamento_consorcio_operadora \
aux_retorno_ordem_pagamento",
aux_retorno_ordem_pagamento staging_arquivo_retorno",
}

BILHETAGEM_MATERIALIZACAO_PASSAGEIROS_HORA_PARAMS = {
"dataset_id": BILHETAGEM_JAE_DASHBOARD_DATASET_ID,
"table_id": "view_passageiros_hora",
"upstream": True,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "data",
"delay_hours": 0,
},
"version": {},
},
"exclude": "+transacao",
}

BILHETAGEM_MATERIALIZACAO_DASHBOARD_CONTROLE_VINCULO_PARAMS = {
Expand Down
18 changes: 18 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,24 @@
raise_final_state=True,
)

run_materializacao_passageiros_hora = create_flow_run(
flow_name=bilhetagem_materializacao_transacao.name,
project_name=PROJECT,
labels=LABELS,
upstream_tasks=[wait_materializacao_transacao],
parameters={
"timestamp": materialize_timestamp,
}
| constants.BILHETAGEM_MATERIALIZACAO_PASSAGEIROS_HORA_PARAMS.value,
)

wait_materializacao_passageiros_hora = wait_for_flow_run(
run_materializacao_passageiros_hora,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

run_materializacao_gps_validador = create_flow_run(
flow_name=bilhetagem_materializacao_gps_validador.name,
project_name=PROJECT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@
# Set specific run parameters #
date_range = get_materialization_date_range(
dataset_id=dataset_id,
table_id="gps_sppo",
table_id=table_id,
raw_dataset_id=raw_dataset_id,
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
Expand Down
50 changes: 50 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/passageiros_hora.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,62 @@
)
}}

/*
consulta as partições a serem atualizadas com base nas transações capturadas entre date_range_start e date_range_end
e as integrações capturadas entre date_range_start e date_range_end
*/
{% set transacao_table = ref('transacao') %}
{% set transacao_riocard_table = ref('transacao_riocard') %}
{% if execute %}
{% if is_incremental() %}
-- Transações Jaé
{% set partitions_query %}
SELECT
CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data
FROM
-- `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
`rj-smtr.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = "{{ transacao_table.identifier }}"
AND partition_id != "__NULL__"
AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND (DATETIME("{{var('date_range_end')}}"))

UNION DISTINCT

SELECT
CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data
FROM
-- `{{ transacao_riocard_table.database }}.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
`rj-smtr.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = "{{ transacao_riocard_table.identifier }}"
AND partition_id != "__NULL__"
AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")

{% endset %}

{% set partitions = run_query(partitions_query) %}

{% set partition_list = partitions.columns[0].values() %}
{% endif %}
{% endif %}

SELECT
* EXCEPT(id_transacao, geo_point_transacao),
COUNT(id_transacao) AS quantidade_passageiros,
'{{ var("version") }}' AS versao
FROM
{{ ref("aux_passageiros_hora") }}
{% if is_incremental() %}
WHERE
{% if partition_list|length > 0 %}
data IN ({{ partition_list|join(', ') }})
{% else %}
data = "2000-01-01"
{% endif %}
{% else %}
data >= "2023-07-19"
{% endif %}
GROUP BY
data,
hora,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,46 @@
)
}}

/*
consulta as partições a serem atualizadas com base nas transações capturadas entre date_range_start e date_range_end
e as integrações capturadas entre date_range_start e date_range_end
*/
{% set transacao_table = ref('transacao') %}
{% set transacao_riocard_table = ref('transacao_riocard') %}
{% if execute %}
{% if is_incremental() %}
-- Transações Jaé
{% set partitions_query %}
SELECT
CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data
FROM
-- `{{ transacao_table.database }}.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
`rj-smtr.{{ transacao_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = "{{ transacao_table.identifier }}"
AND partition_id != "__NULL__"
AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND (DATETIME("{{var('date_range_end')}}"))

UNION DISTINCT

SELECT
CONCAT("'", PARSE_DATE("%Y%m%d", partition_id), "'") AS data
FROM
-- `{{ transacao_riocard_table.database }}.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
`rj-smtr.{{ transacao_riocard_table.schema }}.INFORMATION_SCHEMA.PARTITIONS`
WHERE
table_name = "{{ transacao_riocard_table.identifier }}"
AND partition_id != "__NULL__"
AND DATETIME(last_modified_time, "America/Sao_Paulo") BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")

{% endset %}

{% set partitions = run_query(partitions_query) %}

{% set partition_list = partitions.columns[0].values() %}
{% endif %}
{% endif %}

SELECT
p.* EXCEPT(id_transacao, geo_point_transacao),
geo.tile_id,
Expand All @@ -21,6 +61,16 @@ JOIN
{{ ref("aux_h3_res9") }} geo
ON
ST_CONTAINS(geo.geometry, geo_point_transacao)
{% if is_incremental() %}
WHERE
{% if partition_list|length > 0 %}
data IN ({{ partition_list|join(', ') }})
{% else %}
data = "2000-01-01"
{% endif %}
{% else %}
data >= "2023-07-19"
{% endif %}
GROUP BY
data,
hora,
Expand Down
4 changes: 4 additions & 0 deletions queries/models/br_rj_riodejaneiro_bilhetagem/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ models:
description: "Latitude da transação (WGS84)"
- name: longitude
description: "Longitude da transação (WGS84)"
- name: geo_point_transacao
description: "Ponto geográfico do local da transação"
- name: tile_id
description: "Identificador do hexágono da geolocalização da transação na tabela rj-smtr.br_rj_riodejaneiro_geo.h3_res9"
- name: stop_id
Expand Down Expand Up @@ -570,6 +572,8 @@ models:
description: "Latitude da transação (WGS84)"
- name: longitude
description: "Longitude da transação (WGS84)"
- name: geo_point_transacao
description: "Ponto geográfico do local da transação"
- name: valor_transacao
description: "Valor debitado na transação atual (R$)"
- name: versao
Expand Down
6 changes: 5 additions & 1 deletion queries/models/br_rj_riodejaneiro_bilhetagem/transacao.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
-- TODO: Usar variável de run_date_hour para otimizar o numero de partições lidas em staging
{% set incremental_filter %}
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
-- AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")
AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")
{% endset %}

{% set transacao_staging = ref('staging_transacao') %}
Expand Down Expand Up @@ -150,6 +150,7 @@ new_data AS (
NULL AS id_integracao,
latitude_trx AS latitude,
longitude_trx AS longitude,
ST_GEOGPOINT(longitude_trx, latitude_trx) AS geo_point_transacao,
NULL AS stop_id,
NULL AS stop_lat,
NULL AS stop_lon,
Expand Down Expand Up @@ -226,6 +227,7 @@ complete_partitions AS (
id_integracao,
latitude,
longitude,
geo_point_transacao,
stop_id,
stop_lat,
stop_lon,
Expand Down Expand Up @@ -263,6 +265,7 @@ complete_partitions AS (
id_integracao,
latitude,
longitude,
geo_point_transacao,
stop_id,
stop_lat,
stop_lon,
Expand Down Expand Up @@ -326,6 +329,7 @@ SELECT
t.id_integracao,
t.latitude,
t.longitude,
t.geo_point_transacao,
t.stop_id,
t.stop_lat,
t.stop_lon,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-- depends_on: {{ ref('transacao') }}
{{
config(
materialized="incremental",
Expand All @@ -12,6 +13,7 @@

{% set incremental_filter %}
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
AND timestamp_captura BETWEEN DATETIME("{{var('date_range_start')}}") AND DATETIME("{{var('date_range_end')}}")
{% endset %}

{% set transacao_staging = ref('staging_transacao_riocard') %}
Expand All @@ -35,7 +37,7 @@ WITH staging_transacao AS (
{{ transacao_staging }}
{% if is_incremental() %}
WHERE
DATE(data) BETWEEN DATE("{{var('date_range_start')}}") AND DATE("{{var('date_range_end')}}")
{{ incremental_filter }}
{% endif %}
),
novos_dados AS (
Expand Down Expand Up @@ -63,6 +65,7 @@ novos_dados AS (
t.id AS id_transacao,
t.latitude_trx AS latitude,
t.longitude_trx AS longitude,
ST_GEOGPOINT(t.longitude_trx, t.latitude_trx) AS geo_point_transacao,
t.valor_transacao
FROM
staging_transacao t
Expand Down
Loading

0 comments on commit ce0ef37

Please sign in to comment.