diff --git a/nf_core/download.py b/nf_core/download.py index 31192676c0..d08e0ba40e 100644 --- a/nf_core/download.py +++ b/nf_core/download.py @@ -10,6 +10,7 @@ import tarfile import textwrap from datetime import datetime +from typing import List, Optional, Tuple from zipfile import ZipFile import git @@ -978,7 +979,7 @@ def prioritize_direct_download(self, container_list): d[k] = c return sorted(list(d.values())) - def gather_registries(self, workflow_directory): + def gather_registries(self, workflow_directory: str) -> None: """Fetch the registries from the pipeline config and CLI arguments and store them in a set. This is needed to symlink downloaded container images so Nextflow will find them. """ @@ -1002,7 +1003,7 @@ def gather_registries(self, workflow_directory): # add depot.galaxyproject.org to the set, because it is the default registry for singularity hardcoded in modules self.registry_set.add("depot.galaxyproject.org") - def symlink_singularity_images(self, image_out_path): + def symlink_singularity_images(self, image_out_path: str) -> None: """Create a symlink for each registry in the registry set that points to the image. We have dropped the explicit registries from the modules in favor of the configurable registries. Unfortunately, Nextflow still expects the registry to be part of the file name, so a symlink is needed. @@ -1042,7 +1043,7 @@ def symlink_singularity_images(self, image_out_path): finally: os.close(image_dir) - def get_singularity_images(self, current_revision=""): + def get_singularity_images(self, current_revision: str = "") -> None: """Loop through container names and download Singularity images""" if len(self.containers) == 0: @@ -1060,10 +1061,10 @@ def get_singularity_images(self, current_revision=""): ) # Organise containers based on what we need to do with them - containers_exist = [] - containers_cache = [] - containers_download = [] - containers_pull = [] + containers_exist: List[str] = [] + containers_cache: List[Tuple[str, str, Optional[str]]] = [] + containers_download: List[Tuple[str, str, Optional[str]]] = [] + containers_pull: List[Tuple[str, str, Optional[str]]] = [] for container in self.containers: # Fetch the output and cached filenames for this container out_path, cache_path = self.singularity_image_filenames(container) @@ -1086,16 +1087,16 @@ def get_singularity_images(self, current_revision=""): # We have a copy of this in the NXF_SINGULARITY_CACHE dir if cache_path and os.path.exists(cache_path): - containers_cache.append([container, out_path, cache_path]) + containers_cache.append((container, out_path, cache_path)) continue # Direct download within Python if container.startswith("http"): - containers_download.append([container, out_path, cache_path]) + containers_download.append((container, out_path, cache_path)) continue # Pull using singularity - containers_pull.append([container, out_path, cache_path]) + containers_pull.append((container, out_path, cache_path)) # Exit if we need to pull images and Singularity is not installed if len(containers_pull) > 0: @@ -1127,8 +1128,8 @@ def get_singularity_images(self, current_revision=""): # Kick off concurrent downloads future_downloads = [ - pool.submit(self.singularity_download_image, *container, progress) - for container in containers_download + pool.submit(self.singularity_download_image, *containers, progress) + for containers in containers_download ] # Make ctrl-c work with multi-threading @@ -1153,13 +1154,13 @@ def get_singularity_images(self, current_revision=""): # Re-raise exception on the main thread raise - for container in containers_pull: + for containers in containers_pull: progress.update(task, description="Pulling singularity images") # it is possible to try multiple registries / mirrors if multiple were specified. # Iteration happens over a copy of self.container_library[:], as I want to be able to remove failing registries for subsequent images. for library in self.container_library[:]: try: - self.singularity_pull_image(*container, library, progress) + self.singularity_pull_image(*containers, library, progress) # Pulling the image was successful, no ContainerError was raised, break the library loop break except ContainerError.ImageExistsError: @@ -1196,12 +1197,12 @@ def get_singularity_images(self, current_revision=""): # The else clause executes after the loop completes normally. # This means the library loop completed without breaking, indicating failure for all libraries (registries) log.error( - f"Not able to pull image of {container}. Service might be down or internet connection is dead." + f"Not able to pull image of {containers}. Service might be down or internet connection is dead." ) # Task should advance in any case. Failure to pull will not kill the download process. progress.update(task, advance=1) - def singularity_image_filenames(self, container): + def singularity_image_filenames(self, container: str) -> Tuple[str, Optional[str]]: """Check Singularity cache for image, copy to destination folder if found. Args: @@ -1258,7 +1259,7 @@ def singularity_image_filenames(self, container): return (out_path, cache_path) - def singularity_copy_cache_image(self, container, out_path, cache_path): + def singularity_copy_cache_image(self, container: str, out_path: str, cache_path: Optional[str]) -> None: """Copy Singularity image from NXF_SINGULARITY_CACHEDIR to target folder.""" # Copy to destination folder if we have a cached version if cache_path and os.path.exists(cache_path): @@ -1267,7 +1268,9 @@ def singularity_copy_cache_image(self, container, out_path, cache_path): # Create symlinks to ensure that the images are found even with different registries being used. self.symlink_singularity_images(out_path) - def singularity_download_image(self, container, out_path, cache_path, progress): + def singularity_download_image( + self, container: str, out_path: str, cache_path: Optional[str], progress: DownloadProgress + ) -> None: """Download a singularity image from the web. Use native Python to download the file. @@ -1314,7 +1317,6 @@ def singularity_download_image(self, container, out_path, cache_path, progress): # Rename partial filename to final filename os.rename(output_path_tmp, output_path) - output_path_tmp = None # Copy cached download if we are using the cache if cache_path: @@ -1339,8 +1341,12 @@ def singularity_download_image(self, container, out_path, cache_path, progress): os.remove(output_path) # Re-raise the caught exception raise + finally: + del output_path_tmp - def singularity_pull_image(self, container, out_path, cache_path, library, progress): + def singularity_pull_image( + self, container: str, out_path: str, cache_path: Optional[str], library: List[str], progress: DownloadProgress + ) -> None: """Pull a singularity image using ``singularity pull`` Attempt to use a local installation of singularity to pull the image. @@ -1355,6 +1361,11 @@ def singularity_pull_image(self, container, out_path, cache_path, library, progr """ output_path = cache_path or out_path + # where the output of 'singularity pull' is first generated before being copied to the NXF_SINGULARITY_CACHDIR. + # if not defined by the Singularity administrators, then use the temporary directory to avoid storing the images in the work directory. + if os.environ.get("SINGULARITY_CACHEDIR") is None: + os.environ["SINGULARITY_CACHEDIR"] = NFCORE_CACHE_DIR + # Sometimes, container still contain an explicit library specification, which # resulted in attempted pulls e.g. from docker://quay.io/quay.io/qiime2/core:2022.11 # Thus, if an explicit registry is specified, the provided -l value is ignored. @@ -1399,9 +1410,10 @@ def singularity_pull_image(self, container, out_path, cache_path, library, progr bufsize=1, ) as proc: lines = [] - for line in proc.stdout: - lines.append(line) - progress.update(task, current_log=line.strip()) + if proc.stdout is not None: + for line in proc.stdout: + lines.append(line) + progress.update(task, current_log=line.strip()) if lines: # something went wrong with the container retrieval @@ -1428,7 +1440,7 @@ def singularity_pull_image(self, container, out_path, cache_path, library, progr progress.remove_task(task) - def compress_download(self): + def compress_download(self) -> None: """Take the downloaded files and make a compressed .tar.gz archive.""" log.debug(f"Creating archive: {self.output_filename}")