Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migra GPS 15 minutos #135

Merged
merged 29 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
edb2e20
add schedule every_15_minute
akaBotelho Aug 6, 2024
cf20308
cria flow gps 15 min
akaBotelho Aug 7, 2024
b155128
altera label schedule
akaBotelho Aug 7, 2024
9f4f09f
remove parametro
akaBotelho Aug 7, 2024
47e3133
altera GPS_SPPO_15_MIN_TREATED_TABLE_ID
akaBotelho Aug 7, 2024
4a5fe4f
add data dbt
akaBotelho Aug 7, 2024
a942356
cria modelo gps_sppo_15_minutos
akaBotelho Aug 7, 2024
4d8f00f
altera para prod
akaBotelho Aug 7, 2024
0600500
add changelogs
akaBotelho Aug 7, 2024
7c67745
Merge branch 'main' into staging/gps-sppo-15-min
mergify[bot] Aug 7, 2024
4cedbc2
altera materialized e filtros
akaBotelho Aug 7, 2024
17122d4
Merge branch 'staging/gps-sppo-15-min' of https://github.com/prefeitu…
akaBotelho Aug 7, 2024
974bf8f
teste var fifteen_minutes
akaBotelho Aug 7, 2024
3de4832
corrige _vars
akaBotelho Aug 8, 2024
a8dc03e
altera para dev
akaBotelho Aug 8, 2024
44aa9d7
altera materialized
akaBotelho Aug 8, 2024
5b03f22
altera condições where
akaBotelho Aug 8, 2024
e11fc21
corrige refs
akaBotelho Aug 8, 2024
acb7151
altera refs
akaBotelho Aug 8, 2024
82aca06
altera materialized
akaBotelho Aug 8, 2024
1303744
volta materialized para view
akaBotelho Aug 8, 2024
cab3d6f
Merge branch 'main' into staging/gps-sppo-15-min
mergify[bot] Aug 8, 2024
f18e604
altera schedule do flow recaptura_15min
akaBotelho Aug 8, 2024
e4711dd
adiciona truncate_minutes
akaBotelho Aug 9, 2024
170a795
altera materialized para ephemeral
akaBotelho Aug 12, 2024
1faf457
Merge branch 'main' into staging/gps-sppo-15-min
mergify[bot] Aug 12, 2024
a467df4
Merge branch 'main' into staging/gps-sppo-15-min
mergify[bot] Aug 14, 2024
89d744d
altera para prod
akaBotelho Aug 16, 2024
4382396
Merge branch 'staging/gps-sppo-15-min' of https://github.com/prefeitu…
akaBotelho Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class constants(Enum): # pylint: disable=c0103
GPS_SPPO_RAW_TABLE_ID = "registros"
GPS_SPPO_DATASET_ID = "br_rj_riodejaneiro_veiculos"
GPS_SPPO_TREATED_TABLE_ID = "gps_sppo"
GPS_SPPO_15_MIN_TREATED_TABLE_ID = "gps_sppo_15_minutos"
GPS_SPPO_CAPTURE_DELAY_V1 = 1
GPS_SPPO_CAPTURE_DELAY_V2 = 60
GPS_SPPO_RECAPTURE_DELAY_V2 = 6
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Changelog - br_rj_riodejaneiro_onibus_gps

## [1.0.0] - 2024-08-07

### Adicionado
- Migra flow de tratamento de gps dos ônibus a cada 15 minutos (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/135)
74 changes: 69 additions & 5 deletions pipelines/migration/br_rj_riodejaneiro_onibus_gps/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
"""
Flows for br_rj_riodejaneiro_onibus_gps

DBT 2024-07-02
DBT: 2024-08-16
"""

from copy import deepcopy

from prefect import Parameter, case, task
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
Expand Down Expand Up @@ -52,7 +54,12 @@
set_last_run_timestamp,
upload_logs_to_bq,
)
from pipelines.schedules import every_10_minutes, every_hour_minute_six, every_minute
from pipelines.schedules import (
every_10_minutes,
every_15_minutes,
every_hour_minute_six,
every_minute,
)

# from pipelines.utils.execute_dbt_model.tasks import get_k8s_dbt_client

Expand Down Expand Up @@ -150,6 +157,12 @@
rematerialization = Parameter("rematerialization", default=False)
date_range_start = Parameter("date_range_start", default=None)
date_range_end = Parameter("date_range_end", default=None)
fifteen_minutes = Parameter("fifteen_minutes", default="")
materialize_delay_hours = Parameter(
"materialize_delay_hours",
default=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
)
truncate_minutes = Parameter("truncate_minutes", default=True)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode()
Expand All @@ -168,7 +181,8 @@
raw_table_id=raw_table_id,
table_run_datetime_column_name="timestamp_gps",
mode=MODE,
delay_hours=constants.GPS_SPPO_MATERIALIZE_DELAY_HOURS.value,
delay_hours=materialize_delay_hours,
truncate_minutes=truncate_minutes,
)

RUN_CLEAN_FALSE = task(
Expand Down Expand Up @@ -203,7 +217,7 @@
table_id=table_id,
upstream=True,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha],
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
flags="--full-refresh",
)

Expand All @@ -213,7 +227,7 @@
dataset_id=dataset_id,
table_id=table_id,
exclude="+data_versao_efetiva",
_vars=[date_range, dataset_sha],
_vars=[date_range, dataset_sha, {"fifteen_minutes": fifteen_minutes}],
upstream=True,
)

Expand Down Expand Up @@ -520,3 +534,53 @@
handler_initialize_sentry,
handler_skip_if_running,
]


materialize_gps_15_min = deepcopy(materialize_sppo)
materialize_gps_15_min.name = "SMTR: GPS SPPO 15 Minutos - Materialização (subflow)"

with Flow("SMTR: GPS SPPO 15 Minutos - Tratamento") as recaptura_15min:
version = Parameter("version", default=2)
datetime_filter_gps = Parameter("datetime_filter_gps", default=None)
rebuild = Parameter("rebuild", default=False)
# SETUP #
LABELS = get_current_flow_labels()
PROJECT = get_flow_project()

rename_flow_run = rename_current_flow_run_now_time(
prefix=recaptura.name + ": ", now_time=get_now_time()
)

materialize_no_error = create_flow_run(
flow_name=materialize_gps_15_min.name,
project_name=PROJECT,
labels=LABELS,
run_name=materialize_sppo.name,
parameters={
"table_id": constants.GPS_SPPO_15_MIN_TREATED_TABLE_ID.value,
"rebuild": rebuild,
"materialize_delay_hours": 0,
"truncate_minutes": False,
"fifteen_minutes": "_15_minutos",
},
)

wait_materialize_no_error = wait_for_flow_run(
materialize_no_error,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)


recaptura_15min.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
recaptura_15min.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
recaptura_15min.schedule = every_15_minutes
recaptura_15min.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
handler_skip_if_running,
]
11 changes: 8 additions & 3 deletions pipelines/migration/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@ def get_materialization_date_range( # pylint: disable=R0913
mode: str = "prod",
delay_hours: int = 0,
end_ts: datetime = None,
truncate_minutes: bool = True,
):
"""
Task for generating dict with variables to be passed to the
Expand Down Expand Up @@ -1323,16 +1324,20 @@ def get_materialization_date_range( # pylint: disable=R0913
last_run = datetime(last_run.year, last_run.month, last_run.day)

# set start to last run hour (H)
start_ts = last_run.replace(minute=0, second=0, microsecond=0).strftime(timestr)
start_ts = last_run.replace(second=0, microsecond=0).strftime(timestr)
if truncate_minutes:
start_ts = start_ts.replace(minute=0)

# set end to now - delay

if not end_ts:
end_ts = pendulum.now(constants.TIMEZONE.value).replace(
tzinfo=None, minute=0, second=0, microsecond=0
tzinfo=None, second=0, microsecond=0
)

end_ts = (end_ts - timedelta(hours=delay_hours)).replace(minute=0, second=0, microsecond=0)
end_ts = (end_ts - timedelta(hours=delay_hours)).replace(second=0, microsecond=0)
if truncate_minutes:
end_ts = end_ts.replace(minute=0)

end_ts = end_ts.strftime(timestr)

Expand Down
12 changes: 12 additions & 0 deletions pipelines/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,15 @@ def generate_interval_schedule(
)
]
)

every_15_minutes = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=15),
start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)),
labels=[
emd_constants.RJ_SMTR_AGENT_LABEL.value,
],
),
]
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
{% if var("fifteen_minutes") == "_15_minutos" %}
{{
config(
materialized='ephemeral',
)
}}
{% else %}
{{
config(
materialized='incremental',
Expand All @@ -8,6 +15,8 @@
}
)
}}
{% endif %}

/*
Descrição:
Filtragem e tratamento básico de registros de gps.
Expand All @@ -31,11 +40,9 @@ gps AS (
ST_GEOGPOINT(longitude, latitude) posicao_veiculo_geo
FROM
{{ ref('sppo_registros') }}
{% if is_incremental() -%}
WHERE
data between DATE("{{var('date_range_start')}}") and DATE("{{var('date_range_end')}}")
AND timestamp_gps > "{{var('date_range_start')}}" and timestamp_gps <="{{var('date_range_end')}}"
{%- endif -%}
),
realocacao as (
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
{% if var("fifteen_minutes") == "_15_minutos" %}
{{
config(
materialized='ephemeral',
)
}}
{% else %}
{{
config(
materialized='incremental',
Expand All @@ -8,6 +15,7 @@
}
)
}}
{% endif %}

-- 1. Filtra realocações válidas dentro do intervalo de GPS avaliado
with realocacao as (
Expand All @@ -21,16 +29,11 @@ with realocacao as (
{{ ref('sppo_realocacao') }}
where
-- Realocação deve acontecer após o registro de GPS e até 1 hora depois
datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60
{% if is_incremental() -%}
and
data between DATE("{{var('date_range_start')}}")
and DATE(datetime_add("{{var('date_range_end')}}", interval 1
hour))
and
datetime_operacao between datetime("{{var('date_range_start')}}")
and datetime_add("{{var('date_range_end')}}", interval 1 hour)
{%- endif -%}
datetime_diff(datetime_operacao, datetime_entrada, minute) between 0 and 60
and data between DATE("{{var('date_range_start')}}")
and DATE(datetime_add("{{var('date_range_end')}}", interval 1 hour))
and datetime_operacao between datetime("{{var('date_range_start')}}")
and datetime_add("{{var('date_range_end')}}", interval 1 hour)
),
-- 2. Altera registros de GPS com servicos realocados
gps as (
Expand Down
5 changes: 5 additions & 0 deletions queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog - br_rj_riodejaneiro_veiculos

## [1.0.2] - 2024-08-07

### Adicionado
- Cria modelo `gps_sppo_15_minutos.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/135)

## [1.0.1] - 2024-08-02

### Alterado
Expand Down
Loading
Loading