diff --git a/CHANGELOG.md b/CHANGELOG.md index cb016bfe1..379c77414 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - +## [0.4.1] - 2022-04-07 +### Changed +- bumped version ## [0.4.0] - 2022-04-07 ### Added - Added `custom_mail_state_handler` function that sends mail notification using custom smtp server. @@ -32,6 +34,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `chunksize` parameter to `BCPTask` task to allow more control over the load process - Added support for SQL Server's custom `datetimeoffset` type - Added `AzureSQLToDF` task +- Added `AzureDataLakeRemove` task - Added `AzureSQLUpsert` task ### Changed diff --git a/tests/integration/tasks/test_azure_data_lake.py b/tests/integration/tasks/test_azure_data_lake.py index e051ca2ac..de3ecf49e 100644 --- a/tests/integration/tasks/test_azure_data_lake.py +++ b/tests/integration/tasks/test_azure_data_lake.py @@ -9,6 +9,7 @@ AzureDataLakeCopy, AzureDataLakeList, ) +from viadot.tasks.azure_data_lake import AzureDataLakeRemove uuid_4 = uuid.uuid4() uuid_4_2 = uuid.uuid4() @@ -65,3 +66,11 @@ def test_azure_data_lake_list(): list_task = AzureDataLakeList() files = list_task.run(path="raw/supermetrics") assert adls_path in files + + +def test_azure_data_lake_remove(): + file = AzureDataLake(adls_path) + assert file.exists() + remove_task = AzureDataLakeRemove() + remove_task.run(path=adls_path) + assert not file.exists() diff --git a/tests/integration/tasks/test_prefect.py b/tests/integration/tasks/test_prefect.py index f16a2eab1..78362853f 100644 --- a/tests/integration/tasks/test_prefect.py +++ b/tests/integration/tasks/test_prefect.py @@ -4,7 +4,7 @@ from datetime import date from viadot.tasks import GetFlowNewDateRange -from viadot.tasks.prefect import ( +from viadot.tasks.prefect_date_range import ( get_time_from_last_successful_run, calculate_difference, check_if_scheduled_run, diff --git a/tests/test_viadot.py b/tests/test_viadot.py index ecd7b4939..f54230635 100644 --- a/tests/test_viadot.py +++ b/tests/test_viadot.py @@ -2,4 +2,4 @@ def test_version(): - assert __version__ == "0.4.1" + assert __version__ == "0.4.2" diff --git a/viadot/__init__.py b/viadot/__init__.py index 3d26edf77..df1243329 100644 --- a/viadot/__init__.py +++ b/viadot/__init__.py @@ -1 +1 @@ -__version__ = "0.4.1" +__version__ = "0.4.2" diff --git a/viadot/tasks/__init__.py b/viadot/tasks/__init__.py index eee05fe43..7525fb854 100644 --- a/viadot/tasks/__init__.py +++ b/viadot/tasks/__init__.py @@ -27,7 +27,7 @@ from .supermetrics import SupermetricsToCSV, SupermetricsToDF from .sharepoint import SharepointToDF from .cloud_for_customers import C4CReportToDF, C4CToDF -from .prefect import GetFlowNewDateRange +from .prefect_date_range import GetFlowNewDateRange from .aselite import ASELiteToDF try: diff --git a/viadot/tasks/azure_data_lake.py b/viadot/tasks/azure_data_lake.py index b05d45f30..b369df0dc 100644 --- a/viadot/tasks/azure_data_lake.py +++ b/viadot/tasks/azure_data_lake.py @@ -572,3 +572,99 @@ def run( self.logger.info(f"Successfully listed files in {full_dl_path}.") return files + + +class AzureDataLakeRemove(Task): + """ + Task for removing objects from Azure Data Lake. + + Args: + path (str, optional): The path to the directory from which to delete files. Defaults to None. + recursive (bool): Set this to true if removing entire directories. + gen (int, optional): The generation of the Azure Data Lake. Defaults to 2. + vault_name (str, optional): The name of the vault from which to fetch the secret. Defaults to None. + max_retries (int, optional): Maximum number of retries before failing. Defaults to 3. + retry_delay (timedelta, optional): Time to wait before the next retry attempt. Defaults to timedelta(seconds=10). + """ + + def __init__( + self, + path: str = None, + recursive: bool = False, + gen: int = 2, + vault_name: str = None, + max_retries: int = 3, + retry_delay: timedelta = timedelta(seconds=10), + *args, + **kwargs, + ): + self.path = path + self.recursive = recursive + self.gen = gen + self.vault_name = vault_name + + super().__init__( + name="adls_rm", + max_retries=max_retries, + retry_delay=retry_delay, + *args, + **kwargs, + ) + + @defaults_from_attrs( + "path", + "recursive", + "gen", + "vault_name", + "max_retries", + "retry_delay", + ) + def run( + self, + path: str = None, + recursive: bool = None, + gen: int = None, + sp_credentials_secret: str = None, + vault_name: str = None, + max_retries: int = None, + retry_delay: timedelta = None, + ) -> List[str]: + """Task run method. + + Args: + path (str): The path to the directory contents of which you want to delete. Defaults to None. + recursive (bool): Set this to True if removing files recursively. Defaults to False. + gen (int): The generation of the Azure Data Lake. Defaults to None. + sp_credentials_secret (str, optional): The name of the Azure Key Vault secret containing a dictionary with + ACCOUNT_NAME and Service Principal credentials (TENANT_ID, CLIENT_ID, CLIENT_SECRET). Defaults to None. + vault_name (str, optional): The name of the vault from which to obtain the secret. Defaults to None. + """ + + if not sp_credentials_secret: + # attempt to read a default for the service principal secret name + try: + sp_credentials_secret = PrefectSecret( + "AZURE_DEFAULT_ADLS_SERVICE_PRINCIPAL_SECRET" + ).run() + except ValueError: + pass + + if sp_credentials_secret: + azure_secret_task = AzureKeyVaultSecret() + credentials_str = azure_secret_task.run( + secret=sp_credentials_secret, vault_name=vault_name + ) + credentials = json.loads(credentials_str) + else: + credentials = { + "ACCOUNT_NAME": os.environ["AZURE_ACCOUNT_NAME"], + "AZURE_TENANT_ID": os.environ["AZURE_TENANT_ID"], + "AZURE_CLIENT_ID": os.environ["AZURE_CLIENT_ID"], + "AZURE_CLIENT_SECRET": os.environ["AZURE_CLIENT_SECRET"], + } + lake = AzureDataLake(gen=gen, credentials=credentials) + + full_path = os.path.join(credentials["ACCOUNT_NAME"], path) + self.logger.info(f"Deleting files from {full_path}...") + lake.rm(path, recursive=recursive) + self.logger.info(f"Successfully deleted files from {full_path}.") diff --git a/viadot/tasks/prefect.py b/viadot/tasks/prefect_date_range.py similarity index 86% rename from viadot/tasks/prefect.py rename to viadot/tasks/prefect_date_range.py index af000db2c..1b4224860 100644 --- a/viadot/tasks/prefect.py +++ b/viadot/tasks/prefect_date_range.py @@ -129,13 +129,32 @@ def get_formatted_date( class GetFlowNewDateRange(Task): + """ + Get the flow date_range_type and add the days if the last run of the flow was unsuccessful. + A task that checks the status of the last flow run and change it if the last flow has failed. + + If it is originally 'last_14_days' but the flow has run in Failed state for the last 2 days + then from 'last_14_days' it will be 'last_16_days' in the next flow_run. + + Note that here is the check whether the flow run has been planned. + Only If the flow was run by a prefect (according to the planned schedule), a change of date dates is possible. + """ + def __init__( self, flow_name: str = None, - date_range_type: bool = None, + date_range_type: str = None, *args, **kwargs, ): + """ + Initialize GetFlowNewDateRange class. + + Args: + flow_name (str, optional): Prefect flow name. Defaults to None. + date_range_type (str, optional): String argument that should look like this: 'last_X_days' - + X is a number of days. Defaults to None. + """ self.flow_name = flow_name self.date_range_type = date_range_type @@ -206,7 +225,10 @@ def run( flow_runs = client.graphql(query) flow_runs_details = flow_runs.data.flow - time_schedule = flow_runs_details[0]["flow_runs"][0]["scheduled_start_time"] + for flow_run in iter_throught_flow_runs(flow_runs_details=flow_runs_details): + if flow_run["scheduled_start_time"]: + time_schedule = flow_run["scheduled_start_time"] + break last_success_start_time = get_time_from_last_successful_run(flow_runs_details) is_scheduled = check_if_scheduled_run( time_run=last_success_start_time,