-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Container cache download #3163
base: dev
Are you sure you want to change the base?
Container cache download #3163
Changes from all commits
4f7a3c6
78650af
113430d
ac771d8
6293dd0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1086,12 +1086,13 @@ def get_singularity_images(self, current_revision: str = "") -> None: | |
|
||
# Organise containers based on what we need to do with them | ||
containers_exist: List[str] = [] | ||
containers_cache: List[Tuple[str, str, Optional[str]]] = [] | ||
containers_cache: List[Tuple[str, str, str]] = [] | ||
containers_library: List[Tuple[str, str, str, Optional[str]]] = [] | ||
containers_download: List[Tuple[str, str, Optional[str]]] = [] | ||
containers_pull: List[Tuple[str, str, Optional[str]]] = [] | ||
for container in self.containers: | ||
# Fetch the output and cached filenames for this container | ||
out_path, cache_path = self.singularity_image_filenames(container) | ||
out_path, cache_path, library_path = self.singularity_image_filenames(container) | ||
|
||
# Check that the directories exist | ||
out_path_dir = os.path.dirname(out_path) | ||
|
@@ -1109,11 +1110,16 @@ def get_singularity_images(self, current_revision: str = "") -> None: | |
containers_exist.append(container) | ||
continue | ||
|
||
# We have a copy of this in the NXF_SINGULARITY_CACHE dir | ||
# We have a copy of this in NXF_SINGULARITY_CACHEDIR | ||
if cache_path and os.path.exists(cache_path): | ||
containers_cache.append((container, out_path, cache_path)) | ||
continue | ||
|
||
# We have a copy of this in NXF_SINGULARITY_LIBRARYDIR | ||
if library_path and os.path.exists(library_path): | ||
containers_library.append((container, library_path, out_path, cache_path)) | ||
continue | ||
|
||
# Direct download within Python | ||
if container.startswith("http"): | ||
containers_download.append((container, out_path, cache_path)) | ||
|
@@ -1145,6 +1151,12 @@ def get_singularity_images(self, current_revision: str = "") -> None: | |
self.singularity_copy_cache_image(*container) | ||
progress.update(task, advance=1) | ||
|
||
if containers_library: | ||
for container in containers_library: | ||
progress.update(task, description="Copying singularity images from library") | ||
self.singularity_copy_library_image(*container) | ||
progress.update(task, advance=1) | ||
|
||
if containers_download or containers_pull: | ||
# if clause gives slightly better UX, because Download is no longer displayed if nothing is left to be downloaded. | ||
with concurrent.futures.ThreadPoolExecutor(max_workers=self.parallel_downloads) as pool: | ||
|
@@ -1226,19 +1238,20 @@ def get_singularity_images(self, current_revision: str = "") -> None: | |
# Task should advance in any case. Failure to pull will not kill the download process. | ||
progress.update(task, advance=1) | ||
|
||
def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str]]: | ||
def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str], Optional[str]]: | ||
"""Check Singularity cache for image, copy to destination folder if found. | ||
|
||
Args: | ||
container (str): A pipeline's container name. Can be direct download URL | ||
or a Docker Hub repository ID. | ||
|
||
Returns: | ||
tuple (str, str): Returns a tuple of (out_path, cache_path). | ||
(str, str, str): Returns a tuple of (out_path, cache_path, library_path). | ||
out_path is the final target output path. it may point to the NXF_SINGULARITY_CACHEDIR, if cache utilisation was set to 'amend'. | ||
If cache utilisation was set to 'copy', it will point to the target folder, a subdirectory of the output directory. In the latter case, | ||
cache_path may either be None (image is not yet cached locally) or point to the image in the NXF_SINGULARITY_CACHEDIR, so it will not be | ||
downloaded from the web again, but directly copied from there. See get_singularity_images() for implementation. | ||
library_path is the points to the container in NXF_SINGULARITY_LIBRARYDIR, if the latter is defined. | ||
""" | ||
|
||
# Generate file paths | ||
|
@@ -1281,16 +1294,33 @@ def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str | |
elif self.container_cache_utilisation in ["amend", "copy"]: | ||
raise FileNotFoundError("Singularity cache is required but no '$NXF_SINGULARITY_CACHEDIR' set!") | ||
|
||
return (out_path, cache_path) | ||
library_path = None | ||
if os.environ.get("NXF_SINGULARITY_LIBRARYDIR"): | ||
library_path = os.path.join(os.environ["NXF_SINGULARITY_LIBRARYDIR"], out_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That will not work. The registries are stripped from the You will have to use the name prior to trimming the registries. |
||
|
||
def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: Optional[str]) -> None: | ||
return (out_path, cache_path, library_path) | ||
|
||
def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: str) -> None: | ||
"""Copy Singularity image from NXF_SINGULARITY_CACHEDIR to target folder.""" | ||
# Copy to destination folder if we have a cached version | ||
if cache_path and os.path.exists(cache_path): | ||
log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'") | ||
shutil.copyfile(cache_path, out_path) | ||
# Create symlinks to ensure that the images are found even with different registries being used. | ||
self.symlink_singularity_images(out_path) | ||
self.singularity_copy_image(container, cache_path, out_path) | ||
# Create symlinks to ensure that the images are found even with different registries being used. | ||
self.symlink_singularity_images(cache_path) | ||
|
||
def singularity_copy_library_image( | ||
self, container: str, library_path: str, out_path: str, cache_path: Optional[str] | ||
) -> None: | ||
"""Copy Singularity image from NXF_SINGULARITY_LIBRARYDIR to target folder, and possibly NXF_SINGULARITY_CACHEDIR.""" | ||
self.singularity_copy_image(container, library_path, out_path) | ||
if cache_path: | ||
self.singularity_copy_image(container, library_path, cache_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have not used the Also mind that in the case of |
||
|
||
def singularity_copy_image(self, container: str, from_path: str, to_path: str) -> None: | ||
"""Copy Singularity image between folders. This function is used seamlessly | ||
across the target directory, NXF_SINGULARITY_CACHEDIR, and NXF_SINGULARITY_LIBRARYDIR.""" | ||
log.debug(f"Copying {container} to cache: '{os.path.basename(from_path)}'") | ||
shutil.copyfile(from_path, to_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, but this function misses important functionality. If you factor out the copying process, you need to consider that copies may be interrupted by exceptions or by the user using SIGINT (CTRL+C). Previously, in case of incomplete/corrupted downloads, the local files were deleted by the But now that you changed the logic, it seems more likely to me that a user could amass corrupted images in the cache folder persistently. Therefore, it is important to ensure that the copy process removes partially copied files etc. in case of exceptions or SIGINT, e.g. with a Something along this line: import signal
import sys
# Example
def cleanup_temp_files():
if os.path.exists('temp_file'):
os.remove('temp_file')
# Define a signal handler for SIGINT (CTRL+C)
def abort_download(sig, frame):
cleanup_temp_files()
raise DownloadError("Aborting pipeline download due to user interruption.")
signal.signal(signal.SIGINT, abort_download)
# Example of using try-finally for cleanup in case of exceptions
try:
# File copy code here
# ...
except Exception as e:
cleanup_temp_files()
raise DownloadError(e) from e
finally:
cleanup_temp_files() |
||
# Create symlinks to ensure that the images are found even with different registries being used. | ||
self.symlink_singularity_images(to_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind that the cleanup function should also cover the symlinks in the cache. |
||
|
||
def singularity_download_image( | ||
self, container: str, out_path: str, cache_path: Optional[str], progress: DownloadProgress | ||
|
@@ -1309,8 +1339,7 @@ def singularity_download_image( | |
log.debug(f"Downloading Singularity image: '{container}'") | ||
|
||
# Set output path to save file to | ||
output_path = cache_path or out_path | ||
output_path_tmp = f"{output_path}.partial" | ||
output_path_tmp = f"{out_path}.partial" | ||
log.debug(f"Downloading to: '{output_path_tmp}'") | ||
|
||
# Set up progress bar | ||
|
@@ -1340,16 +1369,15 @@ def singularity_download_image( | |
fh.write(data) | ||
|
||
# Rename partial filename to final filename | ||
os.rename(output_path_tmp, output_path) | ||
os.rename(output_path_tmp, out_path) | ||
|
||
# Copy cached download if we are using the cache | ||
# Copy download to cache if one is defined | ||
if cache_path: | ||
log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'") | ||
progress.update(task, description="Copying from cache to target directory") | ||
shutil.copyfile(cache_path, out_path) | ||
progress.update(task, description="Copying from target directory to cache") | ||
self.singularity_copy_image(container, out_path, cache_path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to check now, if the cache_path actually exists on the file system: Previously, the if cache_path and os.path.exists(cache_path):
containers_cache.append((container, out_path, cache_path)) Hence, it was safe to copy it without further checks. Now, the |
||
|
||
# Create symlinks to ensure that the images are found even with different registries being used. | ||
self.symlink_singularity_images(output_path) | ||
self.symlink_singularity_images(out_path) | ||
|
||
progress.remove_task(task) | ||
|
||
|
@@ -1361,8 +1389,8 @@ def singularity_download_image( | |
log.debug(f"Deleting incompleted singularity image download:\n'{output_path_tmp}'") | ||
if output_path_tmp and os.path.exists(output_path_tmp): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the cleanup function I mentioned. Now, it is not sufficient anymore, because the cache is not considered. |
||
os.remove(output_path_tmp) | ||
if output_path and os.path.exists(output_path): | ||
os.remove(output_path) | ||
if out_path and os.path.exists(out_path): | ||
os.remove(out_path) | ||
# Re-raise the caught exception | ||
raise | ||
finally: | ||
|
@@ -1383,8 +1411,6 @@ def singularity_pull_image( | |
Raises: | ||
Various exceptions possible from `subprocess` execution of Singularity. | ||
""" | ||
output_path = cache_path or out_path | ||
|
||
# where the output of 'singularity pull' is first generated before being copied to the NXF_SINGULARITY_CACHDIR. | ||
# if not defined by the Singularity administrators, then use the temporary directory to avoid storing the images in the work directory. | ||
if os.environ.get("SINGULARITY_CACHEDIR") is None: | ||
|
@@ -1406,11 +1432,11 @@ def singularity_pull_image( | |
"singularity", | ||
"pull", | ||
"--name", | ||
output_path, | ||
out_path, | ||
address, | ||
] | ||
elif shutil.which("apptainer"): | ||
singularity_command = ["apptainer", "pull", "--name", output_path, address] | ||
singularity_command = ["apptainer", "pull", "--name", out_path, address] | ||
else: | ||
raise OSError("Singularity/Apptainer is needed to pull images, but it is not installed or not in $PATH") | ||
log.debug(f"Building singularity image: {address}") | ||
|
@@ -1453,14 +1479,13 @@ def singularity_pull_image( | |
error_msg=lines, | ||
) | ||
|
||
# Copy cached download if we are using the cache | ||
# Copy download to cache if one is defined | ||
if cache_path: | ||
log.debug(f"Copying {container} from cache: '{os.path.basename(out_path)}'") | ||
progress.update(task, current_log="Copying from cache to target directory") | ||
shutil.copyfile(cache_path, out_path) | ||
progress.update(task, current_log="Copying from target directory to cache") | ||
self.singularity_copy_image(container, out_path, cache_path) | ||
|
||
# Create symlinks to ensure that the images are found even with different registries being used. | ||
self.symlink_singularity_images(output_path) | ||
self.symlink_singularity_images(out_path) | ||
|
||
progress.remove_task(task) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, the cache is never optional, so the type
List[Tuple[str, str, Optional[str]]]
can be simplified toList[Tuple[str, str, str]]
forcontainers_cache.
My rationale was that I wanted an identical type for
containers_cache
,containers_download
andcontainers_pull
to somewhat standardize the functions consuming them. But with the newcontainers_library
variable, the heterogeneity is anyway given, so I have no real objections against.Ultimately, this comment is just to explain why I initially had done it differently.