diff --git a/.gitignore b/.gitignore index ae72bccb3..c40e0e688 100644 --- a/.gitignore +++ b/.gitignore @@ -156,4 +156,6 @@ desktop.ini sap_netweaver_rfc # Databricks-connect + .databricks-connect + diff --git a/CHANGELOG.md b/CHANGELOG.md index 529cf5c22..a25b37732 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] + +## [0.4.9] - 2022-09-27 +### Added +- Added new column named `_viadot_downloaded_at_utc` in genesys files with the datetime when it is created. +- Added sftp source class `SftpConnector` +- Added sftp tasks `SftpToDF` and `SftpList` +- Added sftp flows `SftpToAzureSQL` and `SftpToADLS` +- Added new source file `mindful` to connect with mindful API. +- Added new task file `mindful` to be called by the Mindful Flow. +- Added new flow file `mindful_to_adls` to upload data from Mindful API tp ADLS. +- Added `recursive` parameter to `AzureDataLakeList` task + ## [0.4.8] - 2022-09-06 ### Added - Added `protobuf` library to requirements + ## [0.4.7] - 2022-09-06 ### Added - Added new flow - `SQLServerTransform` and new task `SQLServerQuery` to run queries on SQLServer diff --git a/tests/integration/flows/test_sharepoint_to_adls.py b/tests/integration/flows/test_sharepoint_to_adls.py index 14930a412..eaef177ec 100644 --- a/tests/integration/flows/test_sharepoint_to_adls.py +++ b/tests/integration/flows/test_sharepoint_to_adls.py @@ -83,8 +83,3 @@ def test_sharepoint_to_adls_run_flow_overwrite_false(): assert result.is_failed() os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.csv") os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.json") - - rm = AzureDataLakeRemove( - path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" - ) - rm.run(sp_credentials_secret=CREDENTIALS_SECRET) diff --git a/tests/integration/tasks/test_azure_data_lake.py b/tests/integration/tasks/test_azure_data_lake.py index 77f97dee5..e7185af38 100644 --- a/tests/integration/tasks/test_azure_data_lake.py +++ b/tests/integration/tasks/test_azure_data_lake.py @@ -1,6 +1,8 @@ import os import uuid import pytest +from unittest import mock + from viadot.sources import AzureDataLake from viadot.tasks import ( @@ -24,6 +26,23 @@ FILE_NAME_PARQUET = f"test_file_{uuid_4}.parquet" ADLS_PATH_PARQUET = f"raw/supermetrics/{FILE_NAME_PARQUET}" +ADLS_TEST_PATHS = [ + "raw/tests/alds_test_new_fnc/2020/02/01/final_df2.csv", + "raw/tests/alds_test_new_fnc/2020/02/01/test_new_fnc.csv", + "raw/tests/alds_test_new_fnc/2020/02/02/final_df2.csv", + "raw/tests/alds_test_new_fnc/2020/02/02/test_new_fnc.csv", + "raw/tests/alds_test_new_fnc/2021/12/01/final_df2.csv", + "raw/tests/alds_test_new_fnc/2021/12/01/test_new_fnc.csv", + "raw/tests/alds_test_new_fnc/2022/06/21/final_df2.csv", + "raw/tests/alds_test_new_fnc/2022/06/21/test_new_fnc.csv", + "raw/tests/alds_test_new_fnc/2022/08/12/final_df2.csv", + "raw/tests/alds_test_new_fnc/2022/08/12/test_new_fnc.csv", + "raw/tests/alds_test_new_fnc/test_folder/final_df2.csv", + "raw/tests/alds_test_new_fnc/test_folder/test_new_fnc.csv", + "raw/tests/alds_test_new_fnc/test_folder_2/final_df2.csv", + "raw/tests/alds_test_new_fnc/test_folder_2/test_new_fnc.csv", +] + @pytest.mark.dependency() def test_azure_data_lake_upload(TEST_CSV_FILE_PATH): @@ -74,6 +93,23 @@ def test_azure_data_lake_list(): assert ADLS_PATH in files +def test_azure_data_lake_list_recursive(): + list_task = AzureDataLakeList() + files = list_task.run(path="raw/tests/alds_test_new_fnc/", recursive=True) + assert isinstance(files, list) + + +def test_azure_data_lake_list_paths(): + + with mock.patch.object( + AzureDataLakeList, "run", return_value=ADLS_TEST_PATHS + ) as mock_method: + + list_task = AzureDataLakeList(path="raw/tests/alds_test_new_fnc/") + files = list_task.run(recursive=True) + assert files == ADLS_TEST_PATHS + + @pytest.mark.dependency(depends=["test_azure_data_lake_upload"]) def test_azure_data_lake_remove(): file = AzureDataLake(ADLS_PATH) diff --git a/tests/integration/tasks/test_mindful.py b/tests/integration/tasks/test_mindful.py new file mode 100644 index 000000000..0f5a4e275 --- /dev/null +++ b/tests/integration/tasks/test_mindful.py @@ -0,0 +1,27 @@ +import os +import pytest +from unittest import mock +from viadot.tasks import MindfulToCSV + + +class MockClass: + status_code = 200 + content = b'[{"id":7277599,"survey_id":505,"phone_number":"","survey_type":"inbound"},{"id":7277294,"survey_id":504,"phone_number":"","survey_type":"web"}]' + + +@pytest.mark.init +def test_instance_mindful(): + mf = MindfulToCSV() + assert isinstance(mf, MindfulToCSV) + + +@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) +@pytest.mark.run +def test_mindful_run(mock_interactions): + mf = MindfulToCSV() + mf.run() + mock_interactions.call_count == 2 + assert os.path.exists("interactions.csv") + os.remove("interactions.csv") + assert os.path.exists("responses.csv") + os.remove("responses.csv") diff --git a/tests/integration/test_mindful.py b/tests/integration/test_mindful.py new file mode 100644 index 000000000..c63069a3d --- /dev/null +++ b/tests/integration/test_mindful.py @@ -0,0 +1,85 @@ +import os +import pytest +from unittest import mock +from viadot.sources import Mindful +from viadot.config import local_config + +credentials_mindful = local_config["MINDFUL"] + + +class MockClass: + status_code = 200 + content = b'[{"id":7277599,"survey_id":505,"phone_number":"","survey_type":"inbound"},{"id":7277294,"survey_id":504,"phone_number":"","survey_type":"web"}]' + + def json(): + test = [ + { + "id": 7277599, + "survey_id": 505, + "phone_number": "", + "survey_type": "inbound", + }, + {"id": 7277294, "survey_id": 504, "phone_number": "", "survey_type": "web"}, + ] + return test + + +@pytest.mark.init +def test_instance_mindful(): + mf = Mindful(credentials_mindful=credentials_mindful) + assert isinstance(mf, Mindful) + + +@pytest.mark.init +def test_credentials_instance(): + mf = Mindful(credentials_mindful=credentials_mindful) + assert mf.credentials_mindful != None and isinstance(mf.credentials_mindful, dict) + + +@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) +@pytest.mark.connect +def test_mindful_api_response(mock_connection): + mf = Mindful(credentials_mindful=credentials_mindful) + mf.get_interactions_list() + mf.get_responses_list() + mock_connection.call_count == 2 + + +@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) +@pytest.mark.connect +def test_mindful_api_response2(mock_api_response): + mf = Mindful(credentials_mindful=credentials_mindful) + response = mf.get_interactions_list() + + assert response.status_code == 200 and isinstance(response.json(), list) + assert mf.endpoint == "interactions" + + +@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass) +@pytest.mark.connect +def test_mindful_api_response3(mock_api_response): + mf = Mindful(credentials_mindful=credentials_mindful) + response = mf.get_responses_list() + + assert response.status_code == 200 and isinstance(response.json(), list) + assert mf.endpoint == "responses" + + +@mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass) +@pytest.mark.save +def test_mindful_interactions(mock_connection): + mf = Mindful(credentials_mindful=credentials_mindful) + response = mf.get_interactions_list() + mf.response_to_file(response) + assert os.path.exists("interactions.csv") + os.remove("interactions.csv") + + +@mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass) +@pytest.mark.save +def test_mindful_responses(mock_connection): + mf = Mindful(credentials_mindful=credentials_mindful) + response = mf.get_responses_list() + mf.response_to_file(response) + assert os.path.exists("responses.csv") + os.remove("responses.csv") diff --git a/tests/integration/test_outlook.py b/tests/integration/test_outlook.py index 883d8c2ad..e435a6c89 100644 --- a/tests/integration/test_outlook.py +++ b/tests/integration/test_outlook.py @@ -1,5 +1,6 @@ from viadot.config import local_config from viadot.sources import Outlook +import pandas as pd def test_outlook_to_df(): @@ -10,5 +11,4 @@ def test_outlook_to_df(): end_date="2022-04-29", ) df = outlook.to_df() - assert df.shape[1] == 10 - assert df.empty == False + assert isinstance(df, pd.DataFrame) diff --git a/tests/integration/test_sftp.py b/tests/integration/test_sftp.py new file mode 100644 index 000000000..eb0921790 --- /dev/null +++ b/tests/integration/test_sftp.py @@ -0,0 +1,123 @@ +from viadot.sources.sftp import SftpConnector +from viadot.tasks.sftp import SftpToDF, SftpList +import pytest +import pandas as pd +from unittest import mock +from pytest import fixture, raises +import io + + +@pytest.fixture +def tmp_df(): + data = {"country": [1, 2], "sales": [3, 4]} + df = pd.DataFrame(data=data) + return df + + +@pytest.fixture +def list_of_paths(): + + list_of_paths = [ + "Country__Context30##exported.tsv", + "Country__Context31##exported.tsv", + "Country__Context32##exported.tsv", + "Country__Context33##exported.tsv", + "Country__Context4##exported.tsv", + "Country__Context6##exported.tsv", + "Country__Context7##exported.tsv", + "Country__Context8##exported.tsv", + "Local_Products.csv", + "Products Checkup.tsv", + "Products.tsv", + "RewardTest.csv", + ] + return list_of_paths + + +@pytest.fixture +def df_buf(): + s_buf = io.StringIO() + + data = {"country": [1, 2], "sales": [3, 4]} + df = pd.DataFrame(data=data) + df.to_csv(s_buf) + return s_buf + + +def test_create_sftp_instance(): + s = SftpConnector( + credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4} + ) + assert s + + +def test_connection_sftp(tmp_df): + with mock.patch("viadot.sources.sftp.SftpConnector.to_df") as mock_method: + mock_method.return_value = tmp_df + s = SftpConnector( + credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4} + ) + + final_df = s.to_df() + assert isinstance(final_df, pd.DataFrame) + + +def test_getfo_file(df_buf): + with mock.patch("viadot.sources.sftp.SftpConnector.getfo_file") as mock_method: + mock_method.return_value = df_buf + s = SftpConnector( + credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4} + ) + + buffer_df = s.getfo_file() + buffer_df.seek(0) + df = pd.read_csv(buffer_df) + assert isinstance(df, pd.DataFrame) + + +def test_ls_sftp(list_of_paths): + with mock.patch("viadot.sources.sftp.SftpConnector.list_directory") as mock_method: + mock_method.return_value = list_of_paths + s = SftpConnector( + credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4} + ) + + paths = s.list_directory() + + assert isinstance(paths, list) + + +def test_get_exported_files(list_of_paths): + with mock.patch.object( + SftpConnector, "get_exported_files", return_value=list_of_paths + ) as mock_method: + + s = SftpConnector( + credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4} + ) + filtered_paths = s.get_exported_files() + + assert isinstance(filtered_paths, list) + + +def test_task_sftptodf(tmp_df): + task = SftpToDF() + with mock.patch.object(SftpToDF, "run", return_value=tmp_df) as mock_method: + df = task.run() + + assert isinstance(df, pd.DataFrame) + + +def test_task_sftplist(list_of_paths): + task = SftpList() + with mock.patch.object(SftpList, "run", return_value=list_of_paths) as mock_method: + list_directory = task.run() + + assert isinstance(list_directory, list) + + +def test_example(): + with mock.patch("viadot.sources.sftp.SftpConnector.to_df") as mock_method: + mock_method.return_value = tmp_df + t = SftpToDF() + t.run() diff --git a/tests/test_viadot.py b/tests/test_viadot.py index d9bcf2dc3..e63109bf9 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.8" + assert __version__ == "0.4.9" diff --git a/viadot/__init__.py b/viadot/__init__.py index a3a9bd544..574c06661 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.8" +__version__ = "0.4.9" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index ec43d8523..bdba508a3 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -33,3 +33,6 @@ from .epicor_to_duckdb import EpicorOrdersToDuckDB from .sql_server_transform import SQLServerTransform from .sql_server_to_duckdb import SQLServerToDuckDB + +from .sftp_operations import SftpToAzureSQL, SftpToADLS +from .mindful_to_adls import MindfulToADLS diff --git a/viadot/flows/adls_to_azure_sql.py b/viadot/flows/adls_to_azure_sql.py index 26c8e67aa..9cc371b5e 100644 --- a/viadot/flows/adls_to_azure_sql.py +++ b/viadot/flows/adls_to_azure_sql.py @@ -142,6 +142,7 @@ def __init__( tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"]. vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. """ + adls_path = adls_path.strip("/") # Read parquet if adls_path.split(".")[-1] in ["csv", "parquet"]: @@ -299,7 +300,6 @@ def gen_flow(self) -> Flow: df_reorder.set_upstream(lake_to_df_task, flow=self) df_to_csv.set_upstream(df_reorder, flow=self) promote_to_conformed_task.set_upstream(df_to_csv, flow=self) - promote_to_conformed_task.set_upstream(df_to_csv, flow=self) create_table_task.set_upstream(df_to_csv, flow=self) promote_to_operations_task.set_upstream(promote_to_conformed_task, flow=self) bulk_insert_task.set_upstream(create_table_task, flow=self) diff --git a/viadot/flows/genesys_to_adls.py b/viadot/flows/genesys_to_adls.py index 2dc39f08f..9e71131fb 100644 --- a/viadot/flows/genesys_to_adls.py +++ b/viadot/flows/genesys_to_adls.py @@ -1,5 +1,6 @@ from typing import Any, Dict, List, Literal +import pandas as pd from prefect import Flow, task from viadot.task_utils import df_to_csv @@ -44,6 +45,20 @@ def adls_bulk_upload( ) +@task +def add_timestamp(files_names: List = None, sep: str = None) -> None: + """Add new column _viadot_downloaded_at_utc into every genesys file. + + Args: + files_names (List, optional): All file names of downloaded files. Defaults to None. + sep (str, optional): Separator in csv file. Defaults to None. + """ + for file in files_names: + df = pd.read_csv(file, sep=sep) + df_updated = add_ingestion_metadata_task.run(df) + df_updated.to_csv(file, index=False, sep=sep) + + class GenesysToADLS(Flow): def __init__( self, @@ -54,6 +69,7 @@ def __init__( start_date: str = None, end_date: str = None, days_interval: int = 1, + sep: str = "\t", environment: str = None, schedule_id: str = None, report_url: str = None, @@ -77,6 +93,7 @@ def __init__( start_date (str, optional): Start date of the report. Defaults to None. end_date (str, optional): End date of the report. Defaults to None. days_interval (int, optional): How many days report should include. Defaults to 1. + sep (str, optional): Separator in csv file. Defaults to "\t". environment (str, optional): Adress of host server. Defaults to None than will be used enviroment from credentials. schedule_id (str, optional): The ID of report. Defaults to None. @@ -101,6 +118,7 @@ def __init__( self.start_date = start_date self.end_date = end_date self.days_interval = days_interval + self.sep = sep # AzureDataLake self.local_file_path = local_file_path self.adls_file_path = adls_file_path @@ -128,6 +146,8 @@ def gen_flow(self) -> Flow: flow=self, ) + add_timestamp.bind(file_names, sep=self.sep, flow=self) + uploader = adls_bulk_upload( file_names=file_names, adls_file_path=self.adls_file_path, @@ -135,7 +155,8 @@ def gen_flow(self) -> Flow: flow=self, ) - uploader.set_upstream(file_names, flow=self) + add_timestamp.set_upstream(file_names, flow=self) + uploader.set_upstream(add_timestamp, flow=self) class GenesysReportToADLS(Flow): diff --git a/viadot/flows/mindful_to_adls.py b/viadot/flows/mindful_to_adls.py new file mode 100644 index 000000000..def23cd95 --- /dev/null +++ b/viadot/flows/mindful_to_adls.py @@ -0,0 +1,119 @@ +import os +from typing import Any, Dict, List, Literal + +from datetime import datetime +from prefect import Flow, task + +from viadot.tasks import MindfulToCSV +from viadot.tasks import AzureDataLakeUpload + +file_to_adls_task = AzureDataLakeUpload() + + +@task +def adls_bulk_upload( + file_names: List[str], + file_name_relative_path: str = "", + adls_file_path: str = None, + adls_sp_credentials_secret: str = None, + adls_overwrite: bool = True, +) -> List[str]: + """Function that upload files to defined path in ADLS. + + Args: + file_names (List[str]): List of file names to generate paths. + file_name_relative_path (str, optional): Path where to save the file locally. Defaults to ''. + adls_file_path (str, optional): Azure Data Lake path. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + adls_overwrite (bool, optional): Whether to overwrite files in the data lake. Defaults to True. + + Returns: + List[str]: List of paths + """ + + for file in file_names: + file_path = str(adls_file_path + "/" + file) + file_to_adls_task.run( + from_path=os.path.join(file_name_relative_path, file), + to_path=file_path, + sp_credentials_secret=adls_sp_credentials_secret, + overwrite=adls_overwrite, + ) + + +class MindfulToADLS(Flow): + def __init__( + self, + name: str, + credentials_mindful: Dict[str, Any] = None, + credentials_secret: str = None, + start_date: datetime = None, + end_date: datetime = None, + date_interval: int = 1, + file_extension: Literal["parquet", "csv"] = "csv", + file_path: str = "", + adls_file_path: str = None, + adls_overwrite: bool = True, + adls_sp_credentials_secret: str = None, + *args: List[any], + **kwargs: Dict[str, Any] + ): + """Mindful flow to download the CSV files and upload them to ADLS. + + Args: + name (str): The name of the Flow. + credentials_mindful (Dict[str, Any], optional): Credentials to connect with Mindful API. Defaults to None. + credentials_secret (str, optional): Name of the credential secret to retreave the credentials. Defaults to None. + start_date (datetime, optional): Start date of the request. Defaults to None. + end_date (datetime, optional): End date of the resquest. Defaults to None. + date_interval (int, optional): How many days are included in the request. + If end_date is passed as an argument, date_interval will be invalidated. Defaults to 1. + file_extension (Literal[parquet, csv], optional): File extensions for storing responses. Defaults to "csv". + file_path (str, optional): Path where to save the file locally. Defaults to ''. + adls_file_path (str, optional): The destination path at ADLS. Defaults to None. + adls_overwrite (bool, optional): Whether to overwrite files in the data lake. Defaults to True. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + """ + + self.credentials_mindful = credentials_mindful + self.credentials_secret = credentials_secret + self.start_date = start_date + self.end_date = end_date + self.date_interval = date_interval + self.file_extension = file_extension + self.file_path = file_path + + self.adls_file_path = adls_file_path + self.adls_overwrite = adls_overwrite + self.adls_sp_credentials_secret = adls_sp_credentials_secret + + super().__init__(*args, name=name, **kwargs) + + self.mind_flow() + + def mind_flow(self) -> Flow: + to_csv = MindfulToCSV() + + file_names = to_csv.bind( + credentials_mindful=self.credentials_mindful, + credentials_secret=self.credentials_secret, + start_date=self.start_date, + end_date=self.end_date, + date_interval=self.date_interval, + file_extension=self.file_extension, + file_path=self.file_path, + flow=self, + ) + + uploader = adls_bulk_upload( + file_names=file_names, + file_name_relative_path=self.file_path, + adls_file_path=self.adls_file_path, + adls_sp_credentials_secret=self.adls_sp_credentials_secret, + adls_overwrite=self.adls_overwrite, + flow=self, + ) + + uploader.set_upstream(file_names, flow=self) diff --git a/viadot/flows/outlook_to_adls.py b/viadot/flows/outlook_to_adls.py index c79a4412b..bd9e3cf8d 100644 --- a/viadot/flows/outlook_to_adls.py +++ b/viadot/flows/outlook_to_adls.py @@ -12,7 +12,6 @@ from ..tasks import AzureDataLakeUpload, OutlookToDF file_to_adls_task = AzureDataLakeUpload() -outlook_to_df = OutlookToDF() class OutlookToADLS(Flow): @@ -28,6 +27,7 @@ def __init__( overwrite_adls: bool = True, adls_sp_credentials_secret: str = None, limit: int = 10000, + timeout: int = 1200, if_exists: Literal["append", "replace", "skip"] = "append", *args: List[Any], **kwargs: Dict[str, Any], @@ -47,6 +47,7 @@ def __init__( adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. limit (int, optional): Number of fetched top messages. Defaults to 10000. + timeout (int, optional): The amount of time (in seconds) to wait while running this task before a timeout occurs. Defaults to 1200. if_exists (Literal['append', 'replace', 'skip'], optional): What to do if the local file already exists. Defaults to "append". """ @@ -54,6 +55,7 @@ def __init__( self.start_date = start_date self.end_date = end_date self.limit = limit + self.timeout = timeout self.local_file_path = local_file_path self.if_exsists = if_exists @@ -71,6 +73,8 @@ def gen_outlook_df( self, mailbox_list: Union[str, List[str]], flow: Flow = None ) -> Task: + outlook_to_df = OutlookToDF(timeout=self.timeout) + df = outlook_to_df.bind( mailbox_name=mailbox_list, start_date=self.start_date, diff --git a/viadot/flows/sftp_operations.py b/viadot/flows/sftp_operations.py new file mode 100644 index 000000000..0eda548be --- /dev/null +++ b/viadot/flows/sftp_operations.py @@ -0,0 +1,249 @@ +import json +from typing import Any, Dict, List, Literal + +import prefect +from prefect import Task, Flow +from prefect.tasks.secrets import PrefectSecret +from viadot.flows.adls_to_azure_sql import df_to_csv_task + +from viadot.config import local_config +from viadot.sources.sftp import SftpConnector +from viadot.tasks.sftp import SftpToDF +from viadot.tasks import AzureDataLakeUpload, AzureSQLCreateTable, BCPTask +from viadot.task_utils import add_ingestion_metadata_task + + +upload_to_adls = AzureDataLakeUpload() +create_table_task = AzureSQLCreateTable() +bulk_insert_task = BCPTask() + + +class SftpToAzureSQL(Flow): + def __init__( + self, + name: str, + from_path: str = None, + file_name: str = None, + columns: List[str] = None, + sep: str = "\t", + remove_tab: bool = True, + dtypes: Dict[str, Any] = None, + table: str = None, + schema: str = None, + if_exists: Literal["fail", "replace", "append", "delete"] = "fail", + sftp_credentials_secret: Dict[str, Any] = None, + sftp_credentials: Dict[str, Any] = None, + sqldb_credentials_secret: str = None, + on_bcp_error: Literal["skip", "fail"] = "fail", + error_log_file_path: str = "SFTP_logs.log", + vault_name: str = None, + *args, + **kwargs, + ): + """ + Bulk insert data from SFTP server into an Azure SQL Database table. + This task also creates the table if it doesn't exist. + + Args: + name (str): The name of the flow. + from_path (str, optional): Path to the file in SFTP server. Defaults to None. + file_name (str, optional): File name for local file. Defaults to None. + columns (List[str], optional): Columns to read from the file. Defaults to None. + sep (str, optional): The separator to use to read the CSV file. Defaults to "\t". + remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False. + dtypes (dict, optional): Which custom data types should be used for SQL table creation task. + table (str, optional): Destination table. Defaults to None. + schema (str, optional): Destination schema. Defaults to None. + if_exists (Literal, optional): What to do if the table already exists. Defaults to "replace". + sftp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary credentials for SFTP connection. Defaults to None. + sftp_credentials (Dict[str, Any], optional): SFTP server credentials. Defaults to None. + sqldb_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + on_bcp_error (Literal["skip", "fail"], optional): What to do if error occurs. Defaults to "fail". + error_log_file_path (string, optional): Full path of an error file. Defaults to "./log_file.log". + vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + """ + # SFTP + self.from_path = from_path + self.sftp_credentials_secret = sftp_credentials_secret + self.sftp_credentials = sftp_credentials + self.columns = columns + # File args + if file_name is None: + self.file_name = from_path.split("/")[-1] + + else: + self.file_name = file_name + + self.sep = sep + self.remove_tab = remove_tab + + # Read schema + self.schema = schema + self.table = table + self.dtypes = dtypes + + # AzureSQLCreateTable + self.table = table + self.schema = schema + self.if_exists = if_exists + self.vault_name = vault_name + + # BCPTask + self.sqldb_credentials_secret = sqldb_credentials_secret + self.on_bcp_error = on_bcp_error + self.error_log_file_path = error_log_file_path + + super().__init__( + name=name, + *args, + **kwargs, + ) + + self.gen_flow() + + @staticmethod + def _map_if_exists(if_exists: str) -> str: + mapping = {"append": "skip"} + return mapping.get(if_exists, if_exists) + + def __call__(self, *args, **kwargs): + """Download file-like object from SFTP server and load data into Azure SQL database.""" + return super().__call__(*args, **kwargs) + + def gen_flow(self) -> Flow: + ftp = SftpToDF( + sftp_credentials_secret=self.sftp_credentials_secret, + credentials=self.sftp_credentials, + ) + df = ftp.bind( + from_path=self.from_path, + columns=self.columns, + flow=self, + ) + df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + df_to_csv_task.bind( + df=df_with_metadata, + remove_tab=self.remove_tab, + path=self.file_name, + flow=self, + ) + + create_table_task.bind( + schema=self.schema, + table=self.table, + dtypes=self.dtypes, + if_exists=self._map_if_exists(self.if_exists), + credentials_secret=self.sqldb_credentials_secret, + vault_name=self.vault_name, + flow=self, + ) + + bulk_insert_task.bind( + path=self.file_name, + schema=self.schema, + table=self.table, + error_log_file_path=self.error_log_file_path, + on_error=self.on_bcp_error, + credentials_secret=self.sqldb_credentials_secret, + flow=self, + ) + + df_with_metadata.set_upstream(df, flow=self) + df_to_csv_task.set_upstream(df_with_metadata, flow=self) + create_table_task.set_upstream(df_to_csv_task, flow=self) + bulk_insert_task.set_upstream(create_table_task, flow=self) + + +class SftpToADLS(Flow): + def __init__( + self, + name: str, + from_path: str = None, + file_name: str = None, + sep: str = "\t", + remove_tab: bool = True, + overwrite: bool = True, + to_path: str = None, + columns: List[str] = None, + sftp_credentials_secret: Dict[str, Any] = None, + sftp_credentials: Dict[str, Any] = None, + sp_credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + """ + Bulk insert data from SFTP server into an Azure SQL Database table. + This task also creates the table if it doesn't exist. + + Args: + name (str): The name of the flow. + from_path (str, optional): Path to the file in SFTP server. Defaults to None. + file_name (str, optional): File name for local file. Defaults to None. + sep (str, optional): The separator to use to read the CSV file. Defaults to "\t". + remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False. + overwrite (bool, optional): Whether to overwrite files in the lake. Defaults to False. + to_path (str, optional): The destination path in ADLS. Defaults to None. + columns (List[str], optional): Columns to read from the file. Defaults to None. + sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary. + sftp_credentials (Dict[str, Any], optional): SFTP server credentials. Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + """ + # SFTP + self.from_path = from_path + self.sftp_credentials_secret = sftp_credentials_secret + self.sftp_credentials = sftp_credentials + self.columns = columns + # File args + if file_name is None: + self.file_name = from_path.split("/")[-1] + + else: + self.file_name = file_name + + self.sep = sep + self.remove_tab = remove_tab + + # ADLS + self.to_path = to_path + self.sp_credentials_secret = sp_credentials_secret + self.vault_name = vault_name + self.overwrite = overwrite + + super().__init__( + name=name, + *args, + **kwargs, + ) + + self.gen_flow() + + def __call__(self, *args, **kwargs): + """Download file-like object from SFTP server and load data into ADLS.""" + return super().__call__(*args, **kwargs) + + def gen_flow(self) -> Flow: + ftp = SftpToDF( + sftp_credentials_secret=self.sftp_credentials_secret, + credentials=self.sftp_credentials, + ) + df = ftp.bind( + from_path=self.from_path, + columns=self.columns, + flow=self, + ) + df_to_csv_task.bind( + df=df, remove_tab=self.remove_tab, path=self.file_name, flow=self + ) + + upload_df = upload_to_adls.bind( + from_path=self.file_name, + to_path=self.to_path, + sp_credentials_secret=self.sp_credentials_secret, + overwrite=self.overwrite, + vault_name=self.vault_name, + flow=self, + ) + + df_to_csv_task.set_upstream(df, flow=self) + upload_df.set_upstream(df_to_csv_task, flow=self) diff --git a/viadot/sources/__init__.py b/viadot/sources/__init__.py index 1d2d9164e..c64f2c335 100644 --- a/viadot/sources/__init__.py +++ b/viadot/sources/__init__.py @@ -8,6 +8,8 @@ from .sharepoint import Sharepoint from .supermetrics import Supermetrics from .genesys import Genesys +from .sftp import SftpConnector + try: from .sap_rfc import SAPRFC @@ -18,6 +20,7 @@ from .epicor import Epicor from .sql_server import SQLServer from .sqlite import SQLite +from .mindful import Mindful # APIS from .uk_carbon_intensity import UKCarbonIntensity diff --git a/viadot/sources/azure_data_lake.py b/viadot/sources/azure_data_lake.py index 4e53cdd1a..130a163cb 100644 --- a/viadot/sources/azure_data_lake.py +++ b/viadot/sources/azure_data_lake.py @@ -181,8 +181,26 @@ def ls(self, path: str = None) -> List[str]: path (str, optional): Path to a folder. Defaults to None. """ path = path or self.path + return self.fs.ls(path) + def find(self, path: str = None) -> List[str]: + """ + Returns list of files in a path using recursive method. + + Args: + path (str, optional): Path to a folder. Defaults to None. + + Returns: + List[str]: List of paths. + + """ + path = path or self.path + + files_path_list_raw = self.fs.find(path) + paths_list = [p for p in files_path_list_raw if not p.endswith("/")] + return paths_list + def rm(self, path: str = None, recursive: bool = False): """ Deletes a file or directory from the path. diff --git a/viadot/sources/mindful.py b/viadot/sources/mindful.py new file mode 100644 index 000000000..2d5af0b32 --- /dev/null +++ b/viadot/sources/mindful.py @@ -0,0 +1,225 @@ +import os +from io import StringIO +from typing import Any, Dict, Literal + +import prefect +import pandas as pd +from datetime import datetime, timedelta +from requests.models import Response + +from viadot.sources.base import Source +from viadot.utils import handle_api_response +from viadot.exceptions import APIError + + +class Mindful(Source): + def __init__( + self, + credentials_mindful: Dict[str, Any] = None, + region: Literal["us1", "us2", "us3", "ca1", "eu1", "au1"] = "eu1", + start_date: datetime = None, + end_date: datetime = None, + date_interval: int = 1, + file_extension: Literal["parquet", "csv"] = "csv", + *args, + **kwargs, + ) -> None: + """Mindful connector which allows listing and downloading into Data Frame or specified format output. + + Args: + credentials_mindful (Dict[str, Any], optional): Credentials to connect with Mindful API. Defaults to None. + region (Literal[us1, us2, us3, ca1, eu1, au1], optional): SD region from where to interact with the mindful API. Defaults to "eu1". + start_date (datetime, optional): Start date of the request. Defaults to None. + end_date (datetime, optional): End date of the resquest. Defaults to None. + date_interval (int, optional): How many days are included in the request. + If end_date is passed as an argument, date_interval will be invalidated. Defaults to 1. + file_extension (Literal[parquet, csv], optional): File extensions for storing responses. Defaults to "csv". + """ + self.logger = prefect.context.get("logger") + + self.credentials_mindful = credentials_mindful + + super().__init__(*args, credentials=self.credentials_mindful, **kwargs) + + if region != "us1": + self.region = region + "." + else: + self.region = "" + + if not isinstance(start_date, datetime): + self.start_date = datetime.now() - timedelta(days=date_interval) + self.end_date = self.start_date + timedelta(days=date_interval) + self.logger.info( + f"Mindful start_date variable is None or not in datetime format, it has been taken as: {self.start_date}." + ) + self.logger.info( + f"Mindful end_date variable has been also taken as: {self.end_date}." + ) + elif isinstance(start_date, datetime) and not isinstance(end_date, datetime): + self.start_date = start_date + self.end_date = start_date + timedelta(days=date_interval) + if self.end_date > datetime.now(): + self.end_date = datetime.now() + self.logger.info( + f"Mindful end_date variable is None or not in datetime format, it has been taken as: {self.end_date}." + ) + elif start_date >= end_date: + raise ValueError( + f"start_date variable must be lower than end_date variable." + ) + else: + self.start_date = start_date + self.end_date = end_date + self.logger.info( + f"Mindful files to download will store data from {self.start_date} to {self.end_date}." + ) + + self.file_extension = file_extension + self.header = { + "Authorization": f"Bearer {self.credentials.get('VAULT')}", + } + + def _mindful_api_response( + self, + params: Dict[str, Any] = None, + endpoint: str = "", + **kwargs, + ) -> Response: + """Basic call to Mindful API given an endpoint. + + Args: + params (Dict[str, Any], optional): Parameters to be passed into the request. Defaults to None. + endpoint (str, optional): API endpoint for an individual request. Defaults to "". + + Returns: + Response: request object with the response from the Mindful API. + """ + + response = handle_api_response( + url=f"https://{self.region}surveydynamix.com/api/{endpoint}", + params=params, + headers=self.header, + method="GET", + ) + + return response + + def get_interactions_list( + self, + limit: int = 1000, + **kwargs, + ) -> Response: + """Gets a list of survey interactions as a JSON array of interaction resources. + + Args: + limit (int, optional): The number of matching interactions to return. Defaults to 1000. + + Returns: + Response: request object with the response from the Mindful API. + """ + + self.endpoint = "interactions" + params = { + "_limit": limit, + "start_date": f"{self.start_date}", + "end_date": f"{self.end_date}", + } + + response = self._mindful_api_response( + endpoint=self.endpoint, + params=params, + ) + + if response.status_code == 200: + self.logger.info( + "Succesfully downloaded interactions data from mindful API." + ) + elif response.status_code == 204 and not response.content.decode(): + self.logger.warning( + f"Thera are not interactions data to download from {self.start_date} to {self.end_date}." + ) + else: + self.logger.error( + f"Failed to downloaded interactions data. - {response.content}" + ) + raise APIError("Failed to downloaded interactions data.") + + return response + + def get_responses_list( + self, + limit: int = 1000, + **kwargs, + ) -> Response: + """Gets a list of survey responses associated with a survey, question, or interaction resource. + + Args: + limit (int, optional): The number of matching interactions to return. Defaults to 1000. + + Returns: + Response: request object with the response from the Mindful API. + """ + + self.endpoint = "responses" + params = { + "_limit": limit, + "start_date": f"{self.start_date}", + "end_date": f"{self.end_date}", + } + + response = self._mindful_api_response( + endpoint=self.endpoint, + params=params, + ) + + if response.status_code == 200: + self.logger.info("Succesfully downloaded responses data from mindful API.") + elif response.status_code == 204 and not response.content.decode(): + self.logger.warning( + f"Thera are not responses data to download from {self.start_date} to {self.end_date}." + ) + else: + self.logger.error( + f"Failed to downloaded responses data. - {response.content}" + ) + raise APIError("Failed to downloaded responses data.") + + return response + + def response_to_file( + self, + response: Response, + file_name: str = None, + file_path: str = "", + sep: str = "\t", + ) -> str: + """Save Mindful response data to file and return filename. + + Args: + response (Response): Request object with the response from the Mindful API. + file_name (str, optional): Name of the file where saving data. Defaults to None. + file_path (str, optional): Path where to save the file locally. Defaults to ''. + sep (str, optional): Separator in csv file. Defaults to "\t". + + returns + str: the absolute path of the downloaded file. + """ + + data_frame = pd.read_json(StringIO(response.content.decode("utf-8"))) + if file_name is None: + complete_file_name = f"{self.endpoint}.{self.file_extension}" + absolute_path = os.path.join(file_path, complete_file_name) + else: + complete_file_name = f"{file_name}.{self.file_extension}" + absolute_path = os.path.join(file_path, complete_file_name) + + if self.file_extension == "csv": + data_frame.to_csv(absolute_path, index=False, sep=sep) + elif self.file_extension == "parquet": + data_frame.to_parquet(absolute_path, index=False) + else: + self.logger.warning( + "File extension is not available, please choose file_extension: 'parquet' or 'csv' (def.) at Mindful instance." + ) + + return complete_file_name diff --git a/viadot/sources/sftp.py b/viadot/sources/sftp.py new file mode 100644 index 000000000..6c51fc037 --- /dev/null +++ b/viadot/sources/sftp.py @@ -0,0 +1,216 @@ +import paramiko +import prefect +from typing import Any, Dict, List +from io import StringIO, BytesIO +import prefect +import pandas as pd +from viadot.sources.base import Source +from pathlib import Path +import time +import os +from stat import S_ISDIR +from collections import defaultdict +from ..exceptions import CredentialError +import itertools + + +class SftpConnector(Source): + def __init__( + self, + file_name: str = None, + credentials_sftp: Dict[str, Any] = None, + *args: List[Any], + **kwargs: Dict[str, Any], + ): + """ + STFP connector which allows for download data files, listing and downloading into Data Frame. + + Args: + file_name (str, optional): File name to download. Defaults to None. + credentials_sftp (Dict[str, Any], optional): SFTP server credentials. Defaults to None. + + Raises: + CredentialError: If credentials are not provided in local_config or directly as a parameter. + """ + + self.logger = prefect.context.get("logger") + + self.credentials_sftp = credentials_sftp + + if self.credentials_sftp is None: + raise CredentialError("Credentials not found.") + + self.file_name = file_name + + self.conn = None + self.hostname = self.credentials_sftp.get("HOSTNAME") + self.username = self.credentials_sftp.get("USERNAME") + self.password = self.credentials_sftp.get("PASSWORD") + self.port = self.credentials_sftp.get("PORT") + self.rsa_key = self.credentials_sftp.get("RSA_KEY") + + self.recursive_files_list = {} + self.file_name_list = [] + self.recursive_files = [] + + def get_conn(self): + """Returns a SFTP connection object. + + Returns: paramiko.SFTPClient + """ + + ssh = paramiko.SSHClient() + + if self.conn is None: + if self.rsa_key is None: + transport = paramiko.Transport((self.hostname, self.port)) + transport.connect(None, self.username, self.password) + + self.conn = paramiko.SFTPClient.from_transport(transport) + + else: + keyfile = StringIO(self.rsa_key) + mykey = paramiko.RSAKey.from_private_key(keyfile) + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(self.hostname, username=self.username, pkey=mykey) + time.sleep(1) + self.conn = ssh.open_sftp() + return self.conn + + def get_cwd(self): + """Return the current working directory for SFTP session. + + Returns: + str: current working directory + """ + return self.conn.getcwd() + + def getfo_file(self, file_name: str): + """Copy a remote file from the SFTP server and write to a file-like object. + + Args: + file_name (str, optional): File name to copy. + + Returns: + BytesIO: file-like object + """ + flo = BytesIO() + try: + self.conn.getfo(file_name, flo) + return flo + except Exception as e: + print(e) + + def to_df(self, file_name: str, sep: str = "\t", columns: List[str] = None): + """Copy a remote file from the SFTP server and write it to Pandas dataframe. + + Args: + file_name (str, optional): File name to download. + sep (str, optional): The delimiter for the source file. Defaults to "\t". + columns (List[str], optional): List of columns to select from file. Defaults to None. + + Returns: + pd.DataFrame: Pandas dataframe. + """ + byte_file = self.getfo_file(file_name=file_name) + byte_file.seek(0) + if columns is None: + if Path(file_name).suffix == ".csv": + df = pd.read_csv(byte_file, sep=sep) + + elif Path(file_name).suffix == ".parquet": + df = pd.read_parquet(byte_file) + if columns is not None: + if Path(file_name).suffix == ".csv": + df = pd.read_csv(byte_file, sep=sep, usecols=columns) + + elif Path(file_name).suffix == ".parquet": + df = pd.read_parquet(byte_file, usecols=columns) + + return df + + def get_exported_files(self): + """List only exported files in current working directory. + + Returns: + List: List of exported files + """ + self.file_name_list.clear() + + directory_structure = self.conn.listdir() + for file in directory_structure: + if "##exported.tsv" in file: + self.file_name_list.append(file) + + return self.file_name_list + + def list_directory(self, path: str = None) -> List[str]: + """Returns a list of files on the remote system. + + Args: + path (str, optional): full path to the remote directory to list. Defaults to None. + + Returns: + List: List of files + + """ + + if path is None: + files = self.conn.listdir() + else: + files = self.conn.listdir(path) + return files + + def recursive_listdir(self, path=".", files=None): + """Recursively returns a defaultdict of files on the remote system. + + Args: + path (str, optional): full path to the remote directory to list. Defaults to None. + files (any, optional): parameter for calling function recursively. + Returns: + defaultdict(list): List of files + + """ + + if files is None: + files = defaultdict(list) + + # loop over list of SFTPAttributes (files with modes) + for attr in self.conn.listdir_attr(path): + + if S_ISDIR(attr.st_mode): + # If the file is a directory, recurse it + self.recursive_listdir(os.path.join(path, attr.filename), files) + + else: + # if the file is a file, add it to our dict + files[path].append(attr.filename) + + self.recursive_files = files + return files + + def process_defaultdict(self, defaultdict: any = None) -> list: + """Process defaultdict to list of files. + + Args: + defaultdict (any, optional): defaultdict of recursive files. Defaults to None. + + Returns: + List: list of files + """ + path_list = [] + if defaultdict is None: + defaultdict = self.recursive_files + + for item in list(defaultdict.items()): + tuple_list_path = list(itertools.product([item[0]], item[1])) + path_list.extend( + [os.path.join(*tuple_path) for tuple_path in tuple_list_path] + ) + return path_list + + def close_conn(self) -> None: + """Closes the connection""" + if self.conn is not None: + self.conn.close() + self.conn = None diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 6d8752b16..bb3ed7998 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -44,3 +44,7 @@ from .sql_server import SQLServerCreateTable, SQLServerToDF, SQLServerQuery from .epicor import EpicorOrdersToDF + +from .sftp import SftpToDF, SftpList +from .mindful import MindfulToCSV + diff --git a/viadot/tasks/azure_data_lake.py b/viadot/tasks/azure_data_lake.py index e1a72bfd6..9ca86ed0d 100644 --- a/viadot/tasks/azure_data_lake.py +++ b/viadot/tasks/azure_data_lake.py @@ -4,6 +4,7 @@ from typing import List import pandas as pd +import numpy as np from prefect import Task from prefect.tasks.secrets import PrefectSecret from prefect.utilities.tasks import defaults_from_attrs @@ -519,6 +520,8 @@ def __init__( def run( self, path: str = None, + recursive: bool = False, + file_to_match: str = None, gen: int = None, sp_credentials_secret: str = None, vault_name: str = None, @@ -528,7 +531,9 @@ def run( """Task run method. Args: - from_path (str): The path to the directory which contents you want to list. Defaults to None. + path (str, optional): The path to the directory which contents you want to list. Defaults to None. + recursive (bool, optional): If True, recursively list all subdirectories and files. Defaults to False. + file_to_match (str, optional): If exist it only returns files with that name. Defaults to None. gen (int): The generation of the Azure Data Lake. Defaults to None. sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. @@ -567,10 +572,24 @@ def run( full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path) - self.logger.info(f"Listing files in {full_dl_path}...") - files = lake.ls(path) - self.logger.info(f"Successfully listed files in {full_dl_path}.") + self.logger.info(f"Listing files in {full_dl_path}.") + if recursive: + self.logger.info("Loading ADLS directories recursively.") + files = lake.find(path) + if file_to_match: + conditions = [file_to_match in item for item in files] + valid_files = np.array([]) + if any(conditions): + index = np.where(conditions)[0] + files = list(np.append(valid_files, [files[i] for i in index])) + else: + raise FileExistsError( + f"There are not any available file named {file_to_match}." + ) + else: + files = lake.ls(path) + self.logger.info(f"Successfully listed files in {full_dl_path}.") return files diff --git a/viadot/tasks/genesys.py b/viadot/tasks/genesys.py index 4fd17300b..2d7861494 100644 --- a/viadot/tasks/genesys.py +++ b/viadot/tasks/genesys.py @@ -172,8 +172,6 @@ def run( else: logger.info("Succesfully loaded all exports.") - genesys.get_reporting_exports_data() - file_names = genesys.download_all_reporting_exports() logger.info("Downloaded the data from the Genesys into the CSV.") # in order to wait for API GET request call it diff --git a/viadot/tasks/mindful.py b/viadot/tasks/mindful.py new file mode 100644 index 000000000..bc09a9b21 --- /dev/null +++ b/viadot/tasks/mindful.py @@ -0,0 +1,145 @@ +import time +import json +from typing import Any, Dict, List, Literal + +from datetime import datetime, timedelta +from prefect import Task +from prefect.utilities import logging +from prefect.utilities.tasks import defaults_from_attrs + +from viadot.sources import Mindful +from viadot.config import local_config +from viadot.tasks import AzureKeyVaultSecret +from viadot.exceptions import CredentialError + +logger = logging.get_logger() + + +class MindfulToCSV(Task): + def __init__( + self, + report_name: str = "mindful_to_csv", + start_date: datetime = None, + end_date: datetime = None, + date_interval: int = 1, + file_extension: Literal["parquet", "csv"] = "csv", + file_path: str = "", + *args: List[Any], + **kwargs: Dict[str, Any], + ): + """Task for downloading data from Mindful API to CSV. + + Args: + report_name (str, optional): The name of this task. Defaults to "mindful_to_csv". + start_date (datetime, optional): Start date of the request. Defaults to None. + end_date (datetime, optional): End date of the resquest. Defaults to None. + date_interval (int, optional): How many days are included in the request. + If end_date is passed as an argument, date_interval will be invalidated. Defaults to 1. + file_extension (Literal[parquet, csv], optional): File extensions for storing responses. Defaults to "csv". + file_path (str, optional): Path where to save the file locally. Defaults to ''. + + Raises: + CredentialError: If credentials are not provided in local_config or directly as a parameter inside run method. + """ + + self.start_date = start_date + self.end_date = end_date + self.date_interval = date_interval + self.file_extension = file_extension + self.file_path = file_path + + super().__init__( + name=report_name, + *args, + **kwargs, + ) + + if not isinstance(start_date, datetime): + self.start_date = datetime.now() - timedelta(days=date_interval) + self.end_date = self.start_date + timedelta(days=date_interval) + elif isinstance(start_date, datetime) and not isinstance(end_date, datetime): + self.start_date = start_date + self.end_date = start_date + timedelta(days=date_interval) + if self.end_date > datetime.now(): + self.end_date = datetime.now() + elif start_date >= end_date: + raise ValueError( + f"start_date variable must be lower than end_date variable." + ) + else: + self.start_date = start_date + self.end_date = end_date + + def __call__(self, *args, **kwargs): + """Download Mindful data to CSV""" + return super().__call__(*args, **kwargs) + + @defaults_from_attrs( + "start_date", + "end_date", + "date_interval", + "file_extension", + "file_path", + ) + def run( + self, + credentials_mindful: Dict[str, Any] = None, + credentials_secret: str = None, + vault_name: str = None, + start_date: datetime = None, + end_date: datetime = None, + date_interval: int = 1, + file_extension: Literal["parquet", "csv"] = "csv", + file_path: str = "", + ): + + if credentials_mindful is not None: + self.logger.info("Mindful credentials provided by user") + elif credentials_mindful is None and credentials_secret is not None: + credentials_str = AzureKeyVaultSecret( + credentials_secret, vault_name=vault_name + ).run() + credentials_mindful = json.loads(credentials_str) + logger.info("Loaded credentials from Key Vault.") + else: + try: + credentials_mindful = local_config["MINDFUL"] + self.logger.info("Mindful credentials loaded from local config") + except KeyError: + credentials_mindful = None + raise CredentialError("Credentials not found.") + + mindful = Mindful( + credentials_mindful=credentials_mindful, + region="eu1", + start_date=start_date, + end_date=end_date, + date_interval=date_interval, + file_extension=file_extension, + ) + + file_names = [] + interactions_response = mindful.get_interactions_list() + if interactions_response.status_code == 200: + interaction_file_name = mindful.response_to_file( + interactions_response, + file_path=file_path, + ) + file_names.append(interaction_file_name) + logger.info( + "Successfully downloaded interactions data from the Mindful API." + ) + time.sleep(0.5) + responses_response = mindful.get_responses_list() + if responses_response.status_code == 200: + response_file_name = mindful.response_to_file( + responses_response, + file_path=file_path, + ) + file_names.append(response_file_name) + logger.info("Successfully downloaded responses data from the Mindful API.") + + if not file_names: + raise TypeError("Files were not created.") + else: + return file_names diff --git a/viadot/tasks/outlook.py b/viadot/tasks/outlook.py index 15bf574a6..e99fdeb36 100644 --- a/viadot/tasks/outlook.py +++ b/viadot/tasks/outlook.py @@ -21,6 +21,7 @@ def __init__( credentials: Dict[str, Any] = None, output_file_extension: str = ".csv", limit: int = 10000, + timeout: int = 1200, *args: List[Any], **kwargs: Dict[str, Any], ): @@ -40,6 +41,7 @@ def __init__( super().__init__( name="outlook_to_csv", + timeout=timeout, *args, **kwargs, ) diff --git a/viadot/tasks/sftp.py b/viadot/tasks/sftp.py new file mode 100644 index 000000000..8197d4b9d --- /dev/null +++ b/viadot/tasks/sftp.py @@ -0,0 +1,154 @@ +import json +from typing import Any, Dict, List, Literal + +import prefect +from prefect import Task +from prefect.tasks.secrets import PrefectSecret + +from viadot.config import local_config +from viadot.sources.sftp import SftpConnector + +from .azure_key_vault import AzureKeyVaultSecret + + +class SftpToDF(Task): + def __init__( + self, + credentials: Dict[str, Any] = None, + sftp_credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + """Task for downloading data from SFTP server to pandas DataFrame. + + Args: + credentials (Dict[str, Any], optional): SFTP credentials. Defaults to None. + sftp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary credentials for SFTP connection. Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + + Returns: Pandas DataFrame + """ + self.credentials = credentials + self.sftp_credentials_secret = sftp_credentials_secret + self.vault_name = vault_name + + super().__init__( + name="SftpToDF", + *args, + **kwargs, + ) + + def __call__(self, *args, **kwargs): + """Download file-like object from SFTP server to df""" + return super().__call__(*args, **kwargs) + + def run( + self, + from_path: str = None, + sep: str = "\t", + columns: List[str] = None, + ): + logger = prefect.context.get("logger") + if not self.credentials: + if not self.sftp_credentials_secret: + try: + self.sftp_credentials_secret = PrefectSecret("QWASI-SFTP").run() + logger.info("Loaded credentials from Key Vault.") + except ValueError: + pass + + if self.sftp_credentials_secret: + credentials_str = AzureKeyVaultSecret( + self.sftp_credentials_secret, vault_name=self.vault_name + ).run() + credentials_sftp = json.loads(credentials_str) + logger.info("Loaded credentials from Key Vault.") + else: + credentials_sftp = local_config.get("QWASI-SFTP") + logger.info("Loaded credentials from local source.") + else: + credentials_sftp = self.credentials + + sftp = SftpConnector(credentials_sftp=credentials_sftp) + sftp.get_conn() + logger.info("Connected to SFTP server.") + df = sftp.to_df(file_name=from_path, sep=sep, columns=columns) + logger.info("Succefully downloaded file from SFTP server.") + return df + + +class SftpList(Task): + def __init__( + self, + credentials: Dict[str, Any] = None, + sftp_credentials_secret: str = None, + vault_name: str = None, + *args, + **kwargs, + ): + """Task for listing files in SFTP server. + + Args: + credentials (Dict[str, Any], optional): SFTP credentials. Defaults to None. + sftp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary credentials for SFTP connection. Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + + Returns: + files_list (List): List of files in SFTP server. + """ + self.credentials = credentials + self.sftp_credentials_secret = sftp_credentials_secret + self.vault_name = vault_name + + super().__init__( + name="SftpList", + *args, + **kwargs, + ) + + def __call__(self, *args, **kwargs): + """Task for listing files in SFTP server.""" + return super().__call__(*args, **kwargs) + + def run( + self, + path: str = None, + recursive: bool = False, + matching_path: str = None, + ): + logger = prefect.context.get("logger") + if not self.credentials: + if not self.sftp_credentials_secret: + try: + self.sftp_credentials_secret = PrefectSecret("QWASI-SFTP").run() + logger.info("Loaded credentials from Key Vault.") + except ValueError: + pass + + if self.sftp_credentials_secret: + credentials_str = AzureKeyVaultSecret( + self.sftp_credentials_secret, vault_name=self.vault_name + ).run() + credentials_sftp = json.loads(credentials_str) + logger.info("Loaded credentials from Key Vault.") + else: + credentials_sftp = local_config.get("QWASI-SFTP") + logger.info("Loaded credentials from local source.") + else: + credentials_sftp = self.credentials + + sftp = SftpConnector(credentials_sftp=credentials_sftp) + sftp.get_conn() + logger.info("Connected to SFTP server.") + if recursive is False: + files_list = sftp.list_directory(path=path) + + else: + files_list = sftp.recursive_listdir(path=path) + files_list = sftp.process_defaultdict(defaultdict=files_list) + if matching_path is not None: + files_list = [f for f in files_list if matching_path in f] + + logger.info("Succefully loaded file list from SFTP server.") + return files_list