From a4f5222b5a7a075c9b48591d434f8e18075b3c85 Mon Sep 17 00:00:00 2001 From: ThiagoTrabach Date: Thu, 9 Nov 2023 12:31:15 +0000 Subject: [PATCH] =?UTF-8?q?Deploying=20to=20gh-pages=20from=20@=20prefeitu?= =?UTF-8?q?ra-rio/pipelines@ea74b3a48271da46c7a98d9ae96431fa3ea82c6e=20?= =?UTF-8?q?=F0=9F=9A=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rj_sms/dump_ftp_cnes/constants.html | 182 ++++++ rj_sms/dump_ftp_cnes/flows.html | 205 +++++++ rj_sms/dump_ftp_cnes/index.html | 126 ++++ rj_sms/dump_ftp_cnes/schedules.html | 127 ++++ rj_sms/dump_ftp_cnes/tasks.html | 868 ++++++++++++++++++++++++++++ rj_sms/index.html | 5 + rj_sms/utils.html | 174 ++++-- 7 files changed, 1653 insertions(+), 34 deletions(-) create mode 100644 rj_sms/dump_ftp_cnes/constants.html create mode 100644 rj_sms/dump_ftp_cnes/flows.html create mode 100644 rj_sms/dump_ftp_cnes/index.html create mode 100644 rj_sms/dump_ftp_cnes/schedules.html create mode 100644 rj_sms/dump_ftp_cnes/tasks.html diff --git a/rj_sms/dump_ftp_cnes/constants.html b/rj_sms/dump_ftp_cnes/constants.html new file mode 100644 index 000000000..7404c3915 --- /dev/null +++ b/rj_sms/dump_ftp_cnes/constants.html @@ -0,0 +1,182 @@ + + + + + + +pipelines.rj_sms.dump_ftp_cnes.constants API documentation + + + + + + + + + + + +
+
+
+

Module pipelines.rj_sms.dump_ftp_cnes.constants

+
+
+

Constants for utils.

+
+ +Expand source code + +
# -*- coding: utf-8 -*-
+# pylint: disable=C0103
+"""
+Constants for utils.
+"""
+from enum import Enum
+
+
+class constants(Enum):
+    """
+    Constant values for the dump vitai flows
+    """
+
+    FTP_SERVER = "ftp.datasus.gov.br"
+    FTP_FILE_PATH = "/cnes"
+    BASE_FILE = "BASE_DE_DADOS_CNES"
+    DATASET_ID = "brutos_cnes"
+
+
+
+
+
+
+
+
+
+

Classes

+
+
+class constants +(value, names=None, *, module=None, qualname=None, type=None, start=1) +
+
+

Constant values for the dump vitai flows

+
+ +Expand source code + +
class constants(Enum):
+    """
+    Constant values for the dump vitai flows
+    """
+
+    FTP_SERVER = "ftp.datasus.gov.br"
+    FTP_FILE_PATH = "/cnes"
+    BASE_FILE = "BASE_DE_DADOS_CNES"
+    DATASET_ID = "brutos_cnes"
+
+

Ancestors

+
    +
  • enum.Enum
  • +
+

Class variables

+
+
var BASE_FILE
+
+
+
+
var DATASET_ID
+
+
+
+
var FTP_FILE_PATH
+
+
+
+
var FTP_SERVER
+
+
+
+
+
+
+
+
+ +
+ + + \ No newline at end of file diff --git a/rj_sms/dump_ftp_cnes/flows.html b/rj_sms/dump_ftp_cnes/flows.html new file mode 100644 index 000000000..64e26334a --- /dev/null +++ b/rj_sms/dump_ftp_cnes/flows.html @@ -0,0 +1,205 @@ + + + + + + +pipelines.rj_sms.dump_ftp_cnes.flows API documentation + + + + + + + + + + + +
+
+
+

Module pipelines.rj_sms.dump_ftp_cnes.flows

+
+
+

CNES dumping flows

+
+ +Expand source code + +
# -*- coding: utf-8 -*-
+# pylint: disable=C0103
+"""
+CNES dumping flows
+"""
+
+from prefect import Parameter
+from prefect.run_configs import KubernetesRun
+from prefect.storage import GCS
+from pipelines.utils.decorators import Flow
+from pipelines.constants import constants
+from pipelines.rj_sms.dump_ftp_cnes.constants import constants as cnes_constants
+from pipelines.rj_sms.utils import create_folders, unzip_file
+from pipelines.rj_sms.dump_ftp_cnes.tasks import (
+    conform_csv_to_gcp,
+    create_partitions_and_upload_multiple_tables_to_datalake,
+    add_multiple_date_column,
+    download_ftp_cnes,
+    check_file_to_download,
+)
+from pipelines.rj_sms.dump_ftp_cnes.schedules import every_sunday_at_six_am
+
+
+with Flow(
+    name="SMS: Dump CNES - Captura de dados CNES", code_owners=["thiago"]
+) as dump_cnes:
+    # Parameters
+    # Parameters for GCP
+    dataset_id = cnes_constants.DATASET_ID.value
+    # Parameters for CNES
+    ftp_server = cnes_constants.FTP_SERVER.value
+    ftp_file_path = cnes_constants.FTP_FILE_PATH.value
+    base_file = cnes_constants.BASE_FILE.value
+    download_newest = Parameter("download_newest", default=True)
+
+    # Aditional parameters for CNES if download_newest is False
+    file_to_download = Parameter("file_to_download", default=None, required=False)
+    partition_date = Parameter(
+        "partition_date", default=None, required=False
+    )  # format YYYY-MM-DD or YY-MM. Partitions level must follow
+
+    # Start run
+    file_to_download_task = check_file_to_download(
+        download_newest=download_newest,
+        file_to_download=file_to_download,
+        partition_date=partition_date,
+        host=ftp_server,
+        user="",
+        password="",
+        directory=ftp_file_path,
+        file_name=base_file,
+    )
+
+    create_folders_task = create_folders()
+    create_folders_task.set_upstream(file_to_download_task)
+
+    download_task = download_ftp_cnes(
+        host=ftp_server,
+        user="",
+        password="",
+        directory=ftp_file_path,
+        file_name=file_to_download_task["file"],
+        output_path=create_folders_task["raw"],
+    )
+    download_task.set_upstream(create_folders_task)
+
+    unzip_task = unzip_file(
+        file_path=download_task, output_path=create_folders_task["raw"]
+    )
+    unzip_task.set_upstream(download_task)
+
+    conform_task = conform_csv_to_gcp(create_folders_task["raw"])
+    conform_task.set_upstream(unzip_task)
+
+    add_multiple_date_column_task = add_multiple_date_column(
+        directory=create_folders_task["raw"],
+        snapshot_date=file_to_download_task["snapshot"],
+        sep=";",
+    )
+    add_multiple_date_column_task.set_upstream(conform_task)
+
+    upload_to_datalake_task = create_partitions_and_upload_multiple_tables_to_datalake(
+        path_files=conform_task,
+        partition_folder=create_folders_task["partition_directory"],
+        partition_date=file_to_download_task["snapshot"],
+        dataset_id=dataset_id,
+        dump_mode="append",
+    )
+    upload_to_datalake_task.set_upstream(add_multiple_date_column_task)
+
+
+dump_cnes.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
+dump_cnes.run_config = KubernetesRun(
+    image=constants.DOCKER_IMAGE.value,
+    labels=[
+        constants.RJ_SMS_AGENT_LABEL.value,
+    ],
+)
+
+dump_cnes.schedule = every_sunday_at_six_am
+
+
+
+
+
+
+
+
+
+
+
+ +
+ + + \ No newline at end of file diff --git a/rj_sms/dump_ftp_cnes/index.html b/rj_sms/dump_ftp_cnes/index.html new file mode 100644 index 000000000..f3f6ab602 --- /dev/null +++ b/rj_sms/dump_ftp_cnes/index.html @@ -0,0 +1,126 @@ + + + + + + +pipelines.rj_sms.dump_ftp_cnes API documentation + + + + + + + + + + + +
+ + +
+ + + \ No newline at end of file diff --git a/rj_sms/dump_ftp_cnes/schedules.html b/rj_sms/dump_ftp_cnes/schedules.html new file mode 100644 index 000000000..7d473b36d --- /dev/null +++ b/rj_sms/dump_ftp_cnes/schedules.html @@ -0,0 +1,127 @@ + + + + + + +pipelines.rj_sms.dump_ftp_cnes.schedules API documentation + + + + + + + + + + + +
+
+
+

Module pipelines.rj_sms.dump_ftp_cnes.schedules

+
+
+

Schedules for the cnes dump pipeline

+
+ +Expand source code + +
# -*- coding: utf-8 -*-
+# pylint: disable=C0103
+"""
+Schedules for the cnes dump pipeline
+"""
+from datetime import timedelta
+import pendulum
+from prefect.schedules import Schedule
+from prefect.schedules.clocks import IntervalClock
+from pipelines.constants import constants
+
+every_sunday_at_six_am = Schedule(
+    clocks=[
+        IntervalClock(
+            interval=timedelta(days=7),
+            start_date=pendulum.datetime(2023, 10, 8, 6, 0, 0, tz="America/Sao_Paulo"),
+            labels=[
+                constants.RJ_SMS_AGENT_LABEL.value,
+            ],
+        )
+    ]
+)
+
+
+
+
+
+
+
+
+
+
+
+ +
+ + + \ No newline at end of file diff --git a/rj_sms/dump_ftp_cnes/tasks.html b/rj_sms/dump_ftp_cnes/tasks.html new file mode 100644 index 000000000..10ba5c017 --- /dev/null +++ b/rj_sms/dump_ftp_cnes/tasks.html @@ -0,0 +1,868 @@ + + + + + + +pipelines.rj_sms.dump_ftp_cnes.tasks API documentation + + + + + + + + + + + +
+
+
+

Module pipelines.rj_sms.dump_ftp_cnes.tasks

+
+
+

Tasks for dump_ftp_cnes

+
+ +Expand source code + +
# -*- coding: utf-8 -*-
+"""
+Tasks for dump_ftp_cnes
+"""
+
+import os
+import shutil
+from datetime import datetime, timedelta
+import tempfile
+import re
+import pandas as pd
+import pytz
+from prefect import task
+from pipelines.utils.utils import log
+from pipelines.rj_sms.dump_ftp_cnes.constants import constants
+from pipelines.rj_sms.utils import (
+    list_files_ftp,
+    upload_to_datalake,
+    download_ftp,
+    create_partitions,
+)
+
+
+@task
+def check_newest_file_version(
+    host: str, user: str, password: str, directory: str, file_name: str
+):
+    """
+    Check the newest version of a file in a given FTP directory.
+
+    Args:
+        host (str): FTP server hostname.
+        user (str): FTP server username.
+        password (str): FTP server password.
+        directory (str): FTP directory path.
+        file_name (str): Base name of the file to check.
+
+    Returns:
+        str: The name of the newest version of the file.
+    """
+    file_name = constants.BASE_FILE.value
+    files = list_files_ftp.run(host, user, password, directory)
+
+    # filter a list of files that contains the base file name
+    files = [file for file in files if file_name in file]
+
+    # sort list descending
+    files.sort(reverse=True)
+    newest_file = files[0]
+
+    # extract snapshot date from file
+    snapshot_date = re.findall(r"\d{6}", newest_file)[0]
+    snapshot_date = f"{snapshot_date[:4]}-{snapshot_date[-2:]}"
+
+    log(f"Newest file: {newest_file}, snapshot_date: {snapshot_date}")
+    return {"file": newest_file, "snapshot": snapshot_date}
+
+
+@task
+def check_file_to_download(
+    download_newest: bool,
+    file_to_download: None,
+    partition_date: None,
+    host: str,
+    user: str,
+    password: str,
+    directory: str,
+    file_name: str,
+):
+    """
+    Check which file to download based on the given parameters.
+
+    Args:
+        download_newest (bool): Whether to download the newest file or not.
+        file_to_download (None or str): The name of the file to download.
+        partition_date (None or str): The partition date of the file to download.
+        host (str): The FTP host to connect to.
+        user (str): The FTP username.
+        password (str): The FTP password.
+        directory (str): The directory where the file is located.
+        file_name (str): The name of the file to check.
+
+    Returns:
+        dict or str: If download_newest is True, returns the name of the newest file.
+                    If download_newest is False, returns a dictionary with the file name
+                    and partition date.
+    Raises:
+        ValueError: If download_newest is False and file_to_download or partition_date is
+        not provided.
+    """
+    if download_newest:
+        newest_file = check_newest_file_version.run(
+            host=host,
+            user=user,
+            password=password,
+            directory=directory,
+            file_name=file_name,
+        )
+        return newest_file
+    elif file_to_download is not None and partition_date is not None:
+        return {"file": file_to_download, "snapshot": partition_date}
+    else:
+        raise ValueError(
+            "If download_newest is False, file_to_download and partition_date must be provided"
+        )
+
+
+@task
+def conform_csv_to_gcp(directory: str):
+    """
+    Conform CSV files in the given directory to be compatible with Google Cloud Storage.
+
+    Args:
+        directory (str): The directory containing the CSV files to be conformed.
+
+    Returns:
+        List[str]: A list of filepaths of the conformed CSV files.
+    """
+    # list all csv files in the directory
+    csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]
+
+    log(f"Conforming {len(csv_files)} files...")
+
+    files_conform = []
+
+    # iterate over each csv file
+    for csv_file in csv_files:
+        # construct the full file path
+        filepath = os.path.join(directory, csv_file)
+
+        # create a temporary file
+        with tempfile.NamedTemporaryFile(mode="w", delete=False) as tf:
+            # open the original file in read mode
+            with open(filepath, "r", encoding="iso8859-1") as f:
+                # read the first line
+                first_line = f.readline()
+
+                # modify the first line
+                modified_first_line = first_line.replace("TO_CHAR(", "")
+                modified_first_line = modified_first_line.replace(",'DD/MM/YYYY')", "")
+
+                # write the modified first line to the temporary file
+                tf.write(modified_first_line)
+
+                # copy the rest of the lines from the original file to the temporary file
+                shutil.copyfileobj(f, tf)
+
+        # replace the original file with the temporary file
+        shutil.move(tf.name, filepath)
+        files_conform.append(filepath)
+
+    log("Conform done.")
+
+    return files_conform
+
+
+@task
+def upload_multiple_tables_to_datalake(
+    path_files: str, dataset_id: str, dump_mode: str
+):
+    """
+    Uploads multiple tables to datalake.
+
+    Args:
+        path_files (str): The path to the files to be uploaded.
+        dataset_id (str): The ID of the dataset to upload the files to.
+        dump_mode (str): The dump mode to use for the upload.
+
+    Returns:
+        None
+    """
+    for n, file in enumerate(path_files):
+        log(f"Uploading {n+1}/{len(path_files)} files to datalake...")
+
+        # retrieve file name from path
+        file_name = os.path.basename(file)
+
+        # replace 6 digits numbers from string
+        table_id = re.sub(r"\d{6}", "", file_name)
+        table_id = table_id.replace(".csv", "")
+
+        upload_to_datalake.run(
+            input_path=file,
+            dataset_id=dataset_id,
+            table_id=table_id,
+            if_exists="replace",
+            csv_delimiter=";",
+            if_storage_data_exists="replace",
+            biglake_table=True,
+            dump_mode=dump_mode,
+        )
+
+
+@task
+def add_multiple_date_column(directory: str, sep=";", snapshot_date=None):
+    """
+    Adds date metadata columns to all CSV files in a given directory.
+
+    Args:
+        directory (str): The directory containing the CSV files.
+        sep (str, optional): The delimiter used in the CSV files. Defaults to ";".
+        snapshot_date (str, optional): The date of the snapshot. Defaults to None.
+    """
+    tz = pytz.timezone("Brazil/East")
+    now = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
+
+    # list all csv files in the directory
+    csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]
+
+    # iterate over each csv file
+    for n, csv_file in enumerate(csv_files):
+        log(f"Adding date metadata to {n+1}/{len(csv_files)} files ...")
+        # construct the full file path
+
+        filepath = os.path.join(directory, csv_file)
+
+        df = pd.read_csv(filepath, sep=sep, keep_default_na=False, dtype="str")
+        df["_data_carga"] = now
+        df["_data_snapshot"] = snapshot_date
+
+        df.to_csv(filepath, index=False, sep=sep, encoding="utf-8")
+        log(f"Column added to {filepath}")
+
+
+@task(max_retries=5, retry_delay=timedelta(seconds=5), timeout=timedelta(seconds=600))
+def download_ftp_cnes(host, user, password, directory, file_name, output_path):
+    """
+    Downloads a file from an FTP server.
+
+    Args:
+        host (str): The FTP server hostname.
+        user (str): The FTP server username.
+        password (str): The FTP server password.
+        directory (str): The directory where the file is located.
+        file_name (str): The name of the file to download.
+        output_path (str): The local path where the file will be saved.
+
+    Returns:
+        str: The local path where the downloaded file was saved.
+    """
+    return download_ftp.run(
+        host=host,
+        user=user,
+        password=password,
+        directory=directory,
+        file_name=file_name,
+        output_path=output_path,
+    )
+
+
+@task
+def create_partitions_and_upload_multiple_tables_to_datalake(
+    path_files: str,
+    partition_folder: str,
+    partition_date: str,
+    dataset_id: str,
+    dump_mode: str,
+):
+    """
+    Uploads multiple tables to datalake.
+
+    Args:
+        path_files (str): The path to the files to be uploaded.
+        dataset_id (str): The ID of the dataset to upload the files to.
+        dump_mode (str): The dump mode to use for the upload.
+
+    Returns:
+        None
+    """
+    for n, file in enumerate(path_files):
+        log(f"Uploading {n+1}/{len(path_files)} files to datalake...")
+
+        # retrieve file name from path
+        file_name = os.path.basename(file)
+
+        # replace 6 digits numbers from string
+        table_id = re.sub(r"\d{6}", "", file_name)
+        table_id = table_id.replace(".csv", "")
+
+        table_partition_folder = os.path.join(partition_folder, table_id)
+
+        create_partitions.run(
+            data_path=file,
+            partition_directory=table_partition_folder,
+            level="month",
+            partition_date=partition_date,
+        )
+
+        upload_to_datalake.run(
+            input_path=table_partition_folder,
+            dataset_id=dataset_id,
+            table_id=table_id,
+            if_exists="replace",
+            csv_delimiter=";",
+            if_storage_data_exists="replace",
+            biglake_table=True,
+            dump_mode=dump_mode,
+        )
+
+
+
+
+
+
+
+

Functions

+
+
+def add_multiple_date_column(directory: str, sep=';', snapshot_date=None) +
+
+

Adds date metadata columns to all CSV files in a given directory.

+

Args

+
+
directory : str
+
The directory containing the CSV files.
+
sep : str, optional
+
The delimiter used in the CSV files. Defaults to ";".
+
snapshot_date : str, optional
+
The date of the snapshot. Defaults to None.
+
+
+ +Expand source code + +
@task
+def add_multiple_date_column(directory: str, sep=";", snapshot_date=None):
+    """
+    Adds date metadata columns to all CSV files in a given directory.
+
+    Args:
+        directory (str): The directory containing the CSV files.
+        sep (str, optional): The delimiter used in the CSV files. Defaults to ";".
+        snapshot_date (str, optional): The date of the snapshot. Defaults to None.
+    """
+    tz = pytz.timezone("Brazil/East")
+    now = datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
+
+    # list all csv files in the directory
+    csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]
+
+    # iterate over each csv file
+    for n, csv_file in enumerate(csv_files):
+        log(f"Adding date metadata to {n+1}/{len(csv_files)} files ...")
+        # construct the full file path
+
+        filepath = os.path.join(directory, csv_file)
+
+        df = pd.read_csv(filepath, sep=sep, keep_default_na=False, dtype="str")
+        df["_data_carga"] = now
+        df["_data_snapshot"] = snapshot_date
+
+        df.to_csv(filepath, index=False, sep=sep, encoding="utf-8")
+        log(f"Column added to {filepath}")
+
+
+
+def check_file_to_download(download_newest: bool, file_to_download: None, partition_date: None, host: str, user: str, password: str, directory: str, file_name: str) +
+
+

Check which file to download based on the given parameters.

+

Args

+
+
download_newest : bool
+
Whether to download the newest file or not.
+
file_to_download : None or str
+
The name of the file to download.
+
partition_date : None or str
+
The partition date of the file to download.
+
host : str
+
The FTP host to connect to.
+
user : str
+
The FTP username.
+
password : str
+
The FTP password.
+
directory : str
+
The directory where the file is located.
+
file_name : str
+
The name of the file to check.
+
+

Returns

+
+
dict or str
+
If download_newest is True, returns the name of the newest file. +If download_newest is False, returns a dictionary with the file name +and partition date.
+
+

Raises

+
+
ValueError
+
If download_newest is False and file_to_download or partition_date is
+
+

not provided.

+
+ +Expand source code + +
@task
+def check_file_to_download(
+    download_newest: bool,
+    file_to_download: None,
+    partition_date: None,
+    host: str,
+    user: str,
+    password: str,
+    directory: str,
+    file_name: str,
+):
+    """
+    Check which file to download based on the given parameters.
+
+    Args:
+        download_newest (bool): Whether to download the newest file or not.
+        file_to_download (None or str): The name of the file to download.
+        partition_date (None or str): The partition date of the file to download.
+        host (str): The FTP host to connect to.
+        user (str): The FTP username.
+        password (str): The FTP password.
+        directory (str): The directory where the file is located.
+        file_name (str): The name of the file to check.
+
+    Returns:
+        dict or str: If download_newest is True, returns the name of the newest file.
+                    If download_newest is False, returns a dictionary with the file name
+                    and partition date.
+    Raises:
+        ValueError: If download_newest is False and file_to_download or partition_date is
+        not provided.
+    """
+    if download_newest:
+        newest_file = check_newest_file_version.run(
+            host=host,
+            user=user,
+            password=password,
+            directory=directory,
+            file_name=file_name,
+        )
+        return newest_file
+    elif file_to_download is not None and partition_date is not None:
+        return {"file": file_to_download, "snapshot": partition_date}
+    else:
+        raise ValueError(
+            "If download_newest is False, file_to_download and partition_date must be provided"
+        )
+
+
+
+def check_newest_file_version(host: str, user: str, password: str, directory: str, file_name: str) +
+
+

Check the newest version of a file in a given FTP directory.

+

Args

+
+
host : str
+
FTP server hostname.
+
user : str
+
FTP server username.
+
password : str
+
FTP server password.
+
directory : str
+
FTP directory path.
+
file_name : str
+
Base name of the file to check.
+
+

Returns

+
+
str
+
The name of the newest version of the file.
+
+
+ +Expand source code + +
@task
+def check_newest_file_version(
+    host: str, user: str, password: str, directory: str, file_name: str
+):
+    """
+    Check the newest version of a file in a given FTP directory.
+
+    Args:
+        host (str): FTP server hostname.
+        user (str): FTP server username.
+        password (str): FTP server password.
+        directory (str): FTP directory path.
+        file_name (str): Base name of the file to check.
+
+    Returns:
+        str: The name of the newest version of the file.
+    """
+    file_name = constants.BASE_FILE.value
+    files = list_files_ftp.run(host, user, password, directory)
+
+    # filter a list of files that contains the base file name
+    files = [file for file in files if file_name in file]
+
+    # sort list descending
+    files.sort(reverse=True)
+    newest_file = files[0]
+
+    # extract snapshot date from file
+    snapshot_date = re.findall(r"\d{6}", newest_file)[0]
+    snapshot_date = f"{snapshot_date[:4]}-{snapshot_date[-2:]}"
+
+    log(f"Newest file: {newest_file}, snapshot_date: {snapshot_date}")
+    return {"file": newest_file, "snapshot": snapshot_date}
+
+
+
+def conform_csv_to_gcp(directory: str) +
+
+

Conform CSV files in the given directory to be compatible with Google Cloud Storage.

+

Args

+
+
directory : str
+
The directory containing the CSV files to be conformed.
+
+

Returns

+
+
List[str]
+
A list of filepaths of the conformed CSV files.
+
+
+ +Expand source code + +
@task
+def conform_csv_to_gcp(directory: str):
+    """
+    Conform CSV files in the given directory to be compatible with Google Cloud Storage.
+
+    Args:
+        directory (str): The directory containing the CSV files to be conformed.
+
+    Returns:
+        List[str]: A list of filepaths of the conformed CSV files.
+    """
+    # list all csv files in the directory
+    csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]
+
+    log(f"Conforming {len(csv_files)} files...")
+
+    files_conform = []
+
+    # iterate over each csv file
+    for csv_file in csv_files:
+        # construct the full file path
+        filepath = os.path.join(directory, csv_file)
+
+        # create a temporary file
+        with tempfile.NamedTemporaryFile(mode="w", delete=False) as tf:
+            # open the original file in read mode
+            with open(filepath, "r", encoding="iso8859-1") as f:
+                # read the first line
+                first_line = f.readline()
+
+                # modify the first line
+                modified_first_line = first_line.replace("TO_CHAR(", "")
+                modified_first_line = modified_first_line.replace(",'DD/MM/YYYY')", "")
+
+                # write the modified first line to the temporary file
+                tf.write(modified_first_line)
+
+                # copy the rest of the lines from the original file to the temporary file
+                shutil.copyfileobj(f, tf)
+
+        # replace the original file with the temporary file
+        shutil.move(tf.name, filepath)
+        files_conform.append(filepath)
+
+    log("Conform done.")
+
+    return files_conform
+
+
+
+def create_partitions_and_upload_multiple_tables_to_datalake(path_files: str, partition_folder: str, partition_date: str, dataset_id: str, dump_mode: str) +
+
+

Uploads multiple tables to datalake.

+

Args

+
+
path_files : str
+
The path to the files to be uploaded.
+
dataset_id : str
+
The ID of the dataset to upload the files to.
+
dump_mode : str
+
The dump mode to use for the upload.
+
+

Returns

+

None

+
+ +Expand source code + +
@task
+def create_partitions_and_upload_multiple_tables_to_datalake(
+    path_files: str,
+    partition_folder: str,
+    partition_date: str,
+    dataset_id: str,
+    dump_mode: str,
+):
+    """
+    Uploads multiple tables to datalake.
+
+    Args:
+        path_files (str): The path to the files to be uploaded.
+        dataset_id (str): The ID of the dataset to upload the files to.
+        dump_mode (str): The dump mode to use for the upload.
+
+    Returns:
+        None
+    """
+    for n, file in enumerate(path_files):
+        log(f"Uploading {n+1}/{len(path_files)} files to datalake...")
+
+        # retrieve file name from path
+        file_name = os.path.basename(file)
+
+        # replace 6 digits numbers from string
+        table_id = re.sub(r"\d{6}", "", file_name)
+        table_id = table_id.replace(".csv", "")
+
+        table_partition_folder = os.path.join(partition_folder, table_id)
+
+        create_partitions.run(
+            data_path=file,
+            partition_directory=table_partition_folder,
+            level="month",
+            partition_date=partition_date,
+        )
+
+        upload_to_datalake.run(
+            input_path=table_partition_folder,
+            dataset_id=dataset_id,
+            table_id=table_id,
+            if_exists="replace",
+            csv_delimiter=";",
+            if_storage_data_exists="replace",
+            biglake_table=True,
+            dump_mode=dump_mode,
+        )
+
+
+
+def download_ftp_cnes(host, user, password, directory, file_name, output_path) +
+
+

Downloads a file from an FTP server.

+

Args

+
+
host : str
+
The FTP server hostname.
+
user : str
+
The FTP server username.
+
password : str
+
The FTP server password.
+
directory : str
+
The directory where the file is located.
+
file_name : str
+
The name of the file to download.
+
output_path : str
+
The local path where the file will be saved.
+
+

Returns

+
+
str
+
The local path where the downloaded file was saved.
+
+
+ +Expand source code + +
@task(max_retries=5, retry_delay=timedelta(seconds=5), timeout=timedelta(seconds=600))
+def download_ftp_cnes(host, user, password, directory, file_name, output_path):
+    """
+    Downloads a file from an FTP server.
+
+    Args:
+        host (str): The FTP server hostname.
+        user (str): The FTP server username.
+        password (str): The FTP server password.
+        directory (str): The directory where the file is located.
+        file_name (str): The name of the file to download.
+        output_path (str): The local path where the file will be saved.
+
+    Returns:
+        str: The local path where the downloaded file was saved.
+    """
+    return download_ftp.run(
+        host=host,
+        user=user,
+        password=password,
+        directory=directory,
+        file_name=file_name,
+        output_path=output_path,
+    )
+
+
+
+def upload_multiple_tables_to_datalake(path_files: str, dataset_id: str, dump_mode: str) +
+
+

Uploads multiple tables to datalake.

+

Args

+
+
path_files : str
+
The path to the files to be uploaded.
+
dataset_id : str
+
The ID of the dataset to upload the files to.
+
dump_mode : str
+
The dump mode to use for the upload.
+
+

Returns

+

None

+
+ +Expand source code + +
@task
+def upload_multiple_tables_to_datalake(
+    path_files: str, dataset_id: str, dump_mode: str
+):
+    """
+    Uploads multiple tables to datalake.
+
+    Args:
+        path_files (str): The path to the files to be uploaded.
+        dataset_id (str): The ID of the dataset to upload the files to.
+        dump_mode (str): The dump mode to use for the upload.
+
+    Returns:
+        None
+    """
+    for n, file in enumerate(path_files):
+        log(f"Uploading {n+1}/{len(path_files)} files to datalake...")
+
+        # retrieve file name from path
+        file_name = os.path.basename(file)
+
+        # replace 6 digits numbers from string
+        table_id = re.sub(r"\d{6}", "", file_name)
+        table_id = table_id.replace(".csv", "")
+
+        upload_to_datalake.run(
+            input_path=file,
+            dataset_id=dataset_id,
+            table_id=table_id,
+            if_exists="replace",
+            csv_delimiter=";",
+            if_storage_data_exists="replace",
+            biglake_table=True,
+            dump_mode=dump_mode,
+        )
+
+
+
+
+
+
+
+ +
+ + + \ No newline at end of file diff --git a/rj_sms/index.html b/rj_sms/index.html index 87c25b632..84fb71da8 100644 --- a/rj_sms/index.html +++ b/rj_sms/index.html @@ -47,6 +47,10 @@

Sub-modules

+
pipelines.rj_sms.dump_ftp_cnes
+
+
+
pipelines.rj_sms.utils

General utilities for SMS pipelines.

@@ -121,6 +125,7 @@

Index

diff --git a/rj_sms/utils.html b/rj_sms/utils.html index 9f2ed4a15..66df47065 100644 --- a/rj_sms/utils.html +++ b/rj_sms/utils.html @@ -148,14 +148,13 @@

Module pipelines.rj_sms.utils

log(f"API data downloaded to {destination_file_path}") + return destination_file_path + else: - log( - f"API call failed. Error: {response.status_code} - {response.reason}", - level="error", + raise ValueError( + f"API call failed, error: {response.status_code} - {response.reason}" ) - return destination_file_path - @task def download_azure_blob( @@ -420,7 +419,9 @@

Module pipelines.rj_sms.utils

@task -def create_partitions(data_path: str, partition_directory: str): +def create_partitions( + data_path: str, partition_directory: str, level="day", partition_date=None +): """ Creates partitions for each file in the given data path and saves them in the partition directory. @@ -428,26 +429,75 @@

Module pipelines.rj_sms.utils

Args: data_path (str): The path to the data directory. partition_directory (str): The path to the partition directory. + level (str): The level of partitioning. Can be "day" or "month". Defaults to "day". Raises: FileExistsError: If the partition directory already exists. + ValueError: If the partition level is not "day" or "month". Returns: None """ - data_path = Path(data_path) - # Load data - files = data_path.glob("*.csv") + + # check if data_path is a directory or a file + if os.path.isdir(data_path): + data_path = Path(data_path) + files = data_path.glob("*.csv") + else: + files = [data_path] # # Create partition directories for each file for file_name in files: - date_str = re.search(r"\d{4}-\d{2}-\d{2}", str(file_name)).group() - parsed_date = datetime.strptime(date_str, "%Y-%m-%d") - ano_particao = parsed_date.strftime("%Y") - mes_particao = parsed_date.strftime("%m") - data_particao = parsed_date.strftime("%Y-%m-%d") + if level == "day": + if partition_date is None: + try: + date_str = re.search(r"\d{4}-\d{2}-\d{2}", str(file_name)).group() + parsed_date = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + log( + "Filename must contain a date in the format YYYY-MM-DD to match partition level", + level="error", + ) # noqa: E501") + else: + try: + parsed_date = datetime.strptime(partition_date, "%Y-%m-%d") + except ValueError: + log( + "Partition date must be in the format YYYY-MM-DD to match partition level", # noqa: E501 + level="error", + ) + + ano_particao = parsed_date.strftime("%Y") + mes_particao = parsed_date.strftime("%m") + data_particao = parsed_date.strftime("%Y-%m-%d") + + output_directory = f"{partition_directory}/ano_particao={int(ano_particao)}/mes_particao={int(mes_particao)}/data_particao={data_particao}" # noqa: E501 + + elif level == "month": + if partition_date is None: + try: + date_str = re.search(r"\d{4}-\d{2}", str(file_name)).group() + parsed_date = datetime.strptime(date_str, "%Y-%m") + except ValueError: + log( + "File must contain a date in the format YYYY-MM", level="error" + ) # noqa: E501") + else: + try: + parsed_date = datetime.strptime(partition_date, "%Y-%m") + except ValueError: + log( + "Partition date must match be in the format YYYY-MM to match partitio level", # noqa: E501 + level="error", + ) + + ano_particao = parsed_date.strftime("%Y") + mes_particao = parsed_date.strftime("%Y-%m") + + output_directory = f"{partition_directory}/ano_particao={int(ano_particao)}/mes_particao={mes_particao}" # noqa: E501 - output_directory = f"{partition_directory}/ano_particao={int(ano_particao)}/mes_particao={int(mes_particao)}/data_particao={data_particao}" # noqa: E501 + else: + raise ValueError("Partition level must be day or month") # Create partition directory if not os.path.exists(output_directory): @@ -455,7 +505,8 @@

Module pipelines.rj_sms.utils

# Copy file(s) to partition directory shutil.copy(file_name, output_directory) - log("Partitions created successfully") + + log("Partitions created successfully") @task @@ -713,7 +764,7 @@

Returns

-def create_partitions(data_path: str, partition_directory: str) +def create_partitions(data_path: str, partition_directory: str, level='day', partition_date=None)

Creates partitions for each file in the given data path and saves them in the @@ -724,11 +775,15 @@

Args

The path to the data directory.
partition_directory : str
The path to the partition directory.
+
level : str
+
The level of partitioning. Can be "day" or "month". Defaults to "day".

Raises

FileExistsError
If the partition directory already exists.
+
ValueError
+
If the partition level is not "day" or "month".

Returns

None

@@ -737,7 +792,9 @@

Returns

Expand source code
@task
-def create_partitions(data_path: str, partition_directory: str):
+def create_partitions(
+    data_path: str, partition_directory: str, level="day", partition_date=None
+):
     """
     Creates partitions for each file in the given data path and saves them in the
     partition directory.
@@ -745,26 +802,75 @@ 

Returns

Args: data_path (str): The path to the data directory. partition_directory (str): The path to the partition directory. + level (str): The level of partitioning. Can be "day" or "month". Defaults to "day". Raises: FileExistsError: If the partition directory already exists. + ValueError: If the partition level is not "day" or "month". Returns: None """ - data_path = Path(data_path) - # Load data - files = data_path.glob("*.csv") + + # check if data_path is a directory or a file + if os.path.isdir(data_path): + data_path = Path(data_path) + files = data_path.glob("*.csv") + else: + files = [data_path] # # Create partition directories for each file for file_name in files: - date_str = re.search(r"\d{4}-\d{2}-\d{2}", str(file_name)).group() - parsed_date = datetime.strptime(date_str, "%Y-%m-%d") - ano_particao = parsed_date.strftime("%Y") - mes_particao = parsed_date.strftime("%m") - data_particao = parsed_date.strftime("%Y-%m-%d") + if level == "day": + if partition_date is None: + try: + date_str = re.search(r"\d{4}-\d{2}-\d{2}", str(file_name)).group() + parsed_date = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + log( + "Filename must contain a date in the format YYYY-MM-DD to match partition level", + level="error", + ) # noqa: E501") + else: + try: + parsed_date = datetime.strptime(partition_date, "%Y-%m-%d") + except ValueError: + log( + "Partition date must be in the format YYYY-MM-DD to match partition level", # noqa: E501 + level="error", + ) + + ano_particao = parsed_date.strftime("%Y") + mes_particao = parsed_date.strftime("%m") + data_particao = parsed_date.strftime("%Y-%m-%d") + + output_directory = f"{partition_directory}/ano_particao={int(ano_particao)}/mes_particao={int(mes_particao)}/data_particao={data_particao}" # noqa: E501 + + elif level == "month": + if partition_date is None: + try: + date_str = re.search(r"\d{4}-\d{2}", str(file_name)).group() + parsed_date = datetime.strptime(date_str, "%Y-%m") + except ValueError: + log( + "File must contain a date in the format YYYY-MM", level="error" + ) # noqa: E501") + else: + try: + parsed_date = datetime.strptime(partition_date, "%Y-%m") + except ValueError: + log( + "Partition date must match be in the format YYYY-MM to match partitio level", # noqa: E501 + level="error", + ) + + ano_particao = parsed_date.strftime("%Y") + mes_particao = parsed_date.strftime("%Y-%m") + + output_directory = f"{partition_directory}/ano_particao={int(ano_particao)}/mes_particao={mes_particao}" # noqa: E501 - output_directory = f"{partition_directory}/ano_particao={int(ano_particao)}/mes_particao={int(mes_particao)}/data_particao={data_particao}" # noqa: E501 + else: + raise ValueError("Partition level must be day or month") # Create partition directory if not os.path.exists(output_directory): @@ -772,7 +878,8 @@

Returns

# Copy file(s) to partition directory shutil.copy(file_name, output_directory) - log("Partitions created successfully")
+ + log("Partitions created successfully")
@@ -970,13 +1077,12 @@

Returns

log(f"API data downloaded to {destination_file_path}") - else: - log( - f"API call failed. Error: {response.status_code} - {response.reason}", - level="error", - ) + return destination_file_path - return destination_file_path
+ else: + raise ValueError( + f"API call failed, error: {response.status_code} - {response.reason}" + )