Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds job to remove staging folder #7

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions src/aind_data_upload_utils/delete_staging_folder_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""
Module to handle deleting staging folder using dask
"""

import argparse
import logging
import os
import re
import shutil
import sys
from pathlib import Path
from re import Pattern
from time import time
from typing import ClassVar, List

from dask import bag as dask_bag
from pydantic import Field
from pydantic_settings import BaseSettings

# Set log level from env var
LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING")
logging.basicConfig(level=LOG_LEVEL)


class JobSettings(BaseSettings):
"""Job settings for DeleteStagingFolderJob"""

staging_directory: Path = Field(
..., description="staging folder to delete"
)
num_of_dir_levels: int = Field(
default=4,
description="Number of subdirectory levels to remove",
)
n_partitions: int = Field(
default=20, description="Number of dask tasks to run in parallel"
)
dry_run: bool = Field(
default=False,
description="Log commands without actually deleting anything",
)

# In addition to managing permissions, the parent directory
# pattern is also hard-coded for extra security. We don't want
# requests to remove anything outside this directory.
pattern_to_match: ClassVar[Pattern] = re.compile(
r"^/allen/aind/stage/svc_aind_airflow/(?:prod|dev)/.*"
)


class DeleteStagingFolderJob:
"""Job to delete a staging folder. Uses dask to prune subdirectories."""

def __init__(self, job_settings: JobSettings):
"""
Class constructor for DeleteStagingFolderJob.

Parameters
----------
job_settings: JobSettings
"""
self.job_settings = job_settings

def _get_list_of_sub_directories(self) -> List[str]:
"""
Extracts a list from self.job_settings.staging_directory.
Will traverse self.job_settings.num_of_dir_levels deep.
Returns
-------
List[str]
List of paths rendered as posix strings

"""

sub_directories_to_remove = []
max_depth = self.job_settings.num_of_dir_levels

def do_scan(start_dir: Path, output: list, depth=0):
"""Recursively iterate through directories up to max_depth.
Modification of:
https://stackoverflow.com/a/42720847
"""
for f in start_dir.iterdir():
if f.is_dir() and not f.is_symlink() and depth < max_depth:
do_scan(f, output, depth + 1)
elif depth == max_depth and f.is_dir() and not f.is_symlink():
output.append(f)

do_scan(self.job_settings.staging_directory, sub_directories_to_remove)
return [d.as_posix() for d in sub_directories_to_remove]

def _remove_directory(self, directory: str) -> None:
"""
Removes a directory using shutil.rmtree
Parameters
----------
directory : str

Returns
-------
None
Raises an error if directory does not match regex pattern.

"""
# Verify directory to remove is under staging directory
if not re.match(self.job_settings.pattern_to_match, directory):
raise Exception(
f"Directory {directory} is not under staging folder! "
f"Will not remove automatically!"
)
elif self.job_settings.dry_run:
logging.info(f"Removing: {directory}")
else:
shutil.rmtree(directory)

def _dask_task_to_process_directory_list(
self, directories: List[str]
) -> None:
"""
Removes each directory in list
Parameters
----------
directories : List[str]

Returns
-------
None
Will raise an error if a request is made to remove directory
outside of staging folder.

"""
logging.debug(f"Removing list: {directories}")
total_to_scan = len(directories)
for dir_counter, directory in enumerate(directories, start=1):
logging.debug(
f"Removing {directory}. On {dir_counter} of {total_to_scan}"
)
self._remove_directory(directory)

def _remove_subdirectories(self, sub_directories: List[str]) -> None:
"""
Uses dask to partition list of directory paths to remove and removes
the partitioned lists in parallel.
Returns
-------
None
Will raise an error if a request is made to remove a directory
outside the staging folder.
"""
# We'll use dask to partition the sub_directories.
directory_bag = dask_bag.from_sequence(
sub_directories, npartitions=self.job_settings.n_partitions
)
mapped_partitions = dask_bag.map_partitions(
self._dask_task_to_process_directory_list, directory_bag
)
mapped_partitions.compute()

def run_job(self):
"""Main job runner. Walks num_of_dir_levels deep and removes all
subdirectories in that level. Then removes top directory."""
job_start_time = time()
# Remove batches of subdirectories in parallel
list_of_sub_dirs = self._get_list_of_sub_directories()
self._remove_subdirectories(list_of_sub_dirs)
# Remove top-level staging folder
self._remove_directory(
self.job_settings.staging_directory.as_posix().rstrip("/")
)
job_end_time = time()
execution_time = job_end_time - job_start_time
logging.debug(f"Task took {execution_time} seconds")


if __name__ == "__main__":
sys_args = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument(
"-j",
"--job-settings",
required=False,
type=str,
help=(
r"""
Instead of init args the job settings can optionally be passed in
as a json string in the command line.
"""
),
)
cli_args = parser.parse_args(sys_args)
main_job_settings = JobSettings.model_validate_json(cli_args.job_settings)
main_job = DeleteStagingFolderJob(job_settings=main_job_settings)
main_job.run_job()
Loading
Loading