diff --git a/pyproject.toml b/pyproject.toml index cd7f02b0..a304cf34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,8 +17,9 @@ readme = "README.md" dynamic = ["version"] dependencies = [ - "aind-data-schema==0.13.95", - "aind-metadata-service[client]" + "aind-data-schema==0.15.9", + "aind-metadata-service[client]", + "scanimage-tiff-reader==1.4.1.4" ] [project.optional-dependencies] diff --git a/src/aind_metadata_mapper/bergamo/__init__.py b/src/aind_metadata_mapper/bergamo/__init__.py new file mode 100644 index 00000000..bde8cab7 --- /dev/null +++ b/src/aind_metadata_mapper/bergamo/__init__.py @@ -0,0 +1 @@ +"""Maps bergamo metadata into a session model""" diff --git a/src/aind_metadata_mapper/bergamo/session.py b/src/aind_metadata_mapper/bergamo/session.py new file mode 100644 index 00000000..d7a38bed --- /dev/null +++ b/src/aind_metadata_mapper/bergamo/session.py @@ -0,0 +1,602 @@ +"""Module to map bergamo metadata into a session model""" + +import argparse +import json +import logging +import os +import re +import sys +from dataclasses import dataclass +from datetime import datetime, time +from os import PathLike +from pathlib import Path +from typing import Dict, List, Tuple, Union + +import numpy as np +from aind_data_schema.session import ( + Detector, + FieldOfView, + Laser, + Modality, + Session, + Stream, +) +from aind_data_schema.stimulus import ( + PhotoStimulation, + PhotoStimulationGroup, + StimulusEpoch, +) +from aind_data_schema.utils.units import PowerUnit, SizeUnit +from pydantic import BaseSettings, Extra +from ScanImageTiffReader import ScanImageTiffReader + +from aind_metadata_mapper.core import BaseEtl + + +class UserSettings(BaseSettings): + """Data that needs to be input by user. Can be pulled from env vars with + BERGAMO prefix or set explicitly.""" + + experimenter_full_name: List[str] + subject_id: str + # TODO: Look into if the following can be extracted from tif directory + session_start_time: datetime + session_end_time: datetime + stream_start_time: datetime + stream_end_time: datetime + stimulus_start_time: time + stimulus_end_time: time + + # Data that might change but can have default values + session_type: str = "BCI" + iacuc_protocol: str = "2115" + rig_id: str = "Bergamo photostim." + camera_names: Tuple[str] = ("Side Camera",) + laser_a_name: str = "Laser A" + laser_a_wavelength: int = 920 + laser_a_wavelength_unit: SizeUnit = SizeUnit.NM + detector_a_name: str = "PMT A" + detector_a_exposure_time: float = 0.1 + detector_a_trigger_type: str = "Internal" + fov_0_index: int = 0 + fov_0_imaging_depth: int = 150 + fov_0_targeted_structure: str = "M1" + fov_0_coordinate_ml: float = 1.5 + fov_0_coordinate_ap: float = 1.5 + fov_0_reference: str = "Bregma" + fov_0_magnification: str = "16x" + photo_stim_inter_trial_interval: int = 10 + photo_stim_groups: List[Dict[str, int]] = [ + {"group_index": 0, "number_trials": 5}, + {"group_index": 0, "number_trials": 5}, + ] + + @property + def num_of_photo_stim_groups(self): + """Compute number of photo stimulation groups from list of groups""" + return len(self.photo_stim_groups) + + class Config: + """Config to set env var prefix to BERGAMO""" + + extra = Extra.forbid + env_prefix = "BERGAMO_" + + +@dataclass(frozen=True) +class RawImageInfo: + """Metadata from tif files""" + + metadata: str + description0: str + shape: List[int] + + +@dataclass(frozen=True) +class ParsedMetadata: + """RawImageInfo gets parsed into this data""" + + metadata: dict + roi_data: dict + roi_metadata: dict + frame_rate: str + num_planes: int + shape: List[int] + description_first_frame: dict + movie_start_time: datetime + + +class BergamoEtl(BaseEtl): + """Class to manage transforming bergamo data files into a Session object""" + + def __init__( + self, + input_source: Union[str, PathLike], + output_directory: Path, + user_settings: UserSettings, + ): + """ + Class constructor for Base etl class. + Parameters + ---------- + input_source : Union[str, PathLike] + Can be a string or a Path + output_directory : Path + The directory where to save the json files. + user_settings: UserSettings + Variables for a particular session + """ + super().__init__(input_source, output_directory) + self.user_settings = user_settings + + @staticmethod + def _flat_dict_to_nested(flat: dict, key_delim: str = ".") -> dict: + """ + Utility method to convert a flat dictionary into a nested dictionary. + Modified from https://stackoverflow.com/a/50607551 + Parameters + ---------- + flat : dict + Example {"a.b.c": 1, "a.b.d": 2, "e.f": 3} + key_delim : str + Delimiter on dictionary keys. Default is '.'. + + Returns + ------- + dict + A nested dictionary like {"a": {"b": {"c":1, "d":2}, "e": {"f":3}} + """ + + def __nest_dict_rec(k, v, out) -> None: + """Simple recursive method being called.""" + k, *rest = k.split(key_delim, 1) + if rest: + __nest_dict_rec(rest[0], v, out.setdefault(k, {})) + else: + out[k] = v + + result = {} + for flat_key, flat_val in flat.items(): + __nest_dict_rec(flat_key, flat_val, result) + return result + + def _parse_raw_image_info( + self, raw_image_info: RawImageInfo + ) -> ParsedMetadata: + """ + Parses metadata from raw image info. + Parameters + ---------- + raw_image_info : RawImageInfo + + Returns + ------- + ParsedMetadata + """ + + # The metadata contains two parts separated by \n\n. The top part + # looks like + # 'SI.abc.def = 1\n SI.abc.ghf=2' + # We'll convert that to a nested dict. + metadata_first_part = raw_image_info.metadata.split("\n\n")[0] + flat_metadata_header_dict = dict( + [ + (s.split(" = ", 1)[0], s.split(" = ", 1)[1]) + for s in metadata_first_part.split("\n") + ] + ) + metadata = self._flat_dict_to_nested(flat_metadata_header_dict) + # Move SI dictionary up one level + if "SI" in metadata.keys(): + si_contents = metadata.pop("SI") + metadata.update(si_contents) + + # The second part is a standard json string. We'll extract it and + # append it to our dictionary + metadata_json = json.loads(raw_image_info.metadata.split("\n\n")[1]) + metadata["json"] = metadata_json + + # Convert description string to a dictionary + first_frame_description_str = raw_image_info.description0.strip() + description_first_image_dict = dict( + [ + (s.split(" = ", 1)[0], s.split(" = ", 1)[1]) + for s in first_frame_description_str.split("\n") + ] + ) + frame_rate = metadata["hRoiManager"]["scanVolumeRate"] + # TODO: Use .get instead of try/except and add coverage test + try: + z_collection = metadata["hFastZ"]["userZs"] + num_planes = len(z_collection) # pragma: no cover + except Exception as e: # new scanimage version + logging.error( + f"Multiple planes not handled in metadata collection. " + f"HANDLE ME!!!: {repr(e)}" + ) + # TODO: Check if this if/else is necessary + if metadata["hFastZ"]["enable"] == "true": + num_planes = 1 # pragma: no cover + else: + num_planes = 1 + + roi_metadata = metadata["json"]["RoiGroups"]["imagingRoiGroup"]["rois"] + + if isinstance(roi_metadata, dict): + roi_metadata = [roi_metadata] + num_rois = len(roi_metadata) + roi = {} + w_px = [] + h_px = [] + cXY = [] + szXY = [] + for r in range(num_rois): + roi[r] = {} + roi[r]["w_px"] = roi_metadata[r]["scanfields"][ + "pixelResolutionXY" + ][0] + w_px.append(roi[r]["w_px"]) + roi[r]["h_px"] = roi_metadata[r]["scanfields"][ + "pixelResolutionXY" + ][1] + h_px.append(roi[r]["h_px"]) + roi[r]["center"] = roi_metadata[r]["scanfields"]["centerXY"] + cXY.append(roi[r]["center"]) + roi[r]["size"] = roi_metadata[r]["scanfields"]["sizeXY"] + szXY.append(roi[r]["size"]) + + w_px = np.asarray(w_px) + h_px = np.asarray(h_px) + szXY = np.asarray(szXY) + cXY = np.asarray(cXY) + cXY = cXY - szXY / 2 + cXY = cXY - np.amin(cXY, axis=0) + mu = np.median(np.transpose(np.asarray([w_px, h_px])) / szXY, axis=0) + imin = cXY * mu + + n_rows_sum = np.sum(h_px) + n_flyback = (raw_image_info.shape[1] - n_rows_sum) / np.max( + [1, num_rois - 1] + ) + + irow = np.insert(np.cumsum(np.transpose(h_px) + n_flyback), 0, 0) + irow = np.delete(irow, -1) + irow = np.vstack((irow, irow + np.transpose(h_px))) + + data = {"fs": frame_rate, "nplanes": num_planes, "nrois": num_rois} + if data["nrois"] == 1: + data["mesoscan"] = 0 + else: + # TODO: Add coverage example + data["mesoscan"] = 1 # pragma: no cover + # TODO: Add coverage example + if data["mesoscan"]: # pragma: no cover + # data['nrois'] = num_rois #or irow.shape[1]? + data["dx"] = [] + data["dy"] = [] + data["lines"] = [] + for i in range(num_rois): + data["dx"] = np.hstack((data["dx"], imin[i, 1])) + data["dy"] = np.hstack((data["dy"], imin[i, 0])) + # TODO: NOT QUITE RIGHT YET + data["lines"] = list( + range( + irow[0, i].astype("int32"), + irow[1, i].astype("int32") - 1, + ) + ) + data["dx"] = data["dx"].astype("int32") + data["dy"] = data["dy"].astype("int32") + logging.debug(f"data[dx]: {data['dx']}") + logging.debug(f"data[dy]: {data['dy']}") + logging.debug(f"data[lines]: {data['lines']}") + movie_start_time = datetime.strptime( + description_first_image_dict["epoch"], "[%Y %m %d %H %M %S.%f]" + ) + + return ParsedMetadata( + metadata=metadata, + roi_data=data, + roi_metadata=roi_metadata, + frame_rate=frame_rate, + num_planes=num_planes, + shape=raw_image_info.shape, + description_first_frame=description_first_image_dict, + movie_start_time=movie_start_time, + ) + + @staticmethod + def _get_si_file_from_dir( + source_dir: Path, regex_pattern: str = r"^.*?(\d+).tif+$" + ) -> Path: + """ + Utility method to scan top level of source_dir for .tif or .tiff files. + Sorts them by file number and collects the first one. The directory + contains files that look like neuron50_00001.tif, neuron50_00002.tif. + Parameters + ---------- + source_dir : Path + Directory where the tif files are located + regex_pattern : str + Format of how files are expected to be organized. Default matches + against something that ends with a series of digits and .tif(f) + + Returns + ------- + Path + File path of the first tif file. + + """ + compiled_regex = re.compile(regex_pattern) + tif_filepath = None + old_tif_number = None + for root, dirs, files in os.walk(source_dir): + for name in files: + matched = re.match(compiled_regex, name) + if matched: + tif_number = matched.group(1) + if old_tif_number is None or tif_number < old_tif_number: + old_tif_number = tif_number + tif_filepath = Path(os.path.join(root, name)) + + # Only scan the top level files + break + if tif_filepath is None: + raise FileNotFoundError("Directory must contain tif or tiff file!") + else: + return tif_filepath + + def _extract(self) -> RawImageInfo: + """Extract metadata from bergamo session. If input source is a file, + will extract data from file. If input source is a directory, will + attempt to find a file.""" + if isinstance(self.input_source, str): + input_source = Path(self.input_source) + else: + input_source = self.input_source + + if os.path.isfile(input_source): + file_with_metadata = input_source + else: + file_with_metadata = self._get_si_file_from_dir(input_source) + # Not sure if a custom header was appended, but we can't use + # o=json.loads(reader.metadata()) directly + with ScanImageTiffReader(str(file_with_metadata)) as reader: + img_metadata = reader.metadata() + img_description = reader.description(0) + img_shape = reader.shape() + return RawImageInfo( + metadata=img_metadata, + description0=img_description, + shape=img_shape, + ) + + def _transform(self, extracted_source: RawImageInfo) -> Session: + """ + Transforms the raw data extracted from the tif directory into a + Session object. + Parameters + ---------- + extracted_source : RawImageInfo + + Returns + ------- + Session + + """ + siHeader = self._parse_raw_image_info(extracted_source) + photostim_groups = siHeader.metadata["json"]["RoiGroups"][ + "photostimRoiGroups" + ] + + data_stream = Stream( + stream_start_time=self.user_settings.stream_start_time, + stream_end_time=self.user_settings.stream_end_time, + stream_modalities=[Modality.POPHYS], + camera_names=list(self.user_settings.camera_names), + light_sources=[ + Laser( + name=self.user_settings.laser_a_name, + wavelength=self.user_settings.laser_a_wavelength, + wavelength_unit=self.user_settings.laser_a_wavelength_unit, + excitation_power=int( + siHeader.metadata["hBeams"]["powers"][1:-1].split()[0] + ), + excitation_power_unit=PowerUnit.PERCENT, + ), + ], + detectors=[ + Detector( + name=self.user_settings.detector_a_name, + exposure_time=self.user_settings.detector_a_exposure_time, + trigger_type=self.user_settings.detector_a_trigger_type, + ), + ], + ophys_fovs=[ + FieldOfView( + index=self.user_settings.fov_0_index, + imaging_depth=self.user_settings.fov_0_imaging_depth, + targeted_structure=( + self.user_settings.fov_0_targeted_structure + ), + fov_coordinate_ml=self.user_settings.fov_0_coordinate_ml, + fov_coordinate_ap=self.user_settings.fov_0_coordinate_ap, + fov_reference=self.user_settings.fov_0_reference, + fov_width=int( + siHeader.metadata["hRoiManager"]["pixelsPerLine"] + ), + fov_height=int( + siHeader.metadata["hRoiManager"]["linesPerFrame"] + ), + magnification=self.user_settings.fov_0_magnification, + fov_scale_factor=float( + siHeader.metadata["hRoiManager"]["scanZoomFactor"] + ), + frame_rate=float( + siHeader.metadata["hRoiManager"]["scanFrameRate"] + ), + ), + ], + ) + return Session( + experimenter_full_name=self.user_settings.experimenter_full_name, + session_start_time=self.user_settings.session_start_time, + session_end_time=self.user_settings.session_end_time, + subject_id=self.user_settings.subject_id, + session_type=self.user_settings.session_type, + iacuc_protocol=self.user_settings.iacuc_protocol, + rig_id=self.user_settings.rig_id, + data_streams=[data_stream], + stimulus_epochs=[ + StimulusEpoch( + stimulus=PhotoStimulation( + stimulus_name="PhotoStimulation", + number_groups=( + self.user_settings.num_of_photo_stim_groups + ), + groups=[ + PhotoStimulationGroup( + group_index=( + self.user_settings.photo_stim_groups[0][ + "group_index" + ] + ), + number_of_neurons=int( + np.array( + photostim_groups[0]["rois"][1][ + "scanfields" + ]["slmPattern"] + ).shape[0] + ), + stimulation_laser_power=int( + photostim_groups[0]["rois"][1][ + "scanfields" + ]["powers"] + ), + number_trials=( + self.user_settings.photo_stim_groups[0][ + "number_trials" + ] + ), + number_spirals=int( + photostim_groups[0]["rois"][1][ + "scanfields" + ]["repetitions"] + ), + spiral_duration=photostim_groups[0]["rois"][1][ + "scanfields" + ]["duration"], + inter_spiral_interval=photostim_groups[0][ + "rois" + ][2]["scanfields"]["duration"], + ), + PhotoStimulationGroup( + group_index=( + self.user_settings.photo_stim_groups[1][ + "group_index" + ] + ), + number_of_neurons=int( + np.array( + photostim_groups[0]["rois"][1][ + "scanfields" + ]["slmPattern"] + ).shape[0] + ), + stimulation_laser_power=int( + photostim_groups[0]["rois"][1][ + "scanfields" + ]["powers"] + ), + number_trials=( + self.user_settings.photo_stim_groups[1][ + "number_trials" + ] + ), + number_spirals=int( + photostim_groups[0]["rois"][1][ + "scanfields" + ]["repetitions"] + ), + spiral_duration=photostim_groups[0]["rois"][1][ + "scanfields" + ]["duration"], + inter_spiral_interval=photostim_groups[0][ + "rois" + ][2]["scanfields"]["duration"], + ), + ], + inter_trial_interval=( + self.user_settings.photo_stim_inter_trial_interval + ), + ), + stimulus_start_time=( + self.user_settings.stimulus_start_time + ), + stimulus_end_time=self.user_settings.stimulus_end_time, + ) + ], + ) + + @classmethod + def from_args(cls, args: list): + """ + Adds ability to construct settings from a list of arguments. + Parameters + ---------- + args : list + A list of command line arguments to parse. + """ + + parser = argparse.ArgumentParser() + parser.add_argument( + "-i", + "--input-source", + required=False, + type=str, + help="Directory where tif files are located", + ) + parser.add_argument( + "-o", + "--output-directory", + required=False, + default=".", + type=str, + help=( + "Directory to save json file to. Defaults to current working " + "directory." + ), + ) + parser.add_argument( + "-u", + "--user-settings", + required=True, + type=json.loads, + help=( + r""" + Custom settings defined by the user defined as a json + string. For example: -u + '{"experimenter_full_name":["John Smith","Jane Smith"], + "subject_id":"12345", + "session_start_time":"2023-10-10T10:10:10", + "session_end_time":"2023-10-10T18:10:10", + "stream_start_time": "2023-10-10T11:10:10", + "stream_end_time":"2023-10-10T17:10:10", + "stimulus_start_time":"12:10:10", + "stimulus_end_time":"13:10:10"}' + """ + ), + ) + job_args = parser.parse_args(args) + user_settings_from_args = UserSettings(**job_args.user_settings) + return cls( + input_source=Path(job_args.input_source), + output_directory=Path(job_args.output_directory), + user_settings=user_settings_from_args, + ) + + +if __name__ == "__main__": + sys_args = sys.argv[1:] + etl = BergamoEtl.from_args(sys_args) + etl.run_job() diff --git a/src/aind_metadata_mapper/core.py b/src/aind_metadata_mapper/core.py index fda677ae..a575dad9 100644 --- a/src/aind_metadata_mapper/core.py +++ b/src/aind_metadata_mapper/core.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from os import PathLike from pathlib import Path -from typing import Any +from typing import Any, Union from aind_data_schema.base import AindCoreModel from pydantic import validate_model @@ -15,7 +15,9 @@ class BaseEtl(ABC): """Base etl class. Defines interface for extracting, transforming, and loading input sources into a json file saved locally.""" - def __init__(self, input_source: PathLike, output_directory: Path): + def __init__( + self, input_source: Union[PathLike, str], output_directory: Path + ): """ Class constructor for Base etl class. Parameters diff --git a/tests/resources/bergamo/cropped_neuron50_00001.tif b/tests/resources/bergamo/cropped_neuron50_00001.tif new file mode 100644 index 00000000..b05bcbb5 Binary files /dev/null and b/tests/resources/bergamo/cropped_neuron50_00001.tif differ diff --git a/tests/resources/bergamo/example_description0.txt b/tests/resources/bergamo/example_description0.txt new file mode 100644 index 00000000..1d3244eb --- /dev/null +++ b/tests/resources/bergamo/example_description0.txt @@ -0,0 +1,16 @@ +frameNumbers = 1 +acquisitionNumbers = 1 +frameNumberAcquisition = 1 +frameTimestamps_sec = 0.000000000 +acqTriggerTimestamps_sec = -0.000021560 +nextFileMarkerTimestamps_sec = -1.000000000 +endOfAcquisition = 0 +endOfAcquisitionMode = 0 +dcOverVoltage = 0 +epoch = [2023 7 24 14 14 17.854] +auxTrigger0 = [] +auxTrigger1 = [] +auxTrigger2 = [] +auxTrigger3 = [] +I2CData = {} + \ No newline at end of file diff --git a/tests/resources/bergamo/example_metadata.txt.gz b/tests/resources/bergamo/example_metadata.txt.gz new file mode 100644 index 00000000..17463649 Binary files /dev/null and b/tests/resources/bergamo/example_metadata.txt.gz differ diff --git a/tests/resources/bergamo/expected_session.json b/tests/resources/bergamo/expected_session.json new file mode 100644 index 00000000..55526019 --- /dev/null +++ b/tests/resources/bergamo/expected_session.json @@ -0,0 +1,127 @@ +{ + "describedBy": "https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/src/aind_data_schema/session.py", + "schema_version": "0.0.1", + "experimenter_full_name": [ + "John Smith", + "Jane Smith" + ], + "session_start_time": "2023-10-10T14:00:00", + "session_end_time": "2023-10-10T17:00:00", + "session_type": "BCI", + "iacuc_protocol": "2115", + "rig_id": "Bergamo photostim.", + "calibrations": null, + "maintenance": null, + "subject_id": 12345, + "animal_weight_prior": null, + "animal_weight_post": null, + "weight_unit": "gram", + "data_streams": [ + { + "stream_start_time": "2023-10-10T15:00:00", + "stream_end_time": "2023-10-10T16:00:00", + "stream_modalities": [ + { + "name": "Planar optical physiology", + "abbreviation": "ophys" + } + ], + "daq_names": null, + "camera_names": [ + "Side Camera" + ], + "light_sources": [ + { + "device_type": "Laser", + "name": "Laser A", + "wavelength": 920, + "wavelength_unit": "nanometer", + "excitation_power": 15, + "excitation_power_unit": "percent" + } + ], + "ephys_modules": null, + "manipulator_modules": null, + "detectors": [ + { + "name": "PMT A", + "exposure_time": 0.1, + "exposure_time_unit": "millisecond", + "trigger_type": "Internal" + } + ], + "fiber_photometry_assemblies": null, + "ophys_fovs": [ + { + "index": 0, + "imaging_depth": 150, + "imaging_depth_unit": "micrometer", + "targeted_structure": "M1", + "fov_coordinate_ml": 1.5, + "fov_coordinate_ap": 1.5, + "fov_coordinate_unit": "micrometer", + "fov_reference": "Bregma", + "fov_width": 512, + "fov_height": 512, + "fov_size_unit": "pixel", + "magnification": "16x", + "fov_scale_factor": 1.2, + "fov_scale_factor_unit": "um/pixel", + "frame_rate": 30.0119, + "frame_rate_unit": "hertz" + } + ], + "stack_parameters": null, + "stimulus_device_names": null, + "notes": null + } + ], + "stimulus_epochs": [ + { + "stimulus": { + "stimulus_type": "PhotoStimulation", + "stimulus_name": "PhotoStimulation", + "number_groups": 2, + "groups": [ + { + "group_index": 0, + "number_of_neurons": 10, + "stimulation_laser_power": 20, + "stimulation_laser_power_unit": "milliwatt", + "number_trials": 5, + "number_spirals": 10, + "spiral_duration": 0.01, + "spiral_duration_unit": "second", + "inter_spiral_interval": 0.001, + "inter_spiral_interval_unit": "second", + "other_parameters": null, + "notes": null + }, + { + "group_index": 0, + "number_of_neurons": 10, + "stimulation_laser_power": 20, + "stimulation_laser_power_unit": "milliwatt", + "number_trials": 5, + "number_spirals": 10, + "spiral_duration": 0.01, + "spiral_duration_unit": "second", + "inter_spiral_interval": 0.001, + "inter_spiral_interval_unit": "second", + "other_parameters": null, + "notes": null + } + ], + "inter_trial_interval": 10, + "inter_trial_interval_unit": "second", + "other_parameters": null, + "notes": null + }, + "stimulus_start_time": "15:15:00", + "stimulus_end_time": "15:45:00" + } + ], + "reward_delivery": null, + "stick_microscopes": null, + "notes": null +} \ No newline at end of file diff --git a/tests/resources/procedures_examples/basic_response.json b/tests/resources/procedures_examples/basic_response.json index 2f5e3921..73e54022 100644 --- a/tests/resources/procedures_examples/basic_response.json +++ b/tests/resources/procedures_examples/basic_response.json @@ -2,7 +2,7 @@ "message": "Valid Model.", "data": { "describedBy": "https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/src/aind_data_schema/procedures.py", - "schema_version": "0.9.3", + "schema_version": "0.9.5", "subject_id": "632269", "subject_procedures": [ { diff --git a/tests/resources/subject_examples/basic_response.json b/tests/resources/subject_examples/basic_response.json index 272a8e52..b2971bb8 100644 --- a/tests/resources/subject_examples/basic_response.json +++ b/tests/resources/subject_examples/basic_response.json @@ -2,7 +2,7 @@ "message":"Valid Model.", "data": { "describedBy":"https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/src/aind_data_schema/subject.py", - "schema_version":"0.3.0", + "schema_version": "0.9.5", "species":"Mus musculus", "subject_id":"632269", "sex":"Female", diff --git a/tests/test_bergamo.py b/tests/test_bergamo.py new file mode 100644 index 00000000..04ab2159 --- /dev/null +++ b/tests/test_bergamo.py @@ -0,0 +1,209 @@ +"""Tests parsing of session information from bergamo rig.""" + +import gzip +import json +import os +import unittest +from datetime import datetime, time +from pathlib import Path +from unittest.mock import MagicMock, patch + +from aind_metadata_mapper.bergamo.session import ( + BergamoEtl, + RawImageInfo, + UserSettings, +) + +RESOURCES_DIR = ( + Path(os.path.dirname(os.path.realpath(__file__))) / "resources" / "bergamo" +) +EXAMPLE_MD_PATH = RESOURCES_DIR / "example_metadata.txt.gz" +EXAMPLE_DES_PATH = RESOURCES_DIR / "example_description0.txt" +EXAMPLE_IMG_PATH = RESOURCES_DIR / "cropped_neuron50_00001.tif" +EXPECTED_SESSION = RESOURCES_DIR / "expected_session.json" + + +class TestBergamoEtl(unittest.TestCase): + """Test methods in BergamoEtl class.""" + + @classmethod + def setUpClass(cls): + """Load record object and user settings before running tests.""" + with gzip.open(EXAMPLE_MD_PATH, "rt") as f: + raw_md_contents = f.read() + with open(EXAMPLE_DES_PATH, "r") as f: + raw_des0_contents = f.read() + with open(EXPECTED_SESSION, "r") as f: + expected_session_contents = json.load(f) + cls.example_metadata = raw_md_contents + cls.example_description0 = raw_des0_contents + cls.example_shape = [347, 512, 512] + cls.example_user_settings = UserSettings( + experimenter_full_name=["John Smith", "Jane Smith"], + subject_id="12345", + session_start_time=datetime(2023, 10, 10, 14, 0, 0), + session_end_time=datetime(2023, 10, 10, 17, 0, 0), + stream_start_time=datetime(2023, 10, 10, 15, 0, 0), + stream_end_time=datetime(2023, 10, 10, 16, 0, 0), + stimulus_start_time=time(15, 15, 0), + stimulus_end_time=time(15, 45, 0), + ) + cls.expected_session = expected_session_contents + + @patch("aind_metadata_mapper.bergamo.session.ScanImageTiffReader") + def test_extract(self, mock_reader: MagicMock): + """Tests that the raw image info is extracted correcetly.""" + mock_context = mock_reader.return_value.__enter__.return_value + mock_context.metadata.return_value = self.example_metadata + mock_context.description.return_value = self.example_description0 + mock_context.shape.return_value = self.example_shape + # Test extracting where input source is a directory + etl_job1 = BergamoEtl( + input_source=RESOURCES_DIR, + output_directory=RESOURCES_DIR, + user_settings=self.example_user_settings, + ) + raw_image_info1 = etl_job1._extract() + self.assertEqual(2310025, len(raw_image_info1.metadata)) + self.assertEqual(2000, len(raw_image_info1.description0)) + self.assertEqual([347, 512, 512], raw_image_info1.shape) + + # Test extracting where input source is a file + etl_job2 = BergamoEtl( + input_source=EXAMPLE_IMG_PATH, + output_directory=RESOURCES_DIR, + user_settings=self.example_user_settings, + ) + raw_image_info2 = etl_job2._extract() + self.assertEqual(2310025, len(raw_image_info2.metadata)) + self.assertEqual(2000, len(raw_image_info2.description0)) + self.assertEqual([347, 512, 512], raw_image_info2.shape) + + # Test extracting where input source is a str + etl_job3 = BergamoEtl( + input_source=str(EXAMPLE_IMG_PATH), + output_directory=RESOURCES_DIR, + user_settings=self.example_user_settings, + ) + raw_image_info3 = etl_job3._extract() + self.assertEqual(2310025, len(raw_image_info3.metadata)) + self.assertEqual(2000, len(raw_image_info3.description0)) + self.assertEqual([347, 512, 512], raw_image_info3.shape) + + # Test error is raised if no tif file in dir + etl_job4 = BergamoEtl( + input_source=str(RESOURCES_DIR / ".."), + output_directory=RESOURCES_DIR, + user_settings=self.example_user_settings, + ) + with self.assertRaises(FileNotFoundError) as e: + etl_job4._extract() + self.assertEqual( + "Directory must contain tif or tiff file!", str(e.exception) + ) + + def test_flat_dict_to_nested(self): + """Test util method to convert dictionaries from flat to nested.""" + original_input = { + "SI.LINE_FORMAT_VERSION": 1, + "SI.VERSION_UPDATE": 0, + "SI.acqState": "loop", + "SI.acqsPerLoop": "10000", + "SI.errorMsg": "", + "SI.extTrigEnable": "1", + "SI.fieldCurvatureRxs": "[]", + "SI.fieldCurvatureZs": "[]", + "SI.hBeams.enablePowerBox": "false", + "SI.hBeams.errorMsg": "", + "SI.hBeams.lengthConstants": "[200 Inf]", + "SI.hBeams.name": "SI Beams", + } + + expected_output = { + "SI": { + "LINE_FORMAT_VERSION": 1, + "VERSION_UPDATE": 0, + "acqState": "loop", + "acqsPerLoop": "10000", + "errorMsg": "", + "extTrigEnable": "1", + "fieldCurvatureRxs": "[]", + "fieldCurvatureZs": "[]", + "hBeams": { + "enablePowerBox": "false", + "errorMsg": "", + "lengthConstants": "[200 Inf]", + "name": "SI Beams", + }, + } + } + + actual_output = BergamoEtl._flat_dict_to_nested(original_input) + self.assertEqual(expected_output, actual_output) + + @patch("logging.error") + def test_parse_raw_image_info(self, mock_log: MagicMock): + """Tests that raw image info is parsed correctly.""" + raw_image_info = RawImageInfo( + metadata=self.example_metadata, + description0=self.example_description0, + shape=self.example_shape, + ) + + etl_job1 = BergamoEtl( + input_source=RESOURCES_DIR, + output_directory=RESOURCES_DIR, + user_settings=self.example_user_settings, + ) + actual_parsed_data = etl_job1._parse_raw_image_info(raw_image_info) + mock_log.assert_called_once_with( + "Multiple planes not handled in metadata collection. " + "HANDLE ME!!!: KeyError('userZs')" + ) + self.assertEqual([347, 512, 512], actual_parsed_data.shape) + self.assertEqual(1, actual_parsed_data.num_planes) + self.assertEqual( + datetime(2023, 7, 24, 14, 14, 17, 854000), + actual_parsed_data.movie_start_time, + ) + self.assertEqual("30.0119", actual_parsed_data.frame_rate) + self.assertEqual( + {"fs": "30.0119", "nplanes": 1, "nrois": 1, "mesoscan": 0}, + actual_parsed_data.roi_data, + ) + self.assertEqual( + "1", actual_parsed_data.description_first_frame["frameNumbers"] + ) + self.assertEqual( + "false", actual_parsed_data.metadata["hBeams"]["enablePowerBox"] + ) + self.assertEqual( + "1", actual_parsed_data.metadata["LINE_FORMAT_VERSION"] + ) + + @patch("logging.error") + def test_transform(self, mock_log: MagicMock): + """Tests raw image info is parsed into a Session object correctly""" + raw_image_info = RawImageInfo( + metadata=self.example_metadata, + description0=self.example_description0, + shape=self.example_shape, + ) + + etl_job1 = BergamoEtl( + input_source=RESOURCES_DIR, + output_directory=RESOURCES_DIR, + user_settings=self.example_user_settings, + ) + actual_session = etl_job1._transform(raw_image_info) + self.assertEqual( + self.expected_session, json.loads(actual_session.json()) + ) + mock_log.assert_called_once_with( + "Multiple planes not handled in metadata collection. " + "HANDLE ME!!!: KeyError('userZs')" + ) + + +if __name__ == "__main__": + unittest.main()