Skip to content

Commit

Permalink
Merge pull request #393 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.3 PR
  • Loading branch information
m-paz authored Apr 28, 2022
2 parents 99a4c5b + ebef39b commit 9c7db6d
Show file tree
Hide file tree
Showing 28 changed files with 1,352 additions and 111 deletions.
30 changes: 29 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,39 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [Unreleased]
## [0.4.3] - 2022-04-28
### Added
- Added `adls_file_name` in `SupermetricsToADLS` and `SharepointToADLS` flows
- Added `BigQueryToADLS` flow class which anables extract data from BigQuery.
- Added `Salesforce` source
- Added `SalesforceUpsert` task
- Added `SalesforceBulkUpsert` task
- Added C4C secret handling to `CloudForCustomersReportToADLS` flow (`c4c_credentials_secret` parameter)

### Fixed
- Fixed `get_flow_last_run_date()` incorrectly parsing the date
- Fixed C4C secret handling (tasks now correctly read the secret as the credentials, rather than assuming the secret is a container for credentials for all environments and trying to access specific key inside it). In other words, tasks now assume the secret holds credentials, rather than a dict of the form `{env: credentials, env2: credentials2}`
- Fixed `utils.gen_bulk_insert_query_from_df()` failing with > 1000 rows due to INSERT clause limit by chunking the data into multiple INSERTs
- Fixed `get_flow_last_run_date()` incorrectly parsing the date
- Fixed `MultipleFlows` when one flow is passed and when last flow fails.

## [0.4.2] - 2022-04-08
### Added
- Added `AzureDataLakeRemove` task

### Changed
- Changed name of task file from `prefect` to `prefect_date_range`

### Fixed
- Fixed out of range issue in `prefect_date_range`


## [0.4.1] - 2022-04-07
### Changed
- bumped version


## [0.4.0] - 2022-04-07
### Added
- Added `custom_mail_state_handler` function that sends mail notification using custom smtp server.
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ Shapely==1.8.0
imagehash==4.2.1
visions==0.7.4
sharepy==1.3.0
simple_salesforce==1.11.5
sql-metadata==2.3.0
duckdb==0.3.2
google-cloud==0.34.0
google-auth==2.6.2
sendgrid==6.9.7
pandas-gbq==0.17.4
23 changes: 23 additions & 0 deletions tests/integration/flows/test_bigquery_to_adls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from viadot.flows import BigQueryToADLS
from prefect.tasks.secrets import PrefectSecret


def test_bigquery_to_adls():
credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()

flow_bigquery = BigQueryToADLS(
name="BigQuery to ADLS",
dataset_name="official_empty",
table_name="space",
credentials_key="BIGQUERY_TESTS",
adls_dir_path="raw/tests",
adls_sp_credentials_secret=credentials_secret,
)

result = flow_bigquery.run()
assert result.is_successful()

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])
27 changes: 23 additions & 4 deletions tests/integration/flows/test_multiple_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,42 @@


def test_multiple_flows_working(caplog):
list = [
flow_list = [
["Flow of flows 1 test", "dev"],
["Flow of flows 2 - working", "dev"],
["Flow of flows 3", "dev"],
]
flow = MultipleFlows(name="test", flows_list=list)
flow = MultipleFlows(name="test", flows_list=flow_list)
with caplog.at_level(logging.INFO):
flow.run()
assert "All of the tasks succeeded." in caplog.text


def test_multiple_flows_not_working(caplog):
list = [
flow_list = [
["Flow of flows 1 test", "dev"],
["Flow of flows 2 test - not working", "dev"],
["Flow of flows 3", "dev"],
]
flow = MultipleFlows(name="test", flows_list=list)
flow = MultipleFlows(name="test", flows_list=flow_list)
flow.run()
assert "One of the flows has failed!" in caplog.text


def test_multiple_flows_working_one_flow(caplog):
flow_list = [["Flow of flows 1 test", "dev"]]
flow = MultipleFlows(name="test", flows_list=flow_list)
with caplog.at_level(logging.INFO):
flow.run()
assert "All of the tasks succeeded." in caplog.text


def test_multiple_flows_last_not_working(caplog):
flow_list = [
["Flow of flows 1 test", "dev"],
["Flow of flows 2 - working", "dev"],
["Flow of flows 2 test - not working", "dev"],
]
flow = MultipleFlows(name="test", flows_list=flow_list)
flow.run()
assert "One of the flows has failed!" in caplog.text
33 changes: 33 additions & 0 deletions tests/integration/flows/test_supermetrics_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,36 @@ def test_supermetrics_to_adls(expectation_suite):

task_results = result.result.values()
assert all([task_result.is_successful() for task_result in task_results])


def test_supermetrics_to_adls_file_name(expectation_suite):
flow = SupermetricsToADLS(
"Google Analytics Load Times extract test file name",
ds_id="GA",
ds_segments=[
"R1fbzFNQQ3q_GYvdpRr42w",
"I8lnFFvdSFKc50lP7mBKNA",
"Lg7jR0VWS5OqGPARtGYKrw",
"h8ViuGLfRX-cCL4XKk6yfQ",
"-1",
],
ds_accounts=["8326007", "58338899"],
date_range_type="last_year_inc",
fields=[
{"id": "Date"},
{"id": "segment", "split": "column"},
{"id": "AvgPageLoadTime_calc"},
],
settings={"avoid_sampling": "true"},
order_columns="alphabetic",
max_columns=100,
max_rows=10,
evaluation_parameters=dict(previous_run_row_count=9),
adls_dir_path=adls_dir_path,
expectation_suite=expectation_suite,
adls_file_name="test_file.csv",
parallel=False,
storage=STORAGE,
)
result = flow.run()
assert result.is_successful()
46 changes: 46 additions & 0 deletions tests/integration/tasks/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
import logging
import pandas as pd
from viadot.tasks import BigQueryToDF

logger = logging.getLogger(__name__)
DATASET_NAME = "official_empty"
TABLE_NAME = "space"


def test_bigquery_to_df_success():
bigquery_to_df_task = BigQueryToDF(
dataset_name=DATASET_NAME,
table_name=TABLE_NAME,
date_column_name="date",
credentials_key="BIGQUERY_TESTS",
)
df = bigquery_to_df_task.run()
expectation_columns = ["date", "name", "count", "refresh"]

assert isinstance(df, pd.DataFrame)
assert expectation_columns == list(df.columns)


def test_bigquery_to_df_wrong_table_name(caplog):
bigquery_to_df_task = BigQueryToDF()
with caplog.at_level(logging.WARNING):
bigquery_to_df_task.run(
dataset_name=DATASET_NAME,
table_name="wrong_table_name",
date_column_name="date",
credentials_key="BIGQUERY_TESTS",
)
assert f"Returning empty data frame." in caplog.text


def test_bigquery_to_df_wrong_column_name(caplog):
bigquery_to_df_task = BigQueryToDF(
dataset_name=DATASET_NAME,
table_name=TABLE_NAME,
date_column_name="wrong_column_name",
credentials_key="BIGQUERY_TESTS",
)
with caplog.at_level(logging.WARNING):
bigquery_to_df_task.run()
assert f"'wrong_column_name' column is not recognized." in caplog.text
27 changes: 27 additions & 0 deletions tests/integration/tasks/test_salesforce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pandas as pd
import pytest
from viadot.tasks import SalesforceUpsert


@pytest.fixture(scope="session")
def test_df():
data = {
"Id": ["111"],
"LastName": ["John Tester-External 3"],
"SAPContactId__c": [111],
}
df = pd.DataFrame(data=data)
yield df


def test_salesforce_upsert(test_df):
"""
Id and SAPContactId__c are unique values, you can update only non-unique values for this test.
If the combiantion of Id and SAPContactId__c do not exist, the test will fail.
The Id and SAPContactId__c values '111' needs to be replaced with proper one (that exist in the testing system).
"""
try:
sf = SalesforceUpsert()
sf.run(test_df, table="Contact")
except Exception as exception:
assert False, exception
48 changes: 48 additions & 0 deletions tests/integration/test_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pandas as pd
from viadot.sources import BigQuery


BIGQ = BigQuery(credentials_key="BIGQUERY_TESTS")


def test_list_project():
project = BIGQ.get_project_id()
assert project == "manifest-geode-341308"


def test_list_datasets():
datasets = list(BIGQ.list_datasets())
assert datasets == ["manigeo", "official_empty"]


def test_list_tables():
datasets = BIGQ.list_datasets()
tables = list(BIGQ.list_tables(datasets[0]))
assert tables == ["test_data", "manigeo_tab"]


def test_query_is_df():
query = """
SELECT name, SUM(number) AS total
FROM `bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY name, gender
ORDER BY total DESC
LIMIT 4
"""
df = BIGQ.query_to_df(query)

assert isinstance(df, pd.DataFrame)


def test_query():
query = """
SELECT name, SUM(number) AS total
FROM `bigquery-public-data.usa_names.usa_1910_2013`
GROUP BY name, gender
ORDER BY total DESC
LIMIT 4
"""
df = BIGQ.query_to_df(query)
total_received = df["total"].values

assert total_received == [4924235, 4818746, 4703680, 4280040]
63 changes: 63 additions & 0 deletions tests/integration/test_salesforce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pandas as pd
import pytest
from viadot.sources import Salesforce


@pytest.fixture(scope="session")
def salesforce():
s = Salesforce()
yield s


@pytest.fixture(scope="session")
def test_df_external():
data = {
"Id": ["111"],
"LastName": ["John Tester-External"],
"SAPContactId__c": ["112"],
}
df = pd.DataFrame(data=data)
yield df


def test_upsert_empty(salesforce):
try:
df = pd.DataFrame()
salesforce.upsert(df=df, table="Contact")
except Exception as exception:
assert False, exception


def test_upsert_external_id_correct(salesforce, test_df_external):
try:
salesforce.upsert(
df=test_df_external, table="Contact", external_id="SAPContactId__c"
)
except Exception as exception:
assert False, exception
result = salesforce.download(table="Contact")
exists = list(
filter(lambda contact: contact["LastName"] == "John Tester-External", result)
)
assert exists != None


def test_upsert_external_id_wrong(salesforce, test_df_external):
with pytest.raises(ValueError):
salesforce.upsert(df=test_df_external, table="Contact", external_id="SAPId")


def test_download_no_query(salesforce):
ordered_dict = salesforce.download(table="Account")
assert len(ordered_dict) > 0


def test_download_with_query(salesforce):
query = "SELECT Id, Name FROM Account"
ordered_dict = salesforce.download(query=query)
assert len(ordered_dict) > 0


def test_to_df(salesforce):
df = salesforce.to_df(table="Account")
assert df.empty == False
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.4.2"
assert __version__ == "0.4.3"
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.2"
__version__ = "0.4.3"
4 changes: 4 additions & 0 deletions viadot/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ class APIError(Exception):

class CredentialError(Exception):
pass


class DBDataAccessError(Exception):
pass
1 change: 1 addition & 0 deletions viadot/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .sharepoint_to_adls import SharepointToADLS
from .cloud_for_customers_report_to_adls import CloudForCustomersReportToADLS
from .aselite_to_adls import ASELiteToADLS
from .bigquery_to_adls import BigQueryToADLS

try:
from .sap_to_duckdb import SAPToDuckDB
Expand Down
Loading

0 comments on commit 9c7db6d

Please sign in to comment.