Skip to content

Commit

Permalink
Merge pull request #185 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.2.11
  • Loading branch information
m-paz authored Oct 30, 2021
2 parents 12fe77b + bf09f9c commit 9978c74
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 30 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.2.11]
### Fixed
- ADLSToAzureSQL - fixed path to csv issue.
- SupermetricsToADLS - fixed local json path issue.

## [0.2.10] - 2021-10-29
### Release due to CI/CD error

## [0.2.9] - 2021-10-29
### Release due to CI/CD error

## [0.2.8] - 2021-10-29
### Changed
- CI/CD: `dev` image is now only published on push to the `dev` branch
Expand Down
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.2.10"
assert __version__ == "0.2.11"
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.10"
__version__ = "0.2.11"
32 changes: 6 additions & 26 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ def df_to_csv_task(df, path: str, sep: str = "\t"):
df.to_csv(path, sep=sep, index=False)


@task
def df_to_parquet_task(df, path: str):
df.to_parquet(path)


class ADLSToAzureSQL(Flow):
def __init__(
self,
Expand Down Expand Up @@ -200,23 +195,6 @@ def get_promoted_path(self, env: str) -> str:

return promoted_path

def create_to_file_task(self, df, file_type):
df_to_type = None
if file_type == "csv":
df_to_type = df_to_csv_task.bind(
df=df,
path=self.local_file_path,
sep=self.write_sep,
flow=self,
)
else:
df_to_type = df_to_parquet_task.bind(
df=df,
path=self.local_file_path,
flow=self,
)
return df_to_type

def gen_flow(self) -> Flow:
df = lake_to_df_task.bind(
path=self.adls_path,
Expand All @@ -237,8 +215,9 @@ def gen_flow(self) -> Flow:
else:
dtypes = self.dtypes

adls_file_type = self.adls_path.split(".")[-1]
df_to_type = self.create_to_file_task(df, adls_file_type)
df_to_csv = df_to_csv_task.bind(
df=df, path=self.local_file_path, sep=self.write_sep, flow=self
)

promote_to_conformed_task.bind(
from_path=self.local_file_path,
Expand Down Expand Up @@ -274,8 +253,9 @@ def gen_flow(self) -> Flow:
)

# dtypes.set_upstream(download_json_file_task, flow=self)
promote_to_conformed_task.set_upstream(df_to_type, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
promote_to_conformed_task.set_upstream(df_to_csv, flow=self)
# map_data_types_task.set_upstream(download_json_file_task, flow=self)
create_table_task.set_upstream(df_to_type, flow=self)
create_table_task.set_upstream(df_to_csv, flow=self)
promote_to_operations_task.set_upstream(promote_to_conformed_task, flow=self)
bulk_insert_task.set_upstream(create_table_task, flow=self)
2 changes: 0 additions & 2 deletions viadot/flows/supermetrics_to_adls.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,5 +346,3 @@ def gen_flow(self) -> Flow:
file_to_adls_task.set_upstream(df_to_file, flow=self)
json_to_adls_task.set_upstream(dtypes_to_json_task, flow=self)
set_key_value(key=self.adls_dir_path, value=self.adls_file_path)

shutil.rmtree(self.local_json_path)

0 comments on commit 9978c74

Please sign in to comment.