Skip to content

Commit

Permalink
cria flow para reprocessar fare_rules
Browse files Browse the repository at this point in the history
  • Loading branch information
vtr363 committed Jul 19, 2024
1 parent 1843db3 commit 593439e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class constants(Enum): # pylint: disable=c0103
"stops": ["stop_id"],
"trips": ["trip_id"],
"fare_attributes": ["fare_id"],
"fare_rules": [],
"fare_rules": ["fare_id", "route_id"],
"stop_times": ["trip_id", "stop_sequence"],
}

Expand Down
84 changes: 84 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
get_last_capture_os,
get_os_info,
get_raw_drive_files,
get_raw_staging_data_gcs,
update_last_captured_os,
)
from pipelines.migration.br_rj_riodejaneiro_onibus_gps.tasks import (
create_source_path,
)
from prefeitura_rio.pipelines_utils.logging import log
from pipelines.migration.tasks import (
create_date_hour_partition,
create_local_partition_path,
Expand All @@ -36,6 +41,7 @@
get_join_dict,
rename_current_flow_run_now_time,
run_dbt_model,
save_raw_local,
transform_raw_to_nested_structure,
unpack_mapped_results_nout2,
upload_raw_data_to_gcs,
Expand Down Expand Up @@ -329,3 +335,81 @@
# handler_inject_bd_credentials,
# handler_initialize_sentry,
# ]

with Flow("SMTR: GTFS fare_rules - Recaptura") as recaptura_gtfs_fare_rules:

data_versao_gtfs_list = Parameter("data_versao_gtfs_list", default=[])

# SETUP #
GTFS_DATASET_ID = "br_rj_riodejaneiro_gtfs"
GTFS_TABLE_ID = "fare_rules"

data_versao_gtfs_list = get_current_timestamp.map(data_versao_gtfs_list)
filenames = parse_timestamp_to_string.map(data_versao_gtfs_list)

partitions = create_date_hour_partition.map(
timestamp=data_versao_gtfs_list,
partition_date_name=unmapped("data_versao"),
partition_date_only=unmapped(True),
)

local_filepaths = create_local_partition_path.map(
dataset_id=unmapped(GTFS_DATASET_ID),
table_id=unmapped(GTFS_TABLE_ID),
partitions=partitions,
filename=filenames,
)

source_paths = create_source_path.map(
dataset_id=unmapped(GTFS_DATASET_ID),
table_id=unmapped(GTFS_TABLE_ID),
partitions=partitions,
filename=filenames,
)

# EXTRACT #
## buscando de staging ##
raw_status = get_raw_staging_data_gcs.map(source_path=source_paths)

raw_filepaths = save_raw_local.map(status=raw_status, file_path=local_filepaths)

# CLEAN #
transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map(
raw_filepath=raw_filepaths,
filepath=local_filepaths,
primary_key=unmapped(list(constants.GTFS_TABLE_CAPTURE_PARAMS.value["fare_rules"])),
timestamp=unmapped(data_versao_gtfs),
error=unmapped(None),
)

errors, treated_filepaths = unpack_mapped_results_nout2(
mapped_results=transform_raw_to_nested_structure_results
)

# LOAD #
upload_staging_data_to_gcs.map(
dataset_id=unmapped(constants.GTFS_DATASET_ID.value),
table_id=unmapped(GTFS_TABLE_ID),
staging_filepath=treated_filepaths,
partitions=partitions,
timestamp=unmapped(data_versao_gtfs),
error=errors,
)

# upload_logs_to_bq(
# dataset_id=constants.GPS_SPPO_RAW_DATASET_ID.value,
# parent_table_id=constants.GPS_SPPO_RAW_TABLE_ID.value,
# error=error,
# timestamp=timestamp,
# )

recaptura_gtfs_fare_rules.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
recaptura_gtfs_fare_rules.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_SMTR_DEV_AGENT_LABEL.value],
)
# recaptura_sppo_v2.schedule = every_minute
recaptura_gtfs_fare_rules.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]
22 changes: 22 additions & 0 deletions pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
"""
Tasks for gtfs
"""
import io
import os
import traceback
import zipfile
from datetime import datetime

Expand All @@ -13,6 +15,7 @@
from prefect import task
from prefeitura_rio.pipelines_utils.logging import log
from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client
import basedosdados as bd

from pipelines.constants import constants
from pipelines.migration.br_rj_riodejaneiro_gtfs.utils import (
Expand Down Expand Up @@ -208,3 +211,22 @@ def get_raw_drive_files(os_control, local_filepath: list):
raw_filepaths.append(raw_file_path)

return raw_filepaths, list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.values())


@task
def get_raw_staging_data_gcs(source_path: str) -> dict:
try:
bucket = bd.Storage(dataset_id="", table_id="", bucket_name="rj-smtr-staging")
log(f"Downloading data from {source_path}...")

blob = bucket.bucket.get_blob(blob_name=f"{source_path}txt")
bytes_data = blob.download_as_bytes()
data = pd.read_csv(io.BytesIO(bytes_data))

log(f"Data downloaded from {source_path}\n{data.head()}")
data = data.to_dict(orient="records")
except Exception:
e = traceback.format_exc()
log(f"Error downloading data from {source_path}: {e}", level="error")
return {"data": [], "error": e}
return {"data": data, "error": None}
6 changes: 4 additions & 2 deletions queries/models/gtfs/fare_rules_gtfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ SELECT
fi.feed_version,
SAFE_CAST(fr.data_versao AS DATE) feed_start_date,
fi.feed_end_date,
SAFE_CAST(JSON_VALUE(fr.content, '$.fare_id') AS STRING) fare_id,
SAFE_CAST(JSON_VALUE(fr.content, '$.route_id') AS STRING) route_id,
SAFE_CAST(fr.fare_id AS STRING) fare_id,
SAFE_CAST(fr.route_id AS STRING) route_id,
-- SAFE_CAST(JSON_VALUE(fr.content, '$.fare_id') AS STRING) fare_id,
-- SAFE_CAST(JSON_VALUE(fr.content, '$.route_id') AS STRING) route_id,
SAFE_CAST(JSON_VALUE(fr.content, '$.origin_id') AS STRING) origin_id,
SAFE_CAST(JSON_VALUE(fr.content, '$.destination_id') AS STRING) destination_id,
SAFE_CAST(JSON_VALUE(fr.content, '$.contains_id') AS STRING) contains_id,
Expand Down

0 comments on commit 593439e

Please sign in to comment.