diff --git a/CHANGELOG.md b/CHANGELOG.md index c6ca36533..ab02a5447 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -## [0.4.4] - 2022-06-09 +## [0.4.5] - 2022-06-23 ### Added +- Added `error_log_file_path` parameter in `BCPTask` that enables setting name of errors logs file +- Added `on_error` parameter in `BCPTask` that tells what to do if bcp error occurs. +- Added error log file and `on_bcp_error` parameter in `ADLSToAzureSQL` +- Added handling POST requests in `handle_api_response()` add added it to `Epicor` source. +- Added `SalesforceToDF` task +- Added `SalesforceToADLS` flow +- Added `overwrite_adls` option to `BigQueryToADLS` and `SharepointToADLS` +- Added `cast_df_to_str` task in `utils.py` and added this to `EpicorToDuckDB`, `SAPToDuckDB`, `SQLServerToDuckDB` +- Added `if_empty` parameter in `DuckDBCreateTableFromParquet` task and in `EpicorToDuckDB`, `SAPToDuckDB`, +`SQLServerToDuckDB` flows to check if output Parquet is empty and handle it properly. +- Added `check_if_empty_file()` and `handle_if_empty_file()` in `utils.py` + +## [0.4.4] - 2022-06-09 +### Added - Added new connector - Outlook. Created `Outlook` source, `OutlookToDF` task and `OutlookToADLS` flow. - Added new connector - Epicor. Created `Epicor` source, `EpicorToDF` task and `EpicorToDuckDB` flow. - Enabled Databricks Connect in the image. To enable, [follow this guide](./README.md#executing-spark-jobs) diff --git a/tests/integration/flows/test_bigquery_to_adls.py b/tests/integration/flows/test_bigquery_to_adls.py index 25cba2317..2060a884a 100644 --- a/tests/integration/flows/test_bigquery_to_adls.py +++ b/tests/integration/flows/test_bigquery_to_adls.py @@ -1,19 +1,48 @@ from viadot.flows import BigQueryToADLS +from viadot.tasks import AzureDataLakeRemove from prefect.tasks.secrets import PrefectSecret +import pendulum +import os + +ADLS_DIR_PATH = "raw/tests/" +ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet" +BIGQ_CREDENTIAL_KEY = "BIGQUERY_TESTS" +ADLS_CREDENTIAL_SECRET = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" +).run() def test_bigquery_to_adls(): - credentials_secret = PrefectSecret( - "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" - ).run() + flow_bigquery = BigQueryToADLS( + name="Test BigQuery to ADLS extract", + dataset_name="official_empty", + table_name="space", + adls_file_name=ADLS_FILE_NAME, + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, + ) + + result = flow_bigquery.run() + assert result.is_successful() + + task_results = result.result.values() + assert all([task_result.is_successful() for task_result in task_results]) + os.remove("test_bigquery_to_adls_extract.parquet") + os.remove("test_bigquery_to_adls_extract.json") + + +def test_bigquery_to_adls_overwrite_true(): flow_bigquery = BigQueryToADLS( - name="BigQuery to ADLS", + name="Test BigQuery to ADLS overwrite true", dataset_name="official_empty", table_name="space", - credentials_key="BIGQUERY_TESTS", - adls_dir_path="raw/tests", - adls_sp_credentials_secret=credentials_secret, + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, ) result = flow_bigquery.run() @@ -21,3 +50,27 @@ def test_bigquery_to_adls(): task_results = result.result.values() assert all([task_result.is_successful() for task_result in task_results]) + os.remove("test_bigquery_to_adls_overwrite_true.parquet") + os.remove("test_bigquery_to_adls_overwrite_true.json") + + +def test_bigquery_to_adls_false(): + flow_bigquery = BigQueryToADLS( + name="Test BigQuery to ADLS overwrite false", + dataset_name="official_empty", + table_name="space", + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=False, + credentials_key=BIGQ_CREDENTIAL_KEY, + adls_dir_path=ADLS_DIR_PATH, + adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET, + ) + + result = flow_bigquery.run() + assert result.is_failed() + os.remove("test_bigquery_to_adls_overwrite_false.parquet") + os.remove("test_bigquery_to_adls_overwrite_false.json") + rm = AzureDataLakeRemove( + path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" + ) + rm.run(sp_credentials_secret=ADLS_CREDENTIAL_SECRET) diff --git a/tests/integration/flows/test_salesforce_to_adls.py b/tests/integration/flows/test_salesforce_to_adls.py new file mode 100644 index 000000000..c4487620a --- /dev/null +++ b/tests/integration/flows/test_salesforce_to_adls.py @@ -0,0 +1,33 @@ +from viadot.flows import SalesforceToADLS +from viadot.tasks import AzureDataLakeRemove +from prefect.tasks.secrets import PrefectSecret +import os + +ADLS_FILE_NAME = "test_salesforce.parquet" +ADLS_DIR_PATH = "raw/tests/" + + +def test_salesforce_to_adls(): + + credentials_secret = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" + ).run() + + flow = SalesforceToADLS( + "test_salesforce_to_adls_run_flow", + query="SELECT IsDeleted, FiscalYear FROM Opportunity LIMIT 50", + adls_sp_credentials_secret=credentials_secret, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + ) + + result = flow.run() + assert result.is_successful() + + os.remove("test_salesforce_to_adls_run_flow.parquet") + os.remove("test_salesforce_to_adls_run_flow.json") + rm = AzureDataLakeRemove( + path=ADLS_DIR_PATH + ADLS_FILE_NAME, + vault_name="azuwevelcrkeyv001s", + ) + rm.run(sp_credentials_secret=credentials_secret) diff --git a/tests/integration/flows/test_sharepoint_to_adls.py b/tests/integration/flows/test_sharepoint_to_adls.py index 31e1047db..2f88a9050 100644 --- a/tests/integration/flows/test_sharepoint_to_adls.py +++ b/tests/integration/flows/test_sharepoint_to_adls.py @@ -1,8 +1,14 @@ from viadot.flows import SharepointToADLS +from viadot.tasks import AzureDataLakeRemove +from prefect.tasks.secrets import PrefectSecret from unittest import mock import pandas as pd -from prefect.tasks.secrets import PrefectSecret import os +import pendulum + +ADLS_FILE_NAME = str(pendulum.now("utc")) + ".csv" +ADLS_DIR_PATH = "raw/tests/" +CREDENTIALS_SECRET = PrefectSecret("AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET").run() def test_sharepoint_to_adls_run_flow(): @@ -10,10 +16,6 @@ def test_sharepoint_to_adls_run_flow(): d = {"country": [1, 2], "sales": [3, 4]} df = pd.DataFrame(data=d) - credentials_secret = PrefectSecret( - "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" - ).run() - with mock.patch( "viadot.flows.sharepoint_to_adls.excel_to_df_task.bind" ) as excel_to_df_task_mock: @@ -22,10 +24,65 @@ def test_sharepoint_to_adls_run_flow(): flow = SharepointToADLS( "test_sharepoint_to_adls_run_flow", output_file_extension=".csv", - adls_sp_credentials_secret=credentials_secret, - adls_dir_path="raw/tests/test.csv", + adls_sp_credentials_secret=CREDENTIALS_SECRET, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, ) result = flow.run() assert result.is_successful() os.remove("test_sharepoint_to_adls_run_flow.csv") os.remove("test_sharepoint_to_adls_run_flow.json") + + +def test_sharepoint_to_adls_run_flow_overwrite_true(): + + d = {"country": [1, 2], "sales": [3, 4]} + df = pd.DataFrame(data=d) + + with mock.patch( + "viadot.flows.sharepoint_to_adls.excel_to_df_task.bind" + ) as excel_to_df_task_mock: + excel_to_df_task_mock.return_value = df + + flow = SharepointToADLS( + "test_sharepoint_to_adls_run_flow_overwrite_true", + output_file_extension=".csv", + adls_sp_credentials_secret=CREDENTIALS_SECRET, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=True, + ) + result = flow.run() + assert result.is_successful() + os.remove("test_sharepoint_to_adls_run_flow_overwrite_true.csv") + os.remove("test_sharepoint_to_adls_run_flow_overwrite_true.json") + + +def test_sharepoint_to_adls_run_flow_overwrite_false(): + + d = {"country": [1, 2], "sales": [3, 4]} + df = pd.DataFrame(data=d) + + with mock.patch( + "viadot.flows.sharepoint_to_adls.excel_to_df_task.bind" + ) as excel_to_df_task_mock: + excel_to_df_task_mock.return_value = df + + flow = SharepointToADLS( + "test_sharepoint_to_adls_run_flow_overwrite_false", + output_file_extension=".csv", + adls_sp_credentials_secret=CREDENTIALS_SECRET, + adls_dir_path=ADLS_DIR_PATH, + adls_file_name=ADLS_FILE_NAME, + overwrite_adls=False, + ) + result = flow.run() + + assert result.is_failed() + os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.csv") + os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.json") + + rm = AzureDataLakeRemove( + path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s" + ) + rm.run(sp_credentials_secret=CREDENTIALS_SECRET) diff --git a/tests/integration/tasks/test_bcp.py b/tests/integration/tasks/test_bcp.py index 45b425224..f8ac3ac89 100644 --- a/tests/integration/tasks/test_bcp.py +++ b/tests/integration/tasks/test_bcp.py @@ -1,4 +1,6 @@ import pytest +import os + from prefect.engine.signals import FAIL from viadot.tasks import BCPTask @@ -7,6 +9,8 @@ SCHEMA = "sandbox" TABLE = "test_bcp" FAIL_TABLE = "nonexistent_table" +ERROR_TABLE = "test_bcp_error" +ERROR_LOG_FILE = "log_file.log" @pytest.fixture(scope="function") @@ -23,16 +27,36 @@ def test_table(): sql_query_task.run(f"DROP TABLE {SCHEMA}.{TABLE};") +@pytest.fixture(scope="function") +def test_error_table(): + create_table_task = AzureSQLCreateTable() + create_table_task.run( + schema=SCHEMA, + table=ERROR_TABLE, + dtypes={"country": "INT", "sales": "INT"}, + if_exists="replace", + ) + yield + sql_query_task = AzureSQLDBQuery() + sql_query_task.run(f"DROP TABLE {SCHEMA}.{ERROR_TABLE};") + + def test_bcp(TEST_CSV_FILE_PATH, test_table): bcp_task = BCPTask() try: - result = bcp_task.run(path=TEST_CSV_FILE_PATH, schema=SCHEMA, table=TABLE) + result = bcp_task.run( + path=TEST_CSV_FILE_PATH, + schema=SCHEMA, + table=TABLE, + error_log_file_path=ERROR_LOG_FILE, + ) except FAIL: result = False assert result is not False + os.remove(ERROR_LOG_FILE) def test_bcp_fail(TEST_CSV_FILE_PATH, test_table): @@ -41,3 +65,17 @@ def test_bcp_fail(TEST_CSV_FILE_PATH, test_table): with pytest.raises(FAIL): bcp_task.run(path=TEST_CSV_FILE_PATH, schema=SCHEMA, table=FAIL_TABLE) + + +def test_bcp_log_error(TEST_CSV_FILE_PATH, test_error_table): + bcp_task = BCPTask() + bcp_task.run( + path=TEST_CSV_FILE_PATH, + schema=SCHEMA, + table=ERROR_TABLE, + error_log_file_path=ERROR_LOG_FILE, + ) + assert ( + os.path.exists(ERROR_LOG_FILE) is True and os.path.getsize(ERROR_LOG_FILE) != 0 + ) + os.remove(ERROR_LOG_FILE) diff --git a/tests/integration/tasks/test_salesforce.py b/tests/integration/tasks/test_salesforce.py index df659eae0..dafe0acd6 100644 --- a/tests/integration/tasks/test_salesforce.py +++ b/tests/integration/tasks/test_salesforce.py @@ -1,6 +1,6 @@ import pandas as pd import pytest -from viadot.tasks import SalesforceUpsert +from viadot.tasks import SalesforceUpsert, SalesforceToDF @pytest.fixture(scope="session") @@ -25,3 +25,13 @@ def test_salesforce_upsert(test_df): sf.run(test_df, table="Contact") except Exception as exception: assert False, exception + + +def test_salesforce_to_df(): + sf_to_df = SalesforceToDF( + query="SELECT IsDeleted, FiscalYear FROM Opportunity LIMIT 50" + ) + df = sf_to_df.run() + + assert isinstance(df, pd.DataFrame) + assert (50, 2) == df.shape diff --git a/tests/test_viadot.py b/tests/test_viadot.py index 8890de84e..087b72ddc 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.4" + assert __version__ == "0.4.5" diff --git a/tests/unit/tasks/test_duckdb.py b/tests/unit/tasks/test_duckdb.py new file mode 100644 index 000000000..6242cd813 --- /dev/null +++ b/tests/unit/tasks/test_duckdb.py @@ -0,0 +1,36 @@ +import os +import pytest + +from viadot.tasks import DuckDBCreateTableFromParquet +from viadot.sources.duckdb import DuckDB + +TABLE = "test_table" +SCHEMA = "test_schema" +DATABASE_PATH = "test_db_123.duckdb" + + +@pytest.fixture(scope="module") +def duckdb(): + duckdb = DuckDB(credentials=dict(database=DATABASE_PATH)) + yield duckdb + + +def test_create_table_empty_file(duckdb): + path = "empty.parquet" + with open(path, "w"): + pass + duckdb_creds = {f"database": DATABASE_PATH} + task = DuckDBCreateTableFromParquet(credentials=duckdb_creds) + task.run(schema=SCHEMA, table=TABLE, path=path, if_empty="skip") + + assert duckdb._check_if_table_exists(TABLE, schema=SCHEMA) == False + os.remove(path) + + +def test_create_table(duckdb, TEST_PARQUET_FILE_PATH): + duckdb_creds = {f"database": DATABASE_PATH} + task = DuckDBCreateTableFromParquet(credentials=duckdb_creds) + task.run(schema=SCHEMA, table=TABLE, path=TEST_PARQUET_FILE_PATH, if_empty="skip") + + assert duckdb._check_if_table_exists(TABLE, schema=SCHEMA) + os.remove(DATABASE_PATH) diff --git a/tests/unit/test_duckdb.py b/tests/unit/test_duckdb.py index e31d4acf4..f40126350 100644 --- a/tests/unit/test_duckdb.py +++ b/tests/unit/test_duckdb.py @@ -10,7 +10,7 @@ DATABASE_PATH = "test_db_123.duckdb" -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def duckdb(): duckdb = DuckDB(credentials=dict(database=DATABASE_PATH)) yield duckdb diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 02957b0d5..7e934b5fe 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,5 +1,13 @@ -from viadot.utils import gen_bulk_insert_query_from_df import pandas as pd +import pytest +import logging +import os + +from viadot.utils import gen_bulk_insert_query_from_df, check_if_empty_file +from viadot.signals import SKIP + +EMPTY_CSV_PATH = "empty.csv" +EMPTY_PARQUET_PATH = "empty.parquet" def test_single_quotes_inside(): @@ -66,3 +74,81 @@ def test_double_quotes_inside(): VALUES ({TEST_VALUE_ESCAPED}, 'c')""" ), test_insert_query + + +def test_check_if_empty_file_csv(caplog): + with open(EMPTY_CSV_PATH, "w"): + pass + + with caplog.at_level(logging.WARNING): + check_if_empty_file(path=EMPTY_CSV_PATH, if_empty="warn", file_extension=".csv") + assert f"Input file - '{EMPTY_CSV_PATH}' is empty." in caplog.text + with pytest.raises(ValueError): + check_if_empty_file(path=EMPTY_CSV_PATH, if_empty="fail", file_extension=".csv") + with pytest.raises(SKIP): + check_if_empty_file(path=EMPTY_CSV_PATH, if_empty="skip", file_extension=".csv") + + os.remove(EMPTY_CSV_PATH) + + +def test_check_if_empty_file_one_column_csv(caplog): + df = pd.DataFrame(columns=["_viadot_downloaded_at_utc"], index=[0]) + df.to_csv(EMPTY_CSV_PATH, index=False) + + with caplog.at_level(logging.WARNING): + check_if_empty_file(path=EMPTY_CSV_PATH, if_empty="warn", file_extension=".csv") + assert ( + f"Input file - '{EMPTY_CSV_PATH}' has only one column '_viadot_downloaded_at_utc'." + in caplog.text + ) + with pytest.raises(ValueError): + check_if_empty_file(path=EMPTY_CSV_PATH, if_empty="fail", file_extension=".csv") + with pytest.raises(SKIP): + check_if_empty_file(path=EMPTY_CSV_PATH, if_empty="skip", file_extension=".csv") + + os.remove(EMPTY_CSV_PATH) + + +def test_check_if_empty_file_parquet(caplog): + with open(EMPTY_PARQUET_PATH, "w"): + pass + + with caplog.at_level(logging.WARNING): + check_if_empty_file( + path=EMPTY_PARQUET_PATH, if_empty="warn", file_extension=".parquet" + ) + assert f"Input file - '{EMPTY_PARQUET_PATH}' is empty." in caplog.text + with pytest.raises(ValueError): + check_if_empty_file( + path=EMPTY_PARQUET_PATH, if_empty="fail", file_extension=".parquet" + ) + with pytest.raises(SKIP): + check_if_empty_file( + path=EMPTY_PARQUET_PATH, if_empty="skip", file_extension=".parquet" + ) + + os.remove(EMPTY_PARQUET_PATH) + + +def test_check_if_empty_file_one_column_parquet(caplog): + df = pd.DataFrame(columns=["_viadot_downloaded_at_utc"], index=[0]) + df.to_parquet(EMPTY_PARQUET_PATH, index=False) + + with caplog.at_level(logging.WARNING): + check_if_empty_file( + path=EMPTY_PARQUET_PATH, if_empty="warn", file_extension=".parquet" + ) + assert ( + f"Input file - '{EMPTY_PARQUET_PATH}' has only one column '_viadot_downloaded_at_utc'." + in caplog.text + ) + with pytest.raises(ValueError): + check_if_empty_file( + path=EMPTY_PARQUET_PATH, if_empty="fail", file_extension=".parquet" + ) + with pytest.raises(SKIP): + check_if_empty_file( + path=EMPTY_PARQUET_PATH, if_empty="skip", file_extension=".parquet" + ) + + os.remove(EMPTY_PARQUET_PATH) diff --git a/viadot/__init__.py b/viadot/__init__.py index cd1ee63b7..98a433b31 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.4" +__version__ = "0.4.5" diff --git a/viadot/flows/__init__.py b/viadot/flows/__init__.py index 4ba4ff72a..d077086cb 100644 --- a/viadot/flows/__init__.py +++ b/viadot/flows/__init__.py @@ -12,6 +12,7 @@ from .aselite_to_adls import ASELiteToADLS from .bigquery_to_adls import BigQueryToADLS from .outlook_to_adls import OutlookToADLS +from .salesforce_to_adls import SalesforceToADLS try: from .sap_to_duckdb import SAPToDuckDB diff --git a/viadot/flows/adls_to_azure_sql.py b/viadot/flows/adls_to_azure_sql.py index aae3ed869..0b85ec45d 100644 --- a/viadot/flows/adls_to_azure_sql.py +++ b/viadot/flows/adls_to_azure_sql.py @@ -105,6 +105,7 @@ def __init__( if_exists: Literal["fail", "replace", "append", "delete"] = "replace", check_col_order: bool = True, sqldb_credentials_secret: str = None, + on_bcp_error: Literal["skip", "fail"] = "skip", max_download_retries: int = 5, tags: List[str] = ["promotion"], vault_name: str = None, @@ -115,7 +116,6 @@ def __init__( Flow for downloading data from different marketing APIs to a local CSV using Supermetrics API, then uploading it to Azure Data Lake, and finally inserting into Azure SQL Database. - Args: name (str): The name of the flow. local_file_path (str, optional): Local destination path. Defaults to None. @@ -137,6 +137,7 @@ def __init__( check_col_order (bool, optional): Whether to check column order. Defaults to True. sqldb_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with Azure SQL Database credentials. Defaults to None. + on_bcp_error (Literal["skip", "fail"], optional): What to do if error occurs. Defaults to "skip". max_download_retries (int, optional): How many times to retry the download. Defaults to 5. tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"]. vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. @@ -181,6 +182,7 @@ def __init__( # BCPTask self.sqldb_credentials_secret = sqldb_credentials_secret + self.on_bcp_error = on_bcp_error # Global self.max_download_retries = max_download_retries @@ -287,6 +289,8 @@ def gen_flow(self) -> Flow: path=self.local_file_path, schema=self.schema, table=self.table, + error_log_file_path=self.name.replace(" ", "_") + ".log", + on_error=self.on_bcp_error, credentials_secret=self.sqldb_credentials_secret, vault_name=self.vault_name, flow=self, diff --git a/viadot/flows/bigquery_to_adls.py b/viadot/flows/bigquery_to_adls.py index 5625f8141..e8ec300ba 100644 --- a/viadot/flows/bigquery_to_adls.py +++ b/viadot/flows/bigquery_to_adls.py @@ -43,6 +43,7 @@ def __init__( local_file_path: str = None, adls_file_name: str = None, adls_sp_credentials_secret: str = None, + overwrite_adls: bool = False, if_exists: str = "replace", *args: List[Any], **kwargs: Dict[str, Any], @@ -57,7 +58,7 @@ def __init__( If the column that looks like a date does not exist in the table, get all the data from the table. Args: - name (str, optional): _description_. Defaults to None. + name (str): The name of the flow. dataset_name (str, optional): Dataset name. Defaults to None. table_name (str, optional): Table name. Defaults to None. date_column_name (str, optional): The query is based on a date, the user can provide the name @@ -77,6 +78,7 @@ def __init__( adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_exists (str, optional): What to do if the file exists. Defaults to "replace". """ # BigQueryToDF @@ -90,6 +92,7 @@ def __init__( self.credentials_secret = credentials_secret # AzureDataLakeUpload + self.overwrite = overwrite_adls self.adls_sp_credentials_secret = adls_sp_credentials_secret self.if_exists = if_exists self.output_file_extension = output_file_extension @@ -159,6 +162,7 @@ def gen_flow(self) -> Flow: file_to_adls_task.bind( from_path=self.local_file_path, to_path=self.adls_file_path, + overwrite=self.overwrite, sp_credentials_secret=self.adls_sp_credentials_secret, flow=self, ) @@ -171,6 +175,7 @@ def gen_flow(self) -> Flow: json_to_adls_task.bind( from_path=self.local_json_path, to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, sp_credentials_secret=self.adls_sp_credentials_secret, flow=self, ) diff --git a/viadot/flows/epicor_to_duckdb.py b/viadot/flows/epicor_to_duckdb.py index aa5ed6f36..2a9ba48bb 100644 --- a/viadot/flows/epicor_to_duckdb.py +++ b/viadot/flows/epicor_to_duckdb.py @@ -2,7 +2,7 @@ from typing import Any, Dict, List, Literal from ..tasks import EpicorOrdersToDF, DuckDBCreateTableFromParquet -from ..task_utils import df_to_parquet, add_ingestion_metadata_task +from ..task_utils import df_to_parquet, add_ingestion_metadata_task, cast_df_to_str class EpicorOrdersToDuckDB(Flow): @@ -19,6 +19,7 @@ def __init__( duckdb_table: str = None, duckdb_schema: str = None, if_exists: Literal["fail", "replace", "append", "skip", "delete"] = "fail", + if_empty: Literal["warn", "skip", "fail"] = "skip", duckdb_credentials: dict = None, *args: List[any], **kwargs: Dict[str, Any], @@ -38,6 +39,7 @@ def __init__( duckdb_table (str, optional): Destination table in DuckDB. Defaults to None. duckdb_schema (str, optional): Destination schema in DuckDB. Defaults to None. if_exists (Literal, optional): What to do if the table already exists. Defaults to "fail". + if_empty (Literal, optional): What to do if Parquet file is empty. Defaults to "skip". duckdb_credentials (dict, optional): Credentials for the DuckDB connection. Defaults to None. """ self.base_url = base_url @@ -50,6 +52,7 @@ def __init__( self.duckdb_table = duckdb_table self.duckdb_schema = duckdb_schema self.if_exists = if_exists + self.if_empty = if_empty self.duckdb_credentials = duckdb_credentials super().__init__(*args, name=name, **kwargs) @@ -73,9 +76,9 @@ def gen_flow(self) -> Flow: start_date_field=self.start_date_field, ) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) - + df_mapped = cast_df_to_str.bind(df_with_metadata, flow=self) parquet = df_to_parquet.bind( - df=df_with_metadata, + df=df_mapped, path=self.local_file_path, if_exists=self.if_exists, flow=self, @@ -85,6 +88,7 @@ def gen_flow(self) -> Flow: schema=self.duckdb_schema, table=self.duckdb_table, if_exists=self.if_exists, + if_empty=self.if_empty, flow=self, ) create_duckdb_table.set_upstream(parquet, flow=self) diff --git a/viadot/flows/salesforce_to_adls.py b/viadot/flows/salesforce_to_adls.py new file mode 100644 index 000000000..e6cbd4927 --- /dev/null +++ b/viadot/flows/salesforce_to_adls.py @@ -0,0 +1,184 @@ +from pathlib import Path +from typing import Any, Dict, List +import os +import pendulum +from prefect import Flow +from prefect.backend import set_key_value +from prefect.utilities import logging + +from ..task_utils import ( + df_get_data_types_task, + add_ingestion_metadata_task, + df_to_csv, + df_to_parquet, + dtypes_to_json_task, + df_map_mixed_dtypes_for_parquet, + update_dtypes_dict, + df_clean_column, +) + +from ..tasks import SalesforceToDF +from ..tasks import AzureDataLakeUpload + +salesforce_to_df_task = SalesforceToDF() +file_to_adls_task = AzureDataLakeUpload() +json_to_adls_task = AzureDataLakeUpload() + +logger = logging.get_logger(__name__) + + +class SalesforceToADLS(Flow): + def __init__( + self, + name: str = None, + query: str = None, + table: str = None, + columns: List[str] = None, + domain: str = "test", + client_id: str = "viadot", + env: str = "DEV", + vault_name: str = None, + credentials_secret: str = None, + output_file_extension: str = ".parquet", + overwrite_adls: bool = True, + adls_dir_path: str = None, + local_file_path: str = None, + adls_file_name: str = None, + adls_sp_credentials_secret: str = None, + if_exists: str = "replace", + *args: List[Any], + **kwargs: Dict[str, Any], + ): + """ + Flow for downloading data from Salesforce table to a local CSV or Parquet file, then uploading it to Azure Data Lake. + + Args: + name (str, optional): Flow name. Defaults to None. + query (str, optional): Query for download the data if specific download is needed. Defaults to None. + table (str, optional): Table name. Can be used instead of query. Defaults to None. + columns (List[str], optional): List of columns which are needed - table argument is needed. Defaults to None. + domain (str, optional): Domain of a connection; defaults to 'test' (sandbox). + Can only be added if built-in username/password/security token is provided. Defaults to None. + client_id (str, optional): Client id to keep the track of API calls. Defaults to None. + env (str, optional): Environment information, provides information about credential + and connection configuration. Defaults to 'DEV'. + credentials_secret (str, optional): The name of the Azure Key Vault secret for Salesforce. Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. + output_file_extension (str, optional): Output file extension - to allow selection of CSV for data + which is not easy to handle with parquet. Defaults to ".parquet". + overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True. + adls_dir_path (str, optional): Azure Data Lake destination folder/catalog path. Defaults to None. + local_file_path (str, optional): Local destination path. Defaults to None. + adls_file_name (str, optional): Name of file in ADLS. Defaults to None. + adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. + Defaults to None. + if_exists (str, optional): What to do if the file exists. Defaults to "replace". + """ + # SalesforceToDF + self.query = query + self.table = table + self.columns = columns + self.domain = domain + self.client_id = client_id + self.env = env + self.vault_name = vault_name + self.credentials_secret = credentials_secret + + # AzureDataLakeUpload + self.adls_sp_credentials_secret = adls_sp_credentials_secret + self.if_exists = if_exists + self.output_file_extension = output_file_extension + self.now = str(pendulum.now("utc")) + + self.local_file_path = ( + local_file_path or self.slugify(name) + self.output_file_extension + ) + self.local_json_path = self.slugify(name) + ".json" + self.adls_dir_path = adls_dir_path + + if adls_file_name is not None: + self.adls_file_path = os.path.join(adls_dir_path, adls_file_name) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", Path(adls_file_name).stem + ".json" + ) + else: + self.adls_file_path = os.path.join( + adls_dir_path, self.now + self.output_file_extension + ) + self.adls_schema_file_dir_file = os.path.join( + adls_dir_path, "schema", self.now + ".json" + ) + self.overwrite_adls = overwrite_adls + + super().__init__(*args, name=name, **kwargs) + + self.gen_flow() + + @staticmethod + def slugify(name): + return name.replace(" ", "_").lower() + + def gen_flow(self) -> Flow: + df = salesforce_to_df_task.bind( + query=self.query, + table=self.table, + columns=self.columns, + domain=self.domain, + client_id=self.client_id, + env=self.env, + vault_name=self.vault_name, + credentials_secret=self.credentials_secret, + flow=self, + ) + + df_clean = df_clean_column.bind(df=df, flow=self) + df_with_metadata = add_ingestion_metadata_task.bind(df_clean, flow=self) + dtypes_dict = df_get_data_types_task.bind(df_with_metadata, flow=self) + df_to_be_loaded = df_map_mixed_dtypes_for_parquet( + df_with_metadata, dtypes_dict, flow=self + ) + + if self.output_file_extension == ".parquet": + df_to_file = df_to_parquet.bind( + df=df_to_be_loaded, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + else: + df_to_file = df_to_csv.bind( + df=df_with_metadata, + path=self.local_file_path, + if_exists=self.if_exists, + flow=self, + ) + + file_to_adls_task.bind( + from_path=self.local_file_path, + to_path=self.adls_file_path, + overwrite=self.overwrite_adls, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + dtypes_updated = update_dtypes_dict(dtypes_dict, flow=self) + dtypes_to_json_task.bind( + dtypes_dict=dtypes_updated, local_json_path=self.local_json_path, flow=self + ) + + json_to_adls_task.bind( + from_path=self.local_json_path, + to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite_adls, + sp_credentials_secret=self.adls_sp_credentials_secret, + flow=self, + ) + + df_clean.set_upstream(df, flow=self) + df_with_metadata.set_upstream(df_clean, flow=self) + dtypes_dict.set_upstream(df_with_metadata, flow=self) + df_to_be_loaded.set_upstream(dtypes_dict, flow=self) + file_to_adls_task.set_upstream(df_to_file, flow=self) + json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self) + set_key_value(key=self.adls_dir_path, value=self.adls_file_path) diff --git a/viadot/flows/sap_to_duckdb.py b/viadot/flows/sap_to_duckdb.py index bf332ab56..e6348deae 100644 --- a/viadot/flows/sap_to_duckdb.py +++ b/viadot/flows/sap_to_duckdb.py @@ -4,10 +4,7 @@ logger = logging.get_logger() -from ..task_utils import ( - add_ingestion_metadata_task, - df_to_parquet, -) +from ..task_utils import add_ingestion_metadata_task, df_to_parquet, cast_df_to_str from ..tasks import SAPRFCToDF, DuckDBCreateTableFromParquet @@ -24,6 +21,7 @@ def __init__( table_if_exists: Literal[ "fail", "replace", "append", "skip", "delete" ] = "fail", + if_empty: Literal["warn", "skip", "fail"] = "skip", sap_credentials: dict = None, duckdb_credentials: dict = None, *args: List[any], @@ -41,6 +39,7 @@ def __init__( multiple options are automatically tried. Defaults to None. schema (str, optional): Destination schema in DuckDB. Defaults to None. table_if_exists (Literal, optional): What to do if the table already exists. Defaults to "fail". + if_empty (Literal, optional): What to do if Parquet file is empty. Defaults to "skip". sap_credentials (dict, optional): The credentials to use to authenticate with SAP. By default, they're taken from the local viadot config. duckdb_credentials (dict, optional): The config to use for connecting with DuckDB. Defaults to None. @@ -56,6 +55,7 @@ def __init__( self.table = table self.schema = schema self.if_exists = table_if_exists + self.if_empty = if_empty self.local_file_path = local_file_path or self.slugify(name) + ".parquet" self.duckdb_credentials = duckdb_credentials @@ -79,8 +79,9 @@ def gen_flow(self) -> Flow: df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) + df_mapped = cast_df_to_str.bind(df_with_metadata, flow=self) parquet = df_to_parquet.bind( - df=df_with_metadata, + df=df_mapped, path=self.local_file_path, if_exists=self.if_exists, flow=self, @@ -91,6 +92,7 @@ def gen_flow(self) -> Flow: schema=self.schema, table=self.table, if_exists=self.if_exists, + if_empty=self.if_empty, flow=self, ) diff --git a/viadot/flows/sharepoint_to_adls.py b/viadot/flows/sharepoint_to_adls.py index 567d9ec85..1f435c456 100644 --- a/viadot/flows/sharepoint_to_adls.py +++ b/viadot/flows/sharepoint_to_adls.py @@ -27,10 +27,10 @@ class SharepointToADLS(Flow): def __init__( self, - name: str = None, + name: str, + url_to_file: str = None, nrows_to_df: int = None, path_to_file: str = None, - url_to_file: str = None, sheet_number: int = None, validate_excel_file: bool = False, output_file_extension: str = ".csv", @@ -38,6 +38,7 @@ def __init__( adls_dir_path: str = None, adls_file_name: str = None, adls_sp_credentials_secret: str = None, + overwrite_adls: bool = False, if_empty: str = "warn", if_exists: str = "replace", *args: List[any], @@ -47,11 +48,11 @@ def __init__( Flow for downloading Excel file from Sharepoint then uploading it to Azure Data Lake. Args: - name (str, optional): The name of the flow. Defaults to None. + name (str): The name of the flow. + url_to_file (str, optional): Link to a file on Sharepoint. Defaults to None. + (e.g : https://{tenant_name}.sharepoint.com/sites/{folder}/Shared%20Documents/Dashboard/file). nrows_to_df (int, optional): Number of rows to read at a time. Defaults to 50000. Defaults to None. path_to_file (str, optional): Path to local Excel file. Defaults to None. - url_to_file (str, optional): Link to a file on Sharepoint. - (e.g : https://{tenant_name}.sharepoint.com/sites/{folder}/Shared%20Documents/Dashboard/file). Defaults to None. sheet_number (int, optional): Sheet number to be extracted from file. Counting from 0, if None all sheets are axtracted. Defaults to None. validate_excel_file (bool, optional): Check if columns in separate sheets are the same. Defaults to False. output_file_extension (str, optional): Output file extension - to allow selection of .csv for data which is not easy to handle with parquet. Defaults to ".csv". @@ -61,6 +62,7 @@ def __init__( adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET) for the Azure Data Lake. Defaults to None. + overwrite_adls (bool, optional): Whether to overwrite files in the lake. Defaults to False. if_empty (str, optional): What to do if query returns no data. Defaults to "warn". """ # SharepointToDF @@ -73,6 +75,7 @@ def __init__( self.validate_excel_file = validate_excel_file # AzureDataLakeUpload + self.overwrite = overwrite_adls self.adls_sp_credentials_secret = adls_sp_credentials_secret self.if_exists = if_exists self.output_file_extension = output_file_extension @@ -136,6 +139,7 @@ def gen_flow(self) -> Flow: file_to_adls_task.bind( from_path=self.local_file_path, to_path=self.adls_file_path, + overwrite=self.overwrite, sp_credentials_secret=self.adls_sp_credentials_secret, flow=self, ) @@ -146,6 +150,7 @@ def gen_flow(self) -> Flow: json_to_adls_task.bind( from_path=self.local_json_path, to_path=self.adls_schema_file_dir_file, + overwrite=self.overwrite, sp_credentials_secret=self.adls_sp_credentials_secret, flow=self, ) diff --git a/viadot/flows/sql_server_to_duckdb.py b/viadot/flows/sql_server_to_duckdb.py index d852331d0..935d58332 100644 --- a/viadot/flows/sql_server_to_duckdb.py +++ b/viadot/flows/sql_server_to_duckdb.py @@ -2,7 +2,7 @@ from typing import Any, Dict, List, Literal -from ..task_utils import df_to_parquet, add_ingestion_metadata_task +from ..task_utils import df_to_parquet, add_ingestion_metadata_task, cast_df_to_str from ..tasks import SQLServerToDF, DuckDBCreateTableFromParquet df_task = SQLServerToDF() @@ -18,6 +18,7 @@ def __init__( duckdb_table: str = None, duckdb_schema: str = None, if_exists: Literal["fail", "replace", "append", "skip", "delete"] = "fail", + if_empty: Literal["warn", "skip", "fail"] = "skip", duckdb_credentials: dict = None, *args: List[any], **kwargs: Dict[str, Any], @@ -34,6 +35,7 @@ def __init__( duckdb_table (str, optional): Destination table in DuckDB. Defaults to None. duckdb_schema (str, optional): Destination schema in DuckDB. Defaults to None. if_exists (Literal, optional): What to do if the table already exists. Defaults to "fail". + if_empty (Literal, optional): What to do if Parquet file is empty. Defaults to "skip". duckdb_credentials (dict, optional): Credentials for the DuckDB connection. Defaults to None. """ @@ -46,6 +48,7 @@ def __init__( self.duckdb_table = duckdb_table self.duckdb_schema = duckdb_schema self.if_exists = if_exists + self.if_empty = if_empty self.duckdb_credentials = duckdb_credentials super().__init__(*args, name=name, **kwargs) @@ -61,9 +64,9 @@ def gen_flow(self) -> Flow: config_key=self.sqlserver_config_key, query=self.sql_query, flow=self ) df_with_metadata = add_ingestion_metadata_task.bind(df, flow=self) - + df_mapped = cast_df_to_str.bind(df_with_metadata, flow=self) parquet = df_to_parquet.bind( - df=df_with_metadata, + df=df_mapped, path=self.local_file_path, if_exists=self.if_exists, flow=self, @@ -73,6 +76,7 @@ def gen_flow(self) -> Flow: schema=self.duckdb_schema, table=self.duckdb_table, if_exists=self.if_exists, + if_empty=self.if_empty, flow=self, ) create_duckdb_table.set_upstream(parquet, flow=self) diff --git a/viadot/sources/epicor.py b/viadot/sources/epicor.py index 05c2d1132..45ebfe510 100644 --- a/viadot/sources/epicor.py +++ b/viadot/sources/epicor.py @@ -1,3 +1,4 @@ +from urllib import response import requests import pandas as pd from typing import Any, Dict, Optional @@ -5,6 +6,7 @@ from .base import Source from ..config import local_config +from ..utils import handle_api_response from ..exceptions import CredentialError, DataRangeError from pydantic import BaseModel @@ -232,7 +234,7 @@ def generate_token(self) -> str: "password": self.credentials["password"], } - response = requests.request("POST", url, headers=headers) + response = handle_api_response(url=url, headers=headers, method="POST") root = ET.fromstring(response.text) token = root.find("AccessToken").text return token @@ -272,8 +274,9 @@ def get_xml_response(self): "Content-Type": "application/xml", "Authorization": "Bearer " + self.generate_token(), } - response = requests.request("POST", url, headers=headers, data=payload) - + response = handle_api_response( + url=url, headers=headers, body=payload, method="POST" + ) return response def to_df(self): diff --git a/viadot/task_utils.py b/viadot/task_utils.py index 51ae395a4..47a9d3753 100644 --- a/viadot/task_utils.py +++ b/viadot/task_utils.py @@ -439,10 +439,11 @@ def df_clean_column( columns_to_clean (List[str]): A list of columns to clean. Defaults is None. Returns: - pd.DataFrame: The cleaned DataFrame + pd.DataFrame: The cleaned DataFrame. """ df = df.copy() + logger.info(f"Removing special characters from dataframe columns...") if columns_to_clean is None: df.replace( @@ -465,7 +466,7 @@ def df_clean_column( @task def concat_dfs(dfs: List[pd.DataFrame]): """ - Task to combine list of data frames into one + Task to combine list of data frames into one. Args: dfs (List[pd.DataFrame]): List of dataframes to concat. @@ -475,11 +476,28 @@ def concat_dfs(dfs: List[pd.DataFrame]): return pd.concat(dfs, axis=1) +@task +def cast_df_to_str(df: pd.DataFrame) -> pd.DataFrame: + """ + Task for casting an entire DataFrame to a string data type. Task is needed + when data is being uploaded from Parquet file to DuckDB because empty columns + can be casted to INT instead of default VARCHAR. + + Args: + df (pd.DataFrame): Input DataFrame. + + Returns: + df_mapped (pd.DataFrame): Pandas DataFrame casted to string. + """ + df_mapped = df.astype("string") + return df_mapped + + class Git(Git): @property def git_clone_url(self): """ - Build the git url to clone + Build the git url to clone. """ if self.use_ssh: return f"git@{self.repo_host}:{self.repo}" diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index 1e859fa00..14c374e42 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -31,7 +31,7 @@ from .prefect_date_range import GetFlowNewDateRange from .aselite import ASELiteToDF from .bigquery import BigQueryToDF -from .salesforce import SalesforceUpsert, SalesforceBulkUpsert +from .salesforce import SalesforceUpsert, SalesforceBulkUpsert, SalesforceToDF from .outlook import OutlookToDF try: diff --git a/viadot/tasks/bcp.py b/viadot/tasks/bcp.py index 0d64111a1..e6976f2db 100644 --- a/viadot/tasks/bcp.py +++ b/viadot/tasks/bcp.py @@ -1,22 +1,37 @@ import json from datetime import timedelta +from typing import Literal from prefect.tasks.secrets import PrefectSecret from prefect.tasks.shell import ShellTask from prefect.utilities.tasks import defaults_from_attrs +from prefect.utilities import logging from .azure_key_vault import AzureKeyVaultSecret +logger = logging.get_logger() + + +def parse_logs(log_file_path: str): + with open(log_file_path) as log_file: + log_file = log_file.readlines() + for line in log_file: + if "#" in line: + line = line.replace("#", "") + line = line.replace("@", "") + logger.warning(line) + class BCPTask(ShellTask): """ Task for bulk inserting data into SQL Server-compatible databases. - Args: - path (str, optional): The path to the local CSV file to be inserted. - schema (str, optional): The destination schema. - table (str, optional): The destination table. - chunksize (int, optional): The chunk size to use. + - error_log_file_path (string, optional): Full path of an error file. Defaults to "log_file.log". + - on_error (Literal["skip", "fail"], optional): What to do if error occurs. Defaults to "skip". - credentials (dict, optional): The credentials to use for connecting with the database. - vault_name (str): The name of the vault from which to fetch the secret. - **kwargs (dict, optional): Additional keyword arguments to pass to the Task constructor. @@ -28,6 +43,8 @@ def __init__( schema: str = None, table: str = None, chunksize: int = 5000, + error_log_file_path: str = "log_file.log", + on_error: Literal["skip", "fail"] = "skip", credentials: dict = None, vault_name: str = None, max_retries: int = 3, @@ -39,6 +56,8 @@ def __init__( self.schema = schema self.table = table self.chunksize = chunksize + self.error_log_file_path = error_log_file_path + self.on_error = on_error self.credentials = credentials self.vault_name = vault_name @@ -57,6 +76,8 @@ def __init__( "schema", "table", "chunksize", + "error_log_file_path", + "on_error", "credentials", "vault_name", "max_retries", @@ -68,6 +89,8 @@ def run( schema: str = None, table: str = None, chunksize: int = None, + error_log_file_path: str = None, + on_error: Literal = None, credentials: dict = None, credentials_secret: str = None, vault_name: str = None, @@ -77,17 +100,17 @@ def run( ) -> str: """ Task run method. - Args: - path (str, optional): The path to the local CSV file to be inserted. - schema (str, optional): The destination schema. - table (str, optional): The destination table. - chunksize (int, optional): The chunk size to use. By default 5000. + - error_log_file_path (string, optional): Full path of an error file. Defaults to "log_file.log". + - on_error (Literal, optional): What to do if error occur. Defaults to None. - credentials (dict, optional): The credentials to use for connecting with SQL Server. - credentials_secret (str, optional): The name of the Key Vault secret containing database credentials. (server, db_name, user, password) - vault_name (str): The name of the vault from which to fetch the secret. - Returns: str: The output of the bcp CLI command. """ @@ -119,5 +142,15 @@ def run( # but not in BCP's 'server' argument. server = server.replace(" ", "") - command = f"/opt/mssql-tools/bin/bcp {fqn} in '{path}' -S {server} -d {db_name} -U {uid} -P '{pwd}' -c -F 2 -b {chunksize} -h 'TABLOCK'" - return super().run(command=command, **kwargs) + if on_error == "skip": + max_error = 0 + elif on_error == "fail": + max_error = 1 + else: + raise ValueError( + "Please provide correct 'on_error' parameter value - 'skip' or 'fail'. " + ) + command = f"/opt/mssql-tools/bin/bcp {fqn} in '{path}' -S {server} -d {db_name} -U {uid} -P '{pwd}' -c -F 2 -b {chunksize} -h 'TABLOCK' -e '{error_log_file_path}' -m {max_error}" + run_command = super().run(command=command, **kwargs) + parse_logs(error_log_file_path) + return run_command diff --git a/viadot/tasks/duckdb.py b/viadot/tasks/duckdb.py index fa040a82d..2894afb78 100644 --- a/viadot/tasks/duckdb.py +++ b/viadot/tasks/duckdb.py @@ -5,6 +5,8 @@ import pandas as pd from ..sources import DuckDB +from ..utils import check_if_empty_file +from ..signals import SKIP Record = Tuple[Any] @@ -65,10 +67,12 @@ class DuckDBCreateTableFromParquet(Task): also allowed here (eg. `my_folder/*.parquet`). schema (str, optional): Destination schema. if_exists (Literal, optional): What to do if the table already exists. + if_empty (Literal, optional): What to do if ".parquet" file is emty. Defaults to "skip". credentials(dict, optional): The config to use for connecting with the db. Raises: - ValueError: If the table exists and `if_exists` is set to `fail`. + ValueError: If the table exists and `if_exists`is set to `fail` or when parquet file + is empty and `if_empty` is set to `fail`. Returns: NoReturn: Does not return anything. @@ -78,12 +82,14 @@ def __init__( self, schema: str = None, if_exists: Literal["fail", "replace", "append", "skip", "delete"] = "fail", + if_empty: Literal["warn", "skip", "fail"] = "skip", credentials: dict = None, *args, **kwargs, ): self.schema = schema self.if_exists = if_exists + self.if_empty = if_empty self.credentials = credentials super().__init__( @@ -92,13 +98,14 @@ def __init__( **kwargs, ) - @defaults_from_attrs("schema", "if_exists") + @defaults_from_attrs("schema", "if_exists", "if_empty") def run( self, table: str, path: str, schema: str = None, if_exists: Literal["fail", "replace", "append", "skip", "delete"] = None, + if_empty: Literal["warn", "skip", "fail"] = None, ) -> NoReturn: """ Create a DuckDB table with a CTAS from Parquet file(s). @@ -109,13 +116,20 @@ def run( also allowed here (eg. `my_folder/*.parquet`). schema (str, optional): Destination schema. if_exists (Literal, optional): What to do if the table already exists. + if_empty (Literal, optional): What to do if Parquet file is empty. Defaults to "skip". Raises: - ValueError: If the table exists and `if_exists` is set to `fail`. + ValueError: If the table exists and `if_exists`is set to `fail` or when parquet file + is empty and `if_empty` is set to `fail`. Returns: NoReturn: Does not return anything. """ + try: + check_if_empty_file(path=path, if_empty=if_empty, file_extension=".parquet") + except SKIP: + self.logger.info("The input file is empty. Skipping.") + return duckdb = DuckDB(credentials=self.credentials) diff --git a/viadot/tasks/salesforce.py b/viadot/tasks/salesforce.py index 7c5b1f607..a4a4527ce 100644 --- a/viadot/tasks/salesforce.py +++ b/viadot/tasks/salesforce.py @@ -1,5 +1,6 @@ import json from datetime import timedelta +from typing import Any, Dict, List import pandas as pd from prefect import Task @@ -227,3 +228,97 @@ def run( raise_on_error=raise_on_error, ) self.logger.info(f"Successfully upserted {df.shape[0]} rows to Salesforce.") + + +class SalesforceToDF(Task): + """ + The task for querying Salesforce and saving data as the data frame. + + Args: + query (str, optional): Query for download the data if specific download is needed. Defaults to None. + table (str, optional): Table name. Can be used instead of query. Defaults to None. + columns (List[str], optional): List of columns which are needed - table argument is needed. Defaults to None. + domain (str, optional): Domain of a connection. defaults to 'test' (sandbox). + Can only be added if built-in username/password/security token is provided. Defaults to None. + client_id (str, optional): Client id to keep the track of API calls. Defaults to None. + env (str, optional): Environment information, provides information about credential + and connection configuration. Defaults to 'DEV'. + """ + + def __init__( + self, + query: str = None, + table: str = None, + columns: List[str] = None, + domain: str = "test", + client_id: str = "viadot", + env: str = "DEV", + *args: List[Any], + **kwargs: Dict[str, Any], + ): + self.query = query + self.table = table + self.columns = columns + self.domain = domain + self.client_id = client_id + self.env = env + + super().__init__( + name="salesforce_to_df", + *args, + **kwargs, + ) + + def __call__(self): + """Download Salesforce data to a DF""" + super().__call__(self) + + @defaults_from_attrs( + "query", + "table", + "columns", + "domain", + "client_id", + "env", + ) + def run( + self, + query: str = None, + table: str = None, + columns: List[str] = None, + env: str = None, + domain: str = None, + client_id: str = None, + credentials_secret: str = None, + vault_name: str = None, + **kwargs: Dict[str, Any], + ) -> None: + """ + Task run method. + + Args: + query (str, optional): Query for download the data if specific download is needed. Defaults to None. + table (str, optional): Table name. Can be used instead of query. Defaults to None. + columns (List[str], optional): List of columns which are needed - table argument is needed. Defaults to None. + env (str, optional): Environment information, provides information about credential + and connection configuration. Defaults to 'DEV'. + domain (str, optional): Domain of a connection. defaults to 'test' (sandbox). + Can only be added if built-in username/password/security token is provided. Defaults to None. + client_id (str, optional): Client id to keep the track of API calls. Defaults to None. + credentials_secret (str, optional): The name of the Azure Key Vault secret for Salesforce. Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None. + """ + credentials = get_credentials(credentials_secret, vault_name=vault_name) + salesforce = Salesforce( + credentials=credentials, + env=env, + domain=domain, + client_id=client_id, + ) + self.logger.info(f"Retreiving the data from Salesforce...") + df = salesforce.to_df( + query=query, table=table, columns=columns, if_empty="replace" + ) + self.logger.info(f"Successfully downloaded data from Salesforce.") + + return df diff --git a/viadot/utils.py b/viadot/utils.py index 087b6fd7d..dc2ff7571 100644 --- a/viadot/utils.py +++ b/viadot/utils.py @@ -1,10 +1,12 @@ import re -from typing import Any, Dict, List +from typing import Any, Dict, List, Literal import pandas as pd import prefect import pyodbc import requests +import os +from prefect.utilities import logging from prefect.utilities.graphql import EnumValue, with_args from requests.adapters import HTTPAdapter from requests.exceptions import ConnectionError, HTTPError, ReadTimeout, Timeout @@ -12,6 +14,10 @@ from urllib3.exceptions import ProtocolError from itertools import chain from .exceptions import APIError +from .signals import SKIP + + +logger = logging.get_logger(__name__) def slugify(name: str) -> str: @@ -24,6 +30,8 @@ def handle_api_response( params: Dict[str, Any] = None, headers: Dict[str, Any] = None, timeout: tuple = (3.05, 60 * 30), + method: Literal["GET", "POST"] = "GET", + body: str = None, ) -> requests.models.Response: """Handle and raise Python exceptions during request with retry strategy for specyfic status. @@ -33,8 +41,11 @@ def handle_api_response( params (Dict[str, Any], optional): the request params also includes parameters such as the content type. Defaults to None. headers: (Dict[str, Any], optional): the request headers. Defaults to None. timeout (tuple, optional): the request times out. Defaults to (3.05, 60 * 30). + method (Literal ["GET", "POST"], optional): REST API method to use. Defaults to "GET". + body (str, optional): Data to send using POST method. Defaults to None. Raises: + ValueError: raises when 'method' parameter value hasn't been specified ReadTimeout: stop waiting for a response after a given number of seconds with the timeout parameter. HTTPError: exception that indicates when HTTP status codes returned values different than 200. ConnectionError: exception that indicates when client is unable to connect to the server. @@ -43,6 +54,10 @@ def handle_api_response( Returns: requests.models.Response """ + if method.upper() not in ["GET", "POST"]: + raise ValueError( + f"Method not found. Please use one of the available methods: 'GET', 'POST'." + ) try: session = requests.Session() retry_strategy = Retry( @@ -54,12 +69,15 @@ def handle_api_response( session.mount("http://", adapter) session.mount("https://", adapter) - response = session.get( - url, + + response = session.request( + url=url, auth=auth, params=params, headers=headers, timeout=timeout, + data=body, + method=method, ) response.raise_for_status() @@ -345,3 +363,60 @@ def union_dict(*dicts): """ return dict(chain.from_iterable(dct.items() for dct in dicts)) + + +def handle_if_empty_file( + if_empty: Literal["warn", "skip", "fail"] = "warn", + message: str = None, +): + """ + Task for handling empty file. + Args: + if_empty (Literal, optional): What to do if file is empty. Defaults to "warn". + message (str, optional): Massage to show in warning and error messages. Defaults to None. + Raises: + ValueError: If `if_empty` is set to `fail`. + SKIP: If `if_empty` is set to `skip`. + """ + if if_empty == "warn": + logger.warning(message) + elif if_empty == "skip": + raise SKIP(message) + elif if_empty == "fail": + raise ValueError(message) + + +def check_if_empty_file( + path: str, + if_empty: Literal["warn", "skip", "fail"] = "warn", + file_extension: Literal[".parquet", ".csv"] = ".parquet", + file_sep: str = "\t", +): + """ + Task for checking if the file is empty and handling it. If there is only one column + "_viadot_downloaded_at_utc" in the file it's treated as empty one. + + Args: + path (str, required): Path to the local file. + if_empty (Literal, optional): What to do if file is empty. Defaults to "warn". + file_extension (Literal, optional): File extension. Defaults to ".parquet". + file_sep (str, optional): File separator to use while checking .csv file. Defaults to "\t". + + """ + if os.stat(path).st_size == 0: + handle_if_empty_file(if_empty, message=f"Input file - '{path}' is empty.") + + elif file_extension == ".parquet": + df = pd.read_parquet(path) + if "_viadot_downloaded_at_utc" in df.columns and len(df.columns) == 1: + handle_if_empty_file( + if_empty=if_empty, + message=f"Input file - '{path}' has only one column '_viadot_downloaded_at_utc'.", + ) + elif file_extension == ".csv": + df = pd.read_csv(path, sep=file_sep) + if "_viadot_downloaded_at_utc" in df.columns and len(df.columns) == 1: + handle_if_empty_file( + if_empty=if_empty, + message=f"Input file - '{path}' has only one column '_viadot_downloaded_at_utc'.", + )