Skip to content

Commit

Permalink
feat: reformat fib session
Browse files Browse the repository at this point in the history
  • Loading branch information
micahwoodard committed Mar 5, 2024
1 parent 64724b6 commit 3977175
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 87 deletions.
118 changes: 73 additions & 45 deletions src/aind_metadata_mapper/fib/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

from aind_data_schema.core.session import (
DetectorConfig,
Expand All @@ -18,20 +19,47 @@
PulseShape,
StimulusEpoch,
)
from pydantic import Field
from pydantic_settings import BaseSettings

from aind_metadata_mapper.core import BaseEtl
from aind_metadata_mapper.core import GenericEtl, JobResponse


class JobSettings(BaseSettings):
"""Data that needs to be input by user."""

output_directory: Optional[Path] = Field(
default=None,
description=(
"Directory where to save the json file to. If None, then json"
" contents will be returned in the Response message."
),
)

string_to_parse: str
experimenter_full_name: List[str]
session_start_time: datetime
notes: str
labtracks_id: str
iacuc_protocol: str
light_source_list: List[dict]
detector_list: List[dict]
fiber_connections_list: List[dict]

rig_id: str = "ophys_rig"
session_type: str = "Foraging_Photometry"
mouse_platform_name: str = "Disc"
active_mouse_platform: bool = False


@dataclass(frozen=True)
class ParsedInformation:
class ParsedMetadata:
"""RawImageInfo gets parsed into this data"""

teensy_str: str
experiment_data: dict
start_datetime: datetime


class FIBEtl(BaseEtl):
class FIBEtl(GenericEtl[JobSettings]):
"""This class contains the methods to write OphysScreening data"""

_dictionary_mapping = {
Expand All @@ -51,29 +79,22 @@ class FIBEtl(BaseEtl):

def __init__(
self,
output_directory: Path,
teensy_str: str,
experiment_data: dict,
start_datetime: datetime,
input_source: str = "",
job_settings: Union[JobSettings, str],
):
"""
Class constructor for Base etl class.
Parameters
----------
input_source : Union[str, PathLike]
Can be a string or a Path
output_directory : Path
The directory where to save the json files.
user_settings: UserSettings
job_settings: Union[JobSettings, str]
Variables for a particular session
"""
super().__init__(input_source, output_directory)
self.teensy_str = teensy_str
self.experiment_data = experiment_data
self.start_datetime = start_datetime
if isinstance(job_settings, str):
job_settings_model = JobSettings.model_validate_json(job_settings)
else:
job_settings_model = job_settings
super().__init__(job_settings=job_settings_model)

def _transform(self, extracted_source: ParsedInformation) -> Session:
def _transform(self, extracted_source: ParsedMetadata) -> Session:
"""
Parses params from teensy string and creates ophys session model
Parameters
Expand All @@ -87,21 +108,20 @@ def _transform(self, extracted_source: ParsedInformation) -> Session:
"""
# Process data from dictionary keys

experiment_data = extracted_source.experiment_data
string_to_parse = extracted_source.teensy_str
start_datetime = extracted_source.start_datetime

labtracks_id = experiment_data["labtracks_id"]
iacuc_protocol = experiment_data["iacuc"]
rig_id = experiment_data["rig_id"]
experimenter_full_name = experiment_data["experimenter_name"]
mouse_platform_name = experiment_data["mouse_platform_name"]
active_mouse_platform = experiment_data["active_mouse_platform"]
light_source_list = experiment_data["light_source"]
detector_list = experiment_data["detectors"]
fiber_connections_list = experiment_data["fiber_connections"]
session_type = experiment_data["session_type"]
notes = experiment_data["notes"]

session_start_time = self.job_settings.session_start_time
labtracks_id = self.job_settings.labtracks_id
iacuc_protocol = self.job_settings.iacuc_protocol
rig_id = self.job_settings.rig_id
experimenter_full_name = self.job_settings.experimenter_full_name
mouse_platform_name = self.job_settings.mouse_platform_name
active_mouse_platform = self.job_settings.active_mouse_platform
light_source_list = self.job_settings.light_source_list
detector_list = self.job_settings.detector_list
fiber_connections_list = self.job_settings.fiber_connections_list
session_type = self.job_settings.session_type
notes = self.job_settings.notes

# Use regular expressions to extract the values
frequency_match = re.search(self.frequency_regex, string_to_parse)
Expand Down Expand Up @@ -141,12 +161,12 @@ def _transform(self, extracted_source: ParsedInformation) -> Session:
experiment_duration = (
opto_base + opto_duration + (opto_interval * trial_num)
)
end_datetime = start_datetime + datetime.timedelta(
end_datetime = session_start_time + datetime.timedelta(
seconds=experiment_duration
)
stimulus_epochs = StimulusEpoch(
stimulus=opto_stim,
stimulus_start_time=start_datetime,
stimulus_start_time=session_start_time,
stimulus_end_time=end_datetime,
)

Expand All @@ -167,10 +187,9 @@ def _transform(self, extracted_source: ParsedInformation) -> Session:
for fc in fiber_connections_list:
cord = FiberConnectionConfig(**fc)
fiber_connections.append(cord)

data_stream = [
Stream(
stream_start_time=start_datetime,
stream_start_time=session_start_time,
stream_end_time=end_datetime,
light_sources=light_source,
stream_modalities=[Modality.FIB],
Expand All @@ -180,13 +199,12 @@ def _transform(self, extracted_source: ParsedInformation) -> Session:
fiber_connections=fiber_connections,
)
]

# and finally, create ophys session
ophys_session = Session(
stimulus_epochs=[stimulus_epochs],
subject_id=labtracks_id,
iacuc_protocol=iacuc_protocol,
session_start_time=start_datetime,
session_start_time=session_start_time,
session_end_time=end_datetime,
rig_id=rig_id,
experimenter_full_name=experimenter_full_name,
Expand All @@ -197,10 +215,20 @@ def _transform(self, extracted_source: ParsedInformation) -> Session:

return ophys_session

def _extract(self) -> ParsedInformation:
def _extract(self) -> ParsedMetadata:
"""Extract metadata from fib session."""
return ParsedInformation(
teensy_str=self.teensy_str,
experiment_data=self.experiment_data,
start_datetime=self.start_datetime,

tensy_str = self.job_settings.string_to_parse

return ParsedMetadata(
teensy_str=tensy_str,
)

def run_job(self) -> JobResponse:
"""Run the etl job and return a JobResponse."""
extracted = self._extract()
transformed = self._transform(extracted_source=extracted)
job_response = self._load(
transformed, self.job_settings.output_directory
)
return job_response
3 changes: 2 additions & 1 deletion tests/resources/fib/000000_ophys_session.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"describedBy": "https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/src/aind_data_schema/core/session.py",
"schema_version": "0.1.4",
"protocol_id": [],
"experimenter_full_name": [
"john doe"
"Don Key"
],
"session_start_time": "1999-10-04T00:00:00",
"session_end_time": "1999-10-04T00:00:48",
Expand Down
69 changes: 28 additions & 41 deletions tests/test_fib.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from aind_data_schema.core.session import Session

from aind_metadata_mapper.fib.session import FIBEtl
from aind_metadata_mapper.fib.session import FIBEtl, JobSettings

RESOURCES_DIR = (
Path(os.path.dirname(os.path.realpath(__file__))) / "resources" / "fib"
Expand All @@ -24,20 +24,19 @@ class TestSchemaWriter(unittest.TestCase):
def setUpClass(cls):
"""Load record object and user settings before running tests."""

cls.example_experiment_data = {
"labtracks_id": "000000",
"experimenter_name": [
"john doe",
],
"notes": "brabrabrabra....", #
"experimental_mode": "c",
"save_dir": "",
"iacuc": "2115",
"rig_id": "ophys_rig",
"COMPort": "COM3",
"mouse_platform_name": "Disc",
"active_mouse_platform": False,
"light_source": [
with open(EXAMPLE_MD_PATH, "r") as f:
raw_md_contents = f.read()
with open(EXPECTED_SESSION, "r") as f:
expected_session_contents = Session(**json.load(f))

cls.example_job_settings = JobSettings(
string_to_parse=raw_md_contents,
experimenter_full_name=["Don Key"],
session_start_time=datetime(1999, 10, 4),
notes="brabrabrabra....",
labtracks_id="000000",
iacuc_protocol="2115",
light_source_list=[
{
"name": "470nm LED",
"excitation_power": 0.020,
Expand All @@ -53,59 +52,47 @@ def setUpClass(cls):
"excitation_power": 0.020, # Set 0 for unused StimLED
"excitation_power_unit": "milliwatt",
},
], # default light source
"detectors": [
],
detector_list=[
{
"name": "Hamamatsu Camera",
"exposure_time": 10,
"trigger_type": "Internal",
}
],
"fiber_connections": [
fiber_connections_list=[
{
"patch_cord_name": "Patch Cord A",
"patch_cord_output_power": 40,
"output_power_unit": "microwatt",
"fiber_name": "Fiber A",
}
],
"session_type": "Foraging_Photometry",
}

with open(EXAMPLE_MD_PATH, "r") as f:
raw_md_contents = f.read()
with open(EXPECTED_SESSION, "r") as f:
expected_session_contents = Session(**json.load(f))
rig_id="ophys_rig",
session_type="Foraging_Photometry",
mouse_platform_name="Disc",
active_mouse_platform=False,
)

cls.expected_session = expected_session_contents
cls.example_metadata = raw_md_contents

def test_extract(self):
"""Tests that the teensy response and experiment
data is extracted correctly"""

etl_job1 = FIBEtl(
output_directory=RESOURCES_DIR,
teensy_str=self.example_metadata,
experiment_data=self.example_experiment_data,
start_datetime=datetime(1999, 10, 4),
)
etl_job1 = FIBEtl(job_settings=self.example_job_settings)
parsed_info = etl_job1._extract()
self.assertEqual(self.example_metadata, parsed_info.teensy_str)
self.assertEqual(
self.example_experiment_data, parsed_info.experiment_data
self.example_job_settings.string_to_parse, parsed_info.teensy_str
)
self.assertEqual(
datetime(1999, 10, 4), self.example_job_settings.session_start_time
)
self.assertEqual(datetime(1999, 10, 4), parsed_info.start_datetime)

def test_transform(self):
"""Tests that the teensy response maps correctly to ophys session."""

etl_job1 = FIBEtl(
output_directory=RESOURCES_DIR,
teensy_str=self.example_metadata,
experiment_data=self.example_experiment_data,
start_datetime=datetime(1999, 10, 4),
)
etl_job1 = FIBEtl(job_settings=self.example_job_settings)
parsed_info = etl_job1._extract()
actual_session = etl_job1._transform(parsed_info)
self.assertEqual(self.expected_session, actual_session)
Expand Down

0 comments on commit 3977175

Please sign in to comment.