Skip to content

Commit

Permalink
Merge pull request #508 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.7 PR
  • Loading branch information
Rafalz13 authored Sep 6, 2022
2 parents 6ecebec + 1d07f6d commit 520bb67
Show file tree
Hide file tree
Showing 27 changed files with 1,852 additions and 190 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,3 @@ sap_netweaver_rfc

# Databricks-connect
.databricks-connect

19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.4.7] - 2022-09-06
### Added
- Added new flow - `SQLServerTransform` and new task `SQLServerQuery` to run queries on SQLServer
- Added `duckdb_query` parameter to `DuckDBToSQLServer` flow to enable option to create table
using outputs of SQL queries
- Added handling empty DF in `set_new_kv()` task
- Added `update_kv` and `filter_column` params to `SAPRFCToADLS` and `SAPToDuckDB` flows and added `set_new_kv()` task
in `task_utils`
- Added Genesys API source `Genesys`
- Added tasks `GenesysToCSV` and `GenesysToDF`
- Added flows `GenesysToADLS` and `GenesysReportToADLS`
- Added `query` parameter to `PrefectLogs` flow

### Changed
- Updated requirements.txt
- Changed 'handle_api_response()' method by adding more requests method also added contex menager


## [0.4.6] - 2022-07-21
### Added
- Added `rfc_character_limit` parameter in `SAPRFCToDF` task, `SAPRFC` source, `SAPRFCToADLS` and `SAPToDuckDB` flows
Expand Down Expand Up @@ -48,7 +66,6 @@ DF to string before adding metadata
`SQLServerToDuckDB` flows to check if output Parquet is empty and handle it properly.
- Added `check_if_empty_file()` and `handle_if_empty_file()` in `utils.py`


## [0.4.4] - 2022-06-09
### Added
- Added new connector - Outlook. Created `Outlook` source, `OutlookToDF` task and `OutlookToADLS` flow.
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ paramiko==2.11.0
sshtunnel==0.4.0
databricks-connect==10.4.0b0
O365==2.0.18.1
aiohttp==3.8.1
aiolimiter==1.0.0
57 changes: 3 additions & 54 deletions tests/integration/flows/test_duckdb_to_sql_server.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,27 @@
import json
import logging
import os
from unittest import mock

import pytest
from prefect.tasks.secrets import PrefectSecret

from viadot.flows import DuckDBToSQLServer
from viadot.sources import DuckDB
from viadot.tasks.azure_key_vault import AzureKeyVaultSecret

TABLE = "test_table"
SCHEMA = "test_schema"
DUCKDB_SCHEMA = "test_schema"
SQL_SERVER_SCHEMA = "sandbox"
TABLE_MULTIPLE_PARQUETS = "test_multiple_parquets"
DATABASE_PATH = "test_db_123.duckdb"


@pytest.fixture(scope="session")
def duckdb():
duckdb = DuckDB(credentials=dict(database=DATABASE_PATH))
yield duckdb
os.remove(DATABASE_PATH)


def test__check_if_schema_exists(duckdb):
duckdb.run(f"CREATE SCHEMA {SCHEMA}")
assert not duckdb._check_if_schema_exists(SCHEMA)


def test_create_table_from_parquet(duckdb, TEST_PARQUET_FILE_PATH, caplog):
with caplog.at_level(logging.INFO):
duckdb.create_table_from_parquet(
schema=SCHEMA, table=TABLE, path=TEST_PARQUET_FILE_PATH
)

assert "created successfully" in caplog.text


def test_duckdb_sql_server_init():

flow = DuckDBToSQLServer("test_duckdb_init")
assert flow


def test_duckdb_sql_server_flow():

credentials_secret = PrefectSecret(
"AZURE_DEFAULT_SQLDB_SERVICE_PRINCIPAL_SECRET"
).run()
vault_name = PrefectSecret("AZURE_DEFAULT_KEYVAULT").run()
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=credentials_secret, vault_name=vault_name
)

flow = DuckDBToSQLServer(
"test_duckdb_flow_run",
duckdb_credentials=dict(database=DATABASE_PATH),
sql_server_credentials=json.loads(credentials_str),
sql_server_schema="sandbox",
sql_server_table=TABLE,
duckdb_schema=SCHEMA,
duckdb_table=TABLE,
)
r = flow.run()
assert r.is_successful()


def test_duckdb_sql_server_flow_mocked():
with mock.patch.object(DuckDBToSQLServer, "run", return_value=True) as mock_method:
flow = DuckDBToSQLServer(
"test_duckdb_flow_run",
sql_server_table=TABLE,
duckdb_schema=SCHEMA,
duckdb_schema=DUCKDB_SCHEMA,
duckdb_table=TABLE,
)
flow.run()
Expand Down
157 changes: 132 additions & 25 deletions tests/integration/flows/test_prefect_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,110 @@ def expectation_suite():
expectation_suite = {
"data": {
"project": [
{"id": "1d3c5246-61e5-4aff-a07b-4b74959a46e4", "name": "dev_cdl"},
{"id": "e2a926e2-ec86-4900-a24e-330a44b6cb19", "name": "cic_test"},
{"id": "667d5026-2f01-452a-b6fe-5437ca833066", "name": "cic_dev"},
],
"flow": [
{
"project_id": "1d3c5246-61e5-4aff-a07b-4b74959a46e4",
"name": "3-operational korea billing_data_to_sql",
"version": 20,
"flow_runs": [
{
"id": "6f413380-e228-4d64-8e1b-41c6cd434a2a",
"name": "Installer Engagement",
"flows": [],
},
{
"id": "223a8acf-4cf0-4cf7-ae1f-b66f78e28813",
"name": "oso_reporting",
"flows": [
{
"id": "f94a55f6-d1d7-4be5-b2a4-7600a820a256",
"end_time": "2022-07-14T12:15:11.011063+00:00",
"start_time": "2022-07-14T12:14:39.282656+00:00",
"state": "Cancelled",
"scheduled_start_time": "2022-07-14T12:14:21.258614+00:00",
"created_by_user_id": "c99c87c4-f680-496d-90fc-05dfaf08154b",
"id": "b13dcc6d-b621-4acd-88be-2cf28715a7c5",
"name": "1-raw dakvenster_order_prod_info extract",
"version": 3,
"flow_runs": [
{
"id": "d1d76bbd-b494-4cf2-bf59-3dfccf520039",
"scheduled_start_time": "2022-09-06T09:19:47.937928+00:00",
"start_time": "2022-09-06T09:20:06.944586+00:00",
"end_time": "2022-09-06T09:20:39.386856+00:00",
"state": "Cancelled",
"created_by_user_id": "5878be75-ee66-42f4-8179-997450063ea4",
}
],
},
{
"id": "fdfd2233-a8f0-4bb3-946f-060f91a655df",
"end_time": "2022-07-14T11:51:39.635059+00:00",
"start_time": "2022-07-14T11:50:52.362002+00:00",
"state": "Failed",
"scheduled_start_time": "2022-07-14T11:50:30.687881+00:00",
"created_by_user_id": "c99c87c4-f680-496d-90fc-05dfaf08154b",
"id": "14b1a89e-f902-48a1-b6df-43cacdb91e1a",
"name": "1-raw dakvenster_order_prod_info extract",
"version": 2,
"flow_runs": [],
},
{
"id": "a1eace09-38b4-46bf-bacf-a5d29bdbb633",
"name": "1-raw dakvenster_order_prod_info extract",
"version": 1,
"flow_runs": [],
},
],
}
],
},
{
"id": "844372db-2d22-495d-a343-b8f8cbcf8963",
"name": "sap",
"flows": [],
},
{
"id": "512d0f29-2ceb-4177-b7d8-c5908da666ef",
"name": "integrations",
"flows": [],
},
{
"id": "1d3c5246-61e5-4aff-a07b-4b74959a46e4",
"name": "dev_cdl",
"flows": [],
},
{
"id": "e2a926e2-ec86-4900-a24e-330a44b6cb19",
"name": "cic_test",
"flows": [],
},
{
"id": "667d5026-2f01-452a-b6fe-5437ca833066",
"name": "cic_dev",
"flows": [],
},
{
"id": "eac9b6d4-725a-4354-bf8f-25e7828ea2d8",
"name": "Admin",
"flows": [],
},
{
"id": "52217a3c-f42a-4448-afc4-2a325415b8e8",
"name": "test",
"flows": [],
},
{
"id": "516b3fe9-1c26-47a4-b797-e6a77bee390c",
"name": "cic",
"flows": [],
},
{
"id": "dd2ccc32-2163-4f55-a746-1dbc6b28aaa4",
"name": "Hyperlocal",
"flows": [],
},
{
"id": "7131c357-bad7-43cf-aabc-87f9cf045384",
"name": "Installer Segmentation",
"flows": [],
},
{
"id": "94a8b8bf-14fa-4b64-ab78-af1d332dedd4",
"name": "Marketing KPI",
"flows": [],
},
{
"id": "ebe0e5aa-4add-4440-8c1a-6f9c74eb29fe",
"name": "dev",
"flows": [],
},
{
"id": "b5d924b0-4116-479f-a8f5-e28f9a9051ca",
"name": "velux",
"flows": [],
},
]
}
}

Expand All @@ -46,7 +121,39 @@ def test_prefect_logs(expectation_suite):

flow = PrefectLogs(
name="Extract prefect data test",
scheduled_start_time="2022-07-10",
query="""
{
project {
id
name
flows (
where : {name: {_eq: "1-raw google_analytics_oso_sps_gb extract"}}
) {
id
name
version
flow_runs(
order_by: {end_time: desc}
where: {_and:
[
{scheduled_start_time:{ %s: "%s" }},
{state: {_neq: "Scheduled"}}
]
}
)
{
id
scheduled_start_time
start_time
end_time
state
created_by_user_id
}
}
}
}
""",
scheduled_start_time="2022-09-05",
filter_type="_gte",
local_file_path=f"prefect_extract_logs.parquet",
adls_path=f"raw/supermetrics/mp/prefect_extract_logs.parquet",
Expand Down
30 changes: 30 additions & 0 deletions tests/integration/flows/test_sql_server_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest
from viadot.flows.sql_server_transform import SQLServerTransform
from viadot.tasks.sql_server import SQLServerQuery

SCHEMA = "sandbox"
TABLE = "test_sql_server"


def test_sql_server_transform():
query_task = SQLServerQuery("AZURE_SQL")
query_task.run(f"DROP TABLE IF EXISTS {SCHEMA}.{TABLE}")
flow = SQLServerTransform(
name="test flow sql transform",
config_key="AZURE_SQL",
query=f"CREATE TABLE {SCHEMA}.{TABLE} (Id INT, Name VARCHAR (10))",
)
result = flow.run()
assert result.is_successful()
query_task.run(f"DROP TABLE {SCHEMA}.{TABLE}")


def test_sql_server_transfor_fail():
flow = SQLServerTransform(
name="test flow sql transform",
config_key="AZURE_SQL",
query=f"CREATE TABLE {SCHEMA}.{TABLE}",
)

result = flow.run()
assert result.is_failed()
1 change: 1 addition & 0 deletions tests/integration/tasks/test_bcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_bcp_log_error(TEST_CSV_FILE_PATH, test_error_table):
schema=SCHEMA,
table=ERROR_TABLE,
error_log_file_path=ERROR_LOG_FILE,
on_error="skip",
)
assert (
os.path.exists(ERROR_LOG_FILE) is True and os.path.getsize(ERROR_LOG_FILE) != 0
Expand Down
Loading

0 comments on commit 520bb67

Please sign in to comment.