Skip to content

Commit

Permalink
Merge pull request #524 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.9 PR
  • Loading branch information
m-paz authored Sep 27, 2022
2 parents 361e998 + 782f873 commit 7c0e534
Show file tree
Hide file tree
Showing 26 changed files with 1,479 additions and 18 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,6 @@ desktop.ini
sap_netweaver_rfc

# Databricks-connect

.databricks-connect

13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]


## [0.4.9] - 2022-09-27
### Added
- Added new column named `_viadot_downloaded_at_utc` in genesys files with the datetime when it is created.
- Added sftp source class `SftpConnector`
- Added sftp tasks `SftpToDF` and `SftpList`
- Added sftp flows `SftpToAzureSQL` and `SftpToADLS`
- Added new source file `mindful` to connect with mindful API.
- Added new task file `mindful` to be called by the Mindful Flow.
- Added new flow file `mindful_to_adls` to upload data from Mindful API tp ADLS.
- Added `recursive` parameter to `AzureDataLakeList` task

## [0.4.8] - 2022-09-06
### Added
- Added `protobuf` library to requirements

## [0.4.7] - 2022-09-06
### Added
- Added new flow - `SQLServerTransform` and new task `SQLServerQuery` to run queries on SQLServer
Expand Down
5 changes: 0 additions & 5 deletions tests/integration/flows/test_sharepoint_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,3 @@ def test_sharepoint_to_adls_run_flow_overwrite_false():
assert result.is_failed()
os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.csv")
os.remove("test_sharepoint_to_adls_run_flow_overwrite_false.json")

rm = AzureDataLakeRemove(
path=ADLS_DIR_PATH + ADLS_FILE_NAME, vault_name="azuwevelcrkeyv001s"
)
rm.run(sp_credentials_secret=CREDENTIALS_SECRET)
36 changes: 36 additions & 0 deletions tests/integration/tasks/test_azure_data_lake.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import uuid
import pytest
from unittest import mock


from viadot.sources import AzureDataLake
from viadot.tasks import (
Expand All @@ -24,6 +26,23 @@
FILE_NAME_PARQUET = f"test_file_{uuid_4}.parquet"
ADLS_PATH_PARQUET = f"raw/supermetrics/{FILE_NAME_PARQUET}"

ADLS_TEST_PATHS = [
"raw/tests/alds_test_new_fnc/2020/02/01/final_df2.csv",
"raw/tests/alds_test_new_fnc/2020/02/01/test_new_fnc.csv",
"raw/tests/alds_test_new_fnc/2020/02/02/final_df2.csv",
"raw/tests/alds_test_new_fnc/2020/02/02/test_new_fnc.csv",
"raw/tests/alds_test_new_fnc/2021/12/01/final_df2.csv",
"raw/tests/alds_test_new_fnc/2021/12/01/test_new_fnc.csv",
"raw/tests/alds_test_new_fnc/2022/06/21/final_df2.csv",
"raw/tests/alds_test_new_fnc/2022/06/21/test_new_fnc.csv",
"raw/tests/alds_test_new_fnc/2022/08/12/final_df2.csv",
"raw/tests/alds_test_new_fnc/2022/08/12/test_new_fnc.csv",
"raw/tests/alds_test_new_fnc/test_folder/final_df2.csv",
"raw/tests/alds_test_new_fnc/test_folder/test_new_fnc.csv",
"raw/tests/alds_test_new_fnc/test_folder_2/final_df2.csv",
"raw/tests/alds_test_new_fnc/test_folder_2/test_new_fnc.csv",
]


@pytest.mark.dependency()
def test_azure_data_lake_upload(TEST_CSV_FILE_PATH):
Expand Down Expand Up @@ -74,6 +93,23 @@ def test_azure_data_lake_list():
assert ADLS_PATH in files


def test_azure_data_lake_list_recursive():
list_task = AzureDataLakeList()
files = list_task.run(path="raw/tests/alds_test_new_fnc/", recursive=True)
assert isinstance(files, list)


def test_azure_data_lake_list_paths():

with mock.patch.object(
AzureDataLakeList, "run", return_value=ADLS_TEST_PATHS
) as mock_method:

list_task = AzureDataLakeList(path="raw/tests/alds_test_new_fnc/")
files = list_task.run(recursive=True)
assert files == ADLS_TEST_PATHS


@pytest.mark.dependency(depends=["test_azure_data_lake_upload"])
def test_azure_data_lake_remove():
file = AzureDataLake(ADLS_PATH)
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/tasks/test_mindful.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import pytest
from unittest import mock
from viadot.tasks import MindfulToCSV


class MockClass:
status_code = 200
content = b'[{"id":7277599,"survey_id":505,"phone_number":"","survey_type":"inbound"},{"id":7277294,"survey_id":504,"phone_number":"","survey_type":"web"}]'


@pytest.mark.init
def test_instance_mindful():
mf = MindfulToCSV()
assert isinstance(mf, MindfulToCSV)


@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass)
@pytest.mark.run
def test_mindful_run(mock_interactions):
mf = MindfulToCSV()
mf.run()
mock_interactions.call_count == 2
assert os.path.exists("interactions.csv")
os.remove("interactions.csv")
assert os.path.exists("responses.csv")
os.remove("responses.csv")
85 changes: 85 additions & 0 deletions tests/integration/test_mindful.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
import pytest
from unittest import mock
from viadot.sources import Mindful
from viadot.config import local_config

credentials_mindful = local_config["MINDFUL"]


class MockClass:
status_code = 200
content = b'[{"id":7277599,"survey_id":505,"phone_number":"","survey_type":"inbound"},{"id":7277294,"survey_id":504,"phone_number":"","survey_type":"web"}]'

def json():
test = [
{
"id": 7277599,
"survey_id": 505,
"phone_number": "",
"survey_type": "inbound",
},
{"id": 7277294, "survey_id": 504, "phone_number": "", "survey_type": "web"},
]
return test


@pytest.mark.init
def test_instance_mindful():
mf = Mindful(credentials_mindful=credentials_mindful)
assert isinstance(mf, Mindful)


@pytest.mark.init
def test_credentials_instance():
mf = Mindful(credentials_mindful=credentials_mindful)
assert mf.credentials_mindful != None and isinstance(mf.credentials_mindful, dict)


@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass)
@pytest.mark.connect
def test_mindful_api_response(mock_connection):
mf = Mindful(credentials_mindful=credentials_mindful)
mf.get_interactions_list()
mf.get_responses_list()
mock_connection.call_count == 2


@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass)
@pytest.mark.connect
def test_mindful_api_response2(mock_api_response):
mf = Mindful(credentials_mindful=credentials_mindful)
response = mf.get_interactions_list()

assert response.status_code == 200 and isinstance(response.json(), list)
assert mf.endpoint == "interactions"


@mock.patch("viadot.sources.mindful.handle_api_response", return_value=MockClass)
@pytest.mark.connect
def test_mindful_api_response3(mock_api_response):
mf = Mindful(credentials_mindful=credentials_mindful)
response = mf.get_responses_list()

assert response.status_code == 200 and isinstance(response.json(), list)
assert mf.endpoint == "responses"


@mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass)
@pytest.mark.save
def test_mindful_interactions(mock_connection):
mf = Mindful(credentials_mindful=credentials_mindful)
response = mf.get_interactions_list()
mf.response_to_file(response)
assert os.path.exists("interactions.csv")
os.remove("interactions.csv")


@mock.patch("viadot.sources.Mindful._mindful_api_response", return_value=MockClass)
@pytest.mark.save
def test_mindful_responses(mock_connection):
mf = Mindful(credentials_mindful=credentials_mindful)
response = mf.get_responses_list()
mf.response_to_file(response)
assert os.path.exists("responses.csv")
os.remove("responses.csv")
4 changes: 2 additions & 2 deletions tests/integration/test_outlook.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from viadot.config import local_config
from viadot.sources import Outlook
import pandas as pd


def test_outlook_to_df():
Expand All @@ -10,5 +11,4 @@ def test_outlook_to_df():
end_date="2022-04-29",
)
df = outlook.to_df()
assert df.shape[1] == 10
assert df.empty == False
assert isinstance(df, pd.DataFrame)
123 changes: 123 additions & 0 deletions tests/integration/test_sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from viadot.sources.sftp import SftpConnector
from viadot.tasks.sftp import SftpToDF, SftpList
import pytest
import pandas as pd
from unittest import mock
from pytest import fixture, raises
import io


@pytest.fixture
def tmp_df():
data = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=data)
return df


@pytest.fixture
def list_of_paths():

list_of_paths = [
"Country__Context30##exported.tsv",
"Country__Context31##exported.tsv",
"Country__Context32##exported.tsv",
"Country__Context33##exported.tsv",
"Country__Context4##exported.tsv",
"Country__Context6##exported.tsv",
"Country__Context7##exported.tsv",
"Country__Context8##exported.tsv",
"Local_Products.csv",
"Products Checkup.tsv",
"Products.tsv",
"RewardTest.csv",
]
return list_of_paths


@pytest.fixture
def df_buf():
s_buf = io.StringIO()

data = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=data)
df.to_csv(s_buf)
return s_buf


def test_create_sftp_instance():
s = SftpConnector(
credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4}
)
assert s


def test_connection_sftp(tmp_df):
with mock.patch("viadot.sources.sftp.SftpConnector.to_df") as mock_method:
mock_method.return_value = tmp_df
s = SftpConnector(
credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4}
)

final_df = s.to_df()
assert isinstance(final_df, pd.DataFrame)


def test_getfo_file(df_buf):
with mock.patch("viadot.sources.sftp.SftpConnector.getfo_file") as mock_method:
mock_method.return_value = df_buf
s = SftpConnector(
credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4}
)

buffer_df = s.getfo_file()
buffer_df.seek(0)
df = pd.read_csv(buffer_df)
assert isinstance(df, pd.DataFrame)


def test_ls_sftp(list_of_paths):
with mock.patch("viadot.sources.sftp.SftpConnector.list_directory") as mock_method:
mock_method.return_value = list_of_paths
s = SftpConnector(
credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4}
)

paths = s.list_directory()

assert isinstance(paths, list)


def test_get_exported_files(list_of_paths):
with mock.patch.object(
SftpConnector, "get_exported_files", return_value=list_of_paths
) as mock_method:

s = SftpConnector(
credentials_sftp={"HOSTNAME": 1, "USERNAME": 2, "PASSWORD": 3, "PORT": 4}
)
filtered_paths = s.get_exported_files()

assert isinstance(filtered_paths, list)


def test_task_sftptodf(tmp_df):
task = SftpToDF()
with mock.patch.object(SftpToDF, "run", return_value=tmp_df) as mock_method:
df = task.run()

assert isinstance(df, pd.DataFrame)


def test_task_sftplist(list_of_paths):
task = SftpList()
with mock.patch.object(SftpList, "run", return_value=list_of_paths) as mock_method:
list_directory = task.run()

assert isinstance(list_directory, list)


def test_example():
with mock.patch("viadot.sources.sftp.SftpConnector.to_df") as mock_method:
mock_method.return_value = tmp_df
t = SftpToDF()
t.run()
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.4.8"
assert __version__ == "0.4.9"
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.8"
__version__ = "0.4.9"
3 changes: 3 additions & 0 deletions viadot/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@
from .epicor_to_duckdb import EpicorOrdersToDuckDB
from .sql_server_transform import SQLServerTransform
from .sql_server_to_duckdb import SQLServerToDuckDB

from .sftp_operations import SftpToAzureSQL, SftpToADLS
from .mindful_to_adls import MindfulToADLS
2 changes: 1 addition & 1 deletion viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(
tags (List[str], optional): Flow tags to use, eg. to control flow concurrency. Defaults to ["promotion"].
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
"""

adls_path = adls_path.strip("/")
# Read parquet
if adls_path.split(".")[-1] in ["csv", "parquet"]:
Expand Down Expand Up @@ -299,7 +300,6 @@ def gen_flow(self) -> Flow:
df_reorder.set_upstream(lake_to_df_task, flow=self)
df_to_csv.set_upstream(df_reorder, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(promote_to_conformed_task, flow=self)
bulk_insert_task.set_upstream(create_table_task, flow=self)
Loading

0 comments on commit 7c0e534

Please sign in to comment.