From 7ec6ab17e36725615126e2d6bb8435543ff3d65d Mon Sep 17 00:00:00 2001 From: Rafael Date: Fri, 5 Jan 2024 11:48:26 -0300 Subject: [PATCH] criar novos arquivos utils --- pipelines/br_rj_riodejaneiro_brt_gps/tasks.py | 18 +- pipelines/schedules.py | 52 +- pipelines/tasks.py | 117 +-- pipelines/templates/__init__.py | 0 pipelines/utils/backup/__init__.py | 0 pipelines/utils/backup/utils.py | 927 +++++++++++++++++ pipelines/utils/capture.py | 0 pipelines/utils/fs.py | 0 pipelines/utils/gcp.py | 0 pipelines/utils/logging.py | 0 pipelines/utils/pre_treatment.py | 0 pipelines/utils/redis.py | 0 pipelines/utils/secret.py | 12 +- pipelines/utils/utils.py | 959 ------------------ 14 files changed, 1006 insertions(+), 1079 deletions(-) create mode 100644 pipelines/templates/__init__.py create mode 100644 pipelines/utils/backup/__init__.py create mode 100644 pipelines/utils/backup/utils.py create mode 100644 pipelines/utils/capture.py create mode 100644 pipelines/utils/fs.py create mode 100644 pipelines/utils/gcp.py create mode 100644 pipelines/utils/logging.py create mode 100644 pipelines/utils/pre_treatment.py create mode 100644 pipelines/utils/redis.py diff --git a/pipelines/br_rj_riodejaneiro_brt_gps/tasks.py b/pipelines/br_rj_riodejaneiro_brt_gps/tasks.py index 7cfe41f5..aee2a7b2 100644 --- a/pipelines/br_rj_riodejaneiro_brt_gps/tasks.py +++ b/pipelines/br_rj_riodejaneiro_brt_gps/tasks.py @@ -3,19 +3,21 @@ Tasks for br_rj_riodejaneiro_brt_gps """ -from datetime import timedelta import traceback +from datetime import timedelta + import pandas as pd from prefect import task +from prefeitura_rio.pipelines_utils.logging import log + +from pipelines.constants import constants +from pipelines.utils.backup.utils import log_critical, map_dict_keys # EMD Imports # -from prefeitura_rio.pipelines_utils.logging import log # SMTR Imports # -from pipelines.constants import constants -from pipelines.utils.utils import log_critical, map_dict_keys # Tasks # @@ -53,9 +55,7 @@ def pre_treatment_br_rj_riodejaneiro_brt_gps(status: dict, timestamp): df = pd.DataFrame(columns=columns) # pylint: disable=c0103 # map_dict_keys change data keys to match project data structure - df["content"] = [ - map_dict_keys(piece, constants.GPS_BRT_MAPPING_KEYS.value) for piece in data - ] + df["content"] = [map_dict_keys(piece, constants.GPS_BRT_MAPPING_KEYS.value) for piece in data] df[key_column] = [piece[key_column] for piece in data] df["timestamp_gps"] = [piece["timestamp_gps"] for piece in data] df["timestamp_captura"] = timestamp @@ -64,9 +64,7 @@ def pre_treatment_br_rj_riodejaneiro_brt_gps(status: dict, timestamp): # Remove timezone and force it to be config timezone log(f"Before converting, timestamp_gps was: \n{df['timestamp_gps']}") df["timestamp_gps"] = ( - pd.to_datetime(df["timestamp_gps"], unit="ms") - .dt.tz_localize("UTC") - .dt.tz_convert(timezone) + pd.to_datetime(df["timestamp_gps"], unit="ms").dt.tz_localize("UTC").dt.tz_convert(timezone) ) log(f"After converting the timezone, timestamp_gps is: \n{df['timestamp_gps']}") diff --git a/pipelines/schedules.py b/pipelines/schedules.py index 4312feb9..2cadec57 100644 --- a/pipelines/schedules.py +++ b/pipelines/schedules.py @@ -3,21 +3,21 @@ Schedules for rj_smtr """ -from datetime import timedelta, datetime -from pytz import timezone +from datetime import datetime, timedelta + from prefect.schedules import Schedule -from prefect.schedules.clocks import IntervalClock, CronClock -from pipelines.constants import constants as emd_constants -from pipelines.utils.utils import generate_ftp_schedules +from prefect.schedules.clocks import CronClock, IntervalClock +from pytz import timezone + from pipelines.constants import constants +from pipelines.constants import constants as emd_constants +from pipelines.utils.backup.utils import generate_ftp_schedules every_minute = Schedule( clocks=[ IntervalClock( interval=timedelta(minutes=1), - start_date=datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -28,9 +28,7 @@ clocks=[ IntervalClock( interval=timedelta(minutes=1), - start_date=datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_DEV_AGENT_LABEL.value, ], @@ -42,9 +40,7 @@ clocks=[ IntervalClock( interval=timedelta(minutes=10), - start_date=datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -57,9 +53,7 @@ clocks=[ IntervalClock( interval=timedelta(hours=1), - start_date=datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -71,9 +65,7 @@ clocks=[ IntervalClock( interval=timedelta(hours=1), - start_date=datetime( - 2021, 1, 1, 0, 6, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2021, 1, 1, 0, 6, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -85,9 +77,7 @@ clocks=[ IntervalClock( interval=timedelta(days=1), - start_date=datetime( - 2021, 1, 1, 0, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2021, 1, 1, 0, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -103,9 +93,7 @@ clocks=[ IntervalClock( interval=timedelta(days=1), - start_date=datetime( - 2022, 11, 30, 5, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2022, 11, 30, 5, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -117,9 +105,7 @@ clocks=[ IntervalClock( interval=timedelta(days=1), - start_date=datetime( - 2022, 11, 30, 7, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2022, 11, 30, 7, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], @@ -131,18 +117,14 @@ clocks=[ CronClock( cron="0 12 16 * *", - start_date=datetime( - 2022, 12, 16, 12, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2022, 12, 16, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], ), CronClock( cron="0 12 1 * *", - start_date=datetime( - 2023, 1, 1, 12, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), + start_date=datetime(2023, 1, 1, 12, 0, tzinfo=timezone(constants.TIMEZONE.value)), labels=[ emd_constants.RJ_SMTR_AGENT_LABEL.value, ], diff --git a/pipelines/tasks.py b/pipelines/tasks.py index 0f75221c..fd953cfb 100644 --- a/pipelines/tasks.py +++ b/pipelines/tasks.py @@ -3,49 +3,49 @@ """ Tasks for rj_smtr """ -from datetime import datetime, timedelta, date +import io import json import os -from pathlib import Path import traceback -from typing import Dict, List, Union, Iterable, Any -import io +from datetime import date, datetime, timedelta +from pathlib import Path +from typing import Any, Dict, Iterable, List, Union -from basedosdados import Storage, Table import basedosdados as bd import pandas as pd import pendulum import prefect -from prefect import task, Client +import requests +from basedosdados import Storage, Table +from prefect import Client, task from prefect.backend import FlowRunView +from prefeitura_rio.pipelines_utils.dbt import run_dbt_model +from prefeitura_rio.pipelines_utils.infisical import inject_bd_credentials +from prefeitura_rio.pipelines_utils.logging import log +from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client from pytz import timezone -import requests from pipelines.constants import constants -from pipelines.utils.utils import ( - create_or_append_table, +from pipelines.utils.backup.utils import ( # normalize_keys, bq_project, - get_table_min_max_value, - get_last_run_timestamp, + create_or_append_table, data_info_str, dict_contains_keys, + get_datetime_range, + get_last_run_timestamp, get_raw_data_api, - get_raw_data_gcs, get_raw_data_db, + get_raw_data_gcs, get_raw_recursos, - upload_run_logs_to_bq, - get_datetime_range, + get_table_min_max_value, + log_critical, read_raw_data, - save_treated_local_func, save_raw_local_func, - log_critical, - normalize_keys + save_treated_local_func, + upload_run_logs_to_bq, ) from pipelines.utils.secret import get_secret -from prefeitura_rio.pipelines_utils.dbt import run_dbt_model -from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client -from prefeitura_rio.pipelines_utils.infisical import inject_bd_credentials -from prefeitura_rio.pipelines_utils.logging import log + ############### # @@ -56,6 +56,7 @@ def setup_task(): return inject_bd_credentials() + @task def get_current_flow_labels() -> List[str]: """ @@ -65,14 +66,16 @@ def get_current_flow_labels() -> List[str]: flow_run_view = FlowRunView.from_flow_run_id(flow_run_id) return flow_run_view.labels + ############### # # DBT # ############### + @task -def run_dbt_model_task( +def run_dbt_model_task( dataset_id: str = None, table_id: str = None, dbt_alias: bool = False, @@ -80,8 +83,8 @@ def run_dbt_model_task( downstream: bool = None, exclude: str = None, flags: str = None, - _vars: dict | List[Dict] = None - ): + _vars: dict | List[Dict] = None, +): return run_dbt_model( dataset_id=dataset_id, table_id=table_id, @@ -90,7 +93,7 @@ def run_dbt_model_task( downstream=downstream, exclude=exclude, flags=flags, - _vars=_vars + _vars=_vars, ) @@ -140,7 +143,7 @@ def build_incremental_model( # pylint: disable=too-many-arguments if refresh: log("Running in full refresh mode") log(f"DBT will run the following command:\n{run_command+' --full-refresh'}") - run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id, flags='--full-refresh') + run_dbt_model(dataset_id=dataset_id, table_id=mat_table_id, flags="--full-refresh") last_mat_date = get_table_min_max_value( query_project_id, dataset_id, mat_table_id, field_name, "max" ) @@ -208,9 +211,7 @@ def create_dbt_run_vars( log("Creating date_range variable") # Set date_range variable manually - if dict_contains_keys( - dbt_vars["date_range"], ["date_range_start", "date_range_end"] - ): + if dict_contains_keys(dbt_vars["date_range"], ["date_range_start", "date_range_end"]): date_var = { "date_range_start": dbt_vars["date_range"]["date_range_start"], "date_range_end": dbt_vars["date_range"]["date_range_end"], @@ -490,13 +491,9 @@ def query_logs( """ if not datetime_filter: - datetime_filter = pendulum.now(constants.TIMEZONE.value).replace( - second=0, microsecond=0 - ) + datetime_filter = pendulum.now(constants.TIMEZONE.value).replace(second=0, microsecond=0) elif isinstance(datetime_filter, str): - datetime_filter = datetime.fromisoformat(datetime_filter).replace( - second=0, microsecond=0 - ) + datetime_filter = datetime.fromisoformat(datetime_filter).replace(second=0, microsecond=0) datetime_filter = datetime_filter.strftime("%Y-%m-%d %H:%M:%S") @@ -638,13 +635,9 @@ def get_raw( # pylint: disable=R0912 elif filetype in ("txt", "csv"): if csv_args is None: csv_args = {} - data = pd.read_csv(io.StringIO(response.text), **csv_args).to_dict( - orient="records" - ) + data = pd.read_csv(io.StringIO(response.text), **csv_args).to_dict(orient="records") else: - error = ( - "Unsupported raw file extension. Supported only: json, csv and txt" - ) + error = "Unsupported raw file extension. Supported only: json, csv and txt" except Exception: error = traceback.format_exc() @@ -750,9 +743,7 @@ def get_raw_from_sources( source_values = source_type.split("-", 1) - source_type, filetype = ( - source_values if len(source_values) == 2 else (source_values[0], None) - ) + source_type, filetype = source_values if len(source_values) == 2 else (source_values[0], None) log(f"Getting raw data from source type: {source_type}") @@ -779,9 +770,7 @@ def get_raw_from_sources( else: raise NotImplementedError(f"{source_type} not supported") - filepath = save_raw_local_func( - data=data, filepath=local_filepath, filetype=filetype - ) + filepath = save_raw_local_func(data=data, filepath=local_filepath, filetype=filetype) except NotImplementedError: error = traceback.format_exc() @@ -927,9 +916,7 @@ def upload_logs_to_bq( # pylint: disable=R0913 # Create partition directory filename = f"{table_id}_{timestamp.isoformat()}" partition = f"data={timestamp.date()}" - filepath = Path( - f"""data/staging/{dataset_id}/{table_id}/{partition}/{filename}.csv""" - ) + filepath = Path(f"""data/staging/{dataset_id}/{table_id}/{partition}/{filename}.csv""") filepath.parent.mkdir(exist_ok=True, parents=True) # Create dataframe to be uploaded if not error and recapture is True: @@ -1099,9 +1086,7 @@ def get_materialization_date_range( # pylint: disable=R0913 """ timestr = "%Y-%m-%dT%H:%M:%S" # get start from redis - last_run = get_last_run_timestamp( - dataset_id=dataset_id, table_id=table_id, mode=mode - ) + last_run = get_last_run_timestamp(dataset_id=dataset_id, table_id=table_id, mode=mode) # if there's no timestamp set on redis, get max timestamp on source table if last_run is None: log("Failed to fetch key from Redis...\n Querying tables for last suceeded run") @@ -1151,9 +1136,7 @@ def get_materialization_date_range( # pylint: disable=R0913 tzinfo=None, minute=0, second=0, microsecond=0 ) - end_ts = (end_ts - timedelta(hours=delay_hours)).replace( - minute=0, second=0, microsecond=0 - ) + end_ts = (end_ts - timedelta(hours=delay_hours)).replace(minute=0, second=0, microsecond=0) end_ts = end_ts.strftime(timestr) @@ -1343,15 +1326,9 @@ def transform_raw_to_nested_structure( pk_cols = primary_key + ["timestamp_captura"] data = ( data.groupby(pk_cols) - .apply( - lambda x: x[data.columns.difference(pk_cols)].to_json( - orient="records" - ) - ) + .apply(lambda x: x[data.columns.difference(pk_cols)].to_json(orient="records")) .str.strip("[]") - .reset_index(name="content")[ - primary_key + ["content", "timestamp_captura"] - ] + .reset_index(name="content")[primary_key + ["content", "timestamp_captura"]] ) log( @@ -1360,9 +1337,7 @@ def transform_raw_to_nested_structure( ) # save treated local - filepath = save_treated_local_func( - data=data, error=error, filepath=filepath - ) + filepath = save_treated_local_func(data=data, error=error, filepath=filepath) except Exception: # pylint: disable=W0703 error = traceback.format_exc() @@ -1462,13 +1437,12 @@ def get_scheduled_start_times( last_schedule = timestamp for param in parameters[1:]: - last_schedule += intervals.get( - param.get("table_id", "default"), intervals["default"] - ) + last_schedule += intervals.get(param.get("table_id", "default"), intervals["default"]) timestamps.append(last_schedule) return timestamps + @task def rename_current_flow_run_now_time(prefix: str, now_time=None, wait=None) -> None: """ @@ -1478,6 +1452,7 @@ def rename_current_flow_run_now_time(prefix: str, now_time=None, wait=None) -> N client = Client() return client.set_flow_run_name(flow_run_id, f"{prefix}{now_time}") + @prefect.task(checkpoint=False) def get_now_time(): """ @@ -1487,6 +1462,7 @@ def get_now_time(): return f"{now.hour}:{f'0{now.minute}' if len(str(now.minute))==1 else now.minute}" + @prefect.task(checkpoint=False) def get_now_date(): """ @@ -1496,6 +1472,7 @@ def get_now_date(): return now.to_date_string() + @task def get_current_flow_mode(labels: List[str]) -> str: """ diff --git a/pipelines/templates/__init__.py b/pipelines/templates/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/backup/__init__.py b/pipelines/utils/backup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/backup/utils.py b/pipelines/utils/backup/utils.py new file mode 100644 index 00000000..80cc11c3 --- /dev/null +++ b/pipelines/utils/backup/utils.py @@ -0,0 +1,927 @@ +# -*- coding: utf-8 -*- +# flake8: noqa: E501 +""" +General purpose functions for rj_smtr +""" + +import io +import json +import math +import time +import traceback +import zipfile +from datetime import date, datetime, timedelta +from ftplib import FTP +from pathlib import Path +from typing import Any, List, Union + +import basedosdados as bd +import pandas as pd +import psycopg2 +import psycopg2.extras +import pymysql +import pytz +import requests +from basedosdados import Storage, Table +from google.cloud.storage.blob import Blob +from prefect.schedules.clocks import IntervalClock +from prefeitura_rio.pipelines_utils.infisical import get_secret +from prefeitura_rio.pipelines_utils.logging import log # TODO: add or relocate imports +from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client +from pytz import timezone +from redis_pal import RedisPal + +from pipelines.constants import constants +from pipelines.utils.implicit_ftp import ImplicitFtpTls + +# Set BD config to run on cloud # +bd.config.from_file = True + + +def send_discord_message( + message: str, + webhook_url: str, +) -> None: + """ + Sends a message to a Discord channel. + """ + requests.post( + webhook_url, + data={"content": message}, + ) + + +def log_critical(message: str, secret_path: str = constants.CRITICAL_SECRET_PATH.value): + """Logs message to critical discord channel specified + + Args: + message (str): Message to post on the channel + secret_path (str, optional): Secret path storing the webhook to critical channel. + Defaults to constants.CRITICAL_SECRETPATH.value. + + """ + url = get_secret(secret_path=secret_path)["data"]["url"] + return send_discord_message(message=message, webhook_url=url) + + +def create_or_append_table(dataset_id: str, table_id: str, path: str, partitions: str = None): + """Conditionally create table or append data to its relative GCS folder. + + Args: + dataset_id (str): target dataset_id on BigQuery + table_id (str): target table_id on BigQuery + path (str): Path to .csv data file + """ + tb_obj = Table(table_id=table_id, dataset_id=dataset_id) + if not tb_obj.table_exists("staging"): + log("Table does not exist in STAGING, creating table...") + dirpath = path.split(partitions)[0] + tb_obj.create( + path=dirpath, + if_table_exists="pass", + if_storage_data_exists="replace", + ) + log("Table created in STAGING") + else: + log("Table already exists in STAGING, appending to it...") + tb_obj.append(filepath=path, if_exists="replace", timeout=600, partitions=partitions) + log("Appended to table on STAGING successfully.") + + +def generate_df_and_save(data: dict, fname: Path): + """Save DataFrame as csv + + Args: + data (dict): dict with the data which to build the DataFrame + fname (Path): _description_ + """ + # Generate dataframe + dataframe = pd.DataFrame() + dataframe[data["key_column"]] = [piece[data["key_column"]] for piece in data["data"]] + dataframe["content"] = list(data["data"]) + + # Save dataframe to CSV + dataframe.to_csv(fname, index=False) + + +def bq_project(kind: str = "bigquery_prod"): + """Get the set BigQuery project_id + + Args: + kind (str, optional): Which client to get the project name from. + Options are 'bigquery_staging', 'bigquery_prod' and 'storage_staging' + Defaults to 'bigquery_prod'. + + Returns: + str: the requested project_id + """ + return bd.upload.base.Base().client[kind].project + + +def get_table_min_max_value( # pylint: disable=R0913 + query_project_id: str, + dataset_id: str, + table_id: str, + field_name: str, + kind: str, + wait=None, # pylint: disable=unused-argument +): + """Query a table to get the maximum value for the chosen field. + Useful to incrementally materialize tables via DBT + + Args: + dataset_id (str): dataset_id on BigQuery + table_id (str): table_id on BigQuery + field_name (str): column name to query + kind (str): which value to get. Accepts min and max + """ + log(f"Getting {kind} value for {table_id}") + query = f""" + SELECT + {kind}({field_name}) + FROM {query_project_id}.{dataset_id}.{table_id} + """ + log(f"Will run query:\n{query}") + result = bd.read_sql(query=query, billing_project_id=bq_project()) + + return result.iloc[0][0] + + +def get_last_run_timestamp(dataset_id: str, table_id: str, mode: str = "prod") -> str: + """ + Query redis to retrive the time for when the last materialization + ran. + + Args: + dataset_id (str): dataset_id on BigQuery + table_id (str): model filename on the queries repo. + eg: if you have a model defined in the file .sql, + the table_id should be + mode (str): + + Returns: + Union[str, None]: _description_ + """ + redis_client = get_redis_client() + key = dataset_id + "." + table_id + log(f"Fetching key {key} from redis, working on mode {mode}") + if mode == "dev": + key = f"{mode}.{key}" + runs = redis_client.get(key) + # if runs is None: + # redis_client.set(key, "") + try: + last_run_timestamp = runs["last_run_timestamp"] + except KeyError: + return None + except TypeError: + return None + log(f"Got value {last_run_timestamp}") + return last_run_timestamp + + +def map_dict_keys(data: dict, mapping: dict) -> None: + """ + Map old keys to new keys in a dict. + """ + for old_key, new_key in mapping.items(): + data[new_key] = data.pop(old_key) + return data + + +def normalize_keys(data: dict): + _data = {key.lower(): value for key, value in data.items()} + return _data + + +def connect_ftp(secret_path: str = None, secure: bool = True): + """Connect to FTP + + Returns: + ImplicitFTP_TLS: ftp client + """ + + ftp_data = get_secret(secret_path)["data"] + if secure: + ftp_client = ImplicitFtpTls() + else: + ftp_client = FTP() + ftp_client.connect(host=ftp_data["host"], port=int(ftp_data["port"])) + ftp_client.login(user=ftp_data["username"], passwd=ftp_data["pwd"]) + if secure: + ftp_client.prot_p() + return ftp_client + + +def safe_cast(val, to_type, default=None): + """ + Safe cast value. + """ + try: + return to_type(val) + except ValueError: + return default + + +def set_redis_rdo_files(redis_client, dataset_id: str, table_id: str): + """ + Register downloaded files to Redis + + Args: + redis_client (_type_): _description_ + dataset_id (str): dataset_id on BigQuery + table_id (str): table_id on BigQuery + + Returns: + bool: if the key was properly set + """ + try: + content = redis_client.get(f"{dataset_id}.{table_id}")["files"] + except TypeError as e: + log(f"Caught error {e}. Will set unexisting key") + # set key to empty dict for filling later + redis_client.set(f"{dataset_id}.{table_id}", {"files": []}) + content = redis_client.get(f"{dataset_id}.{table_id}") + # update content + st_client = bd.Storage(dataset_id=dataset_id, table_id=table_id) + blob_names = [ + blob.name + for blob in st_client.client["storage_staging"].list_blobs( + st_client.bucket, prefix=f"staging/{dataset_id}/{table_id}" + ) + ] + files = [blob_name.split("/")[-1].replace(".csv", "") for blob_name in blob_names] + log(f"When setting key, found {len(files)} files. Will register on redis...") + content["files"] = files + # set key + return redis_client.set(f"{dataset_id}.{table_id}", content) + + +# PRE TREAT # + + +def check_not_null(data: pd.DataFrame, columns: list, subset_query: str = None): + """ + Check if there are null values in columns. + + Args: + columns (list): list of columns to check + subset_query (str): query to check if there are important data + being removed + + Returns: + None + """ + + for col in columns: + remove = data.query(f"{col} != {col}") # null values + log( + f"[data-check] There are {len(remove)} rows with null values in '{col}'", + level="info", + ) + + if subset_query is not None: + # Check if there are important data being removed + remove = remove.query(subset_query) + if len(remove) > 0: + log( + f"""[data-check] There are {len(remove)} critical rows with + null values in '{col}' (query: {subset_query})""", + level="warning", + ) + + +def filter_null(data: pd.DataFrame, columns: list, subset_query: str = None): + """ + Filter null values in columns. + + Args: + columns (list): list of columns to check + subset_query (str): query to check if there are important data + being removed + + Returns: + pandas.DataFrame: data without null values + """ + + for col in columns: + remove = data.query(f"{col} != {col}") # null values + data = data.drop(remove.index) + log( + f"[data-filter] Removed {len(remove)} rows with null '{col}'", + level="info", + ) + + if subset_query is not None: + # Check if there are important data being removed + remove = remove.query(subset_query) + if len(remove) > 0: + log( + f"[data-filter] Removed {len(remove)} critical rows with null '{col}'", + level="warning", + ) + + return data + + +def filter_data(data: pd.DataFrame, filters: list, subset_query: str = None): + """ + Filter data from a dataframe + + Args: + data (pd.DataFrame): data DataFrame + filters (list): list of queries to filter data + + Returns: + pandas.DataFrame: data without filter data + """ + for item in filters: + remove = data.query(item) + data = data.drop(remove.index) + log( + f"[data-filter] Removed {len(remove)} rows from filter: {item}", + level="info", + ) + + if subset_query is not None: + # Check if there are important data being removed + remove = remove.query(subset_query) + if len(remove) > 0: + log( + f"""[data-filter] Removed {len(remove)} critical rows + from filter: {item} (subquery: {subset_query})""", + level="warning", + ) + + return data + + +def check_relation(data: pd.DataFrame, columns: list): + """ + Check relation between collumns. + + Args: + data (pd.DataFrame): dataframe to be modified + columns (list): list of lists of columns to be checked + + Returns: + None + """ + + for cols in columns: + df_dup = data[~data.duplicated(subset=cols)].groupby(cols).count().reset_index().iloc[:, :1] + + for col in cols: + df_dup_col = ( + data[~data.duplicated(subset=col)].groupby(col).count().reset_index().iloc[:, :1] + ) + + if len(df_dup_col[~df_dup_col[col].duplicated()]) == len(df_dup): + log( + f"[data-check] Comparing '{col}' in '{cols}', there are no duplicated values", + level="info", + ) + else: + log( + f"[data-check] Comparing '{col}' in '{cols}', there are duplicated values", + level="warning", + ) + + +def data_info_str(data: pd.DataFrame): + """ + Return dataframe info as a str to log + + Args: + data (pd.DataFrame): dataframe + + Returns: + data.info() as a string + """ + buffer = io.StringIO() + data.info(buf=buffer) + return buffer.getvalue() + + +def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-locals + clock_interval: timedelta, + labels: List[str], + table_parameters: Union[list[dict], dict], + runs_interval_minutes: int = 15, + start_date: datetime = datetime(2020, 1, 1, tzinfo=pytz.timezone(constants.TIMEZONE.value)), + **general_flow_params, +) -> List[IntervalClock]: + """ + Generates multiple schedules + + Args: + clock_interval (timedelta): The interval to run the schedule + labels (List[str]): The labels to be added to the schedule + table_parameters (list): The table parameters to iterate over + runs_interval_minutes (int, optional): The interval between each schedule. Defaults to 15. + start_date (datetime, optional): The start date of the schedule. + Defaults to datetime(2020, 1, 1, tzinfo=pytz.timezone(constants.TIMEZONE.value)). + general_flow_params: Any param that you want to pass to the flow + Returns: + List[IntervalClock]: The list of schedules + + """ + if isinstance(table_parameters, dict): + table_parameters = [table_parameters] + + clocks = [] + for count, parameters in enumerate(table_parameters): + parameter_defaults = parameters | general_flow_params + clocks.append( + IntervalClock( + interval=clock_interval, + start_date=start_date + timedelta(minutes=runs_interval_minutes * count), + labels=labels, + parameter_defaults=parameter_defaults, + ) + ) + return clocks + + +def dict_contains_keys(input_dict: dict, keys: list[str]) -> bool: + """ + Test if the input dict has all keys present in the list + + Args: + input_dict (dict): the dict to test if has the keys + keys (list[str]): the list containing the keys to check + Returns: + bool: True if the input_dict has all the keys otherwise False + """ + return all(x in input_dict.keys() for x in keys) + + +def custom_serialization(obj: Any) -> Any: + """ + Function to serialize not JSON serializable objects + + Args: + obj (Any): Object to serialize + + Returns: + Any: Serialized object + """ + if isinstance(obj, (pd.Timestamp, date)): + if isinstance(obj, pd.Timestamp): + if obj.tzinfo is None: + obj = obj.tz_localize("UTC").tz_convert(constants.TIMEZONE.value) + return obj.isoformat() + + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + +def save_raw_local_func( + data: Union[dict, str], + filepath: str, + mode: str = "raw", + filetype: str = "json", +) -> str: + """ + Saves json response from API to .json file. + Args: + data (Union[dict, str]): Raw data to save + filepath (str): Path which to save raw file + mode (str, optional): Folder to save locally, later folder which to upload to GCS. + filetype (str, optional): The file format + Returns: + str: Path to the saved file + """ + + # diferentes tipos de arquivos para salvar + _filepath = filepath.format(mode=mode, filetype=filetype) + Path(_filepath).parent.mkdir(parents=True, exist_ok=True) + + if filetype == "json": + if isinstance(data, str): + data = json.loads(data) + with Path(_filepath).open("w", encoding="utf-8") as fi: + json.dump(data, fi, default=custom_serialization) + + if filetype in ("txt", "csv"): + with open(_filepath, "w", encoding="utf-8") as file: + file.write(data) + + log(f"Raw data saved to: {_filepath}") + return _filepath + + +def get_raw_data_api( # pylint: disable=R0912 + url: str, + secret_path: str = None, + api_params: dict = None, + filetype: str = None, +) -> tuple[str, str, str]: + """ + Request data from URL API + + Args: + url (str): URL to request data + secret_path (str, optional): Secret path to get headers. Defaults to None. + api_params (dict, optional): Parameters to pass to API. Defaults to None. + filetype (str, optional): Filetype to save raw file. Defaults to None. + + Returns: + tuple[str, str, str]: Error, data and filetype + """ + error = None + data = None + try: + if secret_path is None: + headers = secret_path + else: + headers = get_secret(secret_path)["data"] + + response = requests.get( + url, + headers=headers, + timeout=constants.MAX_TIMEOUT_SECONDS.value, + params=api_params, + ) + + response.raise_for_status() + + if filetype == "json": + data = response.json() + else: + data = response.text + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data, filetype + + +def get_upload_storage_blob( + dataset_id: str, + filename: str, +) -> Blob: + """ + Get a blob from upload zone in storage + + Args: + dataset_id (str): The dataset id on BigQuery. + filename (str): The filename in GCS. + + Returns: + Blob: blob object + """ + bucket = bd.Storage(dataset_id="", table_id="") + log(f"Filename: {filename}, dataset_id: {dataset_id}") + blob_list = list( + bucket.client["storage_staging"] + .bucket(bucket.bucket_name) + .list_blobs(prefix=f"upload/{dataset_id}/{filename}.") + ) + + return blob_list[0] + + +def get_raw_data_gcs( + dataset_id: str, + table_id: str, + zip_filename: str = None, +) -> tuple[str, str, str]: + """ + Get raw data from GCS + + Args: + dataset_id (str): The dataset id on BigQuery. + table_id (str): The table id on BigQuery. + zip_filename (str, optional): The zip file name. Defaults to None. + + Returns: + tuple[str, str, str]: Error, data and filetype + """ + error = None + data = None + filetype = None + + try: + blob_search_name = zip_filename or table_id + blob = get_upload_storage_blob(dataset_id=dataset_id, filename=blob_search_name) + + filename = blob.name + filetype = filename.split(".")[-1] + + data = blob.download_as_bytes() + + if filetype == "zip": + with zipfile.ZipFile(io.BytesIO(data), "r") as zipped_file: + filenames = zipped_file.namelist() + filename = list(filter(lambda x: x.split(".")[0] == table_id, filenames))[0] + filetype = filename.split(".")[-1] + data = zipped_file.read(filename) + + data = data.decode(encoding="utf-8") + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data, filetype + + +def get_raw_data_db( + query: str, engine: str, host: str, secret_path: str, database: str +) -> tuple[str, str, str]: + """ + Get data from Databases + + Args: + query (str): the SQL Query to execute + engine (str): The datase management system + host (str): The database host + secret_path (str): Secret path to get credentials + database (str): The database to connect + + Returns: + tuple[str, str, str]: Error, data and filetype + """ + connector_mapping = { + "postgresql": psycopg2.connect, + "mysql": pymysql.connect, + } + + data = None + error = None + filetype = "json" + + try: + credentials = get_secret(secret_path)["data"] + + with connector_mapping[engine]( + host=host, + user=credentials["user"], + password=credentials["password"], + database=database, + ) as connection: + data = pd.read_sql(sql=query, con=connection).to_dict(orient="records") + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data, filetype + + +def save_treated_local_func( + filepath: str, data: pd.DataFrame, error: str, mode: str = "staging" +) -> str: + """ + Save treated file to CSV. + + Args: + filepath (str): Path to save file + data (pd.DataFrame): Dataframe to save + error (str): Error catched during execution + mode (str, optional): Folder to save locally, later folder which to upload to GCS. + + Returns: + str: Path to the saved file + """ + _filepath = filepath.format(mode=mode, filetype="csv") + Path(_filepath).parent.mkdir(parents=True, exist_ok=True) + if error is None: + data.to_csv(_filepath, index=False) + log(f"Treated data saved to: {_filepath}") + return _filepath + + +def upload_run_logs_to_bq( # pylint: disable=R0913 + dataset_id: str, + parent_table_id: str, + timestamp: str, + error: str = None, + previous_error: str = None, + recapture: bool = False, + mode: str = "raw", +): + """ + Upload execution status table to BigQuery. + Table is uploaded to the same dataset, named {parent_table_id}_logs. + If passing status_dict, should not pass timestamp and error. + + Args: + dataset_id (str): dataset_id on BigQuery + parent_table_id (str): table_id on BigQuery + timestamp (str): timestamp to get datetime range + error (str): error catched during execution + previous_error (str): previous error catched during execution + recapture (bool): if the execution was a recapture + mode (str): folder to save locally, later folder which to upload to GCS + + Returns: + None + """ + table_id = parent_table_id + "_logs" + # Create partition directory + filename = f"{table_id}_{timestamp.isoformat()}" + partition = f"data={timestamp.date()}" + filepath = Path(f"""data/{mode}/{dataset_id}/{table_id}/{partition}/{filename}.csv""") + filepath.parent.mkdir(exist_ok=True, parents=True) + # Create dataframe to be uploaded + if not error and recapture is True: + # if the recapture is succeeded, update the column erro + dataframe = pd.DataFrame( + { + "timestamp_captura": [timestamp], + "sucesso": [True], + "erro": [f"[recapturado]{previous_error}"], + } + ) + log(f"Recapturing {timestamp} with previous error:\n{previous_error}") + else: + # not recapturing or error during flow execution + dataframe = pd.DataFrame( + { + "timestamp_captura": [timestamp], + "sucesso": [error is None], + "erro": [error], + } + ) + # Save data local + dataframe.to_csv(filepath, index=False) + # Upload to Storage + create_or_append_table( + dataset_id=dataset_id, + table_id=table_id, + path=filepath.as_posix(), + partitions=partition, + ) + if error is not None: + raise Exception(f"Pipeline failed with error: {error}") + + +def get_datetime_range( + timestamp: datetime, + interval: timedelta, +) -> dict: + """ + Task to get datetime range in UTC + + Args: + timestamp (datetime): timestamp to get datetime range + interval (timedelta): interval to get datetime range + + Returns: + dict: datetime range + """ + + start = (timestamp - interval).astimezone(tz=pytz.timezone("UTC")).strftime("%Y-%m-%d %H:%M:%S") + + end = timestamp.astimezone(tz=pytz.timezone("UTC")).strftime("%Y-%m-%d %H:%M:%S") + + return {"start": start, "end": end} + + +def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFrame]: + """ + Read raw data from file + + Args: + filepath (str): filepath to read + csv_args (dict): arguments to pass to pandas.read_csv + + Returns: + tuple[str, pd.DataFrame]: error and data + """ + error = None + data = None + try: + file_type = filepath.split(".")[-1] + + if file_type == "json": + data = pd.read_json(filepath) + + # data = json.loads(data) + elif file_type in ("txt", "csv"): + if csv_args is None: + csv_args = {} + data = pd.read_csv(filepath, **csv_args) + else: + error = "Unsupported raw file extension. Supported only: json, csv and txt" + + except Exception: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + + return error, data + + +def get_raw_recursos(request_url: str, request_params: dict) -> tuple[str, str, str]: + """ + Returns a dataframe with recursos data from movidesk api. + """ + all_records = False + top = 1000 + skip = 0 + error = None + filetype = "json" + data = [] + + while not all_records: + try: + request_params["$top"] = top + request_params["$skip"] = skip + + log(f"Request url {request_url}") + + response = requests.get( + request_url, + params=request_params, + timeout=constants.MAX_TIMEOUT_SECONDS.value, + ) + response.raise_for_status() + + paginated_data = response.json() + + if isinstance(paginated_data, dict): + paginated_data = [paginated_data] + + if len(paginated_data) == top: + skip += top + time.sleep(36) + else: + if len(paginated_data) == 0: + log("Nenhum dado para tratar.") + break + all_records = True + data += paginated_data + + log(f"Dados (paginados): {len(data)}") + + except Exception as error: + error = traceback.format_exc() + log(f"[CATCHED] Task failed with error: \n{error}", level="error") + data = [] + break + + log(f"Request concluĂ­do, tamanho dos dados: {len(data)}.") + + return error, data, filetype + + +def build_table_id(mode: str, report_type: str): + """Build table_id based on which table is the target + of current flow run + + Args: + mode (str): SPPO or STPL + report_type (str): RHO or RDO + + Returns: + str: table_id + """ + if mode == "SPPO": + if report_type == "RDO": + table_id = constants.SPPO_RDO_TABLE_ID.value + else: + table_id = constants.SPPO_RHO_TABLE_ID.value + if mode == "STPL": + # slice the string to get rid of V at end of + # STPL reports filenames + if report_type[:3] == "RDO": + table_id = constants.STPL_RDO_TABLE_ID.value + else: + table_id = constants.STPL_RHO_TABLE_ID.value + return table_id + + +def generate_ftp_schedules(interval_minutes: int, label: str = constants.RJ_SMTR_AGENT_LABEL.value): + """Generates IntervalClocks with the parameters needed to capture + each report. + + Args: + interval_minutes (int): interval which this flow will be run. + label (str, optional): Prefect label, defines which agent to use when launching flow run. + Defaults to constants.RJ_SMTR_AGENT_LABEL.value. + + Returns: + List(IntervalClock): containing the clocks for scheduling runs + """ + modes = ["SPPO", "STPL"] + reports = ["RDO", "RHO"] + clocks = [] + for mode in modes: + for report in reports: + clocks.append( + IntervalClock( + interval=timedelta(minutes=interval_minutes), + start_date=datetime( + 2022, 12, 16, 5, 0, tzinfo=timezone(constants.TIMEZONE.value) + ), + parameter_defaults={ + "transport_mode": mode, + "report_type": report, + "table_id": build_table_id(mode=mode, report_type=report), + }, + labels=[label], + ) + ) + return clocks diff --git a/pipelines/utils/capture.py b/pipelines/utils/capture.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/fs.py b/pipelines/utils/fs.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/gcp.py b/pipelines/utils/gcp.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/logging.py b/pipelines/utils/logging.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/pre_treatment.py b/pipelines/utils/pre_treatment.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/redis.py b/pipelines/utils/redis.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/utils/secret.py b/pipelines/utils/secret.py index e4120538..12d2c55d 100644 --- a/pipelines/utils/secret.py +++ b/pipelines/utils/secret.py @@ -1,8 +1,10 @@ +# -*- coding: utf-8 -*- from prefeitura_rio.pipelines_utils.infisical import get_infisical_client -from pipelines.utils.utils import normalize_keys +from pipelines.utils.backup.utils import normalize_keys -def get_secret(secret_path:str='/', secret_name:str=None, environment:str='dev'): + +def get_secret(secret_path: str = "/", secret_name: str = None, environment: str = "dev"): """ Fetches secrets from Infisical. If passing only `secret_path` and no `secret_name`, returns all secrets inside a folder. @@ -16,10 +18,10 @@ def get_secret(secret_path:str='/', secret_name:str=None, environment:str='dev') _type_: _description_ """ client = get_infisical_client() - if not secret_path.startswith('/'): + if not secret_path.startswith("/"): secret_path = f"/{secret_path}" if secret_path and not secret_name: secrets = client.get_all_secrets(path=secret_path) - return normalize_keys({s.secret_name:s.secret_value for s in secrets}) + return normalize_keys({s.secret_name: s.secret_value for s in secrets}) secret = client.get_secret(secret_name=secret_name, path=secret_path, environment=environment) - return {secret_name:secret.secret_value} \ No newline at end of file + return {secret_name: secret.secret_value} diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 8468307a..e69de29b 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -1,959 +0,0 @@ -# -*- coding: utf-8 -*- -# flake8: noqa: E501 -""" -General purpose functions for rj_smtr -""" - -from ftplib import FTP -from pathlib import Path - -from datetime import timedelta, datetime, date -from typing import List, Union, Any -import traceback -import io -import json -import zipfile -import pytz -import requests -import basedosdados as bd -from basedosdados import Table -from basedosdados import Storage -from pytz import timezone -import math -import pandas as pd -from google.cloud.storage.blob import Blob -import pymysql -import psycopg2 -import psycopg2.extras -from redis_pal import RedisPal -import time - - -from prefect.schedules.clocks import IntervalClock - -from pipelines.constants import constants - - -from pipelines.utils.implicit_ftp import ImplicitFtpTls -from pipelines.constants import constants - -from prefeitura_rio.pipelines_utils.logging import log #TODO: add or relocate imports -from prefeitura_rio.pipelines_utils.infisical import get_secret -from prefeitura_rio.pipelines_utils.redis_pal import get_redis_client - -# Set BD config to run on cloud # -bd.config.from_file = True - -def send_discord_message( - message: str, - webhook_url: str, -) -> None: - """ - Sends a message to a Discord channel. - """ - requests.post( - webhook_url, - data={"content": message}, - ) - - -def log_critical(message: str, secret_path: str = constants.CRITICAL_SECRET_PATH.value): - """Logs message to critical discord channel specified - - Args: - message (str): Message to post on the channel - secret_path (str, optional): Secret path storing the webhook to critical channel. - Defaults to constants.CRITICAL_SECRETPATH.value. - - """ - url = get_secret(secret_path=secret_path)["data"]["url"] - return send_discord_message(message=message, webhook_url=url) - - -def create_or_append_table( - dataset_id: str, table_id: str, path: str, partitions: str = None -): - """Conditionally create table or append data to its relative GCS folder. - - Args: - dataset_id (str): target dataset_id on BigQuery - table_id (str): target table_id on BigQuery - path (str): Path to .csv data file - """ - tb_obj = Table(table_id=table_id, dataset_id=dataset_id) - if not tb_obj.table_exists("staging"): - log("Table does not exist in STAGING, creating table...") - dirpath = path.split(partitions)[0] - tb_obj.create( - path=dirpath, - if_table_exists="pass", - if_storage_data_exists="replace", - ) - log("Table created in STAGING") - else: - log("Table already exists in STAGING, appending to it...") - tb_obj.append( - filepath=path, if_exists="replace", timeout=600, partitions=partitions - ) - log("Appended to table on STAGING successfully.") - - -def generate_df_and_save(data: dict, fname: Path): - """Save DataFrame as csv - - Args: - data (dict): dict with the data which to build the DataFrame - fname (Path): _description_ - """ - # Generate dataframe - dataframe = pd.DataFrame() - dataframe[data["key_column"]] = [ - piece[data["key_column"]] for piece in data["data"] - ] - dataframe["content"] = list(data["data"]) - - # Save dataframe to CSV - dataframe.to_csv(fname, index=False) - - -def bq_project(kind: str = "bigquery_prod"): - """Get the set BigQuery project_id - - Args: - kind (str, optional): Which client to get the project name from. - Options are 'bigquery_staging', 'bigquery_prod' and 'storage_staging' - Defaults to 'bigquery_prod'. - - Returns: - str: the requested project_id - """ - return bd.upload.base.Base().client[kind].project - - -def get_table_min_max_value( # pylint: disable=R0913 - query_project_id: str, - dataset_id: str, - table_id: str, - field_name: str, - kind: str, - wait=None, # pylint: disable=unused-argument -): - """Query a table to get the maximum value for the chosen field. - Useful to incrementally materialize tables via DBT - - Args: - dataset_id (str): dataset_id on BigQuery - table_id (str): table_id on BigQuery - field_name (str): column name to query - kind (str): which value to get. Accepts min and max - """ - log(f"Getting {kind} value for {table_id}") - query = f""" - SELECT - {kind}({field_name}) - FROM {query_project_id}.{dataset_id}.{table_id} - """ - log(f"Will run query:\n{query}") - result = bd.read_sql(query=query, billing_project_id=bq_project()) - - return result.iloc[0][0] - -def get_last_run_timestamp(dataset_id: str, table_id: str, mode: str = "prod") -> str: - """ - Query redis to retrive the time for when the last materialization - ran. - - Args: - dataset_id (str): dataset_id on BigQuery - table_id (str): model filename on the queries repo. - eg: if you have a model defined in the file .sql, - the table_id should be - mode (str): - - Returns: - Union[str, None]: _description_ - """ - redis_client = get_redis_client() - key = dataset_id + "." + table_id - log(f"Fetching key {key} from redis, working on mode {mode}") - if mode == "dev": - key = f"{mode}.{key}" - runs = redis_client.get(key) - # if runs is None: - # redis_client.set(key, "") - try: - last_run_timestamp = runs["last_run_timestamp"] - except KeyError: - return None - except TypeError: - return None - log(f"Got value {last_run_timestamp}") - return last_run_timestamp - - -def map_dict_keys(data: dict, mapping: dict) -> None: - """ - Map old keys to new keys in a dict. - """ - for old_key, new_key in mapping.items(): - data[new_key] = data.pop(old_key) - return data - -def normalize_keys(data:dict): - _data = {key.lower():value for key, value in data.items()} - return _data - -def connect_ftp(secret_path: str = None, secure: bool = True): - """Connect to FTP - - Returns: - ImplicitFTP_TLS: ftp client - """ - - ftp_data = get_secret(secret_path)["data"] - if secure: - ftp_client = ImplicitFtpTls() - else: - ftp_client = FTP() - ftp_client.connect(host=ftp_data["host"], port=int(ftp_data["port"])) - ftp_client.login(user=ftp_data["username"], passwd=ftp_data["pwd"]) - if secure: - ftp_client.prot_p() - return ftp_client - - -def safe_cast(val, to_type, default=None): - """ - Safe cast value. - """ - try: - return to_type(val) - except ValueError: - return default - - -def set_redis_rdo_files(redis_client, dataset_id: str, table_id: str): - """ - Register downloaded files to Redis - - Args: - redis_client (_type_): _description_ - dataset_id (str): dataset_id on BigQuery - table_id (str): table_id on BigQuery - - Returns: - bool: if the key was properly set - """ - try: - content = redis_client.get(f"{dataset_id}.{table_id}")["files"] - except TypeError as e: - log(f"Caught error {e}. Will set unexisting key") - # set key to empty dict for filling later - redis_client.set(f"{dataset_id}.{table_id}", {"files": []}) - content = redis_client.get(f"{dataset_id}.{table_id}") - # update content - st_client = bd.Storage(dataset_id=dataset_id, table_id=table_id) - blob_names = [ - blob.name - for blob in st_client.client["storage_staging"].list_blobs( - st_client.bucket, prefix=f"staging/{dataset_id}/{table_id}" - ) - ] - files = [blob_name.split("/")[-1].replace(".csv", "") for blob_name in blob_names] - log(f"When setting key, found {len(files)} files. Will register on redis...") - content["files"] = files - # set key - return redis_client.set(f"{dataset_id}.{table_id}", content) - - -# PRE TREAT # - - -def check_not_null(data: pd.DataFrame, columns: list, subset_query: str = None): - """ - Check if there are null values in columns. - - Args: - columns (list): list of columns to check - subset_query (str): query to check if there are important data - being removed - - Returns: - None - """ - - for col in columns: - remove = data.query(f"{col} != {col}") # null values - log( - f"[data-check] There are {len(remove)} rows with null values in '{col}'", - level="info", - ) - - if subset_query is not None: - # Check if there are important data being removed - remove = remove.query(subset_query) - if len(remove) > 0: - log( - f"""[data-check] There are {len(remove)} critical rows with - null values in '{col}' (query: {subset_query})""", - level="warning", - ) - - -def filter_null(data: pd.DataFrame, columns: list, subset_query: str = None): - """ - Filter null values in columns. - - Args: - columns (list): list of columns to check - subset_query (str): query to check if there are important data - being removed - - Returns: - pandas.DataFrame: data without null values - """ - - for col in columns: - remove = data.query(f"{col} != {col}") # null values - data = data.drop(remove.index) - log( - f"[data-filter] Removed {len(remove)} rows with null '{col}'", - level="info", - ) - - if subset_query is not None: - # Check if there are important data being removed - remove = remove.query(subset_query) - if len(remove) > 0: - log( - f"[data-filter] Removed {len(remove)} critical rows with null '{col}'", - level="warning", - ) - - return data - - -def filter_data(data: pd.DataFrame, filters: list, subset_query: str = None): - """ - Filter data from a dataframe - - Args: - data (pd.DataFrame): data DataFrame - filters (list): list of queries to filter data - - Returns: - pandas.DataFrame: data without filter data - """ - for item in filters: - remove = data.query(item) - data = data.drop(remove.index) - log( - f"[data-filter] Removed {len(remove)} rows from filter: {item}", - level="info", - ) - - if subset_query is not None: - # Check if there are important data being removed - remove = remove.query(subset_query) - if len(remove) > 0: - log( - f"""[data-filter] Removed {len(remove)} critical rows - from filter: {item} (subquery: {subset_query})""", - level="warning", - ) - - return data - - -def check_relation(data: pd.DataFrame, columns: list): - """ - Check relation between collumns. - - Args: - data (pd.DataFrame): dataframe to be modified - columns (list): list of lists of columns to be checked - - Returns: - None - """ - - for cols in columns: - df_dup = ( - data[~data.duplicated(subset=cols)] - .groupby(cols) - .count() - .reset_index() - .iloc[:, :1] - ) - - for col in cols: - df_dup_col = ( - data[~data.duplicated(subset=col)] - .groupby(col) - .count() - .reset_index() - .iloc[:, :1] - ) - - if len(df_dup_col[~df_dup_col[col].duplicated()]) == len(df_dup): - log( - f"[data-check] Comparing '{col}' in '{cols}', there are no duplicated values", - level="info", - ) - else: - log( - f"[data-check] Comparing '{col}' in '{cols}', there are duplicated values", - level="warning", - ) - - -def data_info_str(data: pd.DataFrame): - """ - Return dataframe info as a str to log - - Args: - data (pd.DataFrame): dataframe - - Returns: - data.info() as a string - """ - buffer = io.StringIO() - data.info(buf=buffer) - return buffer.getvalue() - - -def generate_execute_schedules( # pylint: disable=too-many-arguments,too-many-locals - clock_interval: timedelta, - labels: List[str], - table_parameters: Union[list[dict], dict], - runs_interval_minutes: int = 15, - start_date: datetime = datetime( - 2020, 1, 1, tzinfo=pytz.timezone(constants.TIMEZONE.value) - ), - **general_flow_params, -) -> List[IntervalClock]: - """ - Generates multiple schedules - - Args: - clock_interval (timedelta): The interval to run the schedule - labels (List[str]): The labels to be added to the schedule - table_parameters (list): The table parameters to iterate over - runs_interval_minutes (int, optional): The interval between each schedule. Defaults to 15. - start_date (datetime, optional): The start date of the schedule. - Defaults to datetime(2020, 1, 1, tzinfo=pytz.timezone(constants.TIMEZONE.value)). - general_flow_params: Any param that you want to pass to the flow - Returns: - List[IntervalClock]: The list of schedules - - """ - if isinstance(table_parameters, dict): - table_parameters = [table_parameters] - - clocks = [] - for count, parameters in enumerate(table_parameters): - parameter_defaults = parameters | general_flow_params - clocks.append( - IntervalClock( - interval=clock_interval, - start_date=start_date - + timedelta(minutes=runs_interval_minutes * count), - labels=labels, - parameter_defaults=parameter_defaults, - ) - ) - return clocks - - -def dict_contains_keys(input_dict: dict, keys: list[str]) -> bool: - """ - Test if the input dict has all keys present in the list - - Args: - input_dict (dict): the dict to test if has the keys - keys (list[str]): the list containing the keys to check - Returns: - bool: True if the input_dict has all the keys otherwise False - """ - return all(x in input_dict.keys() for x in keys) - - -def custom_serialization(obj: Any) -> Any: - """ - Function to serialize not JSON serializable objects - - Args: - obj (Any): Object to serialize - - Returns: - Any: Serialized object - """ - if isinstance(obj, (pd.Timestamp, date)): - if isinstance(obj, pd.Timestamp): - if obj.tzinfo is None: - obj = obj.tz_localize("UTC").tz_convert( - constants.TIMEZONE.value - ) - return obj.isoformat() - - raise TypeError(f"Object of type {type(obj)} is not JSON serializable") - - -def save_raw_local_func( - data: Union[dict, str], - filepath: str, - mode: str = "raw", - filetype: str = "json", -) -> str: - """ - Saves json response from API to .json file. - Args: - data (Union[dict, str]): Raw data to save - filepath (str): Path which to save raw file - mode (str, optional): Folder to save locally, later folder which to upload to GCS. - filetype (str, optional): The file format - Returns: - str: Path to the saved file - """ - - # diferentes tipos de arquivos para salvar - _filepath = filepath.format(mode=mode, filetype=filetype) - Path(_filepath).parent.mkdir(parents=True, exist_ok=True) - - if filetype == "json": - if isinstance(data, str): - data = json.loads(data) - with Path(_filepath).open("w", encoding="utf-8") as fi: - json.dump(data, fi, default=custom_serialization) - - if filetype in ("txt", "csv"): - with open(_filepath, "w", encoding="utf-8") as file: - file.write(data) - - log(f"Raw data saved to: {_filepath}") - return _filepath - - -def get_raw_data_api( # pylint: disable=R0912 - url: str, - secret_path: str = None, - api_params: dict = None, - filetype: str = None, -) -> tuple[str, str, str]: - """ - Request data from URL API - - Args: - url (str): URL to request data - secret_path (str, optional): Secret path to get headers. Defaults to None. - api_params (dict, optional): Parameters to pass to API. Defaults to None. - filetype (str, optional): Filetype to save raw file. Defaults to None. - - Returns: - tuple[str, str, str]: Error, data and filetype - """ - error = None - data = None - try: - if secret_path is None: - headers = secret_path - else: - headers = get_secret(secret_path)["data"] - - response = requests.get( - url, - headers=headers, - timeout=constants.MAX_TIMEOUT_SECONDS.value, - params=api_params, - ) - - response.raise_for_status() - - if filetype == "json": - data = response.json() - else: - data = response.text - - except Exception: - error = traceback.format_exc() - log(f"[CATCHED] Task failed with error: \n{error}", level="error") - - return error, data, filetype - - -def get_upload_storage_blob( - dataset_id: str, - filename: str, -) -> Blob: - """ - Get a blob from upload zone in storage - - Args: - dataset_id (str): The dataset id on BigQuery. - filename (str): The filename in GCS. - - Returns: - Blob: blob object - """ - bucket = bd.Storage(dataset_id="", table_id="") - log(f"Filename: {filename}, dataset_id: {dataset_id}") - blob_list = list( - bucket.client["storage_staging"] - .bucket(bucket.bucket_name) - .list_blobs(prefix=f"upload/{dataset_id}/{filename}.") - ) - - return blob_list[0] - - -def get_raw_data_gcs( - dataset_id: str, - table_id: str, - zip_filename: str = None, -) -> tuple[str, str, str]: - """ - Get raw data from GCS - - Args: - dataset_id (str): The dataset id on BigQuery. - table_id (str): The table id on BigQuery. - zip_filename (str, optional): The zip file name. Defaults to None. - - Returns: - tuple[str, str, str]: Error, data and filetype - """ - error = None - data = None - filetype = None - - try: - blob_search_name = zip_filename or table_id - blob = get_upload_storage_blob(dataset_id=dataset_id, filename=blob_search_name) - - filename = blob.name - filetype = filename.split(".")[-1] - - data = blob.download_as_bytes() - - if filetype == "zip": - with zipfile.ZipFile(io.BytesIO(data), "r") as zipped_file: - filenames = zipped_file.namelist() - filename = list( - filter(lambda x: x.split(".")[0] == table_id, filenames) - )[0] - filetype = filename.split(".")[-1] - data = zipped_file.read(filename) - - data = data.decode(encoding="utf-8") - - except Exception: - error = traceback.format_exc() - log(f"[CATCHED] Task failed with error: \n{error}", level="error") - - return error, data, filetype - - -def get_raw_data_db( - query: str, engine: str, host: str, secret_path: str, database: str -) -> tuple[str, str, str]: - """ - Get data from Databases - - Args: - query (str): the SQL Query to execute - engine (str): The datase management system - host (str): The database host - secret_path (str): Secret path to get credentials - database (str): The database to connect - - Returns: - tuple[str, str, str]: Error, data and filetype - """ - connector_mapping = { - "postgresql": psycopg2.connect, - "mysql": pymysql.connect, - } - - data = None - error = None - filetype = "json" - - try: - credentials = get_secret(secret_path)["data"] - - with connector_mapping[engine]( - host=host, - user=credentials["user"], - password=credentials["password"], - database=database, - ) as connection: - data = pd.read_sql(sql=query, con=connection).to_dict(orient="records") - - except Exception: - error = traceback.format_exc() - log(f"[CATCHED] Task failed with error: \n{error}", level="error") - - return error, data, filetype - - -def save_treated_local_func( - filepath: str, data: pd.DataFrame, error: str, mode: str = "staging" -) -> str: - """ - Save treated file to CSV. - - Args: - filepath (str): Path to save file - data (pd.DataFrame): Dataframe to save - error (str): Error catched during execution - mode (str, optional): Folder to save locally, later folder which to upload to GCS. - - Returns: - str: Path to the saved file - """ - _filepath = filepath.format(mode=mode, filetype="csv") - Path(_filepath).parent.mkdir(parents=True, exist_ok=True) - if error is None: - data.to_csv(_filepath, index=False) - log(f"Treated data saved to: {_filepath}") - return _filepath - - -def upload_run_logs_to_bq( # pylint: disable=R0913 - dataset_id: str, - parent_table_id: str, - timestamp: str, - error: str = None, - previous_error: str = None, - recapture: bool = False, - mode: str = "raw", -): - """ - Upload execution status table to BigQuery. - Table is uploaded to the same dataset, named {parent_table_id}_logs. - If passing status_dict, should not pass timestamp and error. - - Args: - dataset_id (str): dataset_id on BigQuery - parent_table_id (str): table_id on BigQuery - timestamp (str): timestamp to get datetime range - error (str): error catched during execution - previous_error (str): previous error catched during execution - recapture (bool): if the execution was a recapture - mode (str): folder to save locally, later folder which to upload to GCS - - Returns: - None - """ - table_id = parent_table_id + "_logs" - # Create partition directory - filename = f"{table_id}_{timestamp.isoformat()}" - partition = f"data={timestamp.date()}" - filepath = Path( - f"""data/{mode}/{dataset_id}/{table_id}/{partition}/{filename}.csv""" - ) - filepath.parent.mkdir(exist_ok=True, parents=True) - # Create dataframe to be uploaded - if not error and recapture is True: - # if the recapture is succeeded, update the column erro - dataframe = pd.DataFrame( - { - "timestamp_captura": [timestamp], - "sucesso": [True], - "erro": [f"[recapturado]{previous_error}"], - } - ) - log(f"Recapturing {timestamp} with previous error:\n{previous_error}") - else: - # not recapturing or error during flow execution - dataframe = pd.DataFrame( - { - "timestamp_captura": [timestamp], - "sucesso": [error is None], - "erro": [error], - } - ) - # Save data local - dataframe.to_csv(filepath, index=False) - # Upload to Storage - create_or_append_table( - dataset_id=dataset_id, - table_id=table_id, - path=filepath.as_posix(), - partitions=partition, - ) - if error is not None: - raise Exception(f"Pipeline failed with error: {error}") - - -def get_datetime_range( - timestamp: datetime, - interval: timedelta, -) -> dict: - """ - Task to get datetime range in UTC - - Args: - timestamp (datetime): timestamp to get datetime range - interval (timedelta): interval to get datetime range - - Returns: - dict: datetime range - """ - - start = ( - (timestamp - interval) - .astimezone(tz=pytz.timezone("UTC")) - .strftime("%Y-%m-%d %H:%M:%S") - ) - - end = timestamp.astimezone(tz=pytz.timezone("UTC")).strftime("%Y-%m-%d %H:%M:%S") - - return {"start": start, "end": end} - - -def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFrame]: - """ - Read raw data from file - - Args: - filepath (str): filepath to read - csv_args (dict): arguments to pass to pandas.read_csv - - Returns: - tuple[str, pd.DataFrame]: error and data - """ - error = None - data = None - try: - file_type = filepath.split(".")[-1] - - if file_type == "json": - data = pd.read_json(filepath) - - # data = json.loads(data) - elif file_type in ("txt", "csv"): - if csv_args is None: - csv_args = {} - data = pd.read_csv(filepath, **csv_args) - else: - error = "Unsupported raw file extension. Supported only: json, csv and txt" - - except Exception: - error = traceback.format_exc() - log(f"[CATCHED] Task failed with error: \n{error}", level="error") - - return error, data - - -def get_raw_recursos(request_url: str, request_params: dict) -> tuple[str, str, str]: - """ - Returns a dataframe with recursos data from movidesk api. - """ - all_records = False - top = 1000 - skip = 0 - error = None - filetype = "json" - data = [] - - while not all_records: - try: - request_params["$top"] = top - request_params["$skip"] = skip - - log(f"Request url {request_url}") - - response = requests.get( - request_url, - params=request_params, - timeout=constants.MAX_TIMEOUT_SECONDS.value, - ) - response.raise_for_status() - - paginated_data = response.json() - - if isinstance(paginated_data, dict): - paginated_data = [paginated_data] - - if len(paginated_data) == top: - skip += top - time.sleep(36) - else: - if len(paginated_data) == 0: - log("Nenhum dado para tratar.") - break - all_records = True - data += paginated_data - - log(f"Dados (paginados): {len(data)}") - - except Exception as error: - error = traceback.format_exc() - log(f"[CATCHED] Task failed with error: \n{error}", level="error") - data = [] - break - - log(f"Request concluĂ­do, tamanho dos dados: {len(data)}.") - - return error, data, filetype - -def build_table_id(mode: str, report_type: str): - """Build table_id based on which table is the target - of current flow run - - Args: - mode (str): SPPO or STPL - report_type (str): RHO or RDO - - Returns: - str: table_id - """ - if mode == "SPPO": - if report_type == "RDO": - table_id = constants.SPPO_RDO_TABLE_ID.value - else: - table_id = constants.SPPO_RHO_TABLE_ID.value - if mode == "STPL": - # slice the string to get rid of V at end of - # STPL reports filenames - if report_type[:3] == "RDO": - table_id = constants.STPL_RDO_TABLE_ID.value - else: - table_id = constants.STPL_RHO_TABLE_ID.value - return table_id - -def generate_ftp_schedules( - interval_minutes: int, label: str = constants.RJ_SMTR_AGENT_LABEL.value -): - """Generates IntervalClocks with the parameters needed to capture - each report. - - Args: - interval_minutes (int): interval which this flow will be run. - label (str, optional): Prefect label, defines which agent to use when launching flow run. - Defaults to constants.RJ_SMTR_AGENT_LABEL.value. - - Returns: - List(IntervalClock): containing the clocks for scheduling runs - """ - modes = ["SPPO", "STPL"] - reports = ["RDO", "RHO"] - clocks = [] - for mode in modes: - for report in reports: - clocks.append( - IntervalClock( - interval=timedelta(minutes=interval_minutes), - start_date=datetime( - 2022, 12, 16, 5, 0, tzinfo=timezone(constants.TIMEZONE.value) - ), - parameter_defaults={ - "transport_mode": mode, - "report_type": report, - "table_id": build_table_id(mode=mode, report_type=report), - }, - labels=[label], - ) - ) - return clocks \ No newline at end of file