From 7fc31a7ef2ffbbafb905aa9dd394e96e395c65e7 Mon Sep 17 00:00:00 2001 From: Eduardo Hirata-Miyasaki Date: Wed, 31 Jul 2024 12:58:10 -0700 Subject: [PATCH] improving docstring for functions and renaming input and output path arguments to be more intuitive. --- iohub/ngff_utils.py | 246 +++++++++++++++++++++++++++++--------------- 1 file changed, 164 insertions(+), 82 deletions(-) diff --git a/iohub/ngff_utils.py b/iohub/ngff_utils.py index f60a9226..8c035856 100644 --- a/iohub/ngff_utils.py +++ b/iohub/ngff_utils.py @@ -5,7 +5,7 @@ import multiprocessing as mp from functools import partial from pathlib import Path -from typing import Tuple +from typing import Tuple, Union import click import numpy as np @@ -15,7 +15,7 @@ from iohub.ngff_meta import TransformationMeta -def create_empty_hcs_zarr( +def create_empty_plate( store_path: Path, position_keys: list[Tuple[str]], channel_names: list[str], @@ -26,26 +26,52 @@ def create_empty_hcs_zarr( max_chunk_size_bytes=500e6, ) -> None: """ - This function creates a new HCS Plate in OME-Zarr format if the plate does not exist. - - If the plate exists, appends positions and channels if they are not - already in the plate. + Create a new HCS Plate in OME-Zarr format if the plate does not exist. If the plate exists, append positions and channels if they are not already in the plate. Parameters ---------- store_path : Path - hcs plate path + Path to the HCS plate. position_keys : list[Tuple[str]] - Position keys, will append if not present in the plate. - e.g. [("A", "1", "0"), ("A", "1", "1")] - shape : Tuple[int] - chunks : Tuple[int] - scale : Tuple[float] + Position keys to append if not present in the plate, e.g., [("A", "1", "0"), ("A", "1", "1")]. channel_names : list[str] - Channel names, will append if not present in metadata. - dtype : DTypeLike + List of channel names. If the store exists, append if not present in metadata. + shape : Tuple[int] + TCZYX shape of the plate. + chunks : Tuple[int], optional + Chunk size for the plate TCZYX. If None, the chunk size is calculated based on the shape to be <500MB. Defaults to None. + scale : Tuple[float], optional + Scale of the plate TCZYX. Defaults to (1, 1, 1, 1, 1). + dtype : DTypeLike, optional + Data type of the plate. Defaults to np.float32. + max_chunk_size_bytes : float, optional + Maximum chunk size in bytes. Defaults to 500e6 (500MB). + + Examples + -------- + Create a new plate with positions and channels: + create_empty_plate( + store_path=Path("/path/to/store"), + position_keys=[("A", "1", "0"), ("A", "1", "1")], + channel_names=["DAPI", "FITC"], + shape=(1, 1, 256, 256, 256) + ) + + Create a plate with custom chunk size and scale: + create_empty_plate( + store_path=Path("/path/to/store"), + position_keys=[("A", "1", "0")], + channel_names=["DAPI"], + shape=(1, 1, 256, 256, 256), + chunks=(1, 1, 128, 128, 128), + scale=(1, 1, 0.5, 0.5, 0.5) + ) + + Notes + ----- + - If `chunks` is not provided, the function calculates an appropriate chunk size to keep the chunks under the specified `max_chunk_size_bytes`. + - The function ensures that positions and channels are appended to an existing plate if they are not already present. """ - MAX_CHUNK_SIZE = max_chunk_size_bytes # in bytes bytes_per_pixel = np.dtype(dtype).itemsize # Limiting the chunking to 500MB @@ -54,7 +80,8 @@ def create_empty_hcs_zarr( # XY image is larger than MAX_CHUNK_SIZE while ( chunk_zyx_shape[-3] > 1 - and np.prod(chunk_zyx_shape) * bytes_per_pixel > MAX_CHUNK_SIZE + and np.prod(chunk_zyx_shape) * bytes_per_pixel + > max_chunk_size_bytes ): chunk_zyx_shape[-3] = np.ceil(chunk_zyx_shape[-3] / 2).astype(int) chunk_zyx_shape = tuple(chunk_zyx_shape) @@ -95,38 +122,69 @@ def create_empty_hcs_zarr( def apply_transform_to_zyx_and_save( func, position: Position, - output_path: Path, - input_channel_indices: list[int], - output_channel_indices: list[int], + output_store_path: Path, + input_channel_indices: Union[list[slice], list[list[int]]], + output_channel_indices: Union[list[slice], list[list[int]]], t_idx_in: int, t_idx_out: int, **kwargs, ) -> None: """ - Load a CZYX array from a Position object, apply a transformation to CZYX. + Load a CZYX array from a Position object, apply a transformation, and save the result. Parameters ---------- - func : CZYX -> CZYX function - The function to be applied to the data + func : Callable + The function to be applied to the data. Should take a CZYX array and return a transformed CZYX array. position : Position - The position object to read from - output_path : Path - The path to output OME-Zarr Store - input_channel_indices : list - The channel indices to process. + The position object to read from. + output_store_path : Path + The path to output OME-Zarr Store. + input_channel_indices : Union[list[slice], list[list[int]]] + The channel indices to process. Acceptable values: + - A list of slices: [slice(0, 2), slice(2, 4), ...]. + - A list of lists of integers: [[0], [1], [2, 3, 4], ...]. If empty list, process all channels. - Must match output_channel_indices if not empty - output_channel_indices : list - The channel indices to write to. + output_channel_indices : Union[list[slice], list[list[int]]] + The channel indices to write to. Acceptable values: + - A list of slices: [slice(0, 2), slice(2, 4), ...]. + - A list of lists of integers: [[0], [1], [2, 3, 4], ...]. If empty list, write to all channels. - Must match input_channel_indices if not empty t_idx_in : int - The time index to process + The time index to process. t_idx_out : int - The time index to write to - kwargs : dict - Additional arguments to pass to the CZYX function + The time index to write to. + kwargs : dict, optional + Additional arguments to pass to the function. A dictionary with key "extra_metadata" can be passed to be stored at a FOV level, e.g., kwargs={"extra_metadata": {"Temperature": 37.5, "CO2_level": 0.5}}. + + Examples + -------- + Using slices for input_channel_indices: + apply_transform_to_zyx_and_save( + func=some_function, + position=some_position, + output_store_path=Path("/path/to/output"), + input_channel_indices=[slice(0, 2), slice(2, 4)], + output_channel_indices=[[0], [1]], + t_idx_in=0, + t_idx_out=0, + ) + + Using list of lists for input_channel_indices: + apply_transform_to_zyx_and_save( + func=some_function, + position=some_position, + output_store_path=Path("/path/to/output"), + input_channel_indices=[[0, 1], [2, 3]], + output_channel_indices=[[0], [1]], + t_idx_in=0, + t_idx_out=0, + ) + + Notes + ----- + - If input_channel_indices or output_channel_indices contain nested lists, the indices should be integers. + - Ensure that the lengths of input_channel_indices and output_channel_indices match if they are provided. """ # TODO: temporary fix to slumkit issue @@ -152,7 +210,7 @@ def apply_transform_to_zyx_and_save( if not _check_nan_n_zeros(czyx_data): transformed_czyx = func(czyx_data, **kwargs) # Write to file - with open_ome_zarr(output_path, mode="r+") as output_dataset: + with open_ome_zarr(output_store_path, mode="r+") as output_dataset: output_dataset[0].oindex[ t_idx_out, output_channel_indices ] = transformed_czyx @@ -166,60 +224,84 @@ def apply_transform_to_zyx_and_save( # TODO: modify how we get the time and channesl like recOrder (isinstance(input, list) or instance(input,int) or all) def process_single_position( func, - input_data_path: Path, - output_path: Path, - time_indices_in: list = "all", - time_indices_out: list = [], - input_channel_idx: list = [], - output_channel_idx: list = [], - num_processes: int = mp.cpu_count(), + input_position_path: Path, + output_store_path: Path, + time_indices_in: Union[list[Union[int, slice]], int] = "all", + time_indices_out: list[int] = [], + input_channel_idx: Union[list[slice], list[list[int]]] = [], + output_channel_idx: Union[list[slice], list[list[int]]] = [], + num_processes: int = 1, **kwargs, ) -> None: """ - Register a single position with multiprocessing parallelization over T and C + Apply function to data in an `iohub` `Position`, parallelizing over time and channel indices, and save result in an output Zarr store. Parameters ---------- - func : CZYX -> CZYX function - The function to be applied to the data - input_data_path : Path - The path to input position - output_path : Path - The path to output OME-Zarr Store - time_indices_in : list - The time indices to process. - Must match time_indices_out if not "all" - time_indices_out : list - The time indices to write to. - Must match time_indices_in if not empty. - Typically used for stabilization, which needs a per timepoint processing - input_channel_idx : list - The channel indices to process. - If empty list, process all channels. - Must match output_channel_idx if not empty - output_channel_idx : list - The channel indices to write to. - If empty list, write to all channels. - Must match input_channel_idx if not empty - num_processes : int - Number of simulatenous processes per position - kwargs : dict - Additional arguments to pass to the CZYX function - - Usage: - ------ - Multiprocessing over T and C: - - input_channel_idx and output_channel_idx should be empty. - Multiprocessing over T only: - - input_channel_idx and output_channel_idx should be provided. + func :CZYX -> CZYX Callable + The function to be applied to the data. Should take a CZYX array and return a transformed CZYX array. + input_position_path : Path + The path to the input Position (e.g., input_position_path.zarr/0/0/0). + output_store_path : Path + The path to the output OME-Zarr store (e.g., output_store_path.zarr). + time_indices_in : Union[list[Union[int, slice]], int], optional + The time indices to process. Acceptable values: + - "all": Process all time points. + - A list of slices: [slice(0, 2), slice(2, 4), ...]. + - A single integer: 0, 1, ... + If "all", time_indices_out should also be "all". Defaults to "all". + time_indices_out : list[int], optional + The time indices to write to. Must match time_indices_in if not empty. + Typically used for stabilization, which needs per timepoint processing. Defaults to an empty list. + input_channel_idx : Union[list[slice], list[list[int]]], optional + The channel indices to process. Acceptable values: + - A list of slices: [slice(0, 2), slice(2, 4), ...]. + - A list of lists of integers: [[0], [1], [2, 3, 4], ...]. + If empty, process all channels. Must match output_channel_idx if not empty. Defaults to an empty list. + output_channel_idx : Union[list[slice], list[list[int]]], optional + The channel indices to write to. Acceptable values: + - A list of slices: [slice(0, 2), slice(2, 4), ...]. + - A list of lists of integers: [[0], [1], [2, 3, 4], ...]. + If empty, write to all channels. Must match input_channel_idx if not empty. Defaults to an empty list. + num_processes : int, optional + Number of simultaneous processes per position. Defaults to 1. + kwargs : dict, optional + Additional arguments to pass to the function. A dictionary with key "extra_metadata" can be passed to be stored at a FOV level, e.g., kwargs={"extra_metadata": {"Temperature": 37.5, "CO2_level": 0.5}}. + + Examples + -------- + Using slices for input_channel_idx: + process_single_position( + func=some_function, + input_position_path=Path("/path/to/input"), + output_store_path=Path("/path/to/output"), + time_indices_in=[slice(1, 2), slice(2, 3)], + input_channel_idx=[slice(0, 2), slice(2, 4)], + output_channel_idx=[[0], [1]], + ) + + Using list of lists for input_channel_idx: + process_single_position( + func=some_function, + input_position_path=Path("/path/to/input"), + output_store_path=Path("/path/to/output"), + time_indices_in=[slice(1, 2), slice(2, 3)], + input_channel_idx=[[0, 1], [2, 3]], + output_channel_idx=[[0], [1]], + ) + + Notes + ----- + - Multiprocessing over T and C: input_channel_idx and output_channel_idx should be empty. + - Multiprocessing over T only: input_channel_idx and output_channel_idx should be provided. """ # Function to be applied click.echo(f"Function to be applied: \t{func}") # Get the reader and writer - click.echo(f"Input data path:\t{input_data_path}") - click.echo(f"Output data path:\t{str(output_path)}") - input_dataset = open_ome_zarr(str(input_data_path)) + click.echo(f"Input data path:\t{input_position_path}") + click.echo(f"Output data path:\t{str(output_store_path)}") + input_dataset = open_ome_zarr(str(input_position_path)) stdout_buffer = io.StringIO() with contextlib.redirect_stdout(stdout_buffer): input_dataset.print_tree() @@ -255,7 +337,7 @@ def process_single_position( # Write the settings into the metadata if existing if "extra_metadata" in non_func_args: # For each dictionary in the nest - with open_ome_zarr(output_path, mode="r+") as output_dataset: + with open_ome_zarr(output_store_path, mode="r+") as output_dataset: for params_metadata_keys in kwargs["extra_metadata"].keys(): output_dataset.zattrs["extra_metadata"] = non_func_args[ "extra_metadata" @@ -277,7 +359,7 @@ def process_single_position( apply_transform_to_zyx_and_save, func, input_dataset, - output_path / Path(*input_data_path.parts[-3:]), + output_store_path / Path(*input_position_path.parts[-3:]), **func_args, ) else: @@ -287,7 +369,7 @@ def process_single_position( apply_transform_to_zyx_and_save, func, input_dataset, - output_path / Path(*input_data_path.parts[-3:]), + output_store_path / Path(*input_position_path.parts[-3:]), input_channel_idx, output_channel_idx, **func_args,