From 7f82815acd73838af72ee1ed091739d16d51b92f Mon Sep 17 00:00:00 2001 From: Rodrigo Cunha <66736583+eng-rodrigocunha@users.noreply.github.com> Date: Mon, 22 Jul 2024 21:30:14 -0300 Subject: [PATCH 01/11] initial commit (#115) --- queries/models/docs.md | 3 +++ queries/models/projeto_subsidio_sppo/schema.yml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 queries/models/docs.md diff --git a/queries/models/docs.md b/queries/models/docs.md new file mode 100644 index 00000000..805d8330 --- /dev/null +++ b/queries/models/docs.md @@ -0,0 +1,3 @@ +{% docs consorcio %} +Consórcio ao qual o serviço pertence +{% enddocs %} \ No newline at end of file diff --git a/queries/models/projeto_subsidio_sppo/schema.yml b/queries/models/projeto_subsidio_sppo/schema.yml index 2849fabb..a0be43fc 100644 --- a/queries/models/projeto_subsidio_sppo/schema.yml +++ b/queries/models/projeto_subsidio_sppo/schema.yml @@ -6,7 +6,7 @@ models: description: "Tabela histórica com detalhes das viagens planejadas a cada dia" columns: - name: consorcio - description: "Consórcio ao qual o serviço pertence" + description: "{{ doc('consorcio') }}" tests: - not_null - name: data From 81bc01defd6f5c94a3ab718b3e96067bcb5d18f8 Mon Sep 17 00:00:00 2001 From: Victor Miguel Rocha Date: Tue, 23 Jul 2024 14:12:44 -0300 Subject: [PATCH 02/11] Adiciona parametros para a captura manual, ajusta tabelas do gtfs (#82) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adiciona parametros para a captura manual do gtfs * atualiza changelog * remove case em get_last_capture_os * adiciona with case para get_last_capture_os * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * adiciona função normalizar_horario em processa_ordem_servico * adiciona parametros capture e materialize * adiciona merge apos materialize * adiciona descrições de tabelas do gtfs * refatora flow tasks * corrige case materialize * corrige import do pandas e case da materialização * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * verifica tipo de data_versao_gtfs * adiciona merge apos case * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove case data_versao_gtfs adiciona case captura e materialize false * adiciona conversão de data ao update_last_captured_os * remove antiga converção de formato de data_versao_gtfs em get_os_info * adiciona verifica_materialize * corrige verifica_materialize * adiciona merge de flag_new_os * corrige verificação de tipo de data_versao_gtfs_merge * semplifica verifica_materialize * simplifica verifica_materialize * remove merge flag_new_os * corrige nome de variaveis de flag_new_os_task * altera verificação de materialização * atualiza changelog * simplifica parametros de captura * ajusta primary key de fare_rules * ajusta verifivação de tipo para data_versao_gtfs * atualiza tratamento de fare_rule e feed_info * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- pipelines/constants.py | 2 +- .../br_rj_riodejaneiro_gtfs/CHANGELOG.md | 7 + .../br_rj_riodejaneiro_gtfs/flows.py | 160 +++++++++++------- .../br_rj_riodejaneiro_gtfs/tasks.py | 25 ++- .../br_rj_riodejaneiro_gtfs/utils.py | 59 +++++-- queries/models/gtfs/CHANGELOG.md | 8 + queries/models/gtfs/fare_rules_gtfs.sql | 4 +- queries/models/gtfs/feed_info_gtfs.sql | 67 +++++--- 8 files changed, 233 insertions(+), 99 deletions(-) diff --git a/pipelines/constants.py b/pipelines/constants.py index c26f7592..a63e7c28 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -236,7 +236,7 @@ class constants(Enum): # pylint: disable=c0103 "stops": ["stop_id"], "trips": ["trip_id"], "fare_attributes": ["fare_id"], - "fare_rules": [], + "fare_rules": ["fare_id", "route_id"], "stop_times": ["trip_id", "stop_sequence"], } diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 010e645b..7bd97103 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -1,11 +1,18 @@ # Changelog - gtfs +## [1.0.4] - 2024-07-17 + +### Adicionado + +- Adiciona parametros para a captura manual do gtfs no flow `SMTR: GTFS - Captura/Tratamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82/) + ## [1.0.3] - 2024-07-04 ## Corrigido - Corrigido o formato da data salva no redis de d/m/y para y-m-d (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/91) + ## [1.0.2] - 2024-06-21 ### Adicionado diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index bdd8bc59..30f91fe4 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -8,6 +8,7 @@ from prefect import Parameter, case, task from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.control_flow import merge from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow @@ -98,94 +99,131 @@ # ) with Flow("SMTR: GTFS - Captura/Tratamento") as gtfs_captura_nova: - capture = Parameter("capture", default=True) - materialize = Parameter("materialize", default=True) + materialize_only = Parameter("materialize_only", default=False) + regular_sheet_index = Parameter("regular_sheet_index", default=None) + data_versao_gtfs_param = Parameter("data_versao_gtfs", default=None) mode = get_current_flow_mode() - last_captured_os = get_last_capture_os(mode=mode, dataset_id=constants.GTFS_DATASET_ID.value) - - timestamp = get_scheduled_timestamp() - - flag_new_os, os_control, data_index, data_versao_gtfs = get_os_info( - last_captured_os=last_captured_os - ) - - with case(flag_new_os, True): - rename_current_flow_run_now_time( - prefix=gtfs_captura_nova.name + ' ["' + data_versao_gtfs + '"] ', now_time=timestamp + data_versao_gtfs_task = None + last_captured_os_none = None + with case(data_versao_gtfs_param, None): + last_captured_os_redis = get_last_capture_os( + mode=mode, dataset_id=constants.GTFS_DATASET_ID.value ) - data_versao_gtfs = get_current_timestamp(data_versao_gtfs) + last_captured_os = merge(last_captured_os_none, last_captured_os_redis) + with case(materialize_only, False): - partition = create_date_hour_partition( - timestamp=data_versao_gtfs, partition_date_name="data_versao", partition_date_only=True - ) + timestamp = get_scheduled_timestamp() - filename = parse_timestamp_to_string(data_versao_gtfs) - - table_ids = task(lambda: list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()))() - - local_filepaths = create_local_partition_path.map( - dataset_id=unmapped(constants.GTFS_DATASET_ID.value), - table_id=table_ids, - partitions=unmapped(partition), - filename=unmapped(filename), + flag_new_os, os_control, data_index, data_versao_gtfs_task = get_os_info( + last_captured_os=last_captured_os, data_versao_gtfs=data_versao_gtfs_param ) - raw_filepaths, primary_keys = get_raw_drive_files( - os_control=os_control, local_filepath=local_filepaths - ) - - transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( - raw_filepath=raw_filepaths, - filepath=local_filepaths, - primary_key=primary_keys, - timestamp=unmapped(data_versao_gtfs), - error=unmapped(None), - ) + with case(flag_new_os, True): + rename_current_flow_run_now_time( + prefix=gtfs_captura_nova.name + ' ["' + data_versao_gtfs_task + '"] ', + now_time=timestamp, + ) + + data_versao_gtfs_task = get_current_timestamp(data_versao_gtfs_task) + + partition = create_date_hour_partition( + timestamp=data_versao_gtfs_task, + partition_date_name="data_versao", + partition_date_only=True, + ) + + filename = parse_timestamp_to_string(data_versao_gtfs_task) + + table_ids = task(lambda: list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()))() + + local_filepaths = create_local_partition_path.map( + dataset_id=unmapped(constants.GTFS_DATASET_ID.value), + table_id=table_ids, + partitions=unmapped(partition), + filename=unmapped(filename), + ) + + raw_filepaths, primary_keys = get_raw_drive_files( + os_control=os_control, + local_filepath=local_filepaths, + regular_sheet_index=regular_sheet_index, + ) + + transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( + raw_filepath=raw_filepaths, + filepath=local_filepaths, + primary_key=primary_keys, + timestamp=unmapped(data_versao_gtfs_task), + error=unmapped(None), + ) + + errors, treated_filepaths = unpack_mapped_results_nout2( + mapped_results=transform_raw_to_nested_structure_results + ) + + errors = upload_raw_data_to_gcs.map( + dataset_id=unmapped(constants.GTFS_DATASET_ID.value), + table_id=table_ids, + raw_filepath=raw_filepaths, + partitions=unmapped(partition), + error=unmapped(None), + ) + + wait_captura_true = upload_staging_data_to_gcs.map( + dataset_id=unmapped(constants.GTFS_DATASET_ID.value), + table_id=table_ids, + staging_filepath=treated_filepaths, + partitions=unmapped(partition), + timestamp=unmapped(data_versao_gtfs_task), + error=errors, + ) + with case(materialize_only, True): + wait_captura_false = task() + + data_versao_gtfs_merge = merge(data_versao_gtfs_task, data_versao_gtfs_param) + wait_captura = merge(wait_captura_true, wait_captura_false) + + verifica_materialize = task(lambda data_versao: data_versao is not None)( + data_versao=data_versao_gtfs_merge + ) - errors, treated_filepaths = unpack_mapped_results_nout2( - mapped_results=transform_raw_to_nested_structure_results - ) + data_versao_gtfs_is_str = task(lambda data_versao: isinstance(data_versao, str))( + data_versao_gtfs_merge + ) - errors = upload_raw_data_to_gcs.map( - dataset_id=unmapped(constants.GTFS_DATASET_ID.value), - table_id=table_ids, - raw_filepath=raw_filepaths, - partitions=unmapped(partition), - error=unmapped(None), + with case(data_versao_gtfs_is_str, False): + string_data_versao_gtfs = parse_timestamp_to_string( + timestamp=data_versao_gtfs_merge, pattern="%Y-%m-%d" ) - wait_upload_staging_data_to_gcs = upload_staging_data_to_gcs.map( - dataset_id=unmapped(constants.GTFS_DATASET_ID.value), - table_id=table_ids, - staging_filepath=treated_filepaths, - partitions=unmapped(partition), - timestamp=unmapped(data_versao_gtfs), - error=errors, - ) + data_versao_gtfs = merge(string_data_versao_gtfs, data_versao_gtfs_merge) - string_data_versao_gtfs = parse_timestamp_to_string( - timestamp=data_versao_gtfs, pattern="%Y-%m-%d" - ) + with case(verifica_materialize, True): version = fetch_dataset_sha(dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value) - dbt_vars = get_join_dict([{"data_versao_gtfs": string_data_versao_gtfs}], version)[0] + dbt_vars = get_join_dict([{"data_versao_gtfs": data_versao_gtfs}], version)[0] wait_run_dbt_model = run_dbt_model( dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value, _vars=dbt_vars, - ).set_upstream(task=wait_upload_staging_data_to_gcs) + ).set_upstream(task=wait_captura) - update_last_captured_os( + wait_materialize_true = update_last_captured_os( dataset_id=constants.GTFS_DATASET_ID.value, data_index=data_index, mode=mode, ).set_upstream(task=wait_run_dbt_model) + with case(verifica_materialize, False): + wait_materialize_false = task() + + wait_materialize = merge(wait_materialize_true, wait_materialize_false) + with case(flag_new_os, False): rename_current_flow_run_now_time( prefix=gtfs_captura_nova.name + " [SKIPPED] ", now_time=timestamp - ) + ).set_upstream(task=wait_materialize) gtfs_captura_nova.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py index c407f0f8..35661375 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py @@ -79,12 +79,24 @@ def update_last_captured_os(dataset_id: str, data_index: str, mode: str = "prod" fetch_key = f"{dataset_id}.last_captured_os" if mode != "prod": fetch_key = f"{mode}.{fetch_key}" - + last_captured_os = redis_client.get(fetch_key) + # verifica se last_capture_os tem formado dia/mes/ano_index e converte para ano-mes-dia_index + if last_captured_os is not None: + if "/" in last_captured_os: + index = last_captured_os.split("_")[1] + data = datetime.strptime(last_captured_os.split("_")[0], "%d/%m/%Y").strftime( + "%Y-%m-%d" + ) + last_captured_os = data + "_" + index + # verifica se a ultima os capturada é maior que a nova + if last_captured_os is not None: + if last_captured_os["last_captured_os"] > data_index: + return redis_client.set(fetch_key, {"last_captured_os": data_index}) @task(nout=4) -def get_os_info(last_captured_os: str) -> dict: +def get_os_info(last_captured_os: str = None, data_versao_gtfs: str = None) -> dict: """ Retrieves information about the OS. @@ -118,9 +130,13 @@ def get_os_info(last_captured_os: str) -> dict: # Ordena por data e index df = df.sort_values(by=["data_index"], ascending=True) - if last_captured_os is None: + if data_versao_gtfs is not None: + df = df.loc[(df["Início da Vigência da OS"] == data_versao_gtfs)] + + elif last_captured_os is None: last_captured_os = df["data_index"].max() df = df.loc[(df["data_index"] == last_captured_os)] + else: # Filtra linhas onde 'data_index' é maior que o último capturado df = df.loc[(df["data_index"] > last_captured_os)] @@ -136,7 +152,7 @@ def get_os_info(last_captured_os: str) -> dict: @task(nout=2) -def get_raw_drive_files(os_control, local_filepath: list): +def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: int = None): """ Downloads raw files from Google Drive and processes them. @@ -182,6 +198,7 @@ def get_raw_drive_files(os_control, local_filepath: list): file_bytes=file_bytes_os, local_filepath=local_filepath, raw_filepaths=raw_filepaths, + regular_sheet_index=regular_sheet_index, ) elif filename == "ordem_servico_trajeto_alternativo": diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py index 14736f69..0accdfa1 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py @@ -152,24 +152,59 @@ def download_xlsx(file_link, drive_service): return file_bytes -def processa_ordem_servico(sheetnames, file_bytes, local_filepath, raw_filepaths): +def normalizar_horario(horario): """ - Process the order of service data from multiple sheets in an Excel file. + Normalizes the given time string. + + If the time string contains the word "day", it splits the string into days and time. + It converts the days, hours, minutes, and seconds into total hours and returns the normalized time string. + + If the time string does not contain the word "day", it returns the time string as is. Args: - sheetnames (list): List of sheet names in the Excel file. - file_bytes (bytes): Bytes of the Excel file. - local_filepath (str): Local file path. - raw_filepaths (list): List of raw file paths. + horario (str): The time string to be normalized. Returns: - None + str: The normalized time string. + + """ + if "day" in horario: + days, time = horario.split(", ") + days = int(days.split(" ")[0]) + hours, minutes, seconds = map(int, time.split(":")) + total_hours = days * 24 + hours + return f"{total_hours:02}:{minutes:02}:{seconds:02}" + else: + return horario.split(" ")[1] if " " in horario else horario + + +def processa_ordem_servico( + sheetnames, file_bytes, local_filepath, raw_filepaths, regular_sheet_index=None +): + """ + Process the OS data from an Excel file. + + Args: + sheetnames (list): List of sheet names in the Excel file. + file_bytes (bytes): The Excel file in bytes format. + local_filepath (str): The local file path where the processed data will be saved. + raw_filepaths (list): List of raw file paths. + regular_sheet_index (int, optional): The index of the regular sheet. Defaults to 0. Raises: - Exception: If there are missing or duplicated columns in the OS data. + Exception: If there are more than 2 tabs in the file or if there are missing or duplicated columns in the order of service data. Exception: If the validation of 'km_test' and 'km_dia_util' fails. + + Returns: + None """ - # conta quantos sheets tem no arquivo sem o nome de Anexo II + + if len(sheetnames) != 2 and regular_sheet_index is None: + raise Exception("More than 2 tabs in the file. Please specify the regular sheet index.") + + if regular_sheet_index is None: + regular_sheet_index = 0 + sheets_range = len(sheetnames) - len([x for x in sheetnames if "ANEXO II" in x]) quadro_geral = pd.DataFrame() @@ -220,7 +255,7 @@ def processa_ordem_servico(sheetnames, file_bytes, local_filepath, raw_filepaths quadro[hora_cols] = quadro[hora_cols].astype(str) for hora_col in hora_cols: - quadro[hora_col] = quadro[hora_col].apply(lambda x: x.split(" ")[1] if " " in x else x) + quadro[hora_col] = quadro[hora_col].apply(normalizar_horario) cols = [ coluna @@ -238,10 +273,10 @@ def processa_ordem_servico(sheetnames, file_bytes, local_filepath, raw_filepaths quadro["extensao_ida"] = quadro["extensao_ida"] / 1000 quadro["extensao_volta"] = quadro["extensao_volta"] / 1000 - if i == 0: + if i == regular_sheet_index: quadro["tipo_os"] = "Regular" - quadro_geral = pd.concat([quadro_geral, quadro]) + quadro_geral = pd.concat([quadro_geral, quadro]) # Verificações columns_in_dataframe = set(quadro_geral.columns) diff --git a/queries/models/gtfs/CHANGELOG.md b/queries/models/gtfs/CHANGELOG.md index ad718a4f..3ef4716a 100644 --- a/queries/models/gtfs/CHANGELOG.md +++ b/queries/models/gtfs/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog - gtfs +## [1.1.6] - 2024-07-22 + +### Alterado + +- Alterada a tabela `feed_info` de table para incremental e adicionada a coluna `feed_update_datetime`(https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82) + +- Alterada a tabela `fare_rules` para refletir a alteração nas primary keys (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82) + ## [1.1.5] - 2024-07-15 ### Adicionado diff --git a/queries/models/gtfs/fare_rules_gtfs.sql b/queries/models/gtfs/fare_rules_gtfs.sql index 167bca8f..fe043ab3 100644 --- a/queries/models/gtfs/fare_rules_gtfs.sql +++ b/queries/models/gtfs/fare_rules_gtfs.sql @@ -14,8 +14,8 @@ SELECT fi.feed_version, SAFE_CAST(fr.data_versao AS DATE) feed_start_date, fi.feed_end_date, - SAFE_CAST(JSON_VALUE(fr.content, '$.fare_id') AS STRING) fare_id, - SAFE_CAST(JSON_VALUE(fr.content, '$.route_id') AS STRING) route_id, + SAFE_CAST(fr.fare_id AS STRING) fare_id, + SAFE_CAST(fr.route_id AS STRING) route_id, SAFE_CAST(JSON_VALUE(fr.content, '$.origin_id') AS STRING) origin_id, SAFE_CAST(JSON_VALUE(fr.content, '$.destination_id') AS STRING) destination_id, SAFE_CAST(JSON_VALUE(fr.content, '$.contains_id') AS STRING) contains_id, diff --git a/queries/models/gtfs/feed_info_gtfs.sql b/queries/models/gtfs/feed_info_gtfs.sql index a85e9401..20230094 100644 --- a/queries/models/gtfs/feed_info_gtfs.sql +++ b/queries/models/gtfs/feed_info_gtfs.sql @@ -1,25 +1,54 @@ {{config( - materialized='table', + materialized='incremental', partition_by = { 'field' :'feed_start_date', 'data_type' :'date', 'granularity': 'day' }, - alias = 'feed_info' + alias = 'feed_info', + unique_key = 'feed_start_date', + incremental_strategy = 'insert_overwrite' )}} - -SELECT - SAFE_CAST(timestamp_captura AS STRING) AS feed_version, - SAFE_CAST(data_versao AS DATE) AS feed_start_date, - DATE_SUB(LEAD(DATE(data_versao)) OVER (ORDER BY data_versao), INTERVAL 1 DAY) AS feed_end_date, - SAFE_CAST(feed_publisher_name AS STRING) feed_publisher_name, - SAFE_CAST(JSON_VALUE(content, '$.feed_publisher_url') AS STRING) feed_publisher_url, - SAFE_CAST(JSON_VALUE(content, '$.feed_lang') AS STRING) feed_lang, - SAFE_CAST(JSON_VALUE(content, '$.default_lang') AS STRING) default_lang, - SAFE_CAST(JSON_VALUE(content, '$.feed_contact_email') AS STRING) feed_contact_email, - SAFE_CAST(JSON_VALUE(content, '$.feed_contact_url') AS STRING) feed_contact_url, - '{{ var("version") }}' AS versao_modelo - FROM - {{ source( - 'br_rj_riodejaneiro_gtfs_staging', - 'feed_info' - ) }} +WITH feed_info AS ( + SELECT + SAFE_CAST(timestamp_captura AS STRING) AS feed_version, + SAFE_CAST(data_versao AS DATE) AS feed_start_date, + NULL AS feed_end_date, + SAFE_CAST(feed_publisher_name AS STRING) feed_publisher_name, + SAFE_CAST(JSON_VALUE(content, '$.feed_publisher_url') AS STRING) feed_publisher_url, + SAFE_CAST(JSON_VALUE(content, '$.feed_lang') AS STRING) feed_lang, + SAFE_CAST(JSON_VALUE(content, '$.default_lang') AS STRING) default_lang, + SAFE_CAST(JSON_VALUE(content, '$.feed_contact_email') AS STRING) feed_contact_email, + SAFE_CAST(JSON_VALUE(content, '$.feed_contact_url') AS STRING) feed_contact_url, + CURRENT_DATETIME("America/Sao_Paulo") AS feed_update_datetime, + '{{ var("version") }}' AS versao_modelo + FROM + {{ source( + 'br_rj_riodejaneiro_gtfs_staging', + 'feed_info' + ) }} + {% if is_incremental() %} + WHERE + data_versao = '{{ var("data_versao_gtfs") }}' + UNION ALL + SELECT + * + FROM + {{ this }} + WHERE + feed_start_date != DATE('{{ var("data_versao_gtfs") }}') + {% endif %} + ) + SELECT + feed_version, + feed_start_date, + DATE_SUB(LEAD(DATE(feed_version)) OVER (ORDER BY feed_version), INTERVAL 1 DAY) AS feed_end_date, + feed_publisher_name, + feed_publisher_url, + feed_lang, + default_lang, + feed_contact_email, + feed_contact_url, + feed_update_datetime, + versao_modelo + FROM + feed_info \ No newline at end of file From c820ec2aa26b56eb5e1476f4ed9bceb8fe563a39 Mon Sep 17 00:00:00 2001 From: Victor Miguel Rocha Date: Tue, 23 Jul 2024 15:18:56 -0300 Subject: [PATCH 03/11] [HOTFIX] Corrige parse de data_versao_gtfs (#118) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adiciona parametros para a captura manual do gtfs * atualiza changelog * remove case em get_last_capture_os * adiciona with case para get_last_capture_os * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * adiciona função normalizar_horario em processa_ordem_servico * adiciona parametros capture e materialize * adiciona merge apos materialize * adiciona descrições de tabelas do gtfs * refatora flow tasks * corrige case materialize * corrige import do pandas e case da materialização * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * verifica tipo de data_versao_gtfs * adiciona merge apos case * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * remove case data_versao_gtfs adiciona case captura e materialize false * adiciona conversão de data ao update_last_captured_os * remove antiga converção de formato de data_versao_gtfs em get_os_info * adiciona verifica_materialize * corrige verifica_materialize * adiciona merge de flag_new_os * corrige verificação de tipo de data_versao_gtfs_merge * semplifica verifica_materialize * simplifica verifica_materialize * remove merge flag_new_os * corrige nome de variaveis de flag_new_os_task * altera verificação de materialização * atualiza changelog * simplifica parametros de captura * ajusta primary key de fare_rules * ajusta verifivação de tipo para data_versao_gtfs * atualiza tratamento de fare_rule e feed_info * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * move verificação de tipo do gtfs para dentro do case * remove verificação desnecessaria * atualiza changelog --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../br_rj_riodejaneiro_gtfs/CHANGELOG.md | 6 ++++++ .../migration/br_rj_riodejaneiro_gtfs/flows.py | 17 ++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 7bd97103..2a4b55ad 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - gtfs +## Corrigido + +## [1.0.5] - 2024-07-23 + +- Corrigido o parse da data_versao_gtf (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/118) + ## [1.0.4] - 2024-07-17 ### Adicionado diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index 30f91fe4..ffdca824 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -189,18 +189,17 @@ data_versao=data_versao_gtfs_merge ) - data_versao_gtfs_is_str = task(lambda data_versao: isinstance(data_versao, str))( - data_versao_gtfs_merge - ) - - with case(data_versao_gtfs_is_str, False): - string_data_versao_gtfs = parse_timestamp_to_string( - timestamp=data_versao_gtfs_merge, pattern="%Y-%m-%d" + with case(verifica_materialize, True): + data_versao_gtfs_is_str = task(lambda data_versao: isinstance(data_versao, str))( + data_versao_gtfs_merge ) + with case(data_versao_gtfs_is_str, False): + string_data_versao_gtfs = parse_timestamp_to_string( + timestamp=data_versao_gtfs_merge, pattern="%Y-%m-%d" + ) - data_versao_gtfs = merge(string_data_versao_gtfs, data_versao_gtfs_merge) + data_versao_gtfs = merge(string_data_versao_gtfs, data_versao_gtfs_merge) - with case(verifica_materialize, True): version = fetch_dataset_sha(dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value) dbt_vars = get_join_dict([{"data_versao_gtfs": data_versao_gtfs}], version)[0] From f958f57d6b63df60eacf6155d8e385295d94fc65 Mon Sep 17 00:00:00 2001 From: Victor Miguel Rocha Date: Tue, 23 Jul 2024 17:07:43 -0300 Subject: [PATCH 04/11] =?UTF-8?q?Adiciona=20descri=C3=A7=C3=A3o=20da=20col?= =?UTF-8?q?una=20feed=5Fupdate=5Fdatetime=20em=20feed=5Finfo=20(#119)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adiciona descrição da coluna feed_update_datetime em feed_info * atualiza changelog --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py | 2 +- queries/models/gtfs/CHANGELOG.md | 7 +++++++ queries/models/gtfs/schema.yml | 2 ++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index ffdca824..6e387cd2 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -2,7 +2,7 @@ """ Flows for gtfs -DBT: 2024-07-15 +DBT: 2024-07-23 """ from prefect import Parameter, case, task diff --git a/queries/models/gtfs/CHANGELOG.md b/queries/models/gtfs/CHANGELOG.md index 3ef4716a..a153ff17 100644 --- a/queries/models/gtfs/CHANGELOG.md +++ b/queries/models/gtfs/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog - gtfs +## [1.1.7] - 2024-07-23 + +### Adicionado + +- Adiciona descrição da coluna `feed_update_datetime` em `feed_info` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/119) + + ## [1.1.6] - 2024-07-22 ### Alterado diff --git a/queries/models/gtfs/schema.yml b/queries/models/gtfs/schema.yml index 0e5f1342..5aef6e79 100644 --- a/queries/models/gtfs/schema.yml +++ b/queries/models/gtfs/schema.yml @@ -146,6 +146,8 @@ models: description: "URL para informações de contato, um formulário web, uma mesa de suporte ou outras ferramentas de comunicação relativas ao conjunto de dados GTFS e práticas de publicação de dados." - name: feed_start_date description: "Data de referência do feed (versão)." + - name: feed_update_datetime + description: "Data e hora da última atualização do feed." - name: versao_modelo description: "Código de controle de versão (SHA do GitHub)." From 9bfd7d9b821d2bbdadb718ac0dfda226078aeee1 Mon Sep 17 00:00:00 2001 From: hellcassius Date: Fri, 26 Jul 2024 14:56:30 -0300 Subject: [PATCH 05/11] update readme --- README.md | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 25fe3498..f6c326b8 100644 --- a/README.md +++ b/README.md @@ -1 +1,54 @@ -# Pipelines rj-smtr \ No newline at end of file +# Pipelines rj-smtr + + +## Setup + +### Etapa 1 - Preparação de Ambiente + +- Na raiz do projeto, crie um ambiente virtual para isolar as dependencias: + - `python3.10 -m venv .pipelines` + +- Ative o ambiente virtual: + - `. .pipelines/bin/activate` + +- Instale as dependencias do projeto: + - `poetry install --all-extras` + - `pip install -e .` + +- Crie um arquivo `.env` na raiz do projeto, contendo as seguintes variáveis: + - ```INFISICAL_ADDRESS = '' + INFISICAL_TOKEN = '' + ``` + - Solicite os valores a serem utilizados para a equipe de devops + +- Adicione as variáveis de ambiente à sua sessão de terminal: + - `set -a && source .env && set +a` + +### Testando flows localmente: + +- Adicione um arquivo `test.py` na raiz do projeto: + - Neste arquivo, você deve importar o flow a ser testado + - Importar a função `run_local` + - `from pipelines.utils.prefect import run_local` + - A assinatura da função é a seguinte: + `run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None)` + Permitindo que se varie os parâmetros a serem passados ao flow durante + uma execução local + - Use `run_local()` e execute o arquivo: + - `python test.py` + - Uma dica interessante que pode ajudar no processo de teste e debug é adicionar + `| tee logs.txt` + ao executar seu teste. + - Isso gerará um arquivo com os logs daquela execução, para que você possa + analisar esses logs mais facilmente do que usando somente o terminal. + +### Etapa 2 - Deploy para staging e PR + + - Sempre trabalhe com branchs `staging/` + - Dê push e abra seu Pull Request. + - Cada commit nesta branch irá disparar as rotinas do Github que: + - Verificam formatação + - Fazem Deploy + - Registram flows em staging (ambiente de testes) + - Você acompanha o status destas rotinas na própria página do seu PR + - Flows registrados aparecem no servidor Prefect. Eles podem ser rodados por lá \ No newline at end of file From e8ef6b031e969b933188ae3910f381067ec90083 Mon Sep 17 00:00:00 2001 From: Caio R S dos Santos Date: Fri, 26 Jul 2024 14:59:46 -0300 Subject: [PATCH 06/11] Update README.md --- README.md | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index f6c326b8..15c48097 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,10 @@ - `pip install -e .` - Crie um arquivo `.env` na raiz do projeto, contendo as seguintes variáveis: - - ```INFISICAL_ADDRESS = '' - INFISICAL_TOKEN = '' - ``` + - ``` + INFISICAL_ADDRESS = '' + INFISICAL_TOKEN = '' + - Solicite os valores a serem utilizados para a equipe de devops - Adicione as variáveis de ambiente à sua sessão de terminal: @@ -44,11 +45,11 @@ ### Etapa 2 - Deploy para staging e PR - - Sempre trabalhe com branchs `staging/` - - Dê push e abra seu Pull Request. - - Cada commit nesta branch irá disparar as rotinas do Github que: - - Verificam formatação - - Fazem Deploy - - Registram flows em staging (ambiente de testes) - - Você acompanha o status destas rotinas na própria página do seu PR - - Flows registrados aparecem no servidor Prefect. Eles podem ser rodados por lá \ No newline at end of file +- Sempre trabalhe com branchs `staging/` +- Dê push e abra seu Pull Request. +- Cada commit nesta branch irá disparar as rotinas do Github que: +- Verificam formatação +- Fazem Deploy +- Registram flows em staging (ambiente de testes) +- Você acompanha o status destas rotinas na própria página do seu PR +- Flows registrados aparecem no servidor Prefect. Eles podem ser rodados por lá From 4305911892ee4e6fdb87e62ad2f5d5032e1f69fe Mon Sep 17 00:00:00 2001 From: Rafael Carvalho Pinheiro <74972217+pixuimpou@users.noreply.github.com> Date: Mon, 29 Jul 2024 11:23:13 -0300 Subject: [PATCH 07/11] =?UTF-8?q?Migra=C3=A7=C3=A3o=20-=20Refatora=20arqui?= =?UTF-8?q?vos=20com=20erro=20no=20pre-commit=20(#122)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * corrige arquivos com erro * corrige whitespaces * correções pre commit * add global flake8 configs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add global flake8 configs * corrige readme --------- Co-authored-by: hellcassius Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .flake8 | 11 ++++++++++- README.md | 6 +++--- pipelines/capture/templates/tasks.py | 8 ++++---- pipelines/janitor/tasks.py | 6 ++++-- pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py | 1 - pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py | 2 -- pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py | 6 ++++-- pipelines/migration/controle_financeiro/tasks.py | 1 - pipelines/tasks.py | 1 - pipelines/treatment/bilhetagem/flows.py | 1 - pipelines/treatment/templates/tasks.py | 1 - pipelines/utils/dbt_datetime_vars.py | 1 - 12 files changed, 25 insertions(+), 20 deletions(-) diff --git a/.flake8 b/.flake8 index 51b50a04..b6d8482c 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,11 @@ [flake8] -max-line-length = 100 \ No newline at end of file +max-line-length = 100 +extend-ignore = + E231, + E221, + E702, + E202, + E271, + E272, + E225, + E222 \ No newline at end of file diff --git a/README.md b/README.md index 15c48097..5d43c244 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ - Adicione um arquivo `test.py` na raiz do projeto: - Neste arquivo, você deve importar o flow a ser testado - - Importar a função `run_local` + - Importar a função `run_local` - `from pipelines.utils.prefect import run_local` - A assinatura da função é a seguinte: `run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None)` @@ -37,12 +37,12 @@ uma execução local - Use `run_local()` e execute o arquivo: - `python test.py` - - Uma dica interessante que pode ajudar no processo de teste e debug é adicionar + - Uma dica interessante que pode ajudar no processo de teste e debug é adicionar `| tee logs.txt` ao executar seu teste. - Isso gerará um arquivo com os logs daquela execução, para que você possa analisar esses logs mais facilmente do que usando somente o terminal. - + ### Etapa 2 - Deploy para staging e PR - Sempre trabalhe com branchs `staging/` diff --git a/pipelines/capture/templates/tasks.py b/pipelines/capture/templates/tasks.py index 005dde7d..a554b0e2 100644 --- a/pipelines/capture/templates/tasks.py +++ b/pipelines/capture/templates/tasks.py @@ -195,9 +195,9 @@ def transform_raw_to_nested_structure( if print_inputs: log( f""" - Received inputs: - - timestamp:\n{timestamp} - - data:\n{data.head()}""" +Received inputs: +- timestamp:\n{timestamp} +- data:\n{data.head()}""" ) if data.empty: @@ -220,7 +220,7 @@ def transform_raw_to_nested_structure( log(f"timestamp column = {timestamp}", level="info") log( - f"Finished nested structure! Data:\n{data_info_str(data)}", + f"Finished nested structure! Data: \n{data_info_str(data)}", level="info", ) diff --git a/pipelines/janitor/tasks.py b/pipelines/janitor/tasks.py index 0990f4f8..5a939082 100644 --- a/pipelines/janitor/tasks.py +++ b/pipelines/janitor/tasks.py @@ -2,7 +2,6 @@ # from typing import Dict, List import traceback from datetime import datetime -from typing import Dict, List import requests from prefect import task @@ -11,6 +10,8 @@ from pipelines.utils.secret import get_secret from pipelines.utils.utils import log +# from typing import Dict, List + @task def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_project="production"): @@ -54,7 +55,8 @@ def query_not_active_flows(flows, prefect_client=None, prefect_project="producti flow_name, last_version = flows now = datetime.now().isoformat() query = """ -query($flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int, $project_name:String){ +query(\ +$flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int, $project_name:String){ flow( where:{ name: {_eq:$flow_name}, diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index 6e387cd2..d3ebb6bd 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -113,7 +113,6 @@ last_captured_os = merge(last_captured_os_none, last_captured_os_redis) with case(materialize_only, False): - timestamp = get_scheduled_timestamp() flag_new_os, os_control, data_index, data_versao_gtfs_task = get_os_info( diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py index 35661375..04488ed1 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py @@ -192,7 +192,6 @@ def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: i with zipfile.ZipFile(file_bytes_gtfs, "r") as zipped_file: for filename in list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()): if filename == "ordem_servico": - processa_ordem_servico( sheetnames=sheetnames, file_bytes=file_bytes_os, @@ -201,7 +200,6 @@ def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: i regular_sheet_index=regular_sheet_index, ) elif filename == "ordem_servico_trajeto_alternativo": - processa_ordem_servico_trajeto_alternativo( sheetnames=sheetnames, file_bytes=file_bytes_os, diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py index 0accdfa1..18f2a20a 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py @@ -157,7 +157,8 @@ def normalizar_horario(horario): Normalizes the given time string. If the time string contains the word "day", it splits the string into days and time. - It converts the days, hours, minutes, and seconds into total hours and returns the normalized time string. + It converts the days, hours, minutes, and seconds into total hours and returns + the normalized time string. If the time string does not contain the word "day", it returns the time string as is. @@ -192,7 +193,8 @@ def processa_ordem_servico( regular_sheet_index (int, optional): The index of the regular sheet. Defaults to 0. Raises: - Exception: If there are more than 2 tabs in the file or if there are missing or duplicated columns in the order of service data. + Exception: If there are more than 2 tabs in the file or if there are missing or + duplicated columns in the order of service data. Exception: If the validation of 'km_test' and 'km_dia_util' fails. Returns: diff --git a/pipelines/migration/controle_financeiro/tasks.py b/pipelines/migration/controle_financeiro/tasks.py index 3ab5cce3..f8d1aa50 100644 --- a/pipelines/migration/controle_financeiro/tasks.py +++ b/pipelines/migration/controle_financeiro/tasks.py @@ -83,7 +83,6 @@ def create_cct_arquivo_retorno_params( } else: - params = { "dt_inicio": ( date.fromisoformat(redis_return["last_date"]) + timedelta(days=1) diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 651a01e4..3a252ee2 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -187,7 +187,6 @@ def run_subflow( flow_run_results = [] for idx, param_list in enumerate(parameters): - if not isinstance(param_list, list): param_list = [param_list] diff --git a/pipelines/treatment/bilhetagem/flows.py b/pipelines/treatment/bilhetagem/flows.py index 2447f889..dfcb7c8c 100644 --- a/pipelines/treatment/bilhetagem/flows.py +++ b/pipelines/treatment/bilhetagem/flows.py @@ -40,7 +40,6 @@ ) with Flow("Bilhetagem - Tratamento") as bilhetagem_tratamento: - timestamp = get_scheduled_timestamp() AUXILIAR_CAPTURE = run_subflow( diff --git a/pipelines/treatment/templates/tasks.py b/pipelines/treatment/templates/tasks.py index 8485ee89..6b3e3492 100644 --- a/pipelines/treatment/templates/tasks.py +++ b/pipelines/treatment/templates/tasks.py @@ -243,7 +243,6 @@ def run_data_quality_checks( log("Executando testes de qualidade de dados") for check in data_quality_checks: - dataplex = DataQuality( data_scan_id=check.check_id, project_id="rj-smtr", diff --git a/pipelines/utils/dbt_datetime_vars.py b/pipelines/utils/dbt_datetime_vars.py index e7fa6cde..47eef13d 100644 --- a/pipelines/utils/dbt_datetime_vars.py +++ b/pipelines/utils/dbt_datetime_vars.py @@ -55,7 +55,6 @@ def create_var( table: BQTable, timestamp: datetime, ) -> dict: - last_run = ( self.get_last_run_from_redis(redis_key=redis_key) or self.get_last_run_from_bq(table=table) From 88cf68941ad983864187265e450ac9cfb662eb6b Mon Sep 17 00:00:00 2001 From: Rodrigo Cunha <66736583+eng-rodrigocunha@users.noreply.github.com> Date: Thu, 1 Aug 2024 18:51:39 -0300 Subject: [PATCH 08/11] =?UTF-8?q?Materializa=C3=A7=C3=A3o=20de=20`viagem?= =?UTF-8?q?=5Fplanejada`=20em=20D+0=20(#125)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * commit inicial * update changelog --- queries/models/projeto_subsidio_sppo/CHANGELOG.md | 6 ++++++ .../projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql | 4 ++-- queries/models/projeto_subsidio_sppo/viagem_planejada.sql | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/queries/models/projeto_subsidio_sppo/CHANGELOG.md b/queries/models/projeto_subsidio_sppo/CHANGELOG.md index 14b408c7..cc87df5b 100644 --- a/queries/models/projeto_subsidio_sppo/CHANGELOG.md +++ b/queries/models/projeto_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - projeto_subsidio_sppo +## [6.0.3] - 2024-08-01 + +### Alterado + +- Alterados modelos `viagem_planejada.sql` e `subsidio_data_versao_efetiva.sql` para materializar sempre em D+0 e permitir acompanhamento pelos operadores (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/125) + ## [6.0.2] - 2024-04-22 ### Adicionado diff --git a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql index 5674df59..18b6a900 100644 --- a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql +++ b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql @@ -368,9 +368,9 @@ WITH (feed_version) WHERE {% if is_incremental() %} - data = DATE_SUB(DATE("{{ var("run_date") }}"), INTERVAL 1 DAY) + data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) AND DATE("{{ var('run_date') }}") {% else %} - data <= DATE_SUB(DATE("{{ var("run_date") }}"), INTERVAL 1 DAY) + data <= DATE("{{ var('run_date') }}") {% endif %} ) SELECT diff --git a/queries/models/projeto_subsidio_sppo/viagem_planejada.sql b/queries/models/projeto_subsidio_sppo/viagem_planejada.sql index d47c52bc..cabcc315 100644 --- a/queries/models/projeto_subsidio_sppo/viagem_planejada.sql +++ b/queries/models/projeto_subsidio_sppo/viagem_planejada.sql @@ -213,7 +213,7 @@ WITH {{ ref("subsidio_data_versao_efetiva") }} -- rj-smtr-dev.projeto_subsidio_sppo.subsidio_data_versao_efetiva WHERE - data = DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) ) + data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) AND DATE("{{ var('run_date') }}")) SELECT d.data, CASE From f13d72d84d86f4eebbfedad1c9180bd017c50593 Mon Sep 17 00:00:00 2001 From: Rodrigo Cunha <66736583+eng-rodrigocunha@users.noreply.github.com> Date: Fri, 2 Aug 2024 18:27:14 -0300 Subject: [PATCH 09/11] =?UTF-8?q?Cria=20cat=C3=A1logo=20de=20dados=20(#127?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Altera dbt_project * Cria macro `get_models_with_tags` * Insere tags de 'geolocalizacao' * Insere tags de 'identificacao' * Cria modelos de `catalogo` * Insere nova função em `dev/utils.py` * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Atualiza CHANGELOG * Atualiza CHANGELOG * Atualiza model `operadoras.sql` * Atualiza changelog --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- queries/dbt_project.yml | 5 ++- queries/dev/utils.py | 15 +++++++++ queries/macros/get_models_with_tags.sql | 18 ++++++++++ .../CHANGELOG.md | 6 ++++ .../gps_validador.sql | 1 + .../gps_validador_van.sql | 1 + .../staging_cliente.sql | 1 + .../CHANGELOG.md | 5 +++ .../gps_sppo_zirix.sql | 3 +- .../br_rj_riodejaneiro_veiculos/CHANGELOG.md | 6 ++++ .../br_rj_riodejaneiro_veiculos/gps_brt.sql | 3 +- .../br_rj_riodejaneiro_veiculos/gps_sppo.sql | 3 +- queries/models/cadastro/CHANGELOG.md | 6 ++++ queries/models/cadastro/operadoras.sql | 3 +- queries/models/cadastro/servicos.sql | 5 +-- .../models/catalogo/ed_metadado_coluna.sql | 20 +++++++++++ queries/models/catalogo/metadado_coluna.sql | 9 +++++ queries/models/catalogo/schema.yml | 33 +++++++++++++++++++ queries/models/docs.md | 26 ++++++++++++++- queries/models/gtfs/CHANGELOG.md | 5 +++ queries/models/gtfs/shapes_geom_gtfs.sql | 3 +- queries/models/gtfs/shapes_gtfs.sql | 3 +- queries/models/gtfs/stops_gtfs.sql | 3 +- 23 files changed, 172 insertions(+), 11 deletions(-) create mode 100644 queries/macros/get_models_with_tags.sql create mode 100644 queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md create mode 100644 queries/models/catalogo/ed_metadado_coluna.sql create mode 100644 queries/models/catalogo/metadado_coluna.sql create mode 100644 queries/models/catalogo/schema.yml diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 4ae666c1..d6a5a324 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -285,4 +285,7 @@ models: +schema: validacao_dados_jae staging: +materialized: incremental - +schema: validacao_dados_jae_staging \ No newline at end of file + +schema: validacao_dados_jae_staging + catalogo: + +materialized: view + +schema: catalogo \ No newline at end of file diff --git a/queries/dev/utils.py b/queries/dev/utils.py index db021da6..439e0ce2 100644 --- a/queries/dev/utils.py +++ b/queries/dev/utils.py @@ -5,6 +5,8 @@ # from datetime import timedelta from typing import Dict, List, Union +import requests + # import pandas as pd @@ -62,3 +64,16 @@ def run_dbt_model( print(f"\n>>> RUNNING: {run_command}\n") os.system(run_command) + + +def fetch_dataset_sha(dataset_id: str): + """Fetches the SHA of a branch from Github""" + url = "https://api.github.com/repos/prefeitura-rio/queries-rj-smtr" + url += f"/commits?queries-rj-smtr/rj_smtr/{dataset_id}" + response = requests.get(url) + + if response.status_code != 200: + return None + + dataset_version = response.json()[0]["sha"] + return {"version": dataset_version} diff --git a/queries/macros/get_models_with_tags.sql b/queries/macros/get_models_with_tags.sql new file mode 100644 index 00000000..f68dce50 --- /dev/null +++ b/queries/macros/get_models_with_tags.sql @@ -0,0 +1,18 @@ +/* https://discourse.getdbt.com/t/get-all-dbt-table-model-names-from-a-tag-inside-another-model/7703 (modificado) */ +{% macro get_models_with_tags(tags) %} + +{% set models_with_tag = [] %} + +{% for model in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} + + {% for tag in tags %} + {% if tag in model.config.tags %} + {{ models_with_tag.append(model) }} + {% endif %} + {% endfor %} + +{% endfor %} + +{{ return(models_with_tag) }} + +{% endmacro %} \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index a74e347f..defd4fb6 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - bilhetagem +## [2.1.4] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` aos modelos `gps_validador_van.sql` e `gps_validador.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) +- Adiciona tag `identificacao` ao modelo `staging_cliente.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [2.1.3] - 2024-07-18 ### Adicionado diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql index ce3b4b71..9ec4fec0 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql @@ -6,6 +6,7 @@ "data_type":"date", "granularity": "day" }, + tags=['geolocalizacao'] ) }} diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql index 84aaa992..601cd3ae 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql @@ -6,6 +6,7 @@ "data_type":"date", "granularity": "day" }, + tags=['geolocalizacao'] ) }} diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql index 7b9c0d93..4feda28c 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql @@ -1,6 +1,7 @@ {{ config( alias='cliente', + tags=['identificacao'] ) }} diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md index d1b45343..3cf994d5 100644 --- a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog - onibus_gps_zirix +## [1.0.3] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` ao modelo `gps_sppo_zirix.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [1.0.2] - 2024-07-02 ### Adicionado diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql index 0c2ac39b..6990d828 100644 --- a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql +++ b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql @@ -6,7 +6,8 @@ 'data_type':'date', 'granularity': 'day' }, - alias='gps_sppo' + alias='gps_sppo', + tags=['geolocalizacao'] ) }} /* diff --git a/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md new file mode 100644 index 00000000..946c270f --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md @@ -0,0 +1,6 @@ +# Changelog - br_rj_riodejaneiro_veiculos + +## [1.0.1] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` aos modelos `gps_brt.sql` e `gps_sppo.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql b/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql index e0d5ace8..4b19400e 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql +++ b/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql @@ -5,7 +5,8 @@ 'field': 'data', 'data_type': 'date', 'granularity': 'day' - } + }, + tags=['geolocalizacao'] ) }} /* diff --git a/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql b/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql index 41e56c81..b62e6353 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql +++ b/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql @@ -5,7 +5,8 @@ 'field':"data", 'data_type':'date', 'granularity': 'day' - } + }, + tags=['geolocalizacao'] ) }} /* diff --git a/queries/models/cadastro/CHANGELOG.md b/queries/models/cadastro/CHANGELOG.md index 49cc575a..f496cc7a 100644 --- a/queries/models/cadastro/CHANGELOG.md +++ b/queries/models/cadastro/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - cadastro +## [1.2.1] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` ao modelo `servicos.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) +- Adiciona tag `identificacao` ao modelo `operadoras.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [1.2.0] - 2024-07-17 ### Adicionado diff --git a/queries/models/cadastro/operadoras.sql b/queries/models/cadastro/operadoras.sql index 65dcdd79..aa1c7b7c 100644 --- a/queries/models/cadastro/operadoras.sql +++ b/queries/models/cadastro/operadoras.sql @@ -1,6 +1,7 @@ {{ config( - materialized="table" + materialized="table", + tags=["identificacao"] ) }} diff --git a/queries/models/cadastro/servicos.sql b/queries/models/cadastro/servicos.sql index 667125fe..a32bd988 100644 --- a/queries/models/cadastro/servicos.sql +++ b/queries/models/cadastro/servicos.sql @@ -1,7 +1,8 @@ {{ config( - materialized='table' - ) + materialized='table', + tags=['geolocalizacao'] + ), }} SELECT diff --git a/queries/models/catalogo/ed_metadado_coluna.sql b/queries/models/catalogo/ed_metadado_coluna.sql new file mode 100644 index 00000000..6915b4ea --- /dev/null +++ b/queries/models/catalogo/ed_metadado_coluna.sql @@ -0,0 +1,20 @@ +{% if execute %} + {% set models_with_tag = get_models_with_tags(["geolocalizacao", "identificacao"]) %} + {% do log("Models: \n", info=true) %} + {% for model in models_with_tag %} + {% do log(model.schema~"."~model.alias~"\n", info=true) %} + {% endfor %} +{% endif %} + +SELECT + * +FROM + {{ ref("metadado_coluna") }} +WHERE + {% for model in models_with_tag %} + {% if not loop.first %}OR {% endif %}(dataset_id = "{{ model.schema }}" + AND table_id = "{{ model.alias }}") + {% endfor %} + + OR (dataset_id = "br_rj_riodejaneiro_stpl_gps" + AND table_id = "registros") \ No newline at end of file diff --git a/queries/models/catalogo/metadado_coluna.sql b/queries/models/catalogo/metadado_coluna.sql new file mode 100644 index 00000000..34a20e45 --- /dev/null +++ b/queries/models/catalogo/metadado_coluna.sql @@ -0,0 +1,9 @@ +SELECT + table_catalog AS project_id, + table_schema AS dataset_id, + table_name AS table_id, + column_name, + data_type, + description +FROM + rj-smtr.`region-US`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS \ No newline at end of file diff --git a/queries/models/catalogo/schema.yml b/queries/models/catalogo/schema.yml new file mode 100644 index 00000000..3b2233c7 --- /dev/null +++ b/queries/models/catalogo/schema.yml @@ -0,0 +1,33 @@ +version: 2 + +models: + - name: ed_metadado_coluna + description: "Catálogo de dados de geolocalização e identificação do data lake da SMTR destinados ao Escritório de Dados (GP/ED)" + columns: + - name: project_id + description: "{{ doc('project_id') }}" + - name: dataset_id + description: "{{ doc('dataset_id') }}" + - name: table_id + description: "{{ doc('table_id') }}" + - name: column_name + description: "{{ doc('column_name') }}" + - name: data_type + description: "{{ doc('data_type') }}" + - name: description + description: "{{ doc('metadado_descricao') }}" + - name: metadado_coluna + description: "Catálogo de dados do data lake da SMTR" + columns: + - name: project_id + description: "{{ doc('project_id') }}" + - name: dataset_id + description: "{{ doc('dataset_id') }}" + - name: table_id + description: "{{ doc('table_id') }}" + - name: column_name + description: "{{ doc('column_name') }}" + - name: data_type + description: "{{ doc('data_type') }}" + - name: description + description: "{{ doc('metadado_descricao') }}" \ No newline at end of file diff --git a/queries/models/docs.md b/queries/models/docs.md index 805d8330..56bbb32f 100644 --- a/queries/models/docs.md +++ b/queries/models/docs.md @@ -1,3 +1,27 @@ {% docs consorcio %} Consórcio ao qual o serviço pertence -{% enddocs %} \ No newline at end of file +{% enddocs %} + +{% docs project_id %} +Nome do projeto (rj-smtr) +{% enddocs %} + +{% docs dataset_id %} +Nome do conjunto de dados +{% enddocs %} + +{% docs table_id %} +Nome da tabela +{% enddocs %} + +{% docs column_name %} +Nome da coluna +{% enddocs %} + +{% docs data_type %} +Tipo de dado da coluna +{% enddocs %} + +{% docs metadado_descricao %} +Descrição da coluna +{% enddocs %} diff --git a/queries/models/gtfs/CHANGELOG.md b/queries/models/gtfs/CHANGELOG.md index a153ff17..2790158e 100644 --- a/queries/models/gtfs/CHANGELOG.md +++ b/queries/models/gtfs/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog - gtfs +## [1.1.8] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` aos modelos `shapes_geom_gtfs.sql`, `shapes_gtfs.sql` e `stops_gtfs.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [1.1.7] - 2024-07-23 ### Adicionado diff --git a/queries/models/gtfs/shapes_geom_gtfs.sql b/queries/models/gtfs/shapes_geom_gtfs.sql index e6098470..4c84c80c 100644 --- a/queries/models/gtfs/shapes_geom_gtfs.sql +++ b/queries/models/gtfs/shapes_geom_gtfs.sql @@ -3,7 +3,8 @@ 'data_type' :'date', 'granularity': 'day' }, unique_key = ['shape_id', 'feed_start_date'], - alias = 'shapes_geom' + alias = 'shapes_geom', + tags=['geolocalizacao'] ) }} {% if execute and is_incremental() %} diff --git a/queries/models/gtfs/shapes_gtfs.sql b/queries/models/gtfs/shapes_gtfs.sql index 926e725a..c9409a45 100644 --- a/queries/models/gtfs/shapes_gtfs.sql +++ b/queries/models/gtfs/shapes_gtfs.sql @@ -3,7 +3,8 @@ 'data_type' :'date', 'granularity': 'day' }, unique_key = ['shape_id', 'shape_pt_sequence', 'feed_start_date'], - alias = 'shapes' + alias = 'shapes', + tags=['geolocalizacao'] )}} {% if execute and is_incremental() %} diff --git a/queries/models/gtfs/stops_gtfs.sql b/queries/models/gtfs/stops_gtfs.sql index 98a1b69f..38f73004 100644 --- a/queries/models/gtfs/stops_gtfs.sql +++ b/queries/models/gtfs/stops_gtfs.sql @@ -3,7 +3,8 @@ 'data_type' :'date', 'granularity': 'day' }, unique_key = ['stop_id', 'feed_start_date'], - alias = 'stops' + alias = 'stops', + tags=['geolocalizacao'] )}} {% if execute and is_incremental() %} From f428b55c7d99824318cb847a5ddc5f5f6802509a Mon Sep 17 00:00:00 2001 From: Victor Miguel Rocha Date: Fri, 2 Aug 2024 19:08:04 -0300 Subject: [PATCH 10/11] [HOTFIX] Adiciona filtro para os nomes de tabs da planilha de controle os (#128) * adiciona filtro para os nomes de tabs da planilha de controle os * atualiza changelog --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md | 6 ++++++ pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py | 2 ++ 2 files changed, 8 insertions(+) diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 2a4b55ad..3fa0e770 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - gtfs +## Adicionado + +## [1.0.6] - 2024-08-02 + +- Adiciona filtro para os nomes de tabs da planilha de controle os na task `get_raw_drive_files` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/128/files) + ## Corrigido ## [1.0.5] - 2024-07-23 diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py index 04488ed1..b40d1251 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py @@ -188,6 +188,8 @@ def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: i # Salva os nomes das planilhas sheetnames = xl.load_workbook(file_bytes_os).sheetnames + sheetnames = [name for name in sheetnames if "ANEXO" in name] + log(f"tabs encontradas na planilha Controle OS: {sheetnames}") with zipfile.ZipFile(file_bytes_gtfs, "r") as zipped_file: for filename in list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()): From b0090f7e17ff93584eeab9735f6e69c596fd802d Mon Sep 17 00:00:00 2001 From: Victor Miguel Rocha Date: Fri, 2 Aug 2024 21:04:57 -0300 Subject: [PATCH 11/11] =?UTF-8?q?[HOTFIX]=20Adiciona=20etapa=20de=20remove?= =?UTF-8?q?r=20pontos=20antes=20da=20conver=C3=A7=C3=A3o=20de=20metro=20pa?= =?UTF-8?q?ra=20km=20(#129)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adiciona filtro para os nomes de tabs da planilha de controle os * atualiza changelog * adiociona etapa de remover pontos antes da converção de metro para km * atualiza changelog --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md | 2 ++ pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 3fa0e770..2df6721a 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -6,6 +6,8 @@ - Adiciona filtro para os nomes de tabs da planilha de controle os na task `get_raw_drive_files` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/128/files) +- Adiociona etapa de remover pontos antes da converção de metro para km no processamento da OS (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/129) + ## Corrigido ## [1.0.5] - 2024-07-23 diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py index 18f2a20a..fa3cd8d1 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py @@ -270,6 +270,8 @@ def processa_ordem_servico( extensao_cols = ["extensao_ida", "extensao_volta"] quadro[extensao_cols] = quadro[extensao_cols].astype(str) + for col in extensao_cols: + quadro[col] = quadro[col].str.replace(".", "", regex=False) quadro[extensao_cols] = quadro[extensao_cols].apply(pd.to_numeric) quadro["extensao_ida"] = quadro["extensao_ida"] / 1000