Skip to content

Commit

Permalink
Merge pull request #439 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.5 PR
  • Loading branch information
Rafalz13 authored Jun 23, 2022
2 parents 7d8f8b0 + 0e4a0b9 commit f35e6df
Show file tree
Hide file tree
Showing 26 changed files with 831 additions and 57 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.4.4] - 2022-06-09
## [0.4.5] - 2022-06-23
### Added
- Added `error_log_file_path` parameter in `BCPTask` that enables setting name of errors logs file
- Added `on_error` parameter in `BCPTask` that tells what to do if bcp error occurs.
- Added error log file and `on_bcp_error` parameter in `ADLSToAzureSQL`
- Added handling POST requests in `handle_api_response()` add added it to `Epicor` source.
- Added `SalesforceToDF` task
- Added `SalesforceToADLS` flow
- Added `overwrite_adls` option to `BigQueryToADLS` and `SharepointToADLS`
- Added `cast_df_to_str` task in `utils.py` and added this to `EpicorToDuckDB`, `SAPToDuckDB`, `SQLServerToDuckDB`
- Added `if_empty` parameter in `DuckDBCreateTableFromParquet` task and in `EpicorToDuckDB`, `SAPToDuckDB`,
`SQLServerToDuckDB` flows to check if output Parquet is empty and handle it properly.
- Added `check_if_empty_file()` and `handle_if_empty_file()` in `utils.py`


## [0.4.4] - 2022-06-09
### Added
- Added new connector - Outlook. Created `Outlook` source, `OutlookToDF` task and `OutlookToADLS` flow.
- Added new connector - Epicor. Created `Epicor` source, `EpicorToDF` task and `EpicorToDuckDB` flow.
- Enabled Databricks Connect in the image. To enable, [follow this guide](./README.md#executing-spark-jobs)
Expand Down
67 changes: 60 additions & 7 deletions tests/integration/flows/test_bigquery_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,76 @@
from viadot.flows import BigQueryToADLS
from viadot.tasks import AzureDataLakeRemove
from prefect.tasks.secrets import PrefectSecret
import pendulum
import os

ADLS_DIR_PATH = "raw/tests/"
ADLS_FILE_NAME = str(pendulum.now("utc")) + ".parquet"
BIGQ_CREDENTIAL_KEY = "BIGQUERY_TESTS"
ADLS_CREDENTIAL_SECRET = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()


def test_bigquery_to_adls():
credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
flow_bigquery = BigQueryToADLS(
name="Test BigQuery to ADLS extract",
dataset_name="official_empty",
table_name="space",
adls_file_name=ADLS_FILE_NAME,
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
)

result = flow_bigquery.run()
assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])

os.remove("test_bigquery_to_adls_extract.parquet")
os.remove("test_bigquery_to_adls_extract.json")


def test_bigquery_to_adls_overwrite_true():
flow_bigquery = BigQueryToADLS(
name="BigQuery to ADLS",
name="Test BigQuery to ADLS overwrite true",
dataset_name="official_empty",
table_name="space",
credentials_key="BIGQUERY_TESTS",
adls_dir_path="raw/tests",
adls_sp_credentials_secret=credentials_secret,
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
)

result = flow_bigquery.run()
assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])
os.remove("test_bigquery_to_adls_overwrite_true.parquet")
os.remove("test_bigquery_to_adls_overwrite_true.json")


def test_bigquery_to_adls_false():
flow_bigquery = BigQueryToADLS(
name="Test BigQuery to ADLS overwrite false",
dataset_name="official_empty",
table_name="space",
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=False,
credentials_key=BIGQ_CREDENTIAL_KEY,
adls_dir_path=ADLS_DIR_PATH,
adls_sp_credentials_secret=ADLS_CREDENTIAL_SECRET,
)

result = flow_bigquery.run()
assert result.is_failed()
os.remove("test_bigquery_to_adls_overwrite_false.parquet")
os.remove("test_bigquery_to_adls_overwrite_false.json")
rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s"
)
rm.run(sp_credentials_secret=ADLS_CREDENTIAL_SECRET)
33 changes: 33 additions & 0 deletions tests/integration/flows/test_salesforce_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from viadot.flows import SalesforceToADLS
from viadot.tasks import AzureDataLakeRemove
from prefect.tasks.secrets import PrefectSecret
import os

ADLS_FILE_NAME = "test_salesforce.parquet"
ADLS_DIR_PATH = "raw/tests/"


def test_salesforce_to_adls():

credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()

flow = SalesforceToADLS(
"test_salesforce_to_adls_run_flow",
query="SELECT IsDeleted, FiscalYear FROM Opportunity LIMIT 50",
adls_sp_credentials_secret=credentials_secret,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
)

result = flow.run()
assert result.is_successful()

os.remove("test_salesforce_to_adls_run_flow.parquet")
os.remove("test_salesforce_to_adls_run_flow.json")
rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME,
vault_name="azuwevelcrkeyv001s",
)
rm.run(sp_credentials_secret=credentials_secret)
71 changes: 64 additions & 7 deletions tests/integration/flows/test_sharepoint_to_adls.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from viadot.flows import SharepointToADLS
from viadot.tasks import AzureDataLakeRemove
from prefect.tasks.secrets import PrefectSecret
from unittest import mock
import pandas as pd
from prefect.tasks.secrets import PrefectSecret
import os
import pendulum

ADLS_FILE_NAME = str(pendulum.now("utc")) + ".csv"
ADLS_DIR_PATH = "raw/tests/"
CREDENTIALS_SECRET = PrefectSecret("AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET").run()


def test_sharepoint_to_adls_run_flow():

d = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=d)

credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()

with mock.patch(
"viadot.flows.sharepoint_to_adls.excel_to_df_task.bind"
) as excel_to_df_task_mock:
Expand All @@ -22,10 +24,65 @@ def test_sharepoint_to_adls_run_flow():
flow = SharepointToADLS(
"test_sharepoint_to_adls_run_flow",
output_file_extension=".csv",
adls_sp_credentials_secret=credentials_secret,
adls_dir_path="raw/tests/test.csv",
adls_sp_credentials_secret=CREDENTIALS_SECRET,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
)
result = flow.run()
assert result.is_successful()
os.remove("test_sharepoint_to_adls_run_flow.csv")
os.remove("test_sharepoint_to_adls_run_flow.json")


def test_sharepoint_to_adls_run_flow_overwrite_true():

d = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=d)

with mock.patch(
"viadot.flows.sharepoint_to_adls.excel_to_df_task.bind"
) as excel_to_df_task_mock:
excel_to_df_task_mock.return_value = df

flow = SharepointToADLS(
"test_sharepoint_to_adls_run_flow_overwrite_true",
output_file_extension=".csv",
adls_sp_credentials_secret=CREDENTIALS_SECRET,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=True,
)
result = flow.run()
assert result.is_successful()
os.remove("test_sharepoint_to_adls_run_flow_overwrite_true.csv")
os.remove("test_sharepoint_to_adls_run_flow_overwrite_true.json")


def test_sharepoint_to_adls_run_flow_overwrite_false():

d = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=d)

with mock.patch(
"viadot.flows.sharepoint_to_adls.excel_to_df_task.bind"
) as excel_to_df_task_mock:
excel_to_df_task_mock.return_value = df

flow = SharepointToADLS(
"test_sharepoint_to_adls_run_flow_overwrite_false",
output_file_extension=".csv",
adls_sp_credentials_secret=CREDENTIALS_SECRET,
adls_dir_path=ADLS_DIR_PATH,
adls_file_name=ADLS_FILE_NAME,
overwrite_adls=False,
)
result = flow.run()

assert result.is_failed()
os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.csv")
os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.json")

rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s"
)
rm.run(sp_credentials_secret=CREDENTIALS_SECRET)
40 changes: 39 additions & 1 deletion tests/integration/tasks/test_bcp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
import os

from prefect.engine.signals import FAIL

from viadot.tasks import BCPTask
Expand All @@ -7,6 +9,8 @@
SCHEMA = "sandbox"
TABLE = "test_bcp"
FAIL_TABLE = "nonexistent_table"
ERROR_TABLE = "test_bcp_error"
ERROR_LOG_FILE = "log_file.log"


@pytest.fixture(scope="function")
Expand All @@ -23,16 +27,36 @@ def test_table():
sql_query_task.run(f"DROP TABLE {SCHEMA}.{TABLE};")


@pytest.fixture(scope="function")
def test_error_table():
create_table_task = AzureSQLCreateTable()
create_table_task.run(
schema=SCHEMA,
table=ERROR_TABLE,
dtypes={"country": "INT", "sales": "INT"},
if_exists="replace",
)
yield
sql_query_task = AzureSQLDBQuery()
sql_query_task.run(f"DROP TABLE {SCHEMA}.{ERROR_TABLE};")


def test_bcp(TEST_CSV_FILE_PATH, test_table):

bcp_task = BCPTask()

try:
result = bcp_task.run(path=TEST_CSV_FILE_PATH, schema=SCHEMA, table=TABLE)
result = bcp_task.run(
path=TEST_CSV_FILE_PATH,
schema=SCHEMA,
table=TABLE,
error_log_file_path=ERROR_LOG_FILE,
)
except FAIL:
result = False

assert result is not False
os.remove(ERROR_LOG_FILE)


def test_bcp_fail(TEST_CSV_FILE_PATH, test_table):
Expand All @@ -41,3 +65,17 @@ def test_bcp_fail(TEST_CSV_FILE_PATH, test_table):

with pytest.raises(FAIL):
bcp_task.run(path=TEST_CSV_FILE_PATH, schema=SCHEMA, table=FAIL_TABLE)


def test_bcp_log_error(TEST_CSV_FILE_PATH, test_error_table):
bcp_task = BCPTask()
bcp_task.run(
path=TEST_CSV_FILE_PATH,
schema=SCHEMA,
table=ERROR_TABLE,
error_log_file_path=ERROR_LOG_FILE,
)
assert (
os.path.exists(ERROR_LOG_FILE) is True and os.path.getsize(ERROR_LOG_FILE) != 0
)
os.remove(ERROR_LOG_FILE)
12 changes: 11 additions & 1 deletion tests/integration/tasks/test_salesforce.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd
import pytest
from viadot.tasks import SalesforceUpsert
from viadot.tasks import SalesforceUpsert, SalesforceToDF


@pytest.fixture(scope="session")
Expand All @@ -25,3 +25,13 @@ def test_salesforce_upsert(test_df):
sf.run(test_df, table="Contact")
except Exception as exception:
assert False, exception


def test_salesforce_to_df():
sf_to_df = SalesforceToDF(
query="SELECT IsDeleted, FiscalYear FROM Opportunity LIMIT 50"
)
df = sf_to_df.run()

assert isinstance(df, pd.DataFrame)
assert (50, 2) == df.shape
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.4.4"
assert __version__ == "0.4.5"
36 changes: 36 additions & 0 deletions tests/unit/tasks/test_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import pytest

from viadot.tasks import DuckDBCreateTableFromParquet
from viadot.sources.duckdb import DuckDB

TABLE = "test_table"
SCHEMA = "test_schema"
DATABASE_PATH = "test_db_123.duckdb"


@pytest.fixture(scope="module")
def duckdb():
duckdb = DuckDB(credentials=dict(database=DATABASE_PATH))
yield duckdb


def test_create_table_empty_file(duckdb):
path = "empty.parquet"
with open(path, "w"):
pass
duckdb_creds = {f"database": DATABASE_PATH}
task = DuckDBCreateTableFromParquet(credentials=duckdb_creds)
task.run(schema=SCHEMA, table=TABLE, path=path, if_empty="skip")

assert duckdb._check_if_table_exists(TABLE, schema=SCHEMA) == False
os.remove(path)


def test_create_table(duckdb, TEST_PARQUET_FILE_PATH):
duckdb_creds = {f"database": DATABASE_PATH}
task = DuckDBCreateTableFromParquet(credentials=duckdb_creds)
task.run(schema=SCHEMA, table=TABLE, path=TEST_PARQUET_FILE_PATH, if_empty="skip")

assert duckdb._check_if_table_exists(TABLE, schema=SCHEMA)
os.remove(DATABASE_PATH)
2 changes: 1 addition & 1 deletion tests/unit/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
DATABASE_PATH = "test_db_123.duckdb"


@pytest.fixture(scope="session")
@pytest.fixture(scope="module")
def duckdb():
duckdb = DuckDB(credentials=dict(database=DATABASE_PATH))
yield duckdb
Expand Down
Loading

0 comments on commit f35e6df

Please sign in to comment.