diff --git a/pipelines/constants.py b/pipelines/constants.py index a63e7c28..30cd70cb 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -72,6 +72,7 @@ class constants(Enum): # pylint: disable=c0103 GPS_SPPO_RAW_TABLE_ID = "registros" GPS_SPPO_DATASET_ID = "br_rj_riodejaneiro_veiculos" GPS_SPPO_TREATED_TABLE_ID = "gps_sppo" + GPS_SPPO_15_MIN_TREATED_TABLE_ID = "gps_sppo_15_minutos" GPS_SPPO_CAPTURE_DELAY_V1 = 1 GPS_SPPO_CAPTURE_DELAY_V2 = 60 GPS_SPPO_RECAPTURE_DELAY_V2 = 6 diff --git a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md new file mode 100644 index 00000000..ea424c45 --- /dev/null +++ b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/CHANGELOG.md @@ -0,0 +1,6 @@ +# Changelog - br_rj_riodejaneiro_onibus_gps + +## [1.0.0] - 2024-08-07 + +### Adicionado +- Migra flow de tratamento de gps dos ônibus a cada 15 minutos (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/135) \ No newline at end of file diff --git a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py index 9527422e..15f9f9b4 100644 --- a/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py @@ -2,9 +2,11 @@ """ Flows for br_rj_riodejaneiro_onibus_gps -DBT 2024-07-02 +DBT: 2024-08-16 """ +from copy import deepcopy + from prefect import Parameter, case, task from prefect.run_configs import KubernetesRun from prefect.storage import GCS @@ -52,7 +54,12 @@ set_last_run_timestamp, upload_logs_to_bq, ) -from pipelines.schedules import every_10_minutes, every_hour_minute_six, every_minute +from pipelines.schedules import ( + every_10_minutes, + every_15_minutes, + every_hour_minute_six, + every_minute, +) # from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client @@ -150,6 +157,12 @@ rematerialization = Parameter("rematerialization", default=False) date_range_start = Parameter("date_range_start", default=None) date_range_end = Parameter("date_range_end", default=None) + fifteen_minutes = Parameter("fifteen_minutes", default="") + materialize_delay_hours = Parameter( + "materialize_delay_hours", + default=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value, + ) + truncate_minutes = Parameter("truncate_minutes", default=True) LABELS = get_current_flow_labels() MODE = get_current_flow_mode() @@ -168,7 +181,8 @@ raw_table_id=raw_table_id, table_run_datetime_column_name="timestamp_gps", mode=MODE, - delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value, + delay_hours=materialize_delay_hours, + truncate_minutes=truncate_minutes, ) RUN_CLEAN_FALSE = task( @@ -203,7 +217,7 @@ table_id=table_id, upstream=True, exclude="+data_versao_efetiva", - _vars=[date_range, dataset_sha], + _vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}], flags="--full-refresh", ) @@ -213,7 +227,7 @@ dataset_id=dataset_id, table_id=table_id, exclude="+data_versao_efetiva", - _vars=[date_range, dataset_sha], + _vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}], upstream=True, ) @@ -520,3 +534,53 @@ handler_initialize_sentry, handler_skip_if_running, ] + + +materialize_gps_15_min = deepcopy(materialize_sppo) +materialize_gps_15_min.name = "SMTR: GPS SPPO 15 Minutos - Materialização (subflow)" + +with Flow("SMTR: GPS SPPO 15 Minutos - Tratamento") as recaptura_15min: + version = Parameter("version", default=2) + datetime_filter_gps = Parameter("datetime_filter_gps", default=None) + rebuild = Parameter("rebuild", default=False) + # SETUP # + LABELS = get_current_flow_labels() + PROJECT = get_flow_project() + + rename_flow_run = rename_current_flow_run_now_time( + prefix=recaptura.name + ": ", now_time=get_now_time() + ) + + materialize_no_error = create_flow_run( + flow_name=materialize_gps_15_min.name, + project_name=PROJECT, + labels=LABELS, + run_name=materialize_sppo.name, + parameters={ + "table_id": constants.GPS_SPPO_15_MIN_TREATED_TABLE_ID.value, + "rebuild": rebuild, + "materialize_delay_hours": 0, + "truncate_minutes": False, + "fifteen_minutes": "_15_minutos", + }, + ) + + wait_materialize_no_error = wait_for_flow_run( + materialize_no_error, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + + +recaptura_15min.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +recaptura_15min.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value], +) +recaptura_15min.schedule = every_15_minutes +recaptura_15min.state_handlers = [ + handler_inject_bd_credentials, + handler_initialize_sentry, + handler_skip_if_running, +] diff --git a/pipelines/migration/tasks.py b/pipelines/migration/tasks.py index 4868d341..b7c2d84e 100644 --- a/pipelines/migration/tasks.py +++ b/pipelines/migration/tasks.py @@ -1260,6 +1260,7 @@ def get_materialization_date_range( # pylint: disable=R0913 mode: str = "prod", delay_hours: int = 0, end_ts: datetime = None, + truncate_minutes: bool = True, ): """ Task for generating dict with variables to be passed to the @@ -1323,16 +1324,20 @@ def get_materialization_date_range( # pylint: disable=R0913 last_run = datetime(last_run.year, last_run.month, last_run.day) # set start to last run hour (H) - start_ts = last_run.replace(minute=0, second=0, microsecond=0).strftime(timestr) + start_ts = last_run.replace(second=0, microsecond=0).strftime(timestr) + if truncate_minutes: + start_ts = start_ts.replace(minute=0) # set end to now - delay if not end_ts: end_ts = pendulum.now(constants.TIMEZONE.value).replace( - tzinfo=None, minute=0, second=0, microsecond=0 + tzinfo=None, second=0, microsecond=0 ) - end_ts = (end_ts - timedelta(hours=delay_hours)).replace(minute=0, second=0, microsecond=0) + end_ts = (end_ts - timedelta(hours=delay_hours)).replace(second=0, microsecond=0) + if truncate_minutes: + end_ts = end_ts.replace(minute=0) end_ts = end_ts.strftime(timestr) diff --git a/pipelines/schedules.py b/pipelines/schedules.py index e8921874..516c3163 100644 --- a/pipelines/schedules.py +++ b/pipelines/schedules.py @@ -204,3 +204,15 @@ def generate_interval_schedule( ) ] ) + +every_15_minutes = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=15), + start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)), + labels=[ + emd_constants.RJ_SMTR_AGENT_LABEL.value, + ], + ), + ] +) diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_filtrada.sql b/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_filtrada.sql index 8ac8cff9..4e37cfcc 100644 --- a/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_filtrada.sql +++ b/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_filtrada.sql @@ -1,3 +1,10 @@ +{% if var("fifteen_minutes") == "_15_minutos" %} +{{ + config( + materialized='ephemeral', + ) +}} +{% else %} {{ config( materialized='incremental', @@ -8,6 +15,8 @@ } ) }} +{% endif %} + /* Descrição: Filtragem e tratamento básico de registros de gps. @@ -31,11 +40,9 @@ gps AS ( ST_GEOGPOINT(longitude, latitude) posicao_veiculo_geo FROM {{ ref('sppo_registros') }} - {% if is_incremental() -%} WHERE data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}") AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}" - {%- endif -%} ), realocacao as ( SELECT diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_realocacao.sql b/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_realocacao.sql index 9fa59556..fa1a1131 100644 --- a/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_realocacao.sql +++ b/queries/models/br_rj_riodejaneiro_onibus_gps/sppo_aux_registros_realocacao.sql @@ -1,3 +1,10 @@ +{% if var("fifteen_minutes") == "_15_minutos" %} +{{ + config( + materialized='ephemeral', + ) +}} +{% else %} {{ config( materialized='incremental', @@ -8,6 +15,7 @@ } ) }} +{% endif %} -- 1. Filtra realocações válidas dentro do intervalo de GPS avaliado with realocacao as ( @@ -21,16 +29,11 @@ with realocacao as ( {{ ref('sppo_realocacao') }} where -- Realocação deve acontecer após o registro de GPS e até 1 hora depois - datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60 - {% if is_incremental() -%} - and - data between DATE("{{var('date_range_start')}}") - and DATE(datetime_add("{{var('date_range_end')}}", interval 1 - hour)) - and - datetime_operacao between datetime("{{var('date_range_start')}}") - and datetime_add("{{var('date_range_end')}}", interval 1 hour) - {%- endif -%} + datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60 + and data between DATE("{{var('date_range_start')}}") + and DATE(datetime_add("{{var('date_range_end')}}", interval 1 hour)) + and datetime_operacao between datetime("{{var('date_range_start')}}") + and datetime_add("{{var('date_range_end')}}", interval 1 hour) ), -- 2. Altera registros de GPS com servicos realocados gps as ( diff --git a/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md index 946c270f..ff18a460 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog - br_rj_riodejaneiro_veiculos +## [1.0.2] - 2024-08-07 + +### Adicionado +- Cria modelo `gps_sppo_15_minutos.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/135) + ## [1.0.1] - 2024-08-02 ### Alterado diff --git a/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo_15_minutos.sql b/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo_15_minutos.sql new file mode 100644 index 00000000..968da1bb --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo_15_minutos.sql @@ -0,0 +1,138 @@ +{{ + config( + materialized='table', + partition_by={ + 'field':"data", + 'data_type':'date', + 'granularity': 'day' + }, + tags=['geolocalizacao'] + ) +}} +/* +Descrição: +Junção dos passos de tratamento, junta as informações extras que definimos a partir dos registros +capturados. +Para descrição detalhada de como cada coluna é calculada, consulte a documentação de cada uma das tabelas +utilizadas abaixo. +1. registros_filtrada: filtragem e tratamento básico dos dados brutos capturados. +2. aux_registros_velocidade: estimativa da velocidade de veículo a cada ponto registrado e identificação +do estado de movimento ('parado', 'andando') +3. aux_registros_parada: identifica veículos parados em terminais ou garagens conhecidas +4. aux_registros_flag_trajeto_correto: calcula intersecções das posições registradas para cada veículo +com o traçado da linha informada. +5. As junções (joins) são feitas sobre o id_veículo e a timestamp_gps. +*/ +WITH + registros as ( + -- 1. registros_filtrada + SELECT + id_veiculo, + timestamp_gps, + timestamp_captura, + velocidade, + linha, + latitude, + longitude, + FROM {{ ref('sppo_aux_registros_filtrada') }} + WHERE + data = DATE("{{var('date_range_end')}}") + AND timestamp_gps > DATETIME_SUB("{{var('date_range_end')}}", INTERVAL 75 MINUTE) + AND timestamp_gps <= "{{var('date_range_end')}}" + ), + velocidades AS ( + -- 2. velocidades + SELECT + id_veiculo, timestamp_gps, linha, velocidade, distancia, flag_em_movimento + FROM + {{ ref('sppo_aux_registros_velocidade') }} + ), + paradas as ( + -- 3. paradas + SELECT + id_veiculo, timestamp_gps, linha, tipo_parada, + FROM {{ ref('sppo_aux_registros_parada') }} + ), + flags AS ( + -- 4. flag_trajeto_correto + SELECT + id_veiculo, + timestamp_gps, + linha, + route_id, + flag_linha_existe_sigmob, + flag_trajeto_correto, + flag_trajeto_correto_hist + FROM + {{ ref('sppo_aux_registros_flag_trajeto_correto') }} + ) +-- 5. Junção final +SELECT + "SPPO" modo, + r.timestamp_gps, + date(r.timestamp_gps) data, + extract(time from r.timestamp_gps) hora, + r.id_veiculo, + r.linha as servico, + r.latitude, + r.longitude, + CASE + WHEN + flag_em_movimento IS true AND flag_trajeto_correto_hist is true + THEN true + ELSE false + END flag_em_operacao, + v.flag_em_movimento, + p.tipo_parada, + flag_linha_existe_sigmob, + flag_trajeto_correto, + flag_trajeto_correto_hist, + CASE + WHEN flag_em_movimento IS true AND flag_trajeto_correto_hist is true + THEN 'Em operação' + WHEN flag_em_movimento is true and flag_trajeto_correto_hist is false + THEN 'Operando fora trajeto' + WHEN flag_em_movimento is false + THEN + CASE + WHEN tipo_parada is not null + THEN concat("Parado ", tipo_parada) + ELSE + CASE + WHEN flag_trajeto_correto_hist is true + THEN 'Parado trajeto correto' + ELSE 'Parado fora trajeto' + END + END + END status, + r.velocidade velocidade_instantanea, + v.velocidade velocidade_estimada_10_min, + v.distancia, + "{{ var("version") }}" as versao +FROM + registros r + +JOIN + flags f +ON + r.id_veiculo = f.id_veiculo + AND r.timestamp_gps = f.timestamp_gps + AND r.linha = f.linha + +JOIN + velocidades v +ON + r.id_veiculo = v.id_veiculo + AND r.timestamp_gps = v.timestamp_gps + AND r.linha = v.linha + +JOIN + paradas p +ON + r.id_veiculo = p.id_veiculo + AND r.timestamp_gps = p.timestamp_gps + AND r.linha = p.linha +WHERE + DATE(r.timestamp_gps) = DATE("{{var('date_range_end')}}") + AND r.timestamp_gps > DATETIME_SUB("{{var('date_range_end')}}", INTERVAL 75 MINUTE) + AND r.timestamp_gps <= "{{var('date_range_end')}}" diff --git a/queries/models/br_rj_riodejaneiro_veiculos/schema.yaml b/queries/models/br_rj_riodejaneiro_veiculos/schema.yaml index 433840b5..2377f843 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/schema.yaml +++ b/queries/models/br_rj_riodejaneiro_veiculos/schema.yaml @@ -7,3 +7,5 @@ models: description: "Tabela com os dados tratados de registros de GPS do SPPO, incluindo velocidade estimada, estado de movimento, parada em terminal ou garagem e interseção com o traçado da linha informada." - name: gps_brt_15_minutos description: "Tabela com os dados tratados de registros de GPS do BRT, incluindo velocidade estimada, estado de movimento, parada em terminal ou garagem e interseção com o traçado da linha informada." + - name: gps_sppo_15_minutos + description: "Tabela com os dados tratados de registros de GPS do SPPO da última hora e 15 minutos, incluindo velocidade estimada, estado de movimento, parada em terminal ou garagem e interseção com o traçado da linha informada." \ No newline at end of file