Skip to content

Commit

Permalink
Merge pull request #425 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.4.4 PR
  • Loading branch information
Rafalz13 authored Jun 9, 2022
2 parents 9c7db6d + 8718803 commit 7d8f8b0
Show file tree
Hide file tree
Showing 54 changed files with 2,350 additions and 45 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,8 @@ desktop.ini
.viminfo

# SAP RFC lib
sap_netweaver_rfc
sap_netweaver_rfc

# Databricks-connect
.databricks-connect

21 changes: 20 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,26 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [0.4.4] - 2022-06-09
### Added

- Added new connector - Outlook. Created `Outlook` source, `OutlookToDF` task and `OutlookToADLS` flow.
- Added new connector - Epicor. Created `Epicor` source, `EpicorToDF` task and `EpicorToDuckDB` flow.
- Enabled Databricks Connect in the image. To enable, [follow this guide](./README.md#executing-spark-jobs)
- Added `MySQL` source and `MySqlToADLS` flow
- Added `SQLServerToDF` task
- Added `SQLServerToDuckDB` flow which downloads data from SQLServer table, loads it to parquet file and then uplads it do DuckDB
- Added complete proxy set up in `SAPRFC` example (`viadot/examples/sap_rfc`)

### Changed
- Changed default name for the Prefect secret holding the name of the Azure KV secret storing Sendgrid credentials


## [0.4.3] - 2022-04-28
### Added
- Added `func` parameter to `SAPRFC`
- Added `SAPRFCToADLS` flow which downloads data from SAP Database to to a pandas DataFrame, exports df to csv and uploads it to Azure Data Lake.
- Added `adls_file_name` in `SupermetricsToADLS` and `SharepointToADLS` flows
- Added `BigQueryToADLS` flow class which anables extract data from BigQuery.
- Added `Salesforce` source
Expand All @@ -21,6 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed `get_flow_last_run_date()` incorrectly parsing the date
- Fixed `MultipleFlows` when one flow is passed and when last flow fails.


## [0.4.2] - 2022-04-08
### Added
- Added `AzureDataLakeRemove` task
Expand All @@ -39,7 +58,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [0.4.0] - 2022-04-07
### Added
- Added `custom_mail_state_handler` function that sends mail notification using custom smtp server.
- Added `custom_mail_state_handler` task that sends email notification using a custom SMTP server.
- Added new function `df_clean_column` that cleans data frame columns from special characters
- Added `df_clean_column` util task that removes special characters from a pandas DataFrame
- Added `MultipleFlows` flow class which enables running multiple flows in a given order.
Expand Down
34 changes: 32 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ Install the library in development mode (repeat for the `viadot_jupyter_lab` con
docker exec -it viadot_testing pip install -e . --user
```


## Running tests

To run tests, log into the container and run pytest:
Expand All @@ -101,6 +100,36 @@ FLOW_NAME=hello_world; python -m viadot.examples.$FLOW_NAME

However, when developing, the easiest way is to use the provided Jupyter Lab container available in the browser at `http://localhost:9000/`.

## Executing Spark jobs
### Setting up
To begin using Spark, you must first declare the environmental variables as follows:
```
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_API_TOKEN = os.getenv("DATABRICKS_API_TOKEN")
DATABRICKS_ORG_ID = os.getenv("DATABRICKS_ORG_ID")
DATABRICKS_PORT = os.getenv("DATABRICKS_PORT")
DATABRICKS_CLUSTER_ID = os.getenv("DATABRICKS_CLUSTER_ID")
```

Alternatively, you can also create a file called `.databricks-connect` in the root directory of viadot and add the required variables there. It should follow the following format:
```
{
"host": "",
"token": "",
"cluster_id": "",
"org_id": "",
"port": ""
}
```
To retrieve the values, follow step 2 in this [link](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect)

### Executing Spark functions
To begin using Spark, you must first create a Spark Session: `spark = SparkSession.builder.appName('session_name').getOrCreate()`. `spark` will be used to access all the Spark methods. Here is a list of commonly used Spark methods (WIP):
* `spark.createDataFrame(df)`: Create a Spark DataFrame from a Pandas DataFrame
* `sparkdf.write.saveAsTable("schema.table")`: Takes a Spark DataFrame and saves it as a table in Databricks.
* Ensure to use the correct schema, as it should be created and specified by the administrator
* `table = spark.sql("select * from schema.table")`: example of a simple query ran through Python


## How to contribute

Expand Down Expand Up @@ -147,6 +176,7 @@ git merge <branch_to_merge>

Please follow the standards and best practices used within the library (eg. when adding tasks, see how other tasks are constructed, etc.). For any questions, please reach out to us here on GitHub.


### Style guidelines
- the code should be formatted with Black using default settings (easiest way is to use the VSCode extension)
- commit messages should:
Expand All @@ -163,4 +193,4 @@ pip install black
2. Go to the settings - gear icon in the bottom left corner and select `Settings` or type "Ctrl" + ",".
3. Find the `Format On Save` setting - check the box.
4. Find the `Python Formatting Provider` and select "black" in the drop-down list.
5. Your code should auto format on save now.
5. Your code should auto format on save now.
35 changes: 19 additions & 16 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
FROM prefecthq/prefect:0.15.11-python3.8


# Add user
RUN useradd --create-home viadot && \
chown -R viadot /home/viadot && \
Expand All @@ -11,20 +9,18 @@ RUN useradd --create-home viadot && \
RUN groupadd docker && \
usermod -aG docker viadot


# Release File Error
# https://stackoverflow.com/questions/63526272/release-file-is-not-valid-yet-docker
RUN echo "Acquire::Check-Valid-Until \"false\";\nAcquire::Check-Date \"false\";" | cat > /etc/apt/apt.conf.d/10no--check-valid-until


# System packages
RUN apt update -q && yes | apt install -q vim unixodbc-dev build-essential \
curl python3-dev libboost-all-dev libpq-dev graphviz python3-gi sudo git
curl python3-dev libboost-all-dev libpq-dev graphviz python3-gi sudo git software-properties-common
RUN pip install --upgrade cffi

RUN curl http://archive.ubuntu.com/ubuntu/pool/main/g/glibc/multiarch-support_2.27-3ubuntu1_amd64.deb \
-o multiarch-support_2.27-3ubuntu1_amd64.deb && \
apt install ./multiarch-support_2.27-3ubuntu1_amd64.deb
apt install -q ./multiarch-support_2.27-3ubuntu1_amd64.deb

# Fix for old SQL Servers still using TLS < 1.2
RUN chmod +rwx /usr/lib/ssl/openssl.cnf && \
Expand All @@ -33,22 +29,20 @@ RUN chmod +rwx /usr/lib/ssl/openssl.cnf && \
# ODBC -- make sure to pin driver version as it's reflected in odbcinst.ini
RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
apt update && \
apt install libsqliteodbc && \
ACCEPT_EULA=Y apt install -y msodbcsql17=17.8.1.1-1 && \
ACCEPT_EULA=Y apt install -y mssql-tools && \
apt update -q && \
apt install -q libsqliteodbc && \
ACCEPT_EULA=Y apt install -q -y msodbcsql17=17.8.1.1-1 && \
ACCEPT_EULA=Y apt install -q -y mssql-tools && \
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc

COPY docker/odbcinst.ini /etc


# Python env

# This one's needed for the SAP RFC connector.
# It must be installed here as the SAP package does not define its dependencies,
# so `pip install pyrfc` breaks if all deps are not already present.
RUN pip install cython==0.29.24

# Python env
WORKDIR /code
COPY requirements.txt /code/
RUN pip install --upgrade pip
Expand All @@ -57,16 +51,25 @@ RUN pip install -r requirements.txt
COPY . .
RUN pip install .

RUN rm -rf /code
## Install Java 11
RUN curl https://adoptopenjdk.jfrog.io/adoptopenjdk/api/gpg/key/public | apt-key add - && \
add-apt-repository --yes https://adoptopenjdk.jfrog.io/adoptopenjdk/deb/ && \
apt update -q && \
apt install -q adoptopenjdk-11-hotspot -y && \
find /usr/bin/java -type d -exec chmod 777 {} \;

### Export env variable
ENV SPARK_HOME /usr/local/lib/python3.8/site-packages/pyspark
RUN export SPARK_HOME

RUN rm -rf /code

# Workdir
ENV USER viadot
ENV HOME="/home/$USER"

WORKDIR ${HOME}

USER ${USER}



EXPOSE 8000
8 changes: 7 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ sql-metadata==2.3.0
duckdb==0.3.2
google-auth==2.6.2
sendgrid==6.9.7
pandas-gbq==0.17.4
pandas-gbq==0.17.4
pydantic==1.9.0
PyMySQL==1.0.2
paramiko==2.11.0
sshtunnel==0.4.0
databricks-connect==10.4.0b0
O365==2.0.18.1
34 changes: 34 additions & 0 deletions tests/integration/flows/test_adls_gen1_to_azure_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from viadot.flows import ADLSGen1ToAzureSQL
from unittest import mock


def test_adls_gen1_to_azure_sql_new_init(
TEST_CSV_FILE_BLOB_PATH, TEST_PARQUET_FILE_PATH
):
instance = ADLSGen1ToAzureSQL(
name="test_adls_gen1_azure_sql_flow",
path=TEST_PARQUET_FILE_PATH,
blob_path=TEST_CSV_FILE_BLOB_PATH,
schema="sandbox",
table="test_bcp",
dtypes={"country": "VARCHAR(25)", "sales": "INT"},
if_exists="replace",
)
assert instance


def test_adls_gen1_to_azure_sql_new_mock(
TEST_CSV_FILE_BLOB_PATH, TEST_PARQUET_FILE_PATH
):
with mock.patch.object(ADLSGen1ToAzureSQL, "run", return_value=True) as mock_method:
instance = ADLSGen1ToAzureSQL(
name="test_adls_gen1_azure_sql_flow",
path=TEST_PARQUET_FILE_PATH,
blob_path=TEST_CSV_FILE_BLOB_PATH,
schema="sandbox",
table="test_bcp",
dtypes={"country": "VARCHAR(25)", "sales": "INT"},
if_exists="replace",
)
instance.run()
mock_method.assert_called_with()
69 changes: 69 additions & 0 deletions tests/integration/flows/test_adls_gen1_to_azure_sql_new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
import pytest
import pandas as pd
from unittest import mock
from viadot.flows import ADLSGen1ToAzureSQLNew


d = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=d)
SCHEMA = "sandbox"
TABLE = "test_bcp"


def test_adls_gen1_to_azure_sql_new_init_args():

flow = ADLSGen1ToAzureSQLNew(
name="test_adls_gen1_gen2_flow",
gen1_path="test_file_1.csv",
gen2_path="test_file_2.csv",
schema=SCHEMA,
table=TABLE,
dtypes={"country": "INT", "sales": "INT"},
if_exists="replace",
)

assert flow


def test_adls_gen1_to_azure_sql_new_mock():
with mock.patch.object(
ADLSGen1ToAzureSQLNew, "run", return_value=True
) as mock_method:
instance = ADLSGen1ToAzureSQLNew(
name="test_adls_gen1_gen2_flow",
gen1_path="folder1/example_file.csv",
gen2_path="folder2/example_file.csv",
schema="sandbox",
table="test_bcp",
dtypes={"country": "VARCHAR(25)", "sales": "INT"},
if_exists="replace",
)
instance.run()
mock_method.assert_called_with()


def test_adls_gen1_to_azure_sql_new_flow_run_mock():

d = {"country": [1, 2], "sales": [3, 4]}
df = pd.DataFrame(data=d)

with mock.patch(
"viadot.flows.adls_gen1_to_azure_sql_new.gen1_to_df_task.bind"
) as gen1_to_df_task_mock_bind_method_mock:
gen1_to_df_task_mock_bind_method_mock.return_value = df

flow = ADLSGen1ToAzureSQLNew(
name="test_adls_g1g2",
gen1_path="example_path",
gen2_path="raw/test/test.csv",
dtypes={"country": "VARCHAR(25)", "sales": "INT"},
if_exists="replace",
table="test",
schema="sandbox",
)

result = flow.run()

assert result.is_successful()
os.remove("test_adls_g1g2.csv")
25 changes: 25 additions & 0 deletions tests/integration/flows/test_adls_gen1_to_gen2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from viadot.flows import ADLSGen1ToGen2
from unittest import mock


def test_adls_gen1_gen2_init(TEST_PARQUET_FILE_PATH_2):

flow = ADLSGen1ToGen2(
"test_adls_gen1_gen2_init",
gen1_path=TEST_PARQUET_FILE_PATH_2,
gen2_path=TEST_PARQUET_FILE_PATH_2,
)
assert flow


def test_adls_gen1_to_azure_sql_new_mock(
TEST_PARQUET_FILE_PATH, TEST_PARQUET_FILE_PATH_2
):
with mock.patch.object(ADLSGen1ToGen2, "run", return_value=True) as mock_method:
instance = ADLSGen1ToGen2(
"test_adls_gen1_gen2_init",
gen1_path=TEST_PARQUET_FILE_PATH,
gen2_path=TEST_PARQUET_FILE_PATH_2,
)
instance.run()
mock_method.assert_called_with()
40 changes: 40 additions & 0 deletions tests/integration/flows/test_duck_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from viadot.flows import DuckDBTransform
import pytest
import pandas as pd
from unittest import mock
from viadot.sources import DuckDB
import os

TABLE = "test_table"
SCHEMA = "test_schema"
TABLE_MULTIPLE_PARQUETS = "test_multiple_parquets"
DATABASE_PATH = "test_db_123.duckdb"


@pytest.fixture(scope="session")
def duckdb():
duckdb = DuckDB(credentials=dict(database=DATABASE_PATH))
yield duckdb
os.remove(DATABASE_PATH)


def test_create_table_from_parquet(duckdb, TEST_PARQUET_FILE_PATH):
duckdb.create_table_from_parquet(
schema=SCHEMA, table=TABLE, path=TEST_PARQUET_FILE_PATH
)


def test_duckdb_transform_init():
instance = DuckDBTransform("test_duckdb_transform", query="select * from test")

assert instance


def test_duckdb_transform_flow_run():
instance = DuckDBTransform(
"test_duckdb_transform",
query=f"select * from {SCHEMA}.{TABLE}",
credentials=dict(database=DATABASE_PATH),
)
result = instance.run()
assert result.is_successful()
Loading

0 comments on commit 7d8f8b0

Please sign in to comment.