diff --git a/.flake8 b/.flake8 index 51b50a04..b6d8482c 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,11 @@ [flake8] -max-line-length = 100 \ No newline at end of file +max-line-length = 100 +extend-ignore = + E231, + E221, + E702, + E202, + E271, + E272, + E225, + E222 \ No newline at end of file diff --git a/README.md b/README.md index 25fe3498..5d43c244 100644 --- a/README.md +++ b/README.md @@ -1 +1,55 @@ -# Pipelines rj-smtr \ No newline at end of file +# Pipelines rj-smtr + + +## Setup + +### Etapa 1 - Preparação de Ambiente + +- Na raiz do projeto, crie um ambiente virtual para isolar as dependencias: + - `python3.10 -m venv .pipelines` + +- Ative o ambiente virtual: + - `. .pipelines/bin/activate` + +- Instale as dependencias do projeto: + - `poetry install --all-extras` + - `pip install -e .` + +- Crie um arquivo `.env` na raiz do projeto, contendo as seguintes variáveis: + - ``` + INFISICAL_ADDRESS = '' + INFISICAL_TOKEN = '' + + - Solicite os valores a serem utilizados para a equipe de devops + +- Adicione as variáveis de ambiente à sua sessão de terminal: + - `set -a && source .env && set +a` + +### Testando flows localmente: + +- Adicione um arquivo `test.py` na raiz do projeto: + - Neste arquivo, você deve importar o flow a ser testado + - Importar a função `run_local` + - `from pipelines.utils.prefect import run_local` + - A assinatura da função é a seguinte: + `run_local(flow: prefect.Flow, parameters: Dict[str, Any] = None)` + Permitindo que se varie os parâmetros a serem passados ao flow durante + uma execução local + - Use `run_local()` e execute o arquivo: + - `python test.py` + - Uma dica interessante que pode ajudar no processo de teste e debug é adicionar + `| tee logs.txt` + ao executar seu teste. + - Isso gerará um arquivo com os logs daquela execução, para que você possa + analisar esses logs mais facilmente do que usando somente o terminal. + +### Etapa 2 - Deploy para staging e PR + +- Sempre trabalhe com branchs `staging/` +- Dê push e abra seu Pull Request. +- Cada commit nesta branch irá disparar as rotinas do Github que: +- Verificam formatação +- Fazem Deploy +- Registram flows em staging (ambiente de testes) +- Você acompanha o status destas rotinas na própria página do seu PR +- Flows registrados aparecem no servidor Prefect. Eles podem ser rodados por lá diff --git a/pipelines/capture/templates/tasks.py b/pipelines/capture/templates/tasks.py index 005dde7d..a554b0e2 100644 --- a/pipelines/capture/templates/tasks.py +++ b/pipelines/capture/templates/tasks.py @@ -195,9 +195,9 @@ def transform_raw_to_nested_structure( if print_inputs: log( f""" - Received inputs: - - timestamp:\n{timestamp} - - data:\n{data.head()}""" +Received inputs: +- timestamp:\n{timestamp} +- data:\n{data.head()}""" ) if data.empty: @@ -220,7 +220,7 @@ def transform_raw_to_nested_structure( log(f"timestamp column = {timestamp}", level="info") log( - f"Finished nested structure! Data:\n{data_info_str(data)}", + f"Finished nested structure! Data: \n{data_info_str(data)}", level="info", ) diff --git a/pipelines/constants.py b/pipelines/constants.py index c26f7592..a63e7c28 100644 --- a/pipelines/constants.py +++ b/pipelines/constants.py @@ -236,7 +236,7 @@ class constants(Enum): # pylint: disable=c0103 "stops": ["stop_id"], "trips": ["trip_id"], "fare_attributes": ["fare_id"], - "fare_rules": [], + "fare_rules": ["fare_id", "route_id"], "stop_times": ["trip_id", "stop_sequence"], } diff --git a/pipelines/janitor/tasks.py b/pipelines/janitor/tasks.py index 0990f4f8..5a939082 100644 --- a/pipelines/janitor/tasks.py +++ b/pipelines/janitor/tasks.py @@ -2,7 +2,6 @@ # from typing import Dict, List import traceback from datetime import datetime -from typing import Dict, List import requests from prefect import task @@ -11,6 +10,8 @@ from pipelines.utils.secret import get_secret from pipelines.utils.utils import log +# from typing import Dict, List + @task def query_active_flow_names(prefix="%SMTR%", prefect_client=None, prefect_project="production"): @@ -54,7 +55,8 @@ def query_not_active_flows(flows, prefect_client=None, prefect_project="producti flow_name, last_version = flows now = datetime.now().isoformat() query = """ -query($flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int, $project_name:String){ +query(\ +$flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int, $project_name:String){ flow( where:{ name: {_eq:$flow_name}, diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md index 010e645b..2df6721a 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/CHANGELOG.md @@ -1,11 +1,32 @@ # Changelog - gtfs +## Adicionado + +## [1.0.6] - 2024-08-02 + +- Adiciona filtro para os nomes de tabs da planilha de controle os na task `get_raw_drive_files` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/128/files) + +- Adiociona etapa de remover pontos antes da converção de metro para km no processamento da OS (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/129) + +## Corrigido + +## [1.0.5] - 2024-07-23 + +- Corrigido o parse da data_versao_gtf (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/118) + +## [1.0.4] - 2024-07-17 + +### Adicionado + +- Adiciona parametros para a captura manual do gtfs no flow `SMTR: GTFS - Captura/Tratamento` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82/) + ## [1.0.3] - 2024-07-04 ## Corrigido - Corrigido o formato da data salva no redis de d/m/y para y-m-d (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/91) + ## [1.0.2] - 2024-06-21 ### Adicionado diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py index bdd8bc59..d3ebb6bd 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/flows.py @@ -2,12 +2,13 @@ """ Flows for gtfs -DBT: 2024-07-15 +DBT: 2024-07-23 """ from prefect import Parameter, case, task from prefect.run_configs import KubernetesRun from prefect.storage import GCS +from prefect.tasks.control_flow import merge from prefect.utilities.edges import unmapped from prefeitura_rio.pipelines_utils.custom import Flow @@ -98,94 +99,129 @@ # ) with Flow("SMTR: GTFS - Captura/Tratamento") as gtfs_captura_nova: - capture = Parameter("capture", default=True) - materialize = Parameter("materialize", default=True) + materialize_only = Parameter("materialize_only", default=False) + regular_sheet_index = Parameter("regular_sheet_index", default=None) + data_versao_gtfs_param = Parameter("data_versao_gtfs", default=None) mode = get_current_flow_mode() - last_captured_os = get_last_capture_os(mode=mode, dataset_id=constants.GTFS_DATASET_ID.value) - - timestamp = get_scheduled_timestamp() - - flag_new_os, os_control, data_index, data_versao_gtfs = get_os_info( - last_captured_os=last_captured_os - ) - - with case(flag_new_os, True): - rename_current_flow_run_now_time( - prefix=gtfs_captura_nova.name + ' ["' + data_versao_gtfs + '"] ', now_time=timestamp - ) - - data_versao_gtfs = get_current_timestamp(data_versao_gtfs) - - partition = create_date_hour_partition( - timestamp=data_versao_gtfs, partition_date_name="data_versao", partition_date_only=True - ) - - filename = parse_timestamp_to_string(data_versao_gtfs) - - table_ids = task(lambda: list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()))() - - local_filepaths = create_local_partition_path.map( - dataset_id=unmapped(constants.GTFS_DATASET_ID.value), - table_id=table_ids, - partitions=unmapped(partition), - filename=unmapped(filename), + data_versao_gtfs_task = None + last_captured_os_none = None + with case(data_versao_gtfs_param, None): + last_captured_os_redis = get_last_capture_os( + mode=mode, dataset_id=constants.GTFS_DATASET_ID.value ) - raw_filepaths, primary_keys = get_raw_drive_files( - os_control=os_control, local_filepath=local_filepaths - ) + last_captured_os = merge(last_captured_os_none, last_captured_os_redis) + with case(materialize_only, False): + timestamp = get_scheduled_timestamp() - transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( - raw_filepath=raw_filepaths, - filepath=local_filepaths, - primary_key=primary_keys, - timestamp=unmapped(data_versao_gtfs), - error=unmapped(None), + flag_new_os, os_control, data_index, data_versao_gtfs_task = get_os_info( + last_captured_os=last_captured_os, data_versao_gtfs=data_versao_gtfs_param ) - errors, treated_filepaths = unpack_mapped_results_nout2( - mapped_results=transform_raw_to_nested_structure_results - ) + with case(flag_new_os, True): + rename_current_flow_run_now_time( + prefix=gtfs_captura_nova.name + ' ["' + data_versao_gtfs_task + '"] ', + now_time=timestamp, + ) + + data_versao_gtfs_task = get_current_timestamp(data_versao_gtfs_task) + + partition = create_date_hour_partition( + timestamp=data_versao_gtfs_task, + partition_date_name="data_versao", + partition_date_only=True, + ) + + filename = parse_timestamp_to_string(data_versao_gtfs_task) + + table_ids = task(lambda: list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()))() + + local_filepaths = create_local_partition_path.map( + dataset_id=unmapped(constants.GTFS_DATASET_ID.value), + table_id=table_ids, + partitions=unmapped(partition), + filename=unmapped(filename), + ) + + raw_filepaths, primary_keys = get_raw_drive_files( + os_control=os_control, + local_filepath=local_filepaths, + regular_sheet_index=regular_sheet_index, + ) + + transform_raw_to_nested_structure_results = transform_raw_to_nested_structure.map( + raw_filepath=raw_filepaths, + filepath=local_filepaths, + primary_key=primary_keys, + timestamp=unmapped(data_versao_gtfs_task), + error=unmapped(None), + ) + + errors, treated_filepaths = unpack_mapped_results_nout2( + mapped_results=transform_raw_to_nested_structure_results + ) + + errors = upload_raw_data_to_gcs.map( + dataset_id=unmapped(constants.GTFS_DATASET_ID.value), + table_id=table_ids, + raw_filepath=raw_filepaths, + partitions=unmapped(partition), + error=unmapped(None), + ) + + wait_captura_true = upload_staging_data_to_gcs.map( + dataset_id=unmapped(constants.GTFS_DATASET_ID.value), + table_id=table_ids, + staging_filepath=treated_filepaths, + partitions=unmapped(partition), + timestamp=unmapped(data_versao_gtfs_task), + error=errors, + ) + with case(materialize_only, True): + wait_captura_false = task() + + data_versao_gtfs_merge = merge(data_versao_gtfs_task, data_versao_gtfs_param) + wait_captura = merge(wait_captura_true, wait_captura_false) + + verifica_materialize = task(lambda data_versao: data_versao is not None)( + data_versao=data_versao_gtfs_merge + ) - errors = upload_raw_data_to_gcs.map( - dataset_id=unmapped(constants.GTFS_DATASET_ID.value), - table_id=table_ids, - raw_filepath=raw_filepaths, - partitions=unmapped(partition), - error=unmapped(None), + with case(verifica_materialize, True): + data_versao_gtfs_is_str = task(lambda data_versao: isinstance(data_versao, str))( + data_versao_gtfs_merge ) + with case(data_versao_gtfs_is_str, False): + string_data_versao_gtfs = parse_timestamp_to_string( + timestamp=data_versao_gtfs_merge, pattern="%Y-%m-%d" + ) - wait_upload_staging_data_to_gcs = upload_staging_data_to_gcs.map( - dataset_id=unmapped(constants.GTFS_DATASET_ID.value), - table_id=table_ids, - staging_filepath=treated_filepaths, - partitions=unmapped(partition), - timestamp=unmapped(data_versao_gtfs), - error=errors, - ) + data_versao_gtfs = merge(string_data_versao_gtfs, data_versao_gtfs_merge) - string_data_versao_gtfs = parse_timestamp_to_string( - timestamp=data_versao_gtfs, pattern="%Y-%m-%d" - ) version = fetch_dataset_sha(dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value) - dbt_vars = get_join_dict([{"data_versao_gtfs": string_data_versao_gtfs}], version)[0] + dbt_vars = get_join_dict([{"data_versao_gtfs": data_versao_gtfs}], version)[0] wait_run_dbt_model = run_dbt_model( dataset_id=constants.GTFS_MATERIALIZACAO_DATASET_ID.value, _vars=dbt_vars, - ).set_upstream(task=wait_upload_staging_data_to_gcs) + ).set_upstream(task=wait_captura) - update_last_captured_os( + wait_materialize_true = update_last_captured_os( dataset_id=constants.GTFS_DATASET_ID.value, data_index=data_index, mode=mode, ).set_upstream(task=wait_run_dbt_model) + with case(verifica_materialize, False): + wait_materialize_false = task() + + wait_materialize = merge(wait_materialize_true, wait_materialize_false) + with case(flag_new_os, False): rename_current_flow_run_now_time( prefix=gtfs_captura_nova.name + " [SKIPPED] ", now_time=timestamp - ) + ).set_upstream(task=wait_materialize) gtfs_captura_nova.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py index c407f0f8..b40d1251 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/tasks.py @@ -79,12 +79,24 @@ def update_last_captured_os(dataset_id: str, data_index: str, mode: str = "prod" fetch_key = f"{dataset_id}.last_captured_os" if mode != "prod": fetch_key = f"{mode}.{fetch_key}" - + last_captured_os = redis_client.get(fetch_key) + # verifica se last_capture_os tem formado dia/mes/ano_index e converte para ano-mes-dia_index + if last_captured_os is not None: + if "/" in last_captured_os: + index = last_captured_os.split("_")[1] + data = datetime.strptime(last_captured_os.split("_")[0], "%d/%m/%Y").strftime( + "%Y-%m-%d" + ) + last_captured_os = data + "_" + index + # verifica se a ultima os capturada é maior que a nova + if last_captured_os is not None: + if last_captured_os["last_captured_os"] > data_index: + return redis_client.set(fetch_key, {"last_captured_os": data_index}) @task(nout=4) -def get_os_info(last_captured_os: str) -> dict: +def get_os_info(last_captured_os: str = None, data_versao_gtfs: str = None) -> dict: """ Retrieves information about the OS. @@ -118,9 +130,13 @@ def get_os_info(last_captured_os: str) -> dict: # Ordena por data e index df = df.sort_values(by=["data_index"], ascending=True) - if last_captured_os is None: + if data_versao_gtfs is not None: + df = df.loc[(df["Início da Vigência da OS"] == data_versao_gtfs)] + + elif last_captured_os is None: last_captured_os = df["data_index"].max() df = df.loc[(df["data_index"] == last_captured_os)] + else: # Filtra linhas onde 'data_index' é maior que o último capturado df = df.loc[(df["data_index"] > last_captured_os)] @@ -136,7 +152,7 @@ def get_os_info(last_captured_os: str) -> dict: @task(nout=2) -def get_raw_drive_files(os_control, local_filepath: list): +def get_raw_drive_files(os_control, local_filepath: list, regular_sheet_index: int = None): """ Downloads raw files from Google Drive and processes them. @@ -172,19 +188,20 @@ def get_raw_drive_files(os_control, local_filepath: list): # Salva os nomes das planilhas sheetnames = xl.load_workbook(file_bytes_os).sheetnames + sheetnames = [name for name in sheetnames if "ANEXO" in name] + log(f"tabs encontradas na planilha Controle OS: {sheetnames}") with zipfile.ZipFile(file_bytes_gtfs, "r") as zipped_file: for filename in list(constants.GTFS_TABLE_CAPTURE_PARAMS.value.keys()): if filename == "ordem_servico": - processa_ordem_servico( sheetnames=sheetnames, file_bytes=file_bytes_os, local_filepath=local_filepath, raw_filepaths=raw_filepaths, + regular_sheet_index=regular_sheet_index, ) elif filename == "ordem_servico_trajeto_alternativo": - processa_ordem_servico_trajeto_alternativo( sheetnames=sheetnames, file_bytes=file_bytes_os, diff --git a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py index 14736f69..fa3cd8d1 100644 --- a/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py +++ b/pipelines/migration/br_rj_riodejaneiro_gtfs/utils.py @@ -152,24 +152,61 @@ def download_xlsx(file_link, drive_service): return file_bytes -def processa_ordem_servico(sheetnames, file_bytes, local_filepath, raw_filepaths): +def normalizar_horario(horario): """ - Process the order of service data from multiple sheets in an Excel file. + Normalizes the given time string. + + If the time string contains the word "day", it splits the string into days and time. + It converts the days, hours, minutes, and seconds into total hours and returns + the normalized time string. + + If the time string does not contain the word "day", it returns the time string as is. Args: - sheetnames (list): List of sheet names in the Excel file. - file_bytes (bytes): Bytes of the Excel file. - local_filepath (str): Local file path. - raw_filepaths (list): List of raw file paths. + horario (str): The time string to be normalized. Returns: - None + str: The normalized time string. + + """ + if "day" in horario: + days, time = horario.split(", ") + days = int(days.split(" ")[0]) + hours, minutes, seconds = map(int, time.split(":")) + total_hours = days * 24 + hours + return f"{total_hours:02}:{minutes:02}:{seconds:02}" + else: + return horario.split(" ")[1] if " " in horario else horario + + +def processa_ordem_servico( + sheetnames, file_bytes, local_filepath, raw_filepaths, regular_sheet_index=None +): + """ + Process the OS data from an Excel file. + + Args: + sheetnames (list): List of sheet names in the Excel file. + file_bytes (bytes): The Excel file in bytes format. + local_filepath (str): The local file path where the processed data will be saved. + raw_filepaths (list): List of raw file paths. + regular_sheet_index (int, optional): The index of the regular sheet. Defaults to 0. Raises: - Exception: If there are missing or duplicated columns in the OS data. + Exception: If there are more than 2 tabs in the file or if there are missing or + duplicated columns in the order of service data. Exception: If the validation of 'km_test' and 'km_dia_util' fails. + + Returns: + None """ - # conta quantos sheets tem no arquivo sem o nome de Anexo II + + if len(sheetnames) != 2 and regular_sheet_index is None: + raise Exception("More than 2 tabs in the file. Please specify the regular sheet index.") + + if regular_sheet_index is None: + regular_sheet_index = 0 + sheets_range = len(sheetnames) - len([x for x in sheetnames if "ANEXO II" in x]) quadro_geral = pd.DataFrame() @@ -220,7 +257,7 @@ def processa_ordem_servico(sheetnames, file_bytes, local_filepath, raw_filepaths quadro[hora_cols] = quadro[hora_cols].astype(str) for hora_col in hora_cols: - quadro[hora_col] = quadro[hora_col].apply(lambda x: x.split(" ")[1] if " " in x else x) + quadro[hora_col] = quadro[hora_col].apply(normalizar_horario) cols = [ coluna @@ -233,15 +270,17 @@ def processa_ordem_servico(sheetnames, file_bytes, local_filepath, raw_filepaths extensao_cols = ["extensao_ida", "extensao_volta"] quadro[extensao_cols] = quadro[extensao_cols].astype(str) + for col in extensao_cols: + quadro[col] = quadro[col].str.replace(".", "", regex=False) quadro[extensao_cols] = quadro[extensao_cols].apply(pd.to_numeric) quadro["extensao_ida"] = quadro["extensao_ida"] / 1000 quadro["extensao_volta"] = quadro["extensao_volta"] / 1000 - if i == 0: + if i == regular_sheet_index: quadro["tipo_os"] = "Regular" - quadro_geral = pd.concat([quadro_geral, quadro]) + quadro_geral = pd.concat([quadro_geral, quadro]) # Verificações columns_in_dataframe = set(quadro_geral.columns) diff --git a/pipelines/migration/controle_financeiro/tasks.py b/pipelines/migration/controle_financeiro/tasks.py index 3ab5cce3..f8d1aa50 100644 --- a/pipelines/migration/controle_financeiro/tasks.py +++ b/pipelines/migration/controle_financeiro/tasks.py @@ -83,7 +83,6 @@ def create_cct_arquivo_retorno_params( } else: - params = { "dt_inicio": ( date.fromisoformat(redis_return["last_date"]) + timedelta(days=1) diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 651a01e4..3a252ee2 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -187,7 +187,6 @@ def run_subflow( flow_run_results = [] for idx, param_list in enumerate(parameters): - if not isinstance(param_list, list): param_list = [param_list] diff --git a/pipelines/treatment/bilhetagem/flows.py b/pipelines/treatment/bilhetagem/flows.py index 2447f889..dfcb7c8c 100644 --- a/pipelines/treatment/bilhetagem/flows.py +++ b/pipelines/treatment/bilhetagem/flows.py @@ -40,7 +40,6 @@ ) with Flow("Bilhetagem - Tratamento") as bilhetagem_tratamento: - timestamp = get_scheduled_timestamp() AUXILIAR_CAPTURE = run_subflow( diff --git a/pipelines/treatment/templates/tasks.py b/pipelines/treatment/templates/tasks.py index 8485ee89..6b3e3492 100644 --- a/pipelines/treatment/templates/tasks.py +++ b/pipelines/treatment/templates/tasks.py @@ -243,7 +243,6 @@ def run_data_quality_checks( log("Executando testes de qualidade de dados") for check in data_quality_checks: - dataplex = DataQuality( data_scan_id=check.check_id, project_id="rj-smtr", diff --git a/pipelines/utils/dbt_datetime_vars.py b/pipelines/utils/dbt_datetime_vars.py index e7fa6cde..47eef13d 100644 --- a/pipelines/utils/dbt_datetime_vars.py +++ b/pipelines/utils/dbt_datetime_vars.py @@ -55,7 +55,6 @@ def create_var( table: BQTable, timestamp: datetime, ) -> dict: - last_run = ( self.get_last_run_from_redis(redis_key=redis_key) or self.get_last_run_from_bq(table=table) diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 4ae666c1..d6a5a324 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -285,4 +285,7 @@ models: +schema: validacao_dados_jae staging: +materialized: incremental - +schema: validacao_dados_jae_staging \ No newline at end of file + +schema: validacao_dados_jae_staging + catalogo: + +materialized: view + +schema: catalogo \ No newline at end of file diff --git a/queries/dev/utils.py b/queries/dev/utils.py index db021da6..439e0ce2 100644 --- a/queries/dev/utils.py +++ b/queries/dev/utils.py @@ -5,6 +5,8 @@ # from datetime import timedelta from typing import Dict, List, Union +import requests + # import pandas as pd @@ -62,3 +64,16 @@ def run_dbt_model( print(f"\n>>> RUNNING: {run_command}\n") os.system(run_command) + + +def fetch_dataset_sha(dataset_id: str): + """Fetches the SHA of a branch from Github""" + url = "https://api.github.com/repos/prefeitura-rio/queries-rj-smtr" + url += f"/commits?queries-rj-smtr/rj_smtr/{dataset_id}" + response = requests.get(url) + + if response.status_code != 200: + return None + + dataset_version = response.json()[0]["sha"] + return {"version": dataset_version} diff --git a/queries/macros/get_models_with_tags.sql b/queries/macros/get_models_with_tags.sql new file mode 100644 index 00000000..f68dce50 --- /dev/null +++ b/queries/macros/get_models_with_tags.sql @@ -0,0 +1,18 @@ +/* https://discourse.getdbt.com/t/get-all-dbt-table-model-names-from-a-tag-inside-another-model/7703 (modificado) */ +{% macro get_models_with_tags(tags) %} + +{% set models_with_tag = [] %} + +{% for model in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} + + {% for tag in tags %} + {% if tag in model.config.tags %} + {{ models_with_tag.append(model) }} + {% endif %} + {% endfor %} + +{% endfor %} + +{{ return(models_with_tag) }} + +{% endmacro %} \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md index a74e347f..defd4fb6 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - bilhetagem +## [2.1.4] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` aos modelos `gps_validador_van.sql` e `gps_validador.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) +- Adiciona tag `identificacao` ao modelo `staging_cliente.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [2.1.3] - 2024-07-18 ### Adicionado diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql index ce3b4b71..9ec4fec0 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador.sql @@ -6,6 +6,7 @@ "data_type":"date", "granularity": "day" }, + tags=['geolocalizacao'] ) }} diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql index 84aaa992..601cd3ae 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem/gps_validador_van.sql @@ -6,6 +6,7 @@ "data_type":"date", "granularity": "day" }, + tags=['geolocalizacao'] ) }} diff --git a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql index 7b9c0d93..4feda28c 100644 --- a/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql +++ b/queries/models/br_rj_riodejaneiro_bilhetagem_staging/staging_cliente.sql @@ -1,6 +1,7 @@ {{ config( alias='cliente', + tags=['identificacao'] ) }} diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md index d1b45343..3cf994d5 100644 --- a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md +++ b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog - onibus_gps_zirix +## [1.0.3] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` ao modelo `gps_sppo_zirix.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [1.0.2] - 2024-07-02 ### Adicionado diff --git a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql index 0c2ac39b..6990d828 100644 --- a/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql +++ b/queries/models/br_rj_riodejaneiro_onibus_gps_zirix/gps_sppo_zirix.sql @@ -6,7 +6,8 @@ 'data_type':'date', 'granularity': 'day' }, - alias='gps_sppo' + alias='gps_sppo', + tags=['geolocalizacao'] ) }} /* diff --git a/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md new file mode 100644 index 00000000..946c270f --- /dev/null +++ b/queries/models/br_rj_riodejaneiro_veiculos/CHANGELOG.md @@ -0,0 +1,6 @@ +# Changelog - br_rj_riodejaneiro_veiculos + +## [1.0.1] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` aos modelos `gps_brt.sql` e `gps_sppo.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) \ No newline at end of file diff --git a/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql b/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql index e0d5ace8..4b19400e 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql +++ b/queries/models/br_rj_riodejaneiro_veiculos/gps_brt.sql @@ -5,7 +5,8 @@ 'field': 'data', 'data_type': 'date', 'granularity': 'day' - } + }, + tags=['geolocalizacao'] ) }} /* diff --git a/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql b/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql index 41e56c81..b62e6353 100644 --- a/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql +++ b/queries/models/br_rj_riodejaneiro_veiculos/gps_sppo.sql @@ -5,7 +5,8 @@ 'field':"data", 'data_type':'date', 'granularity': 'day' - } + }, + tags=['geolocalizacao'] ) }} /* diff --git a/queries/models/cadastro/CHANGELOG.md b/queries/models/cadastro/CHANGELOG.md index 49cc575a..f496cc7a 100644 --- a/queries/models/cadastro/CHANGELOG.md +++ b/queries/models/cadastro/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - cadastro +## [1.2.1] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` ao modelo `servicos.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) +- Adiciona tag `identificacao` ao modelo `operadoras.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + ## [1.2.0] - 2024-07-17 ### Adicionado diff --git a/queries/models/cadastro/operadoras.sql b/queries/models/cadastro/operadoras.sql index 65dcdd79..aa1c7b7c 100644 --- a/queries/models/cadastro/operadoras.sql +++ b/queries/models/cadastro/operadoras.sql @@ -1,6 +1,7 @@ {{ config( - materialized="table" + materialized="table", + tags=["identificacao"] ) }} diff --git a/queries/models/cadastro/servicos.sql b/queries/models/cadastro/servicos.sql index 667125fe..a32bd988 100644 --- a/queries/models/cadastro/servicos.sql +++ b/queries/models/cadastro/servicos.sql @@ -1,7 +1,8 @@ {{ config( - materialized='table' - ) + materialized='table', + tags=['geolocalizacao'] + ), }} SELECT diff --git a/queries/models/catalogo/ed_metadado_coluna.sql b/queries/models/catalogo/ed_metadado_coluna.sql new file mode 100644 index 00000000..6915b4ea --- /dev/null +++ b/queries/models/catalogo/ed_metadado_coluna.sql @@ -0,0 +1,20 @@ +{% if execute %} + {% set models_with_tag = get_models_with_tags(["geolocalizacao", "identificacao"]) %} + {% do log("Models: \n", info=true) %} + {% for model in models_with_tag %} + {% do log(model.schema~"."~model.alias~"\n", info=true) %} + {% endfor %} +{% endif %} + +SELECT + * +FROM + {{ ref("metadado_coluna") }} +WHERE + {% for model in models_with_tag %} + {% if not loop.first %}OR {% endif %}(dataset_id = "{{ model.schema }}" + AND table_id = "{{ model.alias }}") + {% endfor %} + + OR (dataset_id = "br_rj_riodejaneiro_stpl_gps" + AND table_id = "registros") \ No newline at end of file diff --git a/queries/models/catalogo/metadado_coluna.sql b/queries/models/catalogo/metadado_coluna.sql new file mode 100644 index 00000000..34a20e45 --- /dev/null +++ b/queries/models/catalogo/metadado_coluna.sql @@ -0,0 +1,9 @@ +SELECT + table_catalog AS project_id, + table_schema AS dataset_id, + table_name AS table_id, + column_name, + data_type, + description +FROM + rj-smtr.`region-US`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS \ No newline at end of file diff --git a/queries/models/catalogo/schema.yml b/queries/models/catalogo/schema.yml new file mode 100644 index 00000000..3b2233c7 --- /dev/null +++ b/queries/models/catalogo/schema.yml @@ -0,0 +1,33 @@ +version: 2 + +models: + - name: ed_metadado_coluna + description: "Catálogo de dados de geolocalização e identificação do data lake da SMTR destinados ao Escritório de Dados (GP/ED)" + columns: + - name: project_id + description: "{{ doc('project_id') }}" + - name: dataset_id + description: "{{ doc('dataset_id') }}" + - name: table_id + description: "{{ doc('table_id') }}" + - name: column_name + description: "{{ doc('column_name') }}" + - name: data_type + description: "{{ doc('data_type') }}" + - name: description + description: "{{ doc('metadado_descricao') }}" + - name: metadado_coluna + description: "Catálogo de dados do data lake da SMTR" + columns: + - name: project_id + description: "{{ doc('project_id') }}" + - name: dataset_id + description: "{{ doc('dataset_id') }}" + - name: table_id + description: "{{ doc('table_id') }}" + - name: column_name + description: "{{ doc('column_name') }}" + - name: data_type + description: "{{ doc('data_type') }}" + - name: description + description: "{{ doc('metadado_descricao') }}" \ No newline at end of file diff --git a/queries/models/docs.md b/queries/models/docs.md new file mode 100644 index 00000000..56bbb32f --- /dev/null +++ b/queries/models/docs.md @@ -0,0 +1,27 @@ +{% docs consorcio %} +Consórcio ao qual o serviço pertence +{% enddocs %} + +{% docs project_id %} +Nome do projeto (rj-smtr) +{% enddocs %} + +{% docs dataset_id %} +Nome do conjunto de dados +{% enddocs %} + +{% docs table_id %} +Nome da tabela +{% enddocs %} + +{% docs column_name %} +Nome da coluna +{% enddocs %} + +{% docs data_type %} +Tipo de dado da coluna +{% enddocs %} + +{% docs metadado_descricao %} +Descrição da coluna +{% enddocs %} diff --git a/queries/models/gtfs/CHANGELOG.md b/queries/models/gtfs/CHANGELOG.md index ad718a4f..2790158e 100644 --- a/queries/models/gtfs/CHANGELOG.md +++ b/queries/models/gtfs/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog - gtfs +## [1.1.8] - 2024-08-02 + +### Alterado +- Adiciona tag `geolocalizacao` aos modelos `shapes_geom_gtfs.sql`, `shapes_gtfs.sql` e `stops_gtfs.sql` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/127) + +## [1.1.7] - 2024-07-23 + +### Adicionado + +- Adiciona descrição da coluna `feed_update_datetime` em `feed_info` (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/119) + + +## [1.1.6] - 2024-07-22 + +### Alterado + +- Alterada a tabela `feed_info` de table para incremental e adicionada a coluna `feed_update_datetime`(https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82) + +- Alterada a tabela `fare_rules` para refletir a alteração nas primary keys (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/82) + ## [1.1.5] - 2024-07-15 ### Adicionado diff --git a/queries/models/gtfs/fare_rules_gtfs.sql b/queries/models/gtfs/fare_rules_gtfs.sql index 167bca8f..fe043ab3 100644 --- a/queries/models/gtfs/fare_rules_gtfs.sql +++ b/queries/models/gtfs/fare_rules_gtfs.sql @@ -14,8 +14,8 @@ SELECT fi.feed_version, SAFE_CAST(fr.data_versao AS DATE) feed_start_date, fi.feed_end_date, - SAFE_CAST(JSON_VALUE(fr.content, '$.fare_id') AS STRING) fare_id, - SAFE_CAST(JSON_VALUE(fr.content, '$.route_id') AS STRING) route_id, + SAFE_CAST(fr.fare_id AS STRING) fare_id, + SAFE_CAST(fr.route_id AS STRING) route_id, SAFE_CAST(JSON_VALUE(fr.content, '$.origin_id') AS STRING) origin_id, SAFE_CAST(JSON_VALUE(fr.content, '$.destination_id') AS STRING) destination_id, SAFE_CAST(JSON_VALUE(fr.content, '$.contains_id') AS STRING) contains_id, diff --git a/queries/models/gtfs/feed_info_gtfs.sql b/queries/models/gtfs/feed_info_gtfs.sql index a85e9401..20230094 100644 --- a/queries/models/gtfs/feed_info_gtfs.sql +++ b/queries/models/gtfs/feed_info_gtfs.sql @@ -1,25 +1,54 @@ {{config( - materialized='table', + materialized='incremental', partition_by = { 'field' :'feed_start_date', 'data_type' :'date', 'granularity': 'day' }, - alias = 'feed_info' + alias = 'feed_info', + unique_key = 'feed_start_date', + incremental_strategy = 'insert_overwrite' )}} - -SELECT - SAFE_CAST(timestamp_captura AS STRING) AS feed_version, - SAFE_CAST(data_versao AS DATE) AS feed_start_date, - DATE_SUB(LEAD(DATE(data_versao)) OVER (ORDER BY data_versao), INTERVAL 1 DAY) AS feed_end_date, - SAFE_CAST(feed_publisher_name AS STRING) feed_publisher_name, - SAFE_CAST(JSON_VALUE(content, '$.feed_publisher_url') AS STRING) feed_publisher_url, - SAFE_CAST(JSON_VALUE(content, '$.feed_lang') AS STRING) feed_lang, - SAFE_CAST(JSON_VALUE(content, '$.default_lang') AS STRING) default_lang, - SAFE_CAST(JSON_VALUE(content, '$.feed_contact_email') AS STRING) feed_contact_email, - SAFE_CAST(JSON_VALUE(content, '$.feed_contact_url') AS STRING) feed_contact_url, - '{{ var("version") }}' AS versao_modelo - FROM - {{ source( - 'br_rj_riodejaneiro_gtfs_staging', - 'feed_info' - ) }} +WITH feed_info AS ( + SELECT + SAFE_CAST(timestamp_captura AS STRING) AS feed_version, + SAFE_CAST(data_versao AS DATE) AS feed_start_date, + NULL AS feed_end_date, + SAFE_CAST(feed_publisher_name AS STRING) feed_publisher_name, + SAFE_CAST(JSON_VALUE(content, '$.feed_publisher_url') AS STRING) feed_publisher_url, + SAFE_CAST(JSON_VALUE(content, '$.feed_lang') AS STRING) feed_lang, + SAFE_CAST(JSON_VALUE(content, '$.default_lang') AS STRING) default_lang, + SAFE_CAST(JSON_VALUE(content, '$.feed_contact_email') AS STRING) feed_contact_email, + SAFE_CAST(JSON_VALUE(content, '$.feed_contact_url') AS STRING) feed_contact_url, + CURRENT_DATETIME("America/Sao_Paulo") AS feed_update_datetime, + '{{ var("version") }}' AS versao_modelo + FROM + {{ source( + 'br_rj_riodejaneiro_gtfs_staging', + 'feed_info' + ) }} + {% if is_incremental() %} + WHERE + data_versao = '{{ var("data_versao_gtfs") }}' + UNION ALL + SELECT + * + FROM + {{ this }} + WHERE + feed_start_date != DATE('{{ var("data_versao_gtfs") }}') + {% endif %} + ) + SELECT + feed_version, + feed_start_date, + DATE_SUB(LEAD(DATE(feed_version)) OVER (ORDER BY feed_version), INTERVAL 1 DAY) AS feed_end_date, + feed_publisher_name, + feed_publisher_url, + feed_lang, + default_lang, + feed_contact_email, + feed_contact_url, + feed_update_datetime, + versao_modelo + FROM + feed_info \ No newline at end of file diff --git a/queries/models/gtfs/schema.yml b/queries/models/gtfs/schema.yml index 0e5f1342..5aef6e79 100644 --- a/queries/models/gtfs/schema.yml +++ b/queries/models/gtfs/schema.yml @@ -146,6 +146,8 @@ models: description: "URL para informações de contato, um formulário web, uma mesa de suporte ou outras ferramentas de comunicação relativas ao conjunto de dados GTFS e práticas de publicação de dados." - name: feed_start_date description: "Data de referência do feed (versão)." + - name: feed_update_datetime + description: "Data e hora da última atualização do feed." - name: versao_modelo description: "Código de controle de versão (SHA do GitHub)." diff --git a/queries/models/gtfs/shapes_geom_gtfs.sql b/queries/models/gtfs/shapes_geom_gtfs.sql index e6098470..4c84c80c 100644 --- a/queries/models/gtfs/shapes_geom_gtfs.sql +++ b/queries/models/gtfs/shapes_geom_gtfs.sql @@ -3,7 +3,8 @@ 'data_type' :'date', 'granularity': 'day' }, unique_key = ['shape_id', 'feed_start_date'], - alias = 'shapes_geom' + alias = 'shapes_geom', + tags=['geolocalizacao'] ) }} {% if execute and is_incremental() %} diff --git a/queries/models/gtfs/shapes_gtfs.sql b/queries/models/gtfs/shapes_gtfs.sql index 926e725a..c9409a45 100644 --- a/queries/models/gtfs/shapes_gtfs.sql +++ b/queries/models/gtfs/shapes_gtfs.sql @@ -3,7 +3,8 @@ 'data_type' :'date', 'granularity': 'day' }, unique_key = ['shape_id', 'shape_pt_sequence', 'feed_start_date'], - alias = 'shapes' + alias = 'shapes', + tags=['geolocalizacao'] )}} {% if execute and is_incremental() %} diff --git a/queries/models/gtfs/stops_gtfs.sql b/queries/models/gtfs/stops_gtfs.sql index 98a1b69f..38f73004 100644 --- a/queries/models/gtfs/stops_gtfs.sql +++ b/queries/models/gtfs/stops_gtfs.sql @@ -3,7 +3,8 @@ 'data_type' :'date', 'granularity': 'day' }, unique_key = ['stop_id', 'feed_start_date'], - alias = 'stops' + alias = 'stops', + tags=['geolocalizacao'] )}} {% if execute and is_incremental() %} diff --git a/queries/models/projeto_subsidio_sppo/CHANGELOG.md b/queries/models/projeto_subsidio_sppo/CHANGELOG.md index 14b408c7..cc87df5b 100644 --- a/queries/models/projeto_subsidio_sppo/CHANGELOG.md +++ b/queries/models/projeto_subsidio_sppo/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog - projeto_subsidio_sppo +## [6.0.3] - 2024-08-01 + +### Alterado + +- Alterados modelos `viagem_planejada.sql` e `subsidio_data_versao_efetiva.sql` para materializar sempre em D+0 e permitir acompanhamento pelos operadores (https://github.com/prefeitura-rio/pipelines_rj_smtr/pull/125) + ## [6.0.2] - 2024-04-22 ### Adicionado diff --git a/queries/models/projeto_subsidio_sppo/schema.yml b/queries/models/projeto_subsidio_sppo/schema.yml index 2849fabb..a0be43fc 100644 --- a/queries/models/projeto_subsidio_sppo/schema.yml +++ b/queries/models/projeto_subsidio_sppo/schema.yml @@ -6,7 +6,7 @@ models: description: "Tabela histórica com detalhes das viagens planejadas a cada dia" columns: - name: consorcio - description: "Consórcio ao qual o serviço pertence" + description: "{{ doc('consorcio') }}" tests: - not_null - name: data diff --git a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql index 5674df59..18b6a900 100644 --- a/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql +++ b/queries/models/projeto_subsidio_sppo/subsidio_data_versao_efetiva.sql @@ -368,9 +368,9 @@ WITH (feed_version) WHERE {% if is_incremental() %} - data = DATE_SUB(DATE("{{ var("run_date") }}"), INTERVAL 1 DAY) + data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) AND DATE("{{ var('run_date') }}") {% else %} - data <= DATE_SUB(DATE("{{ var("run_date") }}"), INTERVAL 1 DAY) + data <= DATE("{{ var('run_date') }}") {% endif %} ) SELECT diff --git a/queries/models/projeto_subsidio_sppo/viagem_planejada.sql b/queries/models/projeto_subsidio_sppo/viagem_planejada.sql index d47c52bc..cabcc315 100644 --- a/queries/models/projeto_subsidio_sppo/viagem_planejada.sql +++ b/queries/models/projeto_subsidio_sppo/viagem_planejada.sql @@ -213,7 +213,7 @@ WITH {{ ref("subsidio_data_versao_efetiva") }} -- rj-smtr-dev.projeto_subsidio_sppo.subsidio_data_versao_efetiva WHERE - data = DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) ) + data BETWEEN DATE_SUB("{{ var('run_date') }}", INTERVAL 1 DAY) AND DATE("{{ var('run_date') }}")) SELECT d.data, CASE