Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/dyvenia/viadot into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
m-paz committed Apr 8, 2022
2 parents c4bbc60 + 72f5af0 commit 835422d
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `chunksize` parameter to `BCPTask` task to allow more control over the load process
- Added support for SQL Server's custom `datetimeoffset` type
- Added `AzureSQLToDF` task
- Added `AzureDataLakeRemove` task
- Added `AzureSQLUpsert` task

### Changed
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/tasks/test_azure_data_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
AzureDataLakeCopy,
AzureDataLakeList,
)
from viadot.tasks.azure_data_lake import AzureDataLakeRemove

uuid_4 = uuid.uuid4()
uuid_4_2 = uuid.uuid4()
Expand Down Expand Up @@ -65,3 +66,11 @@ def test_azure_data_lake_list():
list_task = AzureDataLakeList()
files = list_task.run(path="raw/supermetrics")
assert adls_path in files


def test_azure_data_lake_remove():
file = AzureDataLake(adls_path)
assert file.exists()
remove_task = AzureDataLakeRemove()
remove_task.run(path=adls_path)
assert not file.exists()
2 changes: 1 addition & 1 deletion tests/integration/tasks/test_prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import date

from viadot.tasks import GetFlowNewDateRange
from viadot.tasks.prefect import (
from viadot.tasks.prefect_date_range import (
get_time_from_last_successful_run,
calculate_difference,
check_if_scheduled_run,
Expand Down
2 changes: 1 addition & 1 deletion viadot/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .supermetrics import SupermetricsToCSV, SupermetricsToDF
from .sharepoint import SharepointToDF
from .cloud_for_customers import C4CReportToDF, C4CToDF
from .prefect import GetFlowNewDateRange
from .prefect_date_range import GetFlowNewDateRange
from .aselite import ASELiteToDF

try:
Expand Down
96 changes: 96 additions & 0 deletions viadot/tasks/azure_data_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,99 @@ def run(
self.logger.info(f"Successfully listed files in {full_dl_path}.")

return files


class AzureDataLakeRemove(Task):
"""
Task for removing objects from Azure Data Lake.
Args:
path (str, optional): The path to the directory from which to delete files. Defaults to None.
recursive (bool): Set this to true if removing entire directories.
gen (int, optional): The generation of the Azure Data Lake. Defaults to 2.
vault_name (str, optional): The name of the vault from which to fetch the secret. Defaults to None.
max_retries (int, optional): Maximum number of retries before failing. Defaults to 3.
retry_delay (timedelta, optional): Time to wait before the next retry attempt. Defaults to timedelta(seconds=10).
"""

def __init__(
self,
path: str = None,
recursive: bool = False,
gen: int = 2,
vault_name: str = None,
max_retries: int = 3,
retry_delay: timedelta = timedelta(seconds=10),
*args,
**kwargs,
):
self.path = path
self.recursive = recursive
self.gen = gen
self.vault_name = vault_name

super().__init__(
name="adls_rm",
max_retries=max_retries,
retry_delay=retry_delay,
*args,
**kwargs,
)

@defaults_from_attrs(
"path",
"recursive",
"gen",
"vault_name",
"max_retries",
"retry_delay",
)
def run(
self,
path: str = None,
recursive: bool = None,
gen: int = None,
sp_credentials_secret: str = None,
vault_name: str = None,
max_retries: int = None,
retry_delay: timedelta = None,
) -> List[str]:
"""Task run method.
Args:
path (str): The path to the directory contents of which you want to delete. Defaults to None.
recursive (bool): Set this to True if removing files recursively. Defaults to False.
gen (int): The generation of the Azure Data Lake. Defaults to None.
sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with
ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None.
"""

if not sp_credentials_secret:
# attempt to read a default for the service principal secret name
try:
sp_credentials_secret = PrefectSecret(
"AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET"
).run()
except ValueError:
pass

if sp_credentials_secret:
azure_secret_task = AzureKeyVaultSecret()
credentials_str = azure_secret_task.run(
secret=sp_credentials_secret, vault_name=vault_name
)
credentials = json.loads(credentials_str)
else:
credentials = {
"ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"],
"AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"],
"AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"],
}
lake = AzureDataLake(gen=gen, credentials=credentials)

full_path = os.path.join(credentials["ACCOUNT_NAME"], path)
self.logger.info(f"Deleting files from {full_path}...")
lake.rm(path, recursive=recursive)
self.logger.info(f"Successfully deleted files from {full_path}.")
26 changes: 24 additions & 2 deletions viadot/tasks/prefect.py → viadot/tasks/prefect_date_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,32 @@ def get_formatted_date(


class GetFlowNewDateRange(Task):
"""
Get the flow date_range_type and add the days if the last run of the flow was unsuccessful.
A task that checks the status of the last flow run and change it if the last flow has failed.
If it is originally 'last_14_days' but the flow has run in Failed state for the last 2 days
then from 'last_14_days' it will be 'last_16_days' in the next flow_run.
Note that here is the check whether the flow run has been planned.
Only If the flow was run by a prefect (according to the planned schedule), a change of date dates is possible.
"""

def __init__(
self,
flow_name: str = None,
date_range_type: bool = None,
date_range_type: str = None,
*args,
**kwargs,
):
"""
Initialize GetFlowNewDateRange class.
Args:
flow_name (str, optional): Prefect flow name. Defaults to None.
date_range_type (str, optional): String argument that should look like this: 'last_X_days' -
X is a number of days. Defaults to None.
"""

self.flow_name = flow_name
self.date_range_type = date_range_type
Expand Down Expand Up @@ -206,7 +225,10 @@ def run(
flow_runs = client.graphql(query)
flow_runs_details = flow_runs.data.flow

time_schedule = flow_runs_details[0]["flow_runs"][0]["scheduled_start_time"]
for flow_run in iter_throught_flow_runs(flow_runs_details=flow_runs_details):
if flow_run["scheduled_start_time"]:
time_schedule = flow_run["scheduled_start_time"]
break
last_success_start_time = get_time_from_last_successful_run(flow_runs_details)
is_scheduled = check_if_scheduled_run(
time_run=last_success_start_time,
Expand Down

0 comments on commit 835422d

Please sign in to comment.