From 578ea9751b3f93c0e861f9b59076f833bf58c3f2 Mon Sep 17 00:00:00 2001 From: Piotr Zawal Date: Thu, 7 Apr 2022 10:00:26 +0200 Subject: [PATCH 1/5] :heavy_plus_sign: Added AzureDataLakeRemove task --- CHANGELOG.md | 1 + viadot/tasks/azure_data_lake.py | 96 +++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index db0e2183b..d496597ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `chunksize` parameter to `BCPTask` task to allow more control over the load process - Added support for SQL Server's custom `datetimeoffset` type - Added `AzureSQLToDF` task +- Added `AzureDataLakeRemove` task ### Changed - Changed the base class of `AzureSQL` to `SQLServer` diff --git a/viadot/tasks/azure_data_lake.py b/viadot/tasks/azure_data_lake.py index b05d45f30..83684d7cb 100644 --- a/viadot/tasks/azure_data_lake.py +++ b/viadot/tasks/azure_data_lake.py @@ -572,3 +572,99 @@ def run( self.logger.info(f"Successfully listed files in {full_dl_path}.") return files + + +class AzureDataLakeRemove(Task): + """ + Task for removing files in Azure Data Lake. + + Args: + path (str, optional): The path to the directory from which to delete files. Defaults to None. + recursive (bool): Set this to true if removing entire directories. + gen (int, optional): The generation of the Azure Data Lake. Defaults to 2. + vault_name (str, optional): The name of the vault from which to fetch the secret. Defaults to None. + max_retries (int, optional): [description]. Defaults to 3. + retry_delay (timedelta, optional): [description]. Defaults to timedelta(seconds=10). + """ + + def __init__( + self, + path: str = None, + recursive: bool = False, + gen: int = 2, + vault_name: str = None, + max_retries: int = 3, + retry_delay: timedelta = timedelta(seconds=10), + *args, + **kwargs, + ): + self.path = path + self.recursive = recursive + self.gen = gen + self.vault_name = vault_name + + super().__init__( + name="adls_rm", + max_retries=max_retries, + retry_delay=retry_delay, + *args, + **kwargs, + ) + + @defaults_from_attrs( + "path", + "recursive", + "gen", + "vault_name", + "max_retries", + "retry_delay", + ) + def run( + self, + path: str = None, + recursive: bool = None, + gen: int = None, + sp_credentials_secret: str = None, + vault_name: str = None, + max_retries: int = None, + retry_delay: timedelta = None, + ) -> List[str]: + """Task run method. + + Args: + path (str): The path to the directory contents of which you want to delete. Defaults to None. + recursive (bool): Set this to True if removing files recursively. Defaults to False. + gen (int): The generation of the Azure Data Lake. Defaults to None. + sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + """ + + if not sp_credentials_secret: + # attempt to read a default for the service principal secret name + try: + sp_credentials_secret = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" + ).run() + except ValueError: + pass + + if sp_credentials_secret: + azure_secret_task = AzureKeyVaultSecret() + credentials_str = azure_secret_task.run( + secret=sp_credentials_secret, vault_name=vault_name + ) + credentials = json.loads(credentials_str) + else: + credentials = { + "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"], + "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"], + "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"], + "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"], + } + lake = AzureDataLake(gen=gen, credentials=credentials) + + full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path) + self.logger.info(f"Deleting files from {full_dl_path}...") + lake.rm(path, recursive=recursive) + self.logger.info(f"Successfully deleted files from {full_dl_path}.") From 27826c72dbbb16bd69fcaaff044bc8d59271bd39 Mon Sep 17 00:00:00 2001 From: Piotr Zawal Date: Thu, 7 Apr 2022 13:18:47 +0200 Subject: [PATCH 2/5] :recycle: Modified docstrings and refactored code --- tests/integration/tasks/test_azure_data_lake.py | 9 +++++++++ viadot/tasks/azure_data_lake.py | 12 ++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/integration/tasks/test_azure_data_lake.py b/tests/integration/tasks/test_azure_data_lake.py index e051ca2ac..83d2ba058 100644 --- a/tests/integration/tasks/test_azure_data_lake.py +++ b/tests/integration/tasks/test_azure_data_lake.py @@ -9,6 +9,7 @@ AzureDataLakeCopy, AzureDataLakeList, ) +from viadot.tasks.azure_data_lake import AzureDataLakeRemove uuid_4 = uuid.uuid4() uuid_4_2 = uuid.uuid4() @@ -65,3 +66,11 @@ def test_azure_data_lake_list(): list_task = AzureDataLakeList() files = list_task.run(path="raw/supermetrics") assert adls_path in files + + +def test_azure_data_lake_remove(): + file_1 = AzureDataLake(adls_path) + file_2 = AzureDataLake(adls_path_2) + remove_task = AzureDataLakeRemove() + remove_task.run(path=adls_path) + assert not file_1.exists() and not file_2.exists() diff --git a/viadot/tasks/azure_data_lake.py b/viadot/tasks/azure_data_lake.py index 83684d7cb..b369df0dc 100644 --- a/viadot/tasks/azure_data_lake.py +++ b/viadot/tasks/azure_data_lake.py @@ -576,15 +576,15 @@ def run( class AzureDataLakeRemove(Task): """ - Task for removing files in Azure Data Lake. + Task for removing objects from Azure Data Lake. Args: path (str, optional): The path to the directory from which to delete files. Defaults to None. recursive (bool): Set this to true if removing entire directories. gen (int, optional): The generation of the Azure Data Lake. Defaults to 2. vault_name (str, optional): The name of the vault from which to fetch the secret. Defaults to None. - max_retries (int, optional): [description]. Defaults to 3. - retry_delay (timedelta, optional): [description]. Defaults to timedelta(seconds=10). + max_retries (int, optional): Maximum number of retries before failing. Defaults to 3. + retry_delay (timedelta, optional): Time to wait before the next retry attempt. Defaults to timedelta(seconds=10). """ def __init__( @@ -664,7 +664,7 @@ def run( } lake = AzureDataLake(gen=gen, credentials=credentials) - full_dl_path = os.path.join(credentials["ACCOUNT_NAME"], path) - self.logger.info(f"Deleting files from {full_dl_path}...") + full_path = os.path.join(credentials["ACCOUNT_NAME"], path) + self.logger.info(f"Deleting files from {full_path}...") lake.rm(path, recursive=recursive) - self.logger.info(f"Successfully deleted files from {full_dl_path}.") + self.logger.info(f"Successfully deleted files from {full_path}.") From 3176a14263c90bee045d587aa5ff7eafeca2717c Mon Sep 17 00:00:00 2001 From: Piotr Zawal Date: Thu, 7 Apr 2022 15:35:00 +0200 Subject: [PATCH 3/5] =?UTF-8?q?=E2=9C=85=20Corrected=20the=20test=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/integration/tasks/test_azure_data_lake.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/tasks/test_azure_data_lake.py b/tests/integration/tasks/test_azure_data_lake.py index 83d2ba058..de3ecf49e 100644 --- a/tests/integration/tasks/test_azure_data_lake.py +++ b/tests/integration/tasks/test_azure_data_lake.py @@ -69,8 +69,8 @@ def test_azure_data_lake_list(): def test_azure_data_lake_remove(): - file_1 = AzureDataLake(adls_path) - file_2 = AzureDataLake(adls_path_2) + file = AzureDataLake(adls_path) + assert file.exists() remove_task = AzureDataLakeRemove() remove_task.run(path=adls_path) - assert not file_1.exists() and not file_2.exists() + assert not file.exists() From f187c0ac07dd7a3ec31d2c27772767edde6e236f Mon Sep 17 00:00:00 2001 From: Rafalz13 Date: Fri, 8 Apr 2022 09:40:44 +0200 Subject: [PATCH 4/5] =?UTF-8?q?=F0=9F=90=9B=20Fixed=20finding=20time=5Fsch?= =?UTF-8?q?edule=20value?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- viadot/tasks/__init__.py | 2 +- .../{prefect.py => prefect_date_range.py} | 26 +++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) rename viadot/tasks/{prefect.py => prefect_date_range.py} (86%) diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index eee05fe43..7525fb854 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -27,7 +27,7 @@ from .supermetrics import SupermetricsToCSV, SupermetricsToDF from .sharepoint import SharepointToDF from .cloud_for_customers import C4CReportToDF, C4CToDF -from .prefect import GetFlowNewDateRange +from .prefect_date_range import GetFlowNewDateRange from .aselite import ASELiteToDF try: diff --git a/viadot/tasks/prefect.py b/viadot/tasks/prefect_date_range.py similarity index 86% rename from viadot/tasks/prefect.py rename to viadot/tasks/prefect_date_range.py index af000db2c..1b4224860 100644 --- a/viadot/tasks/prefect.py +++ b/viadot/tasks/prefect_date_range.py @@ -129,13 +129,32 @@ def get_formatted_date( class GetFlowNewDateRange(Task): + """ + Get the flow date_range_type and add the days if the last run of the flow was unsuccessful. + A task that checks the status of the last flow run and change it if the last flow has failed. + + If it is originally 'last_14_days' but the flow has run in Failed state for the last 2 days + then from 'last_14_days' it will be 'last_16_days' in the next flow_run. + + Note that here is the check whether the flow run has been planned. + Only If the flow was run by a prefect (according to the planned schedule), a change of date dates is possible. + """ + def __init__( self, flow_name: str = None, - date_range_type: bool = None, + date_range_type: str = None, *args, **kwargs, ): + """ + Initialize GetFlowNewDateRange class. + + Args: + flow_name (str, optional): Prefect flow name. Defaults to None. + date_range_type (str, optional): String argument that should look like this: 'last_X_days' - + X is a number of days. Defaults to None. + """ self.flow_name = flow_name self.date_range_type = date_range_type @@ -206,7 +225,10 @@ def run( flow_runs = client.graphql(query) flow_runs_details = flow_runs.data.flow - time_schedule = flow_runs_details[0]["flow_runs"][0]["scheduled_start_time"] + for flow_run in iter_throught_flow_runs(flow_runs_details=flow_runs_details): + if flow_run["scheduled_start_time"]: + time_schedule = flow_run["scheduled_start_time"] + break last_success_start_time = get_time_from_last_successful_run(flow_runs_details) is_scheduled = check_if_scheduled_run( time_run=last_success_start_time, From ba22e40f6bd6a36905593f21aee32467ece99d93 Mon Sep 17 00:00:00 2001 From: Rafalz13 Date: Fri, 8 Apr 2022 09:47:24 +0200 Subject: [PATCH 5/5] =?UTF-8?q?=F0=9F=90=9B=20Changed=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/integration/tasks/test_prefect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/tasks/test_prefect.py b/tests/integration/tasks/test_prefect.py index f16a2eab1..78362853f 100644 --- a/tests/integration/tasks/test_prefect.py +++ b/tests/integration/tasks/test_prefect.py @@ -4,7 +4,7 @@ from datetime import date from viadot.tasks import GetFlowNewDateRange -from viadot.tasks.prefect import ( +from viadot.tasks.prefect_date_range import ( get_time_from_last_successful_run, calculate_difference, check_if_scheduled_run,