Skip to content

Commit

Permalink
feat: adds bergamo etl job
Browse files Browse the repository at this point in the history
  • Loading branch information
jtyoung84 committed Oct 24, 2023
1 parent 45a420e commit 53d07bf
Show file tree
Hide file tree
Showing 4 changed files with 401 additions and 49 deletions.
259 changes: 221 additions & 38 deletions src/aind_metadata_mapper/bergamo/session.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,69 @@
"""Module to map bergamo metadata into a session model"""

import argparse
import json
import logging
import os
import re
import sys
from dataclasses import dataclass
from datetime import datetime, time
from os import PathLike
from pathlib import Path
from typing import Any
import numpy as np
from typing import List, Tuple, Union

from aind_data_schema.session import Session, Stream, Modality, TriggerType, Laser, Detector, FieldOfView
from aind_data_schema.utils.units import SizeUnit, PowerUnit
import numpy as np
from aind_data_schema.session import (
Detector,
FieldOfView,
Laser,
Modality,
Session,
Stream,
)
from aind_data_schema.stimulus import (
PhotoStimulation,
PhotoStimulationGroup,
StimulusEpoch,
)
from ScanImageTiffReader import ScanImageTiffReader
from typing import Any, List

from aind_metadata_mapper.core import BaseEtl
from dataclasses import dataclass
from datetime import datetime, time


@dataclass(frozen=True)
class UserSettings:
"""Data that need to be input by user or pulled from config file"""

experimenter_full_name: List[str]
subject_id: str
# TODO: Look into if the following can be extracted from tif directory
session_start_time: datetime
session_end_time: datetime
stream_start_time: datetime
stream_end_time: datetime
stimulus_start_time: time
stimulus_end_time: time

# Dat that might change but can have default values
iacuc_protocol: str = "2115"
rig_id: str = "Bergamo photostim."
camera_names: Tuple[str] = ("Side Camera",)


@dataclass(frozen=True)
class RawImageInfo:
"""Metadata from tif files"""

metadata: str
description0: str
shape: List[int]


@dataclass(frozen=True)
class ParsedMetadata:
"""RawImageInfo gets parsed into this data"""

metadata: dict
roi_data: dict
roi_metadata: dict
Expand All @@ -42,11 +75,48 @@ class ParsedMetadata:


class BergamoEtl(BaseEtl):
"""Class to manage transforming bergamo data files into a Session object"""

def __init__(
self,
input_source: Union[str, PathLike],
output_directory: Path,
user_settings: UserSettings,
):
"""
Class constructor for Base etl class.
Parameters
----------
input_source : Union[str, PathLike]
Can be a string or a Path
output_directory : Path
The directory where to save the json files.
user_settings: UserSettings
Variables for a particular session
"""
super().__init__(input_source, output_directory)
self.user_settings = user_settings

@staticmethod
def _flat_dict_to_nested(flat: dict, key_delim="."):
# https://stackoverflow.com/a/50607551
def _flat_dict_to_nested(flat: dict, key_delim: str = ".") -> dict:
"""
Utility method to convert a flat dictionary into a nested dictionary.
Modified from https://stackoverflow.com/a/50607551
Parameters
----------
flat : dict
Example {"a.b.c": 1, "a.b.d": 2, "e.f": 3}
key_delim : str
Delimiter on dictionary keys. Default is '.'.
Returns
-------
dict
A nested dictionary like {"a": {"b": {"c":1, "d":2}, "e": {"f":3}}
"""

def __nest_dict_rec(k, v, out) -> None:
"""Simple recursive method being called."""
k, *rest = k.split(key_delim, 1)
if rest:
__nest_dict_rec(rest[0], v, out.setdefault(k, {}))
Expand All @@ -61,7 +131,16 @@ def __nest_dict_rec(k, v, out) -> None:
def _parse_raw_image_info(
self, raw_image_info: RawImageInfo
) -> ParsedMetadata:
"""Parses metadata from tiff_reader."""
"""
Parses metadata from raw image info.
Parameters
----------
raw_image_info : RawImageInfo
Returns
-------
ParsedMetadata
"""

# The metadata contains two parts separated by \n\n. The top part
# looks like
Expand Down Expand Up @@ -94,16 +173,18 @@ def _parse_raw_image_info(
]
)
frame_rate = metadata["hRoiManager"]["scanVolumeRate"]
# TODO: Use .get instead of try/except and add coverage test
try:
z_collection = metadata["hFastZ"]["userZs"]
num_planes = len(z_collection)
num_planes = len(z_collection) # pragma: no cover
except Exception as e: # new scanimage version
logging.error(
f"Multiple planes not handled in metadata collection. "
f"HANDLE ME!!!: {repr(e)}"
)
# TODO: Check if this if/else is necessary
if metadata["hFastZ"]["enable"] == "true":
num_planes = 1
num_planes = 1 # pragma: no cover
else:
num_planes = 1

Expand Down Expand Up @@ -150,16 +231,14 @@ def _parse_raw_image_info(
irow = np.delete(irow, -1)
irow = np.vstack((irow, irow + np.transpose(h_px)))

data = {}
data["fs"] = frame_rate
data["nplanes"] = num_planes
data["nrois"] = num_rois # or irow.shape[1]?
data = {"fs": frame_rate, "nplanes": num_planes, "nrois": num_rois}
if data["nrois"] == 1:
data["mesoscan"] = 0
else:
data["mesoscan"] = 1

if data["mesoscan"]:
# TODO: Add coverage example
data["mesoscan"] = 1 # pragma: no cover
# TODO: Add coverage example
if data["mesoscan"]: # pragma: no cover
# data['nrois'] = num_rois #or irow.shape[1]?
data["dx"] = []
data["dy"] = []
Expand Down Expand Up @@ -195,12 +274,39 @@ def _parse_raw_image_info(
)

@staticmethod
def _get_si_file_from_dir(source_dir: Path) -> Path:
def _get_si_file_from_dir(
source_dir: Path, regex_pattern: str = r"^.*?(\d+).tif+$"
) -> Path:
"""
Utility method to scan top level of source_dir for .tif or .tiff files.
Sorts them by file number and collects the first one. The directory
contains files that look like neuron50_00001.tif, neuron50_00002.tif.
Parameters
----------
source_dir : Path
Directory where the tif files are located
regex_pattern : str
Format of how files are expected to be organized. Default matches
against something that ends with a series of digits and .tif(f)
Returns
-------
Path
File path of the first tif file.
"""
compiled_regex = re.compile(regex_pattern)
tif_filepath = None
old_tif_number = None
for root, dirs, files in os.walk(source_dir):
for name in files:
if name.endswith("tif") or name.endswith("tiff"):
tif_filepath = Path(os.path.join(root, name))
matched = re.match(compiled_regex, name)
if matched:
tif_number = matched.group(1)
if old_tif_number is None or tif_number < old_tif_number:
old_tif_number = tif_number
tif_filepath = Path(os.path.join(root, name))

# Only scan the top level files
break
if tif_filepath is None:
Expand Down Expand Up @@ -234,20 +340,28 @@ def _extract(self) -> RawImageInfo:
)

def _transform(self, extracted_source: RawImageInfo) -> Session:
"""
Transforms the raw data extracted from the tif directory into a
Session object.
Parameters
----------
extracted_source : RawImageInfo
Returns
-------
Session
"""
siHeader = self._parse_raw_image_info(extracted_source)
photostim_groups = siHeader.metadata["json"]["RoiGroups"][
"photostimRoiGroups"
]

# TODO: Set default values and extract variables
t = datetime(2022, 7, 12, 7, 00, 00)
t2 = time(7, 00, 00)

data_stream = Stream(
stream_start_time=t,
stream_end_time=t,
stream_start_time=self.user_settings.stream_start_time,
stream_end_time=self.user_settings.stream_end_time,
stream_modalities=[Modality.POPHYS],
camera_names=["Side Camera"],
camera_names=list(self.user_settings.camera_names),
light_sources=[
Laser(
name="Laser A",
Expand All @@ -260,7 +374,11 @@ def _transform(self, extracted_source: RawImageInfo) -> Session:
),
],
detectors=[
Detector(name="PMT A",exposure_time=0.1, trigger_type="Internal",),
Detector(
name="PMT A",
exposure_time=0.1,
trigger_type="Internal",
),
],
ophys_fovs=[
FieldOfView(
Expand All @@ -287,13 +405,13 @@ def _transform(self, extracted_source: RawImageInfo) -> Session:
],
)
return Session(
experimenter_full_name=["John Doe"],
session_start_time=t,
session_end_time=t,
subject_id="652567",
experimenter_full_name=self.user_settings.experimenter_full_name,
session_start_time=self.user_settings.session_start_time,
session_end_time=self.user_settings.session_end_time,
subject_id=self.user_settings.subject_id,
session_type="BCI",
iacuc_protocol="2115",
rig_id="Bergamo photostim.",
iacuc_protocol=self.user_settings.iacuc_protocol,
rig_id=self.user_settings.rig_id,
data_streams=[data_stream],
stimulus_epochs=[
StimulusEpoch(
Expand Down Expand Up @@ -358,8 +476,73 @@ def _transform(self, extracted_source: RawImageInfo) -> Session:
],
inter_trial_interval=10,
),
stimulus_start_time=t2,
stimulus_end_time=t2,
stimulus_start_time=(
self.user_settings.stimulus_start_time
),
stimulus_end_time=self.user_settings.stimulus_end_time,
)
],
)

@classmethod
def from_args(cls, args: list):
"""
Adds ability to construct settings from a list of arguments.
Parameters
----------
args : list
A list of command line arguments to parse.
"""

parser = argparse.ArgumentParser()
parser.add_argument(
"-i",
"--input-source",
required=False,
type=str,
help="Directory where tif files are located",
)
parser.add_argument(
"-o",
"--output-directory",
required=False,
default=".",
type=str,
help=(
"Directory to save json file to. Defaults to current working "
"directory."
),
)
parser.add_argument(
"-u",
"--user-settings",
required=True,
type=json.loads,
help=(
r"""
Custom settings defined by the user defined as a json
string. For example: -u
'{"experimenter_full_name":["John Smith","Jane Smith"],
"subject_id":"12345",
"session_start_time":"2023-10-10T10:10:10",
"session_end_time":"2023-10-10T18:10:10",
"stream_start_time": "2023-10-10T11:10:10",
"stream_end_time":"2023-10-10T17:10:10",
"stimulus_start_time":"12:10:10",
"stimulus_end_time":"13:10:10"}'
"""
),
)
job_args = parser.parse_args(args)
user_settings_from_args = UserSettings(**job_args.user_settings)
return cls(
input_source=Path(job_args.input_source),
output_directory=Path(job_args.output_directory),
user_settings=user_settings_from_args,
)


if __name__ == "__main__":
sys_args = sys.argv[1:]
etl = BergamoEtl.from_args(sys_args)
etl.run_job()
3 changes: 1 addition & 2 deletions src/aind_metadata_mapper/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import Any
from typing import Any, Union

from aind_data_schema.base import AindCoreModel
from typing import Union
from pydantic import validate_model


Expand Down
Loading

0 comments on commit 53d07bf

Please sign in to comment.