Skip to content

Commit

Permalink
improving docstring for functions and renaming input and output path …
Browse files Browse the repository at this point in the history
…arguments to be more intuitive.
  • Loading branch information
edyoshikun committed Jul 31, 2024
1 parent ee502b2 commit 7fc31a7
Showing 1 changed file with 164 additions and 82 deletions.
246 changes: 164 additions & 82 deletions iohub/ngff_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import multiprocessing as mp
from functools import partial
from pathlib import Path
from typing import Tuple
from typing import Tuple, Union

import click
import numpy as np
Expand All @@ -15,7 +15,7 @@
from iohub.ngff_meta import TransformationMeta


def create_empty_hcs_zarr(
def create_empty_plate(
store_path: Path,
position_keys: list[Tuple[str]],
channel_names: list[str],
Expand All @@ -26,26 +26,52 @@ def create_empty_hcs_zarr(
max_chunk_size_bytes=500e6,
) -> None:
"""
This function creates a new HCS Plate in OME-Zarr format if the plate does not exist.
If the plate exists, appends positions and channels if they are not
already in the plate.
Create a new HCS Plate in OME-Zarr format if the plate does not exist. If the plate exists, append positions and channels if they are not already in the plate.
Parameters
----------
store_path : Path
hcs plate path
Path to the HCS plate.
position_keys : list[Tuple[str]]
Position keys, will append if not present in the plate.
e.g. [("A", "1", "0"), ("A", "1", "1")]
shape : Tuple[int]
chunks : Tuple[int]
scale : Tuple[float]
Position keys to append if not present in the plate, e.g., [("A", "1", "0"), ("A", "1", "1")].
channel_names : list[str]
Channel names, will append if not present in metadata.
dtype : DTypeLike
List of channel names. If the store exists, append if not present in metadata.
shape : Tuple[int]
TCZYX shape of the plate.
chunks : Tuple[int], optional
Chunk size for the plate TCZYX. If None, the chunk size is calculated based on the shape to be <500MB. Defaults to None.
scale : Tuple[float], optional
Scale of the plate TCZYX. Defaults to (1, 1, 1, 1, 1).
dtype : DTypeLike, optional
Data type of the plate. Defaults to np.float32.
max_chunk_size_bytes : float, optional
Maximum chunk size in bytes. Defaults to 500e6 (500MB).
Examples
--------
Create a new plate with positions and channels:
create_empty_plate(
store_path=Path("/path/to/store"),
position_keys=[("A", "1", "0"), ("A", "1", "1")],
channel_names=["DAPI", "FITC"],
shape=(1, 1, 256, 256, 256)
)
Create a plate with custom chunk size and scale:
create_empty_plate(
store_path=Path("/path/to/store"),
position_keys=[("A", "1", "0")],
channel_names=["DAPI"],
shape=(1, 1, 256, 256, 256),
chunks=(1, 1, 128, 128, 128),
scale=(1, 1, 0.5, 0.5, 0.5)
)
Notes
-----
- If `chunks` is not provided, the function calculates an appropriate chunk size to keep the chunks under the specified `max_chunk_size_bytes`.
- The function ensures that positions and channels are appended to an existing plate if they are not already present.
"""
MAX_CHUNK_SIZE = max_chunk_size_bytes # in bytes
bytes_per_pixel = np.dtype(dtype).itemsize

# Limiting the chunking to 500MB
Expand All @@ -54,7 +80,8 @@ def create_empty_hcs_zarr(
# XY image is larger than MAX_CHUNK_SIZE
while (
chunk_zyx_shape[-3] > 1
and np.prod(chunk_zyx_shape) * bytes_per_pixel > MAX_CHUNK_SIZE
and np.prod(chunk_zyx_shape) * bytes_per_pixel
> max_chunk_size_bytes
):
chunk_zyx_shape[-3] = np.ceil(chunk_zyx_shape[-3] / 2).astype(int)
chunk_zyx_shape = tuple(chunk_zyx_shape)
Expand Down Expand Up @@ -95,38 +122,69 @@ def create_empty_hcs_zarr(
def apply_transform_to_zyx_and_save(
func,
position: Position,
output_path: Path,
input_channel_indices: list[int],
output_channel_indices: list[int],
output_store_path: Path,
input_channel_indices: Union[list[slice], list[list[int]]],
output_channel_indices: Union[list[slice], list[list[int]]],
t_idx_in: int,
t_idx_out: int,
**kwargs,
) -> None:
"""
Load a CZYX array from a Position object, apply a transformation to CZYX.
Load a CZYX array from a Position object, apply a transformation, and save the result.
Parameters
----------
func : CZYX -> CZYX function
The function to be applied to the data
func : Callable
The function to be applied to the data. Should take a CZYX array and return a transformed CZYX array.
position : Position
The position object to read from
output_path : Path
The path to output OME-Zarr Store
input_channel_indices : list
The channel indices to process.
The position object to read from.
output_store_path : Path
The path to output OME-Zarr Store.
input_channel_indices : Union[list[slice], list[list[int]]]
The channel indices to process. Acceptable values:
- A list of slices: [slice(0, 2), slice(2, 4), ...].
- A list of lists of integers: [[0], [1], [2, 3, 4], ...].
If empty list, process all channels.
Must match output_channel_indices if not empty
output_channel_indices : list
The channel indices to write to.
output_channel_indices : Union[list[slice], list[list[int]]]
The channel indices to write to. Acceptable values:
- A list of slices: [slice(0, 2), slice(2, 4), ...].
- A list of lists of integers: [[0], [1], [2, 3, 4], ...].
If empty list, write to all channels.
Must match input_channel_indices if not empty
t_idx_in : int
The time index to process
The time index to process.
t_idx_out : int
The time index to write to
kwargs : dict
Additional arguments to pass to the CZYX function
The time index to write to.
kwargs : dict, optional
Additional arguments to pass to the function. A dictionary with key "extra_metadata" can be passed to be stored at a FOV level, e.g., kwargs={"extra_metadata": {"Temperature": 37.5, "CO2_level": 0.5}}.
Examples
--------
Using slices for input_channel_indices:
apply_transform_to_zyx_and_save(
func=some_function,
position=some_position,
output_store_path=Path("/path/to/output"),
input_channel_indices=[slice(0, 2), slice(2, 4)],
output_channel_indices=[[0], [1]],
t_idx_in=0,
t_idx_out=0,
)
Using list of lists for input_channel_indices:
apply_transform_to_zyx_and_save(
func=some_function,
position=some_position,
output_store_path=Path("/path/to/output"),
input_channel_indices=[[0, 1], [2, 3]],
output_channel_indices=[[0], [1]],
t_idx_in=0,
t_idx_out=0,
)
Notes
-----
- If input_channel_indices or output_channel_indices contain nested lists, the indices should be integers.
- Ensure that the lengths of input_channel_indices and output_channel_indices match if they are provided.
"""

# TODO: temporary fix to slumkit issue
Expand All @@ -152,7 +210,7 @@ def apply_transform_to_zyx_and_save(
if not _check_nan_n_zeros(czyx_data):
transformed_czyx = func(czyx_data, **kwargs)
# Write to file
with open_ome_zarr(output_path, mode="r+") as output_dataset:
with open_ome_zarr(output_store_path, mode="r+") as output_dataset:
output_dataset[0].oindex[
t_idx_out, output_channel_indices
] = transformed_czyx
Expand All @@ -166,60 +224,84 @@ def apply_transform_to_zyx_and_save(
# TODO: modify how we get the time and channesl like recOrder (isinstance(input, list) or instance(input,int) or all)
def process_single_position(
func,
input_data_path: Path,
output_path: Path,
time_indices_in: list = "all",
time_indices_out: list = [],
input_channel_idx: list = [],
output_channel_idx: list = [],
num_processes: int = mp.cpu_count(),
input_position_path: Path,
output_store_path: Path,
time_indices_in: Union[list[Union[int, slice]], int] = "all",
time_indices_out: list[int] = [],
input_channel_idx: Union[list[slice], list[list[int]]] = [],
output_channel_idx: Union[list[slice], list[list[int]]] = [],
num_processes: int = 1,
**kwargs,
) -> None:
"""
Register a single position with multiprocessing parallelization over T and C
Apply function to data in an `iohub` `Position`, parallelizing over time and channel indices, and save result in an output Zarr store.
Parameters
----------
func : CZYX -> CZYX function
The function to be applied to the data
input_data_path : Path
The path to input position
output_path : Path
The path to output OME-Zarr Store
time_indices_in : list
The time indices to process.
Must match time_indices_out if not "all"
time_indices_out : list
The time indices to write to.
Must match time_indices_in if not empty.
Typically used for stabilization, which needs a per timepoint processing
input_channel_idx : list
The channel indices to process.
If empty list, process all channels.
Must match output_channel_idx if not empty
output_channel_idx : list
The channel indices to write to.
If empty list, write to all channels.
Must match input_channel_idx if not empty
num_processes : int
Number of simulatenous processes per position
kwargs : dict
Additional arguments to pass to the CZYX function
Usage:
------
Multiprocessing over T and C:
- input_channel_idx and output_channel_idx should be empty.
Multiprocessing over T only:
- input_channel_idx and output_channel_idx should be provided.
func :CZYX -> CZYX Callable
The function to be applied to the data. Should take a CZYX array and return a transformed CZYX array.
input_position_path : Path
The path to the input Position (e.g., input_position_path.zarr/0/0/0).
output_store_path : Path
The path to the output OME-Zarr store (e.g., output_store_path.zarr).
time_indices_in : Union[list[Union[int, slice]], int], optional
The time indices to process. Acceptable values:
- "all": Process all time points.
- A list of slices: [slice(0, 2), slice(2, 4), ...].
- A single integer: 0, 1, ...
If "all", time_indices_out should also be "all". Defaults to "all".
time_indices_out : list[int], optional
The time indices to write to. Must match time_indices_in if not empty.
Typically used for stabilization, which needs per timepoint processing. Defaults to an empty list.
input_channel_idx : Union[list[slice], list[list[int]]], optional
The channel indices to process. Acceptable values:
- A list of slices: [slice(0, 2), slice(2, 4), ...].
- A list of lists of integers: [[0], [1], [2, 3, 4], ...].
If empty, process all channels. Must match output_channel_idx if not empty. Defaults to an empty list.
output_channel_idx : Union[list[slice], list[list[int]]], optional
The channel indices to write to. Acceptable values:
- A list of slices: [slice(0, 2), slice(2, 4), ...].
- A list of lists of integers: [[0], [1], [2, 3, 4], ...].
If empty, write to all channels. Must match input_channel_idx if not empty. Defaults to an empty list.
num_processes : int, optional
Number of simultaneous processes per position. Defaults to 1.
kwargs : dict, optional
Additional arguments to pass to the function. A dictionary with key "extra_metadata" can be passed to be stored at a FOV level, e.g., kwargs={"extra_metadata": {"Temperature": 37.5, "CO2_level": 0.5}}.
Examples
--------
Using slices for input_channel_idx:
process_single_position(
func=some_function,
input_position_path=Path("/path/to/input"),
output_store_path=Path("/path/to/output"),
time_indices_in=[slice(1, 2), slice(2, 3)],
input_channel_idx=[slice(0, 2), slice(2, 4)],
output_channel_idx=[[0], [1]],
)
Using list of lists for input_channel_idx:
process_single_position(
func=some_function,
input_position_path=Path("/path/to/input"),
output_store_path=Path("/path/to/output"),
time_indices_in=[slice(1, 2), slice(2, 3)],
input_channel_idx=[[0, 1], [2, 3]],
output_channel_idx=[[0], [1]],
)
Notes
-----
- Multiprocessing over T and C: input_channel_idx and output_channel_idx should be empty.
- Multiprocessing over T only: input_channel_idx and output_channel_idx should be provided.
"""
# Function to be applied
click.echo(f"Function to be applied: \t{func}")

# Get the reader and writer
click.echo(f"Input data path:\t{input_data_path}")
click.echo(f"Output data path:\t{str(output_path)}")
input_dataset = open_ome_zarr(str(input_data_path))
click.echo(f"Input data path:\t{input_position_path}")
click.echo(f"Output data path:\t{str(output_store_path)}")
input_dataset = open_ome_zarr(str(input_position_path))
stdout_buffer = io.StringIO()
with contextlib.redirect_stdout(stdout_buffer):
input_dataset.print_tree()
Expand Down Expand Up @@ -255,7 +337,7 @@ def process_single_position(
# Write the settings into the metadata if existing
if "extra_metadata" in non_func_args:
# For each dictionary in the nest
with open_ome_zarr(output_path, mode="r+") as output_dataset:
with open_ome_zarr(output_store_path, mode="r+") as output_dataset:
for params_metadata_keys in kwargs["extra_metadata"].keys():
output_dataset.zattrs["extra_metadata"] = non_func_args[
"extra_metadata"
Expand All @@ -277,7 +359,7 @@ def process_single_position(
apply_transform_to_zyx_and_save,
func,
input_dataset,
output_path / Path(*input_data_path.parts[-3:]),
output_store_path / Path(*input_position_path.parts[-3:]),
**func_args,
)
else:
Expand All @@ -287,7 +369,7 @@ def process_single_position(
apply_transform_to_zyx_and_save,
func,
input_dataset,
output_path / Path(*input_data_path.parts[-3:]),
output_store_path / Path(*input_position_path.parts[-3:]),
input_channel_idx,
output_channel_idx,
**func_args,
Expand Down

0 comments on commit 7fc31a7

Please sign in to comment.