Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename multiplier to frames_per_event and move to first dim of shape #726

Open
wants to merge 68 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
652de13
Starting to work on ad tiff writer
jwlodek Sep 4, 2024
e289ee4
Resolve merge conflicts
jwlodek Oct 1, 2024
f36ec3a
Continue working on tiff writer
jwlodek Oct 8, 2024
83dff62
Further work on tiff writer, existing tests now passing.
jwlodek Oct 8, 2024
1a52a21
Remove functions moved to superclas from hdf writer
jwlodek Oct 8, 2024
489cfd8
Significant re-org and simplification of ad classes
jwlodek Oct 9, 2024
83c6884
Ruff formatting
jwlodek Oct 9, 2024
3b4f45a
Modify ad sim classes to reflect new superclasses
jwlodek Oct 9, 2024
7175b30
Modify vimba and kinetix classes
jwlodek Oct 9, 2024
faf53d6
Modify aravis and pilatus classes
jwlodek Oct 9, 2024
5b9f60f
Update all tests to make sure they still pass with changes
jwlodek Oct 10, 2024
8bbfd0e
Some cleanup
jwlodek Oct 10, 2024
1eab818
Merge with upstream
jwlodek Oct 10, 2024
f6825b4
Changes to standard detector to account for controller/writer types i…
jwlodek Nov 22, 2024
651b80d
Significant changes to base detector, controller, and writer classes
jwlodek Nov 22, 2024
38a61e8
Update detector and controller classes to reflect changes
jwlodek Nov 22, 2024
aecdf04
Make sure panda standard det uses new type hints
jwlodek Nov 22, 2024
e42fa12
Most tests passing
jwlodek Nov 22, 2024
07684a4
Merge with main and resolve conflicts
jwlodek Nov 22, 2024
6dc09f3
Revert change in test that was resolved by pydantic version update
jwlodek Nov 22, 2024
1f7dcd7
Remove debugging prints
jwlodek Nov 22, 2024
35dd1b1
Linter fixes
jwlodek Nov 22, 2024
8112220
Fix linter error
jwlodek Nov 22, 2024
ac1e509
Move creation of writer outside of base AreaDetector class init per r…
jwlodek Nov 26, 2024
8494da4
Make sure we don't wait for capture to be done!
jwlodek Nov 26, 2024
b212432
Merge with upstream
jwlodek Nov 26, 2024
3242d45
Merge with upstream
jwlodek Dec 9, 2024
488d7eb
Allow for specifying whether or not to use fileio signals for dataset…
jwlodek Dec 9, 2024
a76b70f
Revert "Allow for specifying whether or not to use fileio signals for…
jwlodek Dec 10, 2024
7da935e
Fix linter errors, remove unused enum
jwlodek Dec 10, 2024
0cd0ddf
Change from return to await to conform to return type
jwlodek Dec 10, 2024
f1b9a4e
Apply more suggestions from review
jwlodek Dec 18, 2024
3c2b479
Merge with upstream
jwlodek Dec 18, 2024
d84f2b4
Replace instances of DeviceCollector to init_devices
jwlodek Dec 18, 2024
a5a42c6
Started moving multiplier; 'multiplier' name change in progress
thomashopkins32 Dec 19, 2024
05dd89c
Update src/ophyd_async/epics/adaravis/_aravis_controller.py
jwlodek Dec 20, 2024
3fcd541
Update src/ophyd_async/epics/adaravis/_aravis_controller.py
jwlodek Dec 20, 2024
87d5fdd
Update src/ophyd_async/epics/adcore/_core_writer.py
jwlodek Dec 20, 2024
52d712e
Update src/ophyd_async/epics/adcore/_core_logic.py
jwlodek Dec 20, 2024
e46cbd4
Update src/ophyd_async/epics/adcore/_core_detector.py
jwlodek Dec 20, 2024
fbb895e
Update src/ophyd_async/epics/adcore/_core_detector.py
jwlodek Dec 20, 2024
6cd79a6
Update src/ophyd_async/epics/adcore/_core_writer.py
jwlodek Dec 20, 2024
05e6fbd
Fix all tests aside from 3.12 unawaited coro after applying suggestions
jwlodek Dec 23, 2024
a616fd2
Resolve unawaited coro error on 3.12
jwlodek Dec 23, 2024
1ab9588
Merge branch 'ad-tiff-writer' of https://github.com/jwlodek/ophyd-asy…
thomashopkins32 Dec 26, 2024
7a86084
Finish changing multiplier -> batch_size; Fix some untyped imports
thomashopkins32 Dec 26, 2024
ed02c6b
Add more type ignores
thomashopkins32 Dec 26, 2024
cc944f9
Add extra first dimension to tests' outputs
thomashopkins32 Dec 26, 2024
0d2ca62
Added some questions and TODOs that need to be resolved; Otherwise, f…
thomashopkins32 Dec 26, 2024
753eee8
Get the initial frame index after opening the writer
thomashopkins32 Dec 27, 2024
26387e5
Remove batch_size from any writer __init__
thomashopkins32 Dec 27, 2024
f6a4f30
Call tiff_writer.open before collect_stream_docs
thomashopkins32 Dec 27, 2024
e7ffb80
Merge branch 'main' of https://github.com/bluesky/ophyd-async into mo…
thomashopkins32 Jan 6, 2025
dcc1566
Fixed tests by adding mocked directory creation callback to tiff and …
thomashopkins32 Jan 6, 2025
bc60d41
Merge branch 'main' of https://github.com/bluesky/ophyd-async into mo…
thomashopkins32 Jan 8, 2025
2bd6186
Change batch_size -> frames_per_event
thomashopkins32 Jan 8, 2025
df48897
Remove frames_per_event from HDFDataset, use as first dimension of sh…
thomashopkins32 Jan 8, 2025
bdda567
Cleanup + ruff checks
thomashopkins32 Jan 8, 2025
93f2f97
Added unit tests for describe with > 1 frames_per_event
thomashopkins32 Jan 8, 2025
9410c4f
Add unit tests for collect with > 1 frames_per_event
thomashopkins32 Jan 8, 2025
9ca8f2a
Fix docs indentation
thomashopkins32 Jan 8, 2025
871326a
Merge branch 'main' of https://github.com/bluesky/ophyd-async into mo…
thomashopkins32 Jan 13, 2025
4208d51
Merge branch 'main' of https://github.com/bluesky/ophyd-async into mo…
thomashopkins32 Jan 14, 2025
33b6b21
Remove shape from stream resource parameters
thomashopkins32 Jan 14, 2025
3fde1d2
Ruff check fixes
thomashopkins32 Jan 14, 2025
161f022
Make the first dimension for scalar values always the frames_per_event
thomashopkins32 Jan 14, 2025
2a92b9d
Ruff format
thomashopkins32 Jan 14, 2025
1935d69
Forgot one test
thomashopkins32 Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions src/ophyd_async/core/_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Triggerable,
WritesStreamAssets,
)
from event_model import DataKey
from event_model import DataKey # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the # type: ignore here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the following mypy issue. It looks like event-model does not have a py.typed marker.

image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pyright also complain? If it needs a py.typed then please could you make an event-model PR to add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyright does not seem to complain, event-model is missing a py.typed marker so I can add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coretl It looks like adding this py.typed file to event-model breaks bluesky's tests since there are a lot of unresolved typing issues. I can work on these typing issues but I don't think this should be a blocker for this PR to be merged. Let me know if you think otherwise.

from pydantic import BaseModel, Field, NonNegativeInt, computed_field

from ._device import Device, DeviceConnector
Expand Down Expand Up @@ -65,11 +65,13 @@ class TriggerInfo(BaseModel):
livetime: float | None = Field(default=None, ge=0)
#: What is the maximum timeout on waiting for a frame
frame_timeout: float | None = Field(default=None, gt=0)
#: How many triggers make up a single StreamDatum index, to allow multiple frames
#: from a faster detector to be zipped with a single frame from a slow detector
#: e.g. if num=10 and multiplier=5 then the detector will take 10 frames,
#: but publish 2 indices, and describe() will show a shape of (5, h, w)
multiplier: int = 1
#: The number of triggers that are grouped into a single StreamDatum index.
#: A frames_per_event > 1 can be useful to have frames from a faster detector
#: able to be zipped with a single frame from a slower detector. E.g. if
#: number_of_triggers=10 and frames_per_event=5 then the detector will take
#: 10 frames, but publish 2 StreamDatum indices, and describe() will show a
#: shape of (5, h, w) for each.
frames_per_event: NonNegativeInt = 1

@computed_field
@cached_property
Expand Down Expand Up @@ -107,8 +109,8 @@ async def prepare(self, trigger_info: TriggerInfo) -> None:
exposure time.
deadtime Defaults to None. This is the minimum deadtime between
triggers.
multiplier The number of triggers grouped into a single StreamDatum
index.
frames_per_event The number of triggers grouped into a single
StreamDatum index
"""

@abstractmethod
Expand All @@ -133,13 +135,17 @@ class DetectorWriter(ABC):
(e.g. an HDF5 file)"""

@abstractmethod
async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
async def open(self, frames_per_event: int = 1) -> dict[str, DataKey]:
"""Open writer and wait for it to be ready for data.

Args:
multiplier: Each StreamDatum index corresponds to this many
written exposures

frames_per_event: The number of triggers that are grouped into a single
StreamDatum index. A frames_per_event > 1 can be useful to have
frames from a faster detector able to be zipped with a single frame
from a slower detector. E.g. if number_of_triggers=10 and
frames_per_event=5 then the detector will take 10 frames, but publish
2 StreamDatum indices, and describe() will show a shape of (5, h, w)
for each.
Returns:
Output for ``describe()``
"""
Expand All @@ -148,7 +154,7 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
"""Yield the index of each frame (or equivalent data point) as it is written"""
"""Yield the index of each frame (or batch of frames) as it is written"""

@abstractmethod
async def get_indices_written(self) -> int:
Expand Down Expand Up @@ -327,10 +333,12 @@ async def prepare(self, value: TriggerInfo) -> None:
if isinstance(self._trigger_info.number_of_triggers, list)
else [self._trigger_info.number_of_triggers]
)
self._initial_frame = await self._writer.get_indices_written()
# Open the writer and prepare the controller.
self._describe, _ = await asyncio.gather(
self._writer.open(value.multiplier), self._controller.prepare(value)
self._writer.open(value.frames_per_event), self._controller.prepare(value)
)
# Get the initial frame index from the writer.
self._initial_frame = await self._writer.get_indices_written()
if value.trigger != DetectorTrigger.INTERNAL:
await self._controller.arm()
self._fly_start = time.monotonic()
Expand Down
5 changes: 2 additions & 3 deletions src/ophyd_async/core/_hdf_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from urllib.parse import urlunparse

from event_model import (
from event_model import ( # type: ignore
ComposeStreamResource,
ComposeStreamResourceBundle,
StreamDatum,
Expand All @@ -18,7 +18,6 @@ class HDFDataset:
dataset: str
shape: Sequence[int] = field(default_factory=tuple)
dtype_numpy: str = ""
multiplier: int = 1
swmr: bool = False
# Represents explicit chunk size written to disk.
chunk_shape: tuple[int, ...] = ()
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(
parameters={
"dataset": ds.dataset,
"swmr": ds.swmr,
"multiplier": ds.multiplier,
"shape": ds.shape,
thomashopkins32 marked this conversation as resolved.
Show resolved Hide resolved
"chunk_shape": ds.chunk_shape,
},
uid=None,
Expand Down
24 changes: 15 additions & 9 deletions src/ophyd_async/epics/adcore/_core_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from urllib.parse import urlunparse

from bluesky.protocols import Hints, StreamAsset
from event_model import (
from event_model import ( # type: ignore
ComposeStreamResource,
DataKey,
StreamRange,
Expand Down Expand Up @@ -59,7 +59,6 @@ def __init__(
self._emitted_resource = None

self._capture_status: AsyncStatus | None = None
self._multiplier = 1
self._filename_template = "%s%s_%6.6d"

@classmethod
Expand Down Expand Up @@ -123,10 +122,10 @@ async def begin_capture(self) -> None:
self.fileio.capture, True, wait_for_set_completion=False
)

async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
async def open(self, frames_per_event: int = 1) -> dict[str, DataKey]:
self._emitted_resource = None
self._last_emitted = 0
self._multiplier = multiplier
self._frames_per_event = frames_per_event
frame_shape = await self._dataset_describer.shape()
dtype_numpy = await self._dataset_describer.np_datatype()

Expand All @@ -135,7 +134,7 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
describe = {
self._name_provider(): DataKey(
source=self._name_provider(),
shape=list(frame_shape),
shape=[frames_per_event, *frame_shape],
dtype="array",
dtype_numpy=dtype_numpy,
external="STREAM:",
Expand All @@ -148,11 +147,11 @@ async def observe_indices_written(
) -> AsyncGenerator[int, None]:
"""Wait until a specific index is ready to be collected"""
async for num_captured in observe_value(self.fileio.num_captured, timeout):
yield num_captured // self._multiplier
yield num_captured // self._frames_per_event

async def get_indices_written(self) -> int:
num_captured = await self.fileio.num_captured.get_value()
return num_captured // self._multiplier
return num_captured // self._frames_per_event

async def collect_stream_docs(
self, indices_written: int
Expand Down Expand Up @@ -181,10 +180,17 @@ async def collect_stream_docs(
self._emitted_resource = bundler_composer(
mimetype=self._mimetype,
uri=uri,
# TODO: This is confusing, I expected this to be of type `DataKey`
# but it is a string. Naming could be improved maybe?
data_key=self._name_provider(),
thomashopkins32 marked this conversation as resolved.
Show resolved Hide resolved
# Q: What are the parameters used for? Extra info?
parameters={
# Assume that we always write 1 frame per file/chunk
"chunk_shape": (1, *frame_shape),
# Assume that we always write self._frames_per_event
# frames per file/chunk
# TODO: Validate this assumption and that it should
# not be self._frames_per_event
"chunk_shape": (self._frames_per_event, *frame_shape),
thomashopkins32 marked this conversation as resolved.
Show resolved Hide resolved
"shape": (self._frames_per_event, *frame_shape),
thomashopkins32 marked this conversation as resolved.
Show resolved Hide resolved
# Include file template for reconstruction in consolidator
"template": file_template,
},
Expand Down
19 changes: 10 additions & 9 deletions src/ophyd_async/epics/adcore/_hdf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from xml.etree import ElementTree as ET

from bluesky.protocols import Hints, StreamAsset
from event_model import DataKey
from event_model import DataKey # type: ignore

from ophyd_async.core import (
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
self._file: HDFFile | None = None
self._include_file_number = False

async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
async def open(self, frames_per_event: int = 1) -> dict[str, DataKey]:
self._file = None

# Setting HDF writer specific signals
Expand All @@ -74,8 +74,9 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
name = self._name_provider()
detector_shape = await self._dataset_describer.shape()
np_dtype = await self._dataset_describer.np_datatype()
self._multiplier = multiplier
outer_shape = (multiplier,) if multiplier > 1 else ()

# Used by the base class
self._frames_per_event = frames_per_event

# Determine number of frames that will be saved per HDF chunk
frames_per_chunk = await self.fileio.num_frames_chunks.get_value()
Expand All @@ -85,9 +86,10 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
HDFDataset(
data_key=name,
dataset="/entry/data/data",
shape=detector_shape,
shape=(frames_per_event, *detector_shape)
if frames_per_event > 1 or detector_shape
else (),
dtype_numpy=np_dtype,
multiplier=multiplier,
chunk_shape=(frames_per_chunk, *detector_shape),
)
]
Expand All @@ -112,9 +114,8 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
HDFDataset(
datakey,
f"/entry/instrument/NDAttributes/{datakey}",
(),
(frames_per_event,) if frames_per_event > 1 else (),
np_datatype,
multiplier,
# NDAttributes appear to always be configured with
# this chunk size
chunk_shape=(16384,),
Expand All @@ -124,7 +125,7 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
describe = {
ds.data_key: DataKey(
source=self.fileio.full_file_name.source,
shape=list(outer_shape + tuple(ds.shape)),
shape=list(ds.shape),
dtype="array" if ds.shape else "number",
dtype_numpy=ds.dtype_numpy,
external="STREAM:",
Expand Down
11 changes: 6 additions & 5 deletions src/ophyd_async/epics/eiger/_odin_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import AsyncGenerator, AsyncIterator

from bluesky.protocols import StreamAsset
from event_model import DataKey
from event_model import DataKey # type: ignore

from ophyd_async.core import (
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -77,8 +77,9 @@ def __init__(
self._name_provider = name_provider
super().__init__()

async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
async def open(self, frames_per_event: int = 1) -> dict[str, DataKey]:
info = self._path_provider(device_name=self._name_provider())
self._frames_per_event = frames_per_event

await asyncio.gather(
self._drv.file_path.set(str(info.directory_path)),
Expand All @@ -101,7 +102,7 @@ async def _describe(self) -> dict[str, DataKey]:
return {
"data": DataKey(
source=self._drv.file_name.source,
shape=list(data_shape),
shape=[self._frames_per_event, *data_shape],
dtype="array",
# TODO: Use correct type based on eiger https://github.com/bluesky/ophyd-async/issues/529
dtype_numpy="<u2",
Expand All @@ -113,10 +114,10 @@ async def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
async for num_captured in observe_value(self._drv.num_captured, timeout):
yield num_captured
yield num_captured // self._frames_per_event

async def get_indices_written(self) -> int:
return await self._drv.num_captured.get_value()
return await self._drv.num_captured.get_value() // self._frames_per_event

def collect_stream_docs(self, indices_written: int) -> AsyncIterator[StreamAsset]:
# TODO: Correctly return stream https://github.com/bluesky/ophyd-async/issues/530
Expand Down
22 changes: 10 additions & 12 deletions src/ophyd_async/fastcs/panda/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from pathlib import Path

from bluesky.protocols import StreamAsset
from event_model import DataKey
from p4p.client.thread import Context
from event_model import DataKey # type: ignore
from p4p.client.thread import Context # type: ignore

from ophyd_async.core import (
DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -34,10 +34,9 @@ def __init__(
self._name_provider = name_provider
self._datasets: list[HDFDataset] = []
self._file: HDFFile | None = None
self._multiplier = 1

# Triggered on PCAP arm
async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
async def open(self, frames_per_event: int = 1) -> dict[str, DataKey]:
"""Retrieve and get descriptor of all PandA signals marked for capture"""

# Ensure flushes are immediate
Expand Down Expand Up @@ -68,9 +67,9 @@ async def open(self, multiplier: int = 1) -> dict[str, DataKey]:

# Wait for it to start, stashing the status that tells us when it finishes
await self.panda_data_block.capture.set(True)
if multiplier > 1:
if frames_per_event > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
"All PandA datasets should be scalar, frames_per_event should be 1"
)

return await self._describe()
Expand All @@ -84,8 +83,9 @@ async def _describe(self) -> dict[str, DataKey]:
describe = {
ds.data_key: DataKey(
source=self.panda_data_block.hdf_directory.source,
shape=list(ds.shape),
dtype="array" if ds.shape != [1] else "number",
# frames_per_event is always 1 for PandA
shape=[1, *ds.shape] if ds.shape else [],
dtype="array" if ds.shape else "number",
# PandA data should always be written as Float64
dtype_numpy="<f8",
external="STREAM:",
Expand All @@ -104,9 +104,7 @@ async def _update_datasets(self) -> None:
self._datasets = [
# TODO: Update chunk size to read signal once available in IOC
# Currently PandA IOC sets chunk size to 1024 points per chunk
HDFDataset(
dataset_name, "/" + dataset_name, [1], multiplier=1, chunk_shape=(1024,)
)
HDFDataset(dataset_name, "/" + dataset_name, shape=(), chunk_shape=(1024,))
for dataset_name in capture_table.name
]

Expand Down Expand Up @@ -141,7 +139,7 @@ async def observe_indices_written(
async for num_captured in observe_value(
self.panda_data_block.num_captured, timeout
):
yield num_captured // self._multiplier
yield num_captured

async def collect_stream_docs(
self, indices_written: int
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections.abc import AsyncGenerator, AsyncIterator

from event_model import DataKey
from event_model import DataKey # type: ignore

from ophyd_async.core import DEFAULT_TIMEOUT, DetectorWriter, NameProvider, PathProvider

Expand All @@ -20,9 +20,10 @@ def __init__(
self.path_provider = path_provider
self.name_provider = name_provider

async def open(self, multiplier: int = 1) -> dict[str, DataKey]:
async def open(self, frames_per_event: int = 1) -> dict[str, DataKey]:
self._frames_per_event = frames_per_event
return await self.pattern_generator.open_file(
self.path_provider, self.name_provider(), multiplier
self.path_provider, self.name_provider(), frames_per_event
)

async def close(self) -> None:
Expand All @@ -35,7 +36,7 @@ async def observe_indices_written(
self, timeout=DEFAULT_TIMEOUT
) -> AsyncGenerator[int, None]:
async for index in self.pattern_generator.observe_indices_written(timeout):
yield index
yield index // self._frames_per_event

async def get_indices_written(self) -> int:
return self.pattern_generator.image_counter
return self.pattern_generator.image_counter // self._frames_per_event
Loading
Loading