Skip to content

Commit

Permalink
Merge pull request #286 from dyvenia/dev
Browse files Browse the repository at this point in the history
Release 0.3.1
  • Loading branch information
m-paz authored Feb 17, 2022
2 parents da3a5a9 + 59eb99b commit bf12741
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.3.1] - 2022-02-17
### Changed
-`ADLSToAzureSQL` - added `remove_tab` parameter to remove uncessery tab separators from data.

### Fixed
- fixed an issue with return df within `CheckColumnOrder` class.

## [0.3.0] - 2022-02-16
### Added
Expand Down
2 changes: 1 addition & 1 deletion tests/test_viadot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_version():
assert __version__ == "0.3.0"
assert __version__ == "0.3.1"
2 changes: 1 addition & 1 deletion viadot/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.0"
__version__ = "0.3.1"
22 changes: 15 additions & 7 deletions viadot/flows/adls_to_azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ def map_data_types_task(json_shema_path: str):


@task
def df_to_csv_task(df, path: str, sep: str = "\t"):
for col in range(len(df.columns)):
df[df.columns[col]] = (
df[df.columns[col]].astype(str).str.replace(r"\t", "", regex=True)
)
df.to_csv(path, sep=sep, index=False)
def df_to_csv_task(df, remove_tab, path: str, sep: str = "\t"):
if remove_tab == True:
for col in range(len(df.columns)):
df[df.columns[col]] = (
df[df.columns[col]].astype(str).str.replace(r"\t", "", regex=True)
)
df.to_csv(path, sep=sep, index=False)
else:
df.to_csv(path, sep=sep, index=False)


class ADLSToAzureSQL(Flow):
Expand All @@ -90,6 +93,7 @@ def __init__(
adls_path: str = None,
read_sep: str = "\t",
write_sep: str = "\t",
remove_tab: bool = False,
overwrite_adls: bool = True,
if_empty: str = "warn",
adls_sp_credentials_secret: str = None,
Expand All @@ -116,6 +120,7 @@ def __init__(
the latest file from that directory will be loaded. We assume that the files are named using timestamps.
read_sep (str, optional): The delimiter for the source file. Defaults to "\t".
write_sep (str, optional): The delimiter for the output CSV file. Defaults to "\t".
remove_tab (bool, optional): Whether to remove tab delimiters from the data. Defaults to False.
overwrite_adls (bool, optional): Whether to overwrite the file in ADLS. Defaults to True.
if_empty (str, optional): What to do if the Supermetrics query returns no data. Defaults to "warn".
adls_sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
Expand Down Expand Up @@ -167,6 +172,8 @@ def __init__(
self.schema = schema
self.if_exists = self._map_if_exists(if_exists)

# Generate CSV
self.remove_tab = remove_tab
# BCPTask
self.sqldb_credentials_secret = sqldb_credentials_secret

Expand Down Expand Up @@ -235,11 +242,12 @@ def gen_flow(self) -> Flow:
df=df_reorder,
path=self.local_file_path,
sep=self.write_sep,
remove_tab=self.remove_tab,
flow=self,
)

promote_to_conformed_task.bind(
from_path=self.local_file_path,
from_path=self.adls_path,
to_path=self.adls_path_conformed,
sp_credentials_secret=self.adls_sp_credentials_secret,
vault_name=self.vault_name,
Expand Down
4 changes: 4 additions & 0 deletions viadot/tasks/azure_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,9 @@ def run(
"Detected column order difference between the CSV file and the table. Reordering..."
)
df = self.df_change_order(df=df, sql_column_list=sql_column_list)
print(df)
else:
return df
else:
self.logger.info("The table will be replaced.")
return df

0 comments on commit bf12741

Please sign in to comment.