diff --git a/docs/images/data_bobfile.png b/docs/images/data_bobfile.png
new file mode 100644
index 00000000..4f4747c9
Binary files /dev/null and b/docs/images/data_bobfile.png differ
diff --git a/docs/user/how-to/capture-hdf.rst b/docs/user/how-to/capture-hdf.rst
new file mode 100644
index 00000000..e966340e
--- /dev/null
+++ b/docs/user/how-to/capture-hdf.rst
@@ -0,0 +1,38 @@
+
+Capture data
+============
+
+The ``:DATA`` PVs are used to capture data from the panda.
+These can be viewed from the DATA screen.
+
+.. image:: /images/data_bobfile.png
+ :alt: The data screen
+ :align: center
+
+
+* The file directory and name are chosen with ``:DATA:HDFDirectory`` and ``:DATA:HDFFileName``.
+* ``:DATA:NumCapture`` is the number of frames to capture in the file.
+* ``:DATA:NumCaptured`` is the number of frames written to file.
+* ``:DATA:NumReceived`` is the number of frames received from the panda.
+* ``:DATA:FlushPeriod`` is the frequency that the data is flushed into frames in the client.
+* ``:DATA:Capture`` will begin capturing data.
+* ``:DATA:CaptureMode`` is one of the three capture modes listed below.
+
+
+First N mode
+------------
+
+Begin capturing data and writing it to file as soon as it is received. Stop capturing once ``NumCapture``
+frames have been written or the panda has been disarmed.
+
+
+Last N mode
+-----------
+
+Begin capturing data in a buffer, once capturing has finished write the last ``NumCapture`` frames to disk.
+
+
+Forever mode
+------------
+
+Keep capturing and writing frames. Once the panda has been disarmed wait for it to be armed again and continue writing.
\ No newline at end of file
diff --git a/docs/user/index.rst b/docs/user/index.rst
index 2c94a0c0..31da93ae 100644
--- a/docs/user/index.rst
+++ b/docs/user/index.rst
@@ -26,6 +26,7 @@ side-bar.
:maxdepth: 1
how-to/run-container
+ how-to/capture-hdf
+++
diff --git a/pyproject.toml b/pyproject.toml
index 576799c3..c44d9067 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -17,7 +17,7 @@ dependencies = [
"click",
"h5py",
"softioc>=4.4.0",
- "pandablocks>=0.5.3",
+ "pandablocks~=0.7.0",
"pvi~=0.7.0",
] # Add project dependencies here, e.g. ["click", "numpy"]
dynamic = ["version"]
diff --git a/src/pandablocks_ioc/_hdf_ioc.py b/src/pandablocks_ioc/_hdf_ioc.py
index f3543039..034adae0 100644
--- a/src/pandablocks_ioc/_hdf_ioc.py
+++ b/src/pandablocks_ioc/_hdf_ioc.py
@@ -2,8 +2,10 @@
import logging
import os
from asyncio import CancelledError
+from collections import deque
+from enum import Enum
from importlib.util import find_spec
-from typing import List, Optional
+from typing import Callable, Deque, Optional, Union
from pandablocks.asyncio import AsyncioClient
from pandablocks.hdf import (
@@ -18,26 +20,305 @@
from softioc import alarm, builder
from softioc.pythonSoftIoc import RecordWrapper
-from ._pvi import PviGroup, add_pvi_info
+from ._pvi import PviGroup, add_automatic_pvi_info, add_data_capture_pvi_info
from ._types import ONAM_STR, ZNAM_STR, EpicsName
+HDFReceived = Union[ReadyData, StartData, FrameData, EndData]
+
+
+class CaptureMode(Enum):
+ """
+ The mode which the circular buffer will use to flush
+ """
+
+ #: Wait till N frames are recieved then write them
+ #: and finish capture
+ FIRST_N = 0
+
+ #: On EndData write the last N frames
+ LAST_N = 1
+
+ #: Write data as received until Capture set to 0
+ FOREVER = 2
+
+
+class NumCapturedSetter(Pipeline):
+ def __init__(self, number_captured_setter: Callable) -> None:
+ self.number_captured_setter = number_captured_setter
+ self.number_captured_setter(0)
+ super().__init__()
+
+ self.what_to_do = {int: self.set_record}
+
+ def set_record(self, value: int):
+ self.number_captured_setter(value)
+
+
+class HDF5Buffer:
+ _buffer_index = None
+ start_data = None
+ number_of_received_rows = 0
+ finish_capturing = False
+ number_of_rows_in_circular_buffer = 0
+
+ def __init__(
+ self,
+ capture_mode: CaptureMode,
+ filepath: str,
+ number_of_rows_to_capture: int,
+ status_message_setter: Callable,
+ number_received_setter: Callable,
+ number_captured_setter_pipeline: NumCapturedSetter,
+ ):
+ # Only one filename - user must stop capture and set new FileName/FilePath
+ # for new files
+
+ self.circular_buffer: Deque[FrameData] = deque()
+ self.capture_mode = capture_mode
+
+ match capture_mode:
+ case CaptureMode.FIRST_N:
+ self._handle_FrameData = self._capture_first_n
+ case CaptureMode.LAST_N:
+ self._handle_FrameData = self._capture_last_n
+ case CaptureMode.FOREVER:
+ self._handle_FrameData = self._capture_forever
+ case _:
+ raise RuntimeError("Invalid capture mode")
+
+ self.filepath = filepath
+ self.number_of_rows_to_capture = number_of_rows_to_capture
+ self.status_message_setter = status_message_setter
+ self.number_received_setter = number_received_setter
+ self.number_captured_setter_pipeline = number_captured_setter_pipeline
+ self.number_captured_setter_pipeline.number_captured_setter(0)
+
+ if (
+ self.capture_mode == CaptureMode.LAST_N
+ and self.number_of_rows_to_capture <= 0
+ ):
+ raise RuntimeError("Number of rows to capture must be > 0 on LAST_N mode")
+
+ self.start_pipeline()
+
+ def __del__(self):
+ if self.pipeline[0].is_alive():
+ stop_pipeline(self.pipeline)
+
+ def put_data_to_file(self, data: HDFReceived):
+ try:
+ self.pipeline[0].queue.put_nowait(data)
+ except Exception as ex:
+ logging.exception(f"Failed to save the data to HDF5 file: {ex}")
+
+ def start_pipeline(self):
+ self.pipeline = create_default_pipeline(
+ iter([self.filepath]), self.number_captured_setter_pipeline
+ )
+
+ def _handle_StartData(self, data: StartData):
+ if self.start_data and data != self.start_data:
+ # PandA was disarmed, had config changed, and rearmed.
+ # Cannot process to the same file with different start data.
+ logging.error(
+ "New start data detected, differs from previous start "
+ "data for this file. Aborting HDF5 data capture."
+ )
+
+ self.status_message_setter(
+ "Mismatched StartData packet for file",
+ severity=alarm.MAJOR_ALARM,
+ alarm=alarm.STATE_ALARM,
+ )
+ self.put_data_to_file(
+ EndData(self.number_of_received_rows, EndReason.START_DATA_MISMATCH)
+ )
+
+ self.finish_capturing = True
+
+ # Only pass StartData to pipeline if we haven't previously
+ else:
+ # In LAST_N mode, wait till the end of capture to write
+ # the StartData to file.
+ # In FOREVER mode write the StartData to file if it's the first received.
+ if (
+ self.capture_mode == CaptureMode.FIRST_N
+ or self.capture_mode == CaptureMode.FOREVER
+ and not self.start_data
+ ):
+ self.put_data_to_file(data)
+
+ self.start_data = data
+
+ def _capture_first_n(self, data: FrameData):
+ """
+ Capture framedata as it comes in. Stop when number of frames exceeds
+ number_of_rows_to_capture, and cut off the data so that it's length
+ number_of_rows_to_capture.
+ """
+ self.number_of_received_rows += len(data.data)
+
+ if (
+ self.number_of_rows_to_capture > 0
+ and self.number_of_received_rows > self.number_of_rows_to_capture
+ ):
+ # Discard extra collected data points if necessary
+ data.data = data.data[
+ : self.number_of_rows_to_capture - self.number_of_received_rows
+ ].copy()
+ self.number_of_received_rows = self.number_of_rows_to_capture
+
+ self.put_data_to_file(data)
+ self.number_received_setter(self.number_of_received_rows)
+
+ if (
+ self.number_of_rows_to_capture > 0
+ and self.number_of_received_rows == self.number_of_rows_to_capture
+ ):
+ # Reached configured capture limit, stop the file
+ logging.info(
+ f"Requested number of frames ({self.number_of_rows_to_capture}) "
+ "captured, disabling Capture."
+ )
+ self.status_message_setter("Requested number of frames captured")
+ self.put_data_to_file(EndData(self.number_of_received_rows, EndReason.OK))
+ self.finish_capturing = True
+
+ def _capture_forever(self, data: FrameData):
+ self.put_data_to_file(data)
+ self.number_of_received_rows += len(data.data)
+ self.number_received_setter(self.number_of_received_rows)
+
+ def _capture_last_n(self, data: FrameData):
+ """
+ Append every FrameData to a buffer until the number of rows equals
+ `:NumCapture`. Then rewrite the data circularly.
+
+ Only write the data once PCAP is received.
+ """
+ self.circular_buffer.append(data)
+ self.number_of_received_rows += len(data.data)
+ self.number_of_rows_in_circular_buffer += len(data.data)
+
+ if self.number_of_rows_in_circular_buffer > self.number_of_rows_to_capture:
+ self.status_message_setter(
+ "NumCapture received, rewriting first frames received"
+ )
+
+ else:
+ self.status_message_setter("Filling buffer to NumReceived")
+
+ while self.number_of_rows_in_circular_buffer > self.number_of_rows_to_capture:
+ first_frame_data = self.circular_buffer.popleft()
+ first_frame_data_length = len(first_frame_data.data)
+
+ if first_frame_data_length > self.number_of_rows_to_capture:
+ # More data than we want to capture, all in a single FrameData
+ # We can just slice with the NumCapture since this has to be the
+ # only FrameData in the buffer at this point
+ assert len(self.circular_buffer) == 0
+ shrinked_data = first_frame_data.data[
+ -self.number_of_rows_to_capture :
+ ].copy()
+ first_frame_data.data = shrinked_data
+ self.circular_buffer.appendleft(first_frame_data)
+ self.number_of_rows_in_circular_buffer = self.number_of_rows_to_capture
+ elif (
+ first_frame_data_length
+ > self.number_of_rows_in_circular_buffer
+ - self.number_of_rows_to_capture
+ ):
+ # We can slice from the beginning of the FrameData to have the desired
+ # number of rows
+ indices_to_discard = (
+ self.number_of_rows_in_circular_buffer
+ - self.number_of_rows_to_capture
+ )
+ shrinked_data = first_frame_data.data[indices_to_discard:].copy()
+ first_frame_data.data = shrinked_data
+ self.circular_buffer.appendleft(first_frame_data)
+ self.number_of_rows_in_circular_buffer -= indices_to_discard
+ assert (
+ self.number_of_rows_in_circular_buffer
+ == self.number_of_rows_to_capture
+ )
+ else:
+ # If we remove the enire first frame data then the buffer will still
+ # be too big, or it will be exactly the number of rows we want
+ self.number_of_rows_in_circular_buffer -= first_frame_data_length
+
+ self.number_received_setter(self.number_of_received_rows)
+
+ def _handle_EndData(self, data: EndData):
+ match self.capture_mode:
+ case CaptureMode.LAST_N:
+ # In LAST_N only write FrameData if the EndReason is OK
+ if data.reason not in (EndReason.OK, EndReason.MANUALLY_STOPPED):
+ self.status_message_setter(
+ f"Stopped capturing with reason {data.reason}, "
+ "skipping writing of buffered frames"
+ )
+ self.finish_capturing = True
+ return
+
+ self.status_message_setter(
+ "Finishing capture, writing buffered frames to file"
+ )
+ self.put_data_to_file(self.start_data)
+ for frame_data in self.circular_buffer:
+ self.put_data_to_file(frame_data)
+
+ case CaptureMode.FOREVER:
+ if data.reason != EndReason.MANUALLY_STOPPED:
+ self.status_message_setter(
+ "Finished capture, waiting for next ReadyData"
+ )
+ return
+
+ case CaptureMode.FIRST_N:
+ pass # Frames will have already been written in FirstN
+
+ case _:
+ raise RuntimeError("Unknown capture mode")
+
+ self.status_message_setter("Finished capture")
+ self.finish_capturing = True
+ self.put_data_to_file(data)
+
+ def handle_data(self, data: HDFReceived):
+ match data:
+ case ReadyData():
+ pass
+ case StartData():
+ self.status_message_setter("Starting capture")
+ self._handle_StartData(data)
+ case FrameData():
+ self._handle_FrameData(data)
+ case EndData():
+ self._handle_EndData(data)
+ case _:
+ raise RuntimeError(
+ f"Data was recieved that was of type {type(data)}, not"
+ "StartData, EndData, ReadyData, or FrameData"
+ )
+
class HDF5RecordController:
"""Class to create and control the records that handle HDF5 processing"""
- _HDF5_PREFIX = "HDF5"
+ _DATA_PREFIX = "DATA"
_client: AsyncioClient
- _file_path_record: RecordWrapper
+ _directory_record: RecordWrapper
_file_name_record: RecordWrapper
_file_number_record: RecordWrapper
_file_format_record: RecordWrapper
_num_capture_record: RecordWrapper
+ _num_captured_record: RecordWrapper
_flush_period_record: RecordWrapper
_capture_control_record: RecordWrapper # Turn capture on/off
_status_message_record: RecordWrapper # Reports status and error messages
- _currently_capturing_record: RecordWrapper # If HDF5 file currently being written
_handle_hdf5_data_task: Optional[asyncio.Task] = None
@@ -53,32 +334,34 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
# Create the records, including an uppercase alias for each
# Naming convention and settings (mostly) copied from FSCN2 HDF5 records
- file_path_record_name = EpicsName(self._HDF5_PREFIX + ":FilePath")
- self._file_path_record = builder.longStringOut(
- file_path_record_name,
+ directory_record_name = EpicsName(self._DATA_PREFIX + ":HDFDirectory")
+ self._directory_record = builder.longStringOut(
+ directory_record_name,
length=path_length,
DESC="File path for HDF5 files",
validate=self._parameter_validate,
+ on_update=self._update_full_file_path,
)
- add_pvi_info(
- PviGroup.INPUTS,
- self._file_path_record,
- file_path_record_name,
+ add_automatic_pvi_info(
+ PviGroup.HDF,
+ self._directory_record,
+ directory_record_name,
builder.longStringOut,
)
- self._file_path_record.add_alias(
- record_prefix + ":" + file_path_record_name.upper()
+ self._directory_record.add_alias(
+ record_prefix + ":" + directory_record_name.upper()
)
- file_name_record_name = EpicsName(self._HDF5_PREFIX + ":FileName")
+ file_name_record_name = EpicsName(self._DATA_PREFIX + ":HDFFileName")
self._file_name_record = builder.longStringOut(
file_name_record_name,
length=filename_length,
DESC="File name prefix for HDF5 files",
validate=self._parameter_validate,
+ on_update=self._update_full_file_path,
)
- add_pvi_info(
- PviGroup.INPUTS,
+ add_automatic_pvi_info(
+ PviGroup.HDF,
self._file_name_record,
file_name_record_name,
builder.longStringOut,
@@ -87,7 +370,23 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
record_prefix + ":" + file_name_record_name.upper()
)
- num_capture_record_name = EpicsName(self._HDF5_PREFIX + ":NumCapture")
+ full_file_path_record_name = EpicsName(self._DATA_PREFIX + ":HDFFullFilePath")
+ self._full_file_path_record = builder.longStringIn(
+ full_file_path_record_name,
+ length=path_length + 1 + filename_length,
+ DESC="Full HDF5 file name with directory",
+ )
+ add_automatic_pvi_info(
+ PviGroup.HDF,
+ self._full_file_path_record,
+ full_file_path_record_name,
+ builder.longStringIn,
+ )
+ self._file_name_record.add_alias(
+ record_prefix + ":" + full_file_path_record_name.upper()
+ )
+
+ num_capture_record_name = EpicsName(self._DATA_PREFIX + ":NumCapture")
self._num_capture_record = builder.longOut(
num_capture_record_name,
initial_value=0, # Infinite capture
@@ -95,8 +394,8 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
DRVL=0,
)
- add_pvi_info(
- PviGroup.INPUTS,
+ add_automatic_pvi_info(
+ PviGroup.CAPTURE,
self._num_capture_record,
num_capture_record_name,
builder.longOut,
@@ -106,14 +405,49 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
record_prefix + ":" + num_capture_record_name.upper()
)
- flush_period_record_name = EpicsName(self._HDF5_PREFIX + ":FlushPeriod")
+ num_captured_record_name = EpicsName(self._DATA_PREFIX + ":NumCaptured")
+ self._num_captured_record = builder.longIn(
+ num_captured_record_name,
+ initial_value=0,
+ DESC="Number of frames written to file.",
+ )
+
+ add_automatic_pvi_info(
+ PviGroup.CAPTURE,
+ self._num_captured_record,
+ num_captured_record_name,
+ builder.longIn,
+ )
+ self._num_captured_record.add_alias(
+ record_prefix + ":" + num_captured_record_name.upper()
+ )
+
+ num_received_record_name = EpicsName(self._DATA_PREFIX + ":NumReceived")
+ self._num_received_record = builder.longIn(
+ num_received_record_name,
+ initial_value=0,
+ DESC="Number of frames received from panda.",
+ )
+
+ add_automatic_pvi_info(
+ PviGroup.CAPTURE,
+ self._num_received_record,
+ num_received_record_name,
+ builder.longIn,
+ )
+ self._num_received_record.add_alias(
+ record_prefix + ":" + num_received_record_name.upper()
+ )
+
+ flush_period_record_name = EpicsName(self._DATA_PREFIX + ":FlushPeriod")
self._flush_period_record = builder.aOut(
flush_period_record_name,
initial_value=1.0,
DESC="Frequency that data is flushed (seconds)",
+ EGU="s",
)
- add_pvi_info(
- PviGroup.INPUTS,
+ add_automatic_pvi_info(
+ PviGroup.CAPTURE,
self._flush_period_record,
flush_period_record_name,
builder.aOut,
@@ -122,7 +456,7 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
record_prefix + ":" + flush_period_record_name.upper()
)
- capture_control_record_name = EpicsName(self._HDF5_PREFIX + ":Capture")
+ capture_control_record_name = EpicsName(self._DATA_PREFIX + ":Capture")
self._capture_control_record = builder.boolOut(
capture_control_record_name,
ZNAM=ZNAM_STR,
@@ -131,23 +465,40 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
validate=self._capture_validate,
DESC="Start/stop HDF5 capture",
)
- add_pvi_info(
- PviGroup.INPUTS,
- self._capture_control_record,
+ add_data_capture_pvi_info(
+ PviGroup.CAPTURE,
capture_control_record_name,
- builder.boolOut,
+ self._capture_control_record,
)
self._capture_control_record.add_alias(
record_prefix + ":" + capture_control_record_name.upper()
)
- status_message_record_name = EpicsName(self._HDF5_PREFIX + ":Status")
- self._status_message_record = builder.stringIn(
+ capture_mode_record_name = EpicsName(self._DATA_PREFIX + ":CaptureMode")
+ self._capture_mode_record = builder.mbbOut(
+ capture_mode_record_name,
+ *[capture_mode.name for capture_mode in CaptureMode],
+ initial_value=0,
+ DESC="Choose how to hdf writer flushes",
+ )
+ add_automatic_pvi_info(
+ PviGroup.CAPTURE,
+ self._capture_mode_record,
+ capture_mode_record_name,
+ builder.mbbOut,
+ )
+ self._capture_mode_record.add_alias(
+ record_prefix + ":" + capture_mode_record_name.upper()
+ )
+
+ status_message_record_name = EpicsName(self._DATA_PREFIX + ":Status")
+ self._status_message_record = builder.longStringIn(
status_message_record_name,
initial_value="OK",
+ length=200,
DESC="Reports current status of HDF5 capture",
)
- add_pvi_info(
+ add_automatic_pvi_info(
PviGroup.OUTPUTS,
self._status_message_record,
status_message_record_name,
@@ -157,23 +508,6 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
record_prefix + ":" + status_message_record_name.upper()
)
- currently_capturing_record_name = EpicsName(self._HDF5_PREFIX + ":Capturing")
- self._currently_capturing_record = builder.boolIn(
- currently_capturing_record_name,
- ZNAM=ZNAM_STR,
- ONAM=ONAM_STR,
- DESC="If HDF5 file is currently being written",
- )
- add_pvi_info(
- PviGroup.OUTPUTS,
- self._currently_capturing_record,
- currently_capturing_record_name,
- builder.boolIn,
- )
- self._currently_capturing_record.add_alias(
- record_prefix + ":" + currently_capturing_record_name.upper()
- )
-
def _parameter_validate(self, record: RecordWrapper, new_val) -> bool:
"""Control when values can be written to parameter records
(file name etc.) based on capturing record's value"""
@@ -187,101 +521,49 @@ def _parameter_validate(self, record: RecordWrapper, new_val) -> bool:
return False
return True
+ async def _update_full_file_path(self, new_val) -> None:
+ self._full_file_path_record.set(self._get_filepath())
+
async def _handle_hdf5_data(self) -> None:
"""Handles writing HDF5 data from the PandA to file, based on configuration
in the various HDF5 records.
This method expects to be run as an asyncio Task."""
try:
- # Keep the start data around to compare against, for the case where a new
- # capture, and thus new StartData, is sent without Capture ever being
- # disabled
- start_data: Optional[StartData] = None
- captured_frames: int = 0
- # Only one filename - user must stop capture and set new FileName/FilePath
- # for new files
- pipeline: List[Pipeline] = create_default_pipeline(
- iter([self._get_filename()])
+ # Set up the hdf buffer
+ num_capture: int = self._num_capture_record.get()
+ capture_mode: CaptureMode = CaptureMode(self._capture_mode_record.get())
+ filepath = self._get_filepath()
+
+ number_captured_setter_pipeline = NumCapturedSetter(
+ self._num_captured_record.set
+ )
+ buffer = HDF5Buffer(
+ capture_mode,
+ filepath,
+ num_capture,
+ self._status_message_record.set,
+ self._num_received_record.set,
+ number_captured_setter_pipeline,
)
flush_period: float = self._flush_period_record.get()
-
async for data in self._client.data(
scaled=False, flush_period=flush_period
):
logging.debug(f"Received data packet: {data}")
- if isinstance(data, ReadyData):
- self._currently_capturing_record.set(1)
- self._status_message_record.set("Starting capture")
- elif isinstance(data, StartData):
- if start_data and data != start_data:
- # PandA was disarmed, had config changed, and rearmed.
- # Cannot process to the same file with different start data.
- logging.error(
- "New start data detected, differs from previous start "
- "data for this file. Aborting HDF5 data capture."
- )
-
- self._status_message_record.set(
- "Mismatched StartData packet for file",
- severity=alarm.MAJOR_ALARM,
- alarm=alarm.STATE_ALARM,
- )
- pipeline[0].queue.put_nowait(
- EndData(captured_frames, EndReason.START_DATA_MISMATCH)
- )
-
- break
- if start_data is None:
- # Only pass StartData to pipeline if we haven't previously
- # - if we have there will already be an in-progress HDF file
- # that we should just append data to
- start_data = data
- pipeline[0].queue.put_nowait(data)
-
- elif isinstance(data, FrameData):
- captured_frames += len(data.data)
-
- num_frames_to_capture: int = self._num_capture_record.get()
- if (
- num_frames_to_capture > 0
- and captured_frames > num_frames_to_capture
- ):
- # Discard extra collected data points if necessary
- data.data = data.data[: num_frames_to_capture - captured_frames]
- captured_frames = num_frames_to_capture
-
- pipeline[0].queue.put_nowait(data)
-
- if (
- num_frames_to_capture > 0
- and captured_frames >= num_frames_to_capture
- ):
- # Reached configured capture limit, stop the file
- logging.info(
- f"Requested number of frames ({num_frames_to_capture}) "
- "captured, disabling Capture."
- )
- self._status_message_record.set(
- "Requested number of frames captured"
- )
- pipeline[0].queue.put_nowait(
- EndData(captured_frames, EndReason.OK)
- )
- break
- elif not isinstance(data, EndData):
- raise RuntimeError(
- f"Data was recieved that was of type {type(data)}, not"
- "StartData, EndData, ReadyData or FrameData"
- )
- # Ignore EndData - handle terminating capture with the Capture
- # record or when we capture the requested number of frames
+
+ buffer.handle_data(data)
+ if buffer.finish_capturing:
+ break
except CancelledError:
logging.info("Capturing task cancelled, closing HDF5 file")
self._status_message_record.set("Capturing disabled")
# Only send EndData if we know the file was opened - could be cancelled
# before PandA has actually send any data
- if start_data:
- pipeline[0].queue.put_nowait(EndData(captured_frames, EndReason.OK))
+ if buffer.capture_mode != CaptureMode.LAST_N:
+ buffer.put_data_to_file(
+ EndData(buffer.number_of_received_rows, EndReason.MANUALLY_STOPPED)
+ )
except Exception:
logging.exception("HDF5 data capture terminated due to unexpected error")
@@ -292,22 +574,21 @@ async def _handle_hdf5_data(self) -> None:
)
# Only send EndData if we know the file was opened - exception could happen
# before file was opened
- if start_data:
- pipeline[0].queue.put_nowait(
- EndData(captured_frames, EndReason.UNKNOWN_EXCEPTION)
+ if buffer.start_data and buffer.capture_mode != CaptureMode.LAST_N:
+ buffer.put_data_to_file(
+ EndData(buffer.number_of_received_rows, EndReason.UNKNOWN_EXCEPTION)
)
finally:
logging.debug("Finishing processing HDF5 PandA data")
- stop_pipeline(pipeline)
+ self._num_received_record.set(buffer.number_of_received_rows)
self._capture_control_record.set(0)
- self._currently_capturing_record.set(0)
- def _get_filename(self) -> str:
+ def _get_filepath(self) -> str:
"""Create the file path for the HDF5 file from the relevant records"""
return "/".join(
(
- self._file_path_record.get(),
+ self._directory_record.get(),
self._file_name_record.get(),
)
)
@@ -330,7 +611,7 @@ def _capture_validate(self, record: RecordWrapper, new_val: int) -> bool:
"""Check the required records have been set before allowing Capture=1"""
if new_val:
try:
- self._get_filename()
+ self._get_filepath()
except ValueError:
logging.exception("At least 1 required record had no value")
return False
diff --git a/src/pandablocks_ioc/_pvi.py b/src/pandablocks_ioc/_pvi.py
index ce74d379..7ddf8bab 100644
--- a/src/pandablocks_ioc/_pvi.py
+++ b/src/pandablocks_ioc/_pvi.py
@@ -39,6 +39,7 @@ class PviGroup(Enum):
READBACKS = "Readbacks"
OUTPUTS = "Outputs"
CAPTURE = "Capture"
+ HDF = "HDF"
TABLE = "Table" # TODO: May not need this anymore
@@ -53,13 +54,64 @@ class PviInfo:
component: Component
-def add_pvi_info(
+def add_pvi_info_to_record(
+ record: RecordWrapper,
+ record_name: EpicsName,
+ access: str,
+):
+ block, field = record_name.split(":", maxsplit=1)
+ block_name_suffixed = f"pvi.{field.lower().replace(':', '_')}.{access}"
+ record.add_info(
+ "Q:group",
+ {
+ RecordName(f"{block}:PVI"): {
+ block_name_suffixed: {
+ "+channel": "NAME",
+ "+type": "plain",
+ "+trigger": block_name_suffixed,
+ }
+ }
+ },
+ )
+
+
+def add_data_capture_pvi_info(
+ group: PviGroup,
+ data_capture_record_name: EpicsName,
+ data_capture_pvi_record: RecordWrapper,
+):
+ component = SignalRW(
+ name=epics_to_pvi_name(data_capture_record_name),
+ pv=data_capture_record_name,
+ widget=ButtonPanel(actions=dict(Start="1", Stop="0")),
+ read_widget=LED(),
+ )
+ add_pvi_info_to_record(data_capture_pvi_record, data_capture_record_name, "rw")
+ Pvi.add_pvi_info(
+ record_name=data_capture_record_name, group=group, component=component
+ )
+
+
+def add_pcap_arm_pvi_info(group: PviGroup, pcap_arm_pvi_record: RecordWrapper):
+ pcap_arm_record_name = EpicsName("PCAP:ARM")
+ component = SignalRW(
+ name=epics_to_pvi_name(pcap_arm_record_name),
+ pv=pcap_arm_record_name,
+ widget=ButtonPanel(actions=dict(Arm="1", Disarm="0")),
+ read_widget=LED(),
+ )
+ add_pvi_info_to_record(pcap_arm_pvi_record, pcap_arm_record_name, "rw")
+ Pvi.add_pvi_info(record_name=pcap_arm_record_name, group=group, component=component)
+
+
+def add_automatic_pvi_info(
group: PviGroup,
record: RecordWrapper,
record_name: EpicsName,
record_creation_func: Callable,
) -> None:
- """Create the most common forms of the `PviInfo` structure"""
+ """Create the most common forms of the `PviInfo` structure.
+ Generates generic components from"""
component: Component
writeable: bool = record_creation_func in OUT_RECORD_FUNCTIONS
useComboBox: bool = record_creation_func == builder.mbbOut
@@ -83,7 +135,10 @@ def add_pvi_info(
if useComboBox:
widget = ComboBox()
else:
- if record_creation_func in (builder.longStringOut, builder.stringOut):
+ if record_creation_func in (
+ builder.longStringOut,
+ builder.stringOut,
+ ):
widget = TextWrite(format=TextFormat.string)
else:
widget = TextWrite(format=None)
@@ -91,22 +146,18 @@ def add_pvi_info(
component = SignalRW(name=pvi_name, pv=record_name, widget=widget)
access = "rw"
else:
+ if record_creation_func in (
+ builder.longStringIn,
+ builder.stringIn,
+ ):
+ widget = TextRead(format=TextFormat.string)
+ else:
+ widget = TextRead(format=None)
+
component = SignalR(name=pvi_name, pv=record_name, widget=TextRead())
access = "r"
- block, field = record_name.split(":", maxsplit=1)
- block_name_suffixed = f"pvi.{field.lower().replace(':', '_')}.{access}"
- record.add_info(
- "Q:group",
- {
- RecordName(f"{block}:PVI"): {
- block_name_suffixed: {
- "+channel": "NAME",
- "+type": "plain",
- "+trigger": block_name_suffixed,
- }
- }
- },
- )
+
+ add_pvi_info_to_record(record, record_name, access)
Pvi.add_pvi_info(record_name=record_name, group=group, component=component)
diff --git a/src/pandablocks_ioc/ioc.py b/src/pandablocks_ioc/ioc.py
index 148488f5..f65625a6 100644
--- a/src/pandablocks_ioc/ioc.py
+++ b/src/pandablocks_ioc/ioc.py
@@ -41,7 +41,13 @@
from softioc.pythonSoftIoc import RecordWrapper
from ._hdf_ioc import HDF5RecordController
-from ._pvi import Pvi, PviGroup, add_positions_table_row, add_pvi_info
+from ._pvi import (
+ Pvi,
+ PviGroup,
+ add_automatic_pvi_info,
+ add_pcap_arm_pvi_info,
+ add_positions_table_row,
+)
from ._tables import TableRecordWrapper, TableUpdater
from ._types import (
ONAM_STR,
@@ -652,7 +658,7 @@ def _create_record_info(
record_name, *labels, *args, **extra_kwargs, **kwargs
)
- add_pvi_info(
+ add_automatic_pvi_info(
group=group,
record=record,
record_name=record_name,
@@ -1727,9 +1733,7 @@ def create_block_records(
DESC="Arm/Disarm the PandA",
)
- add_pvi_info(
- PviGroup.INPUTS, pcap_arm_record, EpicsName("PCAP:ARM"), builder.Action
- )
+ add_pcap_arm_pvi_info(PviGroup.INPUTS, pcap_arm_record)
HDF5RecordController(self._client, self._record_prefix)
diff --git a/tests/fixtures/mocked_panda.py b/tests/fixtures/mocked_panda.py
index 495e2944..7c2b9c0c 100644
--- a/tests/fixtures/mocked_panda.py
+++ b/tests/fixtures/mocked_panda.py
@@ -227,15 +227,12 @@ async def data(
flush_every_frame = flush_period is None
conn = DataConnection()
conn.connect(scaled)
- try:
- f = open(Path(__file__).parent.parent / "raw_dump.txt", "rb")
+ with open(Path(__file__).parent.parent / "raw_dump.txt", "rb") as f:
for raw in chunked_read(f, 200000):
for data in conn.receive_bytes(
raw, flush_every_frame=flush_every_frame
):
yield data
- finally:
- f.close()
def get_multiprocessing_context():
diff --git a/tests/test-bobfiles/HDF5.bob b/tests/test-bobfiles/DATA.bob
similarity index 50%
rename from tests/test-bobfiles/HDF5.bob
rename to tests/test-bobfiles/DATA.bob
index 51d60d04..1cabfce4 100644
--- a/tests/test-bobfiles/HDF5.bob
+++ b/tests/test-bobfiles/DATA.bob
@@ -3,13 +3,13 @@
0
0
426
- 277
+ 413
4
4
Title
TITLE
- HDF5 - TEST_PREFIX:
+ DATA - TEST_PREFIX:
0
0
426
@@ -26,15 +26,15 @@
1
- INPUTS
+ HDF
5
30
416
- 156
+ 106
true
Label
- Filepath
+ Hdfdirectory
0
0
250
@@ -42,7 +42,7 @@
TextEntry
- TEST_PREFIX:HDF5:FilePath
+ TEST_PREFIX:DATA:HDFDirectory
255
0
125
@@ -52,7 +52,7 @@
Label
- Filename
+ Hdffilename
0
25
250
@@ -60,7 +60,7 @@
TextEntry
- TEST_PREFIX:HDF5:FileName
+ TEST_PREFIX:DATA:HDFFileName
255
25
125
@@ -70,76 +70,84 @@
Label
- Numcapture
+ Hdffullfilepath
0
50
250
20
-
- TextEntry
- TEST_PREFIX:HDF5:NumCapture
+
+ TextUpdate
+ TEST_PREFIX:DATA:HDFFullFilePath
255
50
125
20
+
+
+
+
1
+
+
+ CAPTURE
+ 5
+ 141
+ 416
+ 206
+ true
Label
- Flushperiod
+ Numcapture
0
- 75
+ 0
250
20
TextEntry
- TEST_PREFIX:HDF5:FlushPeriod
+ TEST_PREFIX:DATA:NumCapture
255
- 75
+ 0
125
20
1
Label
- Capture
+ Numcaptured
0
- 100
+ 25
250
20
-
- TextEntry
- TEST_PREFIX:HDF5:Capture
+
+ TextUpdate
+ TEST_PREFIX:DATA:NumCaptured
255
- 100
+ 25
125
20
+
+
+
+
1
-
-
- OUTPUTS
- 5
- 191
- 416
- 81
- true
Label
- Status
+ Numreceived
0
- 0
+ 50
250
20
TextUpdate
- TEST_PREFIX:HDF5:Status
+ TEST_PREFIX:DATA:NumReceived
255
- 0
+ 50
125
20
@@ -150,17 +158,132 @@
Label
- Capturing
+ Flushperiod
0
- 25
+ 75
+ 250
+ 20
+
+
+ TextEntry
+ TEST_PREFIX:DATA:FlushPeriod
+ 255
+ 75
+ 125
+ 20
+ 1
+
+
+ Label
+ Capture
+ 0
+ 100
+ 250
+ 20
+
+
+ WritePV
+ TEST_PREFIX:DATA:Capture
+
+
+ $(pv_name)
+ 1
+ $(name)
+
+
+ Start
+ 255
+ 100
+ 38
+ 20
+ $(actions)
+
+
+ WritePV
+ TEST_PREFIX:DATA:Capture
+
+
+ $(pv_name)
+ 0
+ $(name)
+
+
+ Stop
+ 298
+ 100
+ 38
+ 20
+ $(actions)
+
+
+ LED
+ TEST_PREFIX:DATA:Capture
+ 350
+ 100
+ 20
+ 20
+
+
+ Label
+ Capturemode
+ 0
+ 125
+ 250
+ 20
+
+
+ ComboBox
+ TEST_PREFIX:DATA:CaptureMode
+ 255
+ 125
+ 125
+ 20
+
+
+ Label
+ All Postion Capture Parameters
+ 0
+ 150
+ 250
+ 20
+
+
+ OpenDisplay
+
+
+ PandA_PositionsTable.bob
+ tab
+ Open Display
+
+
+ All Postion Capture Parameters
+ 255
+ 150
+ 125
+ 20
+ $(actions)
+
+
+
+ OUTPUTS
+ 5
+ 352
+ 416
+ 56
+ true
+
+ Label
+ Status
+ 0
+ 0
250
20
TextUpdate
- TEST_PREFIX:HDF5:Capturing
+ TEST_PREFIX:DATA:Status
255
- 25
+ 0
125
20
diff --git a/tests/test-bobfiles/index.bob b/tests/test-bobfiles/index.bob
index 77167918..7f9a10ae 100644
--- a/tests/test-bobfiles/index.bob
+++ b/tests/test-bobfiles/index.bob
@@ -51,7 +51,7 @@
Label
- HDF5
+ DATA
23
55
250
@@ -61,12 +61,12 @@
OpenDisplay
- HDF5.bob
+ DATA.bob
tab
Open Display
- HDF5
+ DATA
278
55
125
diff --git a/tests/test_hdf_ioc.py b/tests/test_hdf_ioc.py
index 36ef4912..1d6d8e6a 100644
--- a/tests/test_hdf_ioc.py
+++ b/tests/test_hdf_ioc.py
@@ -3,6 +3,7 @@
import asyncio
import logging
from asyncio import CancelledError
+from collections import deque
from multiprocessing.connection import Connection
from pathlib import Path
from typing import AsyncGenerator, Generator
@@ -11,7 +12,7 @@
import numpy
import pytest
import pytest_asyncio
-from aioca import caget, camonitor, caput
+from aioca import DBR_CHAR_STR, CANothing, caget, caput
from fixtures.mocked_panda import (
TIMEOUT,
MockedAsyncioClient,
@@ -34,7 +35,12 @@
)
from softioc import asyncio_dispatcher, builder, softioc
-from pandablocks_ioc._hdf_ioc import HDF5RecordController
+from pandablocks_ioc._hdf_ioc import (
+ CaptureMode,
+ HDF5Buffer,
+ HDF5RecordController,
+ NumCapturedSetter,
+)
NAMESPACE_PREFIX = "HDF-RECORD-PREFIX"
@@ -42,7 +48,7 @@
@pytest.fixture
def new_random_hdf5_prefix():
test_prefix = append_random_uppercase(NAMESPACE_PREFIX)
- hdf5_test_prefix = test_prefix + ":HDF5"
+ hdf5_test_prefix = test_prefix + ":DATA"
return test_prefix, hdf5_test_prefix
@@ -207,7 +213,7 @@ def fast_dump_expected():
[8, 58, 58, 174, 0.570000056, 58, 116],
)
),
- EndData(58, EndReason.DISARMED),
+ EndData(58, EndReason.OK),
]
@@ -318,14 +324,14 @@ async def test_hdf5_ioc(hdf5_subprocess_ioc):
test_prefix, hdf5_test_prefix = hdf5_subprocess_ioc
- val = await caget(hdf5_test_prefix + ":FilePath")
+ val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR)
# Default value of longStringOut is an array of a single NULL byte
- assert val.size == 1
+ assert val == ""
# Mix and match between CamelCase and UPPERCASE to check aliases work
- val = await caget(hdf5_test_prefix + ":FILENAME")
- assert val.size == 1 # As above for longStringOut
+ val = await caget(hdf5_test_prefix + ":HDFFILENAME", datatype=DBR_CHAR_STR)
+ assert val == ""
val = await caget(hdf5_test_prefix + ":NumCapture")
assert val == 0
@@ -336,20 +342,13 @@ async def test_hdf5_ioc(hdf5_subprocess_ioc):
val = await caget(hdf5_test_prefix + ":CAPTURE")
assert val == 0
- val = await caget(hdf5_test_prefix + ":Status")
+ val = await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR)
assert val == "OK"
- val = await caget(hdf5_test_prefix + ":Capturing")
- assert val == 0
-
-
-def _string_to_buffer(string: str):
- """Convert a python string into a numpy buffer suitable for caput'ing to a Waveform
- record"""
- return numpy.frombuffer(string.encode(), dtype=numpy.uint8)
-
-async def test_hdf5_ioc_parameter_validate_works(hdf5_subprocess_ioc_no_logging_check):
+async def test_hdf5_ioc_parameter_validate_works(
+ hdf5_subprocess_ioc_no_logging_check, tmp_path
+):
"""Run the HDF5 module as its own IOC and check the _parameter_validate method
does not stop updates, then stops when capture record is changed"""
@@ -357,52 +356,155 @@ async def test_hdf5_ioc_parameter_validate_works(hdf5_subprocess_ioc_no_logging_
# EPICS bug means caputs always appear to succeed, so do a caget to prove it worked
await caput(
- hdf5_test_prefix + ":FilePath", _string_to_buffer("/new/path"), wait=True
+ hdf5_test_prefix + ":HDFDirectory",
+ str(tmp_path),
+ datatype=DBR_CHAR_STR,
+ wait=True,
)
- val = await caget(hdf5_test_prefix + ":FilePath")
- assert val.tobytes().decode() == "/new/path"
+ val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR)
+ assert val == str(tmp_path)
- await caput(hdf5_test_prefix + ":FileName", _string_to_buffer("name.h5"), wait=True)
- val = await caget(hdf5_test_prefix + ":FileName")
- assert val.tobytes().decode() == "name.h5"
+ await caput(
+ hdf5_test_prefix + ":HDFFileName", "name.h5", wait=True, datatype=DBR_CHAR_STR
+ )
+ val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR)
+ assert val == "name.h5"
await caput(hdf5_test_prefix + ":Capture", 1, wait=True)
assert await caget(hdf5_test_prefix + ":Capture") == 1
+ with pytest.raises(CANothing):
+ await caput(
+ hdf5_test_prefix + ":HDFFullFilePath",
+ "/second/path/name.h5",
+ wait=True,
+ datatype=DBR_CHAR_STR,
+ )
+ val = await caget(hdf5_test_prefix + ":HDFFullFilePath", datatype=DBR_CHAR_STR)
+ assert val == str(tmp_path) + "/name.h5" # put should have been stopped
+
+
+@pytest.mark.parametrize("num_capture", [1, 1000, 10000])
+async def test_hdf5_file_writing_first_n(
+ hdf5_subprocess_ioc, tmp_path: Path, caplog, num_capture
+):
+ """Test that an HDF5 file is written when Capture is enabled"""
+
+ test_prefix, hdf5_test_prefix = hdf5_subprocess_ioc
+
+ val = await caget(hdf5_test_prefix + ":CaptureMode")
+ assert val == CaptureMode.FIRST_N.value
+
+ test_dir = tmp_path
+ test_filename = "test.h5"
await caput(
- hdf5_test_prefix + ":FilePath", _string_to_buffer("/second/path"), wait=True
+ hdf5_test_prefix + ":HDFDirectory",
+ str(test_dir),
+ wait=True,
+ datatype=DBR_CHAR_STR,
+ )
+ val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR)
+ assert val == str(test_dir)
+
+ await caput(
+ hdf5_test_prefix + ":HDFFileName", "name.h5", wait=True, datatype=DBR_CHAR_STR
+ )
+ val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR)
+ assert val == "name.h5"
+
+ await caput(
+ hdf5_test_prefix + ":HDFFileName",
+ test_filename,
+ wait=True,
+ timeout=TIMEOUT,
+ datatype=DBR_CHAR_STR,
+ )
+ val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR)
+ assert val == test_filename
+
+ val = await caget(hdf5_test_prefix + ":HDFFullFilePath", datatype=DBR_CHAR_STR)
+ assert val == "/".join([str(tmp_path), test_filename])
+
+ # Only a single FrameData in the example data
+ assert await caget(hdf5_test_prefix + ":NumCapture") == 0
+ await caput(
+ hdf5_test_prefix + ":NumCapture", num_capture, wait=True, timeout=TIMEOUT
+ )
+ assert await caget(hdf5_test_prefix + ":NumCapture") == num_capture
+
+ await caput(hdf5_test_prefix + ":Capture", 1, wait=True, timeout=TIMEOUT)
+ assert await caget(hdf5_test_prefix + ":NumReceived") <= num_capture
+
+ await asyncio.sleep(1)
+ # Capture should have closed by itself
+ assert await caget(hdf5_test_prefix + ":Capture") == 0
+
+ assert await caget(hdf5_test_prefix + ":NumReceived") == num_capture
+ assert await caget(hdf5_test_prefix + ":NumCaptured") == num_capture
+ # Confirm file contains data we expect
+ with h5py.File(tmp_path / test_filename, "r") as hdf_file:
+ assert list(hdf_file) == [
+ "COUNTER1.OUT.Max",
+ "COUNTER1.OUT.Mean",
+ "COUNTER1.OUT.Min",
+ "COUNTER2.OUT.Mean",
+ "COUNTER3.OUT.Value",
+ "PCAP.BITS2.Value",
+ "PCAP.SAMPLES.Value",
+ "PCAP.TS_START.Value",
+ ]
+
+ assert len(hdf_file["/COUNTER1.OUT.Max"]) == num_capture
+
+ assert (
+ await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR)
+ == "Requested number of frames captured"
)
- val = await caget(hdf5_test_prefix + ":FilePath")
- assert val.tobytes().decode() == "/new/path" # put should have been stopped
@pytest.mark.parametrize("num_capture", [1, 1000, 10000])
-async def test_hdf5_file_writing(
+async def test_hdf5_file_writing_last_n_endreason_not_ok(
hdf5_subprocess_ioc, tmp_path: Path, caplog, num_capture
):
"""Test that an HDF5 file is written when Capture is enabled"""
test_prefix, hdf5_test_prefix = hdf5_subprocess_ioc
- test_dir = str(tmp_path) + "\0"
- test_filename = "test.h5\0"
+ val = await caget(hdf5_test_prefix + ":CaptureMode")
+ assert val == CaptureMode.FIRST_N.value
+ await caput(hdf5_test_prefix + ":CaptureMode", 1, wait=True)
+ val = await caget(hdf5_test_prefix + ":CaptureMode")
+ assert val == CaptureMode.LAST_N.value
+
+ test_dir = tmp_path
+ test_filename = "test.h5"
await caput(
- hdf5_test_prefix + ":FilePath",
- _string_to_buffer(str(test_dir)),
+ hdf5_test_prefix + ":HDFDirectory",
+ str(test_dir),
wait=True,
- timeout=TIMEOUT,
+ datatype=DBR_CHAR_STR,
+ )
+ val = await caget(hdf5_test_prefix + ":HDFDirectory", datatype=DBR_CHAR_STR)
+ assert val == str(test_dir)
+
+ await caput(
+ hdf5_test_prefix + ":HDFFileName", "name.h5", wait=True, datatype=DBR_CHAR_STR
)
- val = await caget(hdf5_test_prefix + ":FilePath")
- assert val.tobytes().decode() == test_dir
+ val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR)
+ assert val == "name.h5"
await caput(
- hdf5_test_prefix + ":FileName",
- _string_to_buffer(test_filename),
+ hdf5_test_prefix + ":HDFFileName",
+ test_filename,
wait=True,
timeout=TIMEOUT,
+ datatype=DBR_CHAR_STR,
)
- val = await caget(hdf5_test_prefix + ":FileName")
- assert val.tobytes().decode() == test_filename
+ val = await caget(hdf5_test_prefix + ":HDFFileName", datatype=DBR_CHAR_STR)
+ assert val == test_filename
+
+ val = await caget(hdf5_test_prefix + ":HDFFullFilePath", datatype=DBR_CHAR_STR)
+ assert val == "/".join([str(tmp_path), test_filename])
# Only a single FrameData in the example data
assert await caget(hdf5_test_prefix + ":NumCapture") == 0
@@ -411,44 +513,301 @@ async def test_hdf5_file_writing(
)
assert await caget(hdf5_test_prefix + ":NumCapture") == num_capture
- # The queue expects to see Capturing go 0 -> 1 -> 0 as Capture is enabled
- # and subsequently finishes
- capturing_queue: asyncio.Queue = asyncio.Queue()
- m = camonitor(
- hdf5_test_prefix + ":Capturing",
- capturing_queue.put,
+ # Initially Status should be "OK"
+ val = await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR)
+ assert val == "OK"
+
+ await caput(hdf5_test_prefix + ":Capture", 1, wait=True, timeout=TIMEOUT)
+
+ await asyncio.sleep(1)
+ # Capture should have closed by itself
+ assert await caget(hdf5_test_prefix + ":Capture") == 0
+
+ val = await caget(hdf5_test_prefix + ":Status", datatype=DBR_CHAR_STR)
+ assert (
+ val == "Stopped capturing with reason EndReason.DISARMED, "
+ "skipping writing of buffered frames"
)
- # Initially Capturing should be 0
- assert await capturing_queue.get() == 0
+ # We received all 10000 frames even if we asked to capture fewer.
+ assert await caget(hdf5_test_prefix + ":NumReceived") == 10000
- await caput(hdf5_test_prefix + ":Capture", 1, wait=True, timeout=TIMEOUT)
+ # We didn't write any frames since the endreason was `EndReason.DISARMED`,
+ # not endreason `EndReason.OK`
+ assert await caget(hdf5_test_prefix + ":NumCaptured") == 0
+
+ # Confirm no data was written
+ assert not (tmp_path / test_filename).exists()
+
+
+@pytest_asyncio.fixture
+def differently_sized_framedata():
+ yield [
+ ReadyData(),
+ StartData(DUMP_FIELDS, 0, "Scaled", "Framed", 52),
+ FrameData(
+ numpy.array(
+ [
+ [0, 1, 1, 3, 5.6e-08, 1, 2],
+ [0, 2, 2, 6, 0.010000056, 2, 4],
+ [8, 3, 3, 9, 0.020000056, 3, 6],
+ [8, 4, 4, 12, 0.030000056, 4, 8],
+ [8, 5, 5, 15, 0.040000056, 5, 10],
+ [8, 6, 6, 18, 0.050000056, 6, 12],
+ [8, 7, 7, 21, 0.060000056, 7, 14],
+ [8, 8, 8, 24, 0.070000056, 8, 16],
+ [8, 9, 9, 27, 0.080000056, 9, 18],
+ [8, 10, 10, 30, 0.090000056, 10, 20],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [0, 11, 11, 33, 0.100000056, 11, 22],
+ [8, 12, 12, 36, 0.110000056, 12, 24],
+ [8, 13, 13, 39, 0.120000056, 13, 26],
+ [8, 14, 14, 42, 0.130000056, 14, 28],
+ [8, 15, 15, 45, 0.140000056, 15, 30],
+ [8, 16, 16, 48, 0.150000056, 16, 32],
+ [8, 17, 17, 51, 0.160000056, 17, 34],
+ [8, 18, 18, 54, 0.170000056, 18, 36],
+ [8, 19, 19, 57, 0.180000056, 19, 38],
+ [0, 20, 20, 60, 0.190000056, 20, 40],
+ [8, 21, 21, 63, 0.200000056, 21, 42],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [8, 22, 22, 66, 0.210000056, 22, 44],
+ [8, 23, 23, 69, 0.220000056, 23, 46],
+ [8, 24, 24, 72, 0.230000056, 24, 48],
+ [8, 25, 25, 75, 0.240000056, 25, 50],
+ [8, 26, 26, 78, 0.250000056, 26, 52],
+ [8, 27, 27, 81, 0.260000056, 27, 54],
+ [8, 28, 28, 84, 0.270000056, 28, 56],
+ [0, 29, 29, 87, 0.280000056, 29, 58],
+ [8, 30, 30, 90, 0.290000056, 30, 60],
+ [8, 31, 31, 93, 0.300000056, 31, 62],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [8, 32, 32, 96, 0.310000056, 32, 64],
+ [8, 33, 33, 99, 0.320000056, 33, 66],
+ [8, 34, 34, 102, 0.330000056, 34, 68],
+ [8, 35, 35, 105, 0.340000056, 35, 70],
+ [8, 36, 36, 108, 0.350000056, 36, 72],
+ [8, 37, 37, 111, 0.360000056, 37, 74],
+ [0, 38, 38, 114, 0.370000056, 38, 76],
+ [8, 39, 39, 117, 0.380000056, 39, 78],
+ [8, 40, 40, 120, 0.390000056, 40, 80],
+ [8, 41, 41, 123, 0.400000056, 41, 82],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [8, 42, 42, 126, 0.410000056, 42, 84],
+ [8, 43, 43, 129, 0.420000056, 43, 86],
+ [8, 44, 44, 132, 0.430000056, 44, 88],
+ [8, 45, 45, 135, 0.440000056, 45, 90],
+ [8, 46, 46, 138, 0.450000056, 46, 92],
+ [0, 47, 47, 141, 0.460000056, 47, 94],
+ [8, 48, 48, 144, 0.470000056, 48, 96],
+ [8, 49, 49, 147, 0.480000056, 49, 98],
+ [8, 50, 50, 150, 0.490000056, 50, 100],
+ [8, 51, 51, 153, 0.500000056, 51, 102],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [8, 52, 52, 156, 0.510000056, 52, 104],
+ [8, 53, 53, 159, 0.520000056, 53, 106],
+ [8, 54, 54, 162, 0.530000056, 54, 108],
+ [8, 55, 55, 165, 0.540000056, 55, 110],
+ [0, 56, 56, 168, 0.550000056, 56, 112],
+ [8, 57, 57, 171, 0.560000056, 57, 114],
+ [8, 58, 58, 174, 0.570000056, 58, 116],
+ ]
+ )
+ ),
+ EndData(58, EndReason.OK),
+ ]
- assert await capturing_queue.get() == 1
- # The HDF5 data will be processed, and when it's done Capturing is set to 0
- assert await asyncio.wait_for(capturing_queue.get(), timeout=TIMEOUT) == 0
+def test_hdf_buffer_forever(differently_sized_framedata, tmp_path):
+ filepath = str(tmp_path / "test_file.h5")
+ status_output = []
+ num_received_output = []
+ num_captured_output = []
+ frames_written_to_file = []
+ num_captured_output = []
+ num_captured_setter_pipeline = NumCapturedSetter(num_captured_output.append)
+ buffer = HDF5Buffer(
+ CaptureMode.FOREVER,
+ filepath,
+ 21,
+ status_output.append,
+ num_received_output.append,
+ num_captured_setter_pipeline,
+ )
+ buffer.put_data_to_file = frames_written_to_file.append
+
+ for data in differently_sized_framedata:
+ buffer.handle_data(data)
+
+ assert buffer.number_of_received_rows == 58
+ assert not buffer.finish_capturing
+
+ differently_sized_framedata[-1] = EndData(58, EndReason.MANUALLY_STOPPED)
+
+ for data in differently_sized_framedata:
+ buffer.handle_data(data)
+
+ assert buffer.number_of_received_rows == 116
+ assert buffer.finish_capturing
+
+ assert len(frames_written_to_file) == 14
+ sum(
+ len(frame.data)
+ for frame in frames_written_to_file
+ if isinstance(frame, FrameData)
+ ) == 116
+
+
+def test_hdf_buffer_last_n(differently_sized_framedata, tmp_path):
+ filepath = str(tmp_path / "test_file.h5")
+ status_output = []
+ num_received_output = []
+ num_captured_output = []
+ frames_written_to_file = []
+ num_captured_output = []
+ num_captured_setter_pipeline = NumCapturedSetter(num_captured_output.append)
+ buffer = HDF5Buffer(
+ CaptureMode.LAST_N,
+ filepath,
+ 21,
+ status_output.append,
+ num_received_output.append,
+ num_captured_setter_pipeline,
+ )
+ buffer.put_data_to_file = frames_written_to_file.append
+
+ for data in differently_sized_framedata:
+ buffer.handle_data(data)
+
+ assert buffer.number_of_received_rows == 58
+ assert buffer.number_of_rows_in_circular_buffer == 21
+
+ expected_cut_off_data = deque(
+ [
+ FrameData(
+ numpy.array(
+ [
+ [0, 38, 38, 114, 0.370000056, 38, 76],
+ [8, 39, 39, 117, 0.380000056, 39, 78],
+ [8, 40, 40, 120, 0.390000056, 40, 80],
+ [8, 41, 41, 123, 0.400000056, 41, 82],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [8, 42, 42, 126, 0.410000056, 42, 84],
+ [8, 43, 43, 129, 0.420000056, 43, 86],
+ [8, 44, 44, 132, 0.430000056, 44, 88],
+ [8, 45, 45, 135, 0.440000056, 45, 90],
+ [8, 46, 46, 138, 0.450000056, 46, 92],
+ [0, 47, 47, 141, 0.460000056, 47, 94],
+ [8, 48, 48, 144, 0.470000056, 48, 96],
+ [8, 49, 49, 147, 0.480000056, 49, 98],
+ [8, 50, 50, 150, 0.490000056, 50, 100],
+ [8, 51, 51, 153, 0.500000056, 51, 102],
+ ]
+ )
+ ),
+ FrameData(
+ numpy.array(
+ [
+ [8, 52, 52, 156, 0.510000056, 52, 104],
+ [8, 53, 53, 159, 0.520000056, 53, 106],
+ [8, 54, 54, 162, 0.530000056, 54, 108],
+ [8, 55, 55, 165, 0.540000056, 55, 110],
+ [0, 56, 56, 168, 0.550000056, 56, 112],
+ [8, 57, 57, 171, 0.560000056, 57, 114],
+ [8, 58, 58, 174, 0.570000056, 58, 116],
+ ]
+ )
+ ),
+ ]
+ )
- m.close()
+ output_frames = [
+ frame_data
+ for frame_data in frames_written_to_file
+ if isinstance(frame_data, FrameData)
+ ]
+ for expected_frame, output_frame in zip(expected_cut_off_data, output_frames):
+ numpy.testing.assert_array_equal(expected_frame.data, output_frame.data)
+
+
+def test_hdf_buffer_last_n_large_data(tmp_path):
+ filepath = str(tmp_path / "test_file.h5")
+ status_output = []
+ num_received_output = []
+ num_captured_output = []
+ frames_written_to_file = []
+ num_captured_setter_pipeline = NumCapturedSetter(num_captured_output.append)
+ buffer = HDF5Buffer(
+ CaptureMode.LAST_N,
+ filepath,
+ 25000,
+ status_output.append,
+ num_received_output.append,
+ num_captured_setter_pipeline,
+ )
+ buffer.put_data_to_file = frames_written_to_file.append
- # Close capture, thus closing hdf5 file
- await caput(hdf5_test_prefix + ":Capture", 0, wait=True)
- assert await caget(hdf5_test_prefix + ":Capture") == 0
+ large_data = [
+ ReadyData(),
+ StartData([], 0, "Scaled", "Framed", 52),
+ FrameData(numpy.zeros((25000))),
+ FrameData(numpy.zeros((25000))),
+ FrameData(numpy.zeros((25000))),
+ FrameData(numpy.zeros((25000))),
+ FrameData(numpy.zeros((25000))),
+ FrameData(numpy.append(numpy.zeros((15000)), numpy.arange(1, 10001))),
+ EndData(150000, EndReason.OK),
+ ]
- # Confirm file contains data we expect
- hdf_file = h5py.File(tmp_path / test_filename[:-1], "r")
- assert list(hdf_file) == [
- "COUNTER1.OUT.Max",
- "COUNTER1.OUT.Mean",
- "COUNTER1.OUT.Min",
- "COUNTER2.OUT.Mean",
- "COUNTER3.OUT.Value",
- "PCAP.BITS2.Value",
- "PCAP.SAMPLES.Value",
- "PCAP.TS_START.Value",
+ for data in large_data:
+ buffer.handle_data(data)
+
+ assert buffer.number_of_received_rows == 150000
+ assert buffer.number_of_rows_in_circular_buffer == 25000
+
+ expected_output = [
+ StartData([], 0, "Scaled", "Framed", 52),
+ FrameData(numpy.append(numpy.zeros((15000)), numpy.arange(1, 10001))),
+ EndData(150000, EndReason.OK),
]
- assert len(hdf_file["/COUNTER1.OUT.Max"]) == num_capture
+ output_frames = [
+ frame_data
+ for frame_data in frames_written_to_file
+ if isinstance(frame_data, FrameData)
+ ]
+ assert len(output_frames) == 1
+ numpy.testing.assert_array_equal(output_frames[0].data, expected_output[1].data)
def test_hdf_parameter_validate_not_capturing(hdf5_controller: HDF5RecordController):
@@ -490,7 +849,7 @@ async def mock_data(scaled, flush_period):
yield item
# Set up all the mocks
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
return_value="Some/Filepath"
)
hdf5_controller._client.data = mock_data # type: ignore
@@ -510,8 +869,6 @@ async def mock_data(scaled, flush_period):
assert pipeline_mock[0].queue.put_nowait.call_count == 7
pipeline_mock[0].queue.put_nowait.assert_called_with(EndData(5, EndReason.OK))
- mock_stop_pipeline.assert_called_once()
-
@patch("pandablocks_ioc._hdf_ioc.stop_pipeline")
@patch("pandablocks_ioc._hdf_ioc.create_default_pipeline")
@@ -530,7 +887,7 @@ async def mock_data(scaled, flush_period):
yield item
# Set up all the mocks
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
return_value="Some/Filepath"
)
hdf5_controller._client.data = mock_data # type: ignore
@@ -547,12 +904,10 @@ async def mock_data(scaled, flush_period):
hdf5_controller._status_message_record.get()
== "Requested number of frames captured"
)
- # len 12 as ReadyData isn't pushed to pipeline, only Start and Frame data.
- assert pipeline_mock[0].queue.put_nowait.call_count == 12
+ # len 13 for 2 StartData, 10 FrameData and 1 EndData
+ assert pipeline_mock[0].queue.put_nowait.call_count == 13
pipeline_mock[0].queue.put_nowait.assert_called_with(EndData(10, EndReason.OK))
- mock_stop_pipeline.assert_called_once()
-
@patch("pandablocks_ioc._hdf_ioc.stop_pipeline")
@patch("pandablocks_ioc._hdf_ioc.create_default_pipeline")
@@ -599,7 +954,7 @@ async def mock_data(scaled, flush_period):
yield item
# Set up all the mocks
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
return_value="Some/Filepath"
)
hdf5_controller._client.data = mock_data # type: ignore
@@ -622,8 +977,6 @@ async def mock_data(scaled, flush_period):
EndData(1, EndReason.START_DATA_MISMATCH)
)
- mock_stop_pipeline.assert_called_once()
-
@patch("pandablocks_ioc._hdf_ioc.stop_pipeline")
@patch("pandablocks_ioc._hdf_ioc.create_default_pipeline")
@@ -661,7 +1014,7 @@ async def mock_data(scaled, flush_period):
raise CancelledError
# Set up all the mocks
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
return_value="Some/Filepath"
)
hdf5_controller._client.data = mock_data # type: ignore
@@ -675,9 +1028,9 @@ async def mock_data(scaled, flush_period):
assert hdf5_controller._status_message_record.get() == "Capturing disabled"
# len 2 - one StartData, one EndData
assert pipeline_mock[0].queue.put_nowait.call_count == 2
- pipeline_mock[0].queue.put_nowait.assert_called_with(EndData(0, EndReason.OK))
-
- mock_stop_pipeline.assert_called_once()
+ pipeline_mock[0].queue.put_nowait.assert_called_with(
+ EndData(0, EndReason.MANUALLY_STOPPED)
+ )
@patch("pandablocks_ioc._hdf_ioc.stop_pipeline")
@@ -716,7 +1069,7 @@ async def mock_data(scaled, flush_period):
raise Exception("Test exception")
# Set up all the mocks
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
return_value="Some/Filepath"
)
hdf5_controller._client.data = mock_data # type: ignore
@@ -737,8 +1090,6 @@ async def mock_data(scaled, flush_period):
EndData(0, EndReason.UNKNOWN_EXCEPTION)
)
- mock_stop_pipeline.assert_called_once()
-
async def test_capture_on_update(
hdf5_controller: HDF5RecordController,
@@ -781,13 +1132,13 @@ async def test_capture_on_update_cancel_unexpected_task(
task_mock.cancel.assert_called_once()
-def test_hdf_get_filename(
+def test_hdf_get_filepath(
hdf5_controller: HDF5RecordController,
):
- """Test _get_filename works when all records have valid values"""
+ """Test _get_filepath works when all records have valid values"""
- hdf5_controller._file_path_record = MagicMock()
- hdf5_controller._file_path_record.get = MagicMock( # type: ignore
+ hdf5_controller._directory_record = MagicMock()
+ hdf5_controller._directory_record.get = MagicMock( # type: ignore
return_value="/some/path"
)
@@ -796,14 +1147,14 @@ def test_hdf_get_filename(
return_value="some_filename"
)
- assert hdf5_controller._get_filename() == "/some/path/some_filename"
+ assert hdf5_controller._get_filepath() == "/some/path/some_filename"
def test_hdf_capture_validate_valid_filename(
hdf5_controller: HDF5RecordController,
):
"""Test _capture_validate passes when a valid filename is given"""
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
return_value="/valid/file.h5"
)
@@ -821,7 +1172,7 @@ def test_hdf_capture_validate_invalid_filename(
hdf5_controller: HDF5RecordController,
):
"""Test _capture_validate fails when filename cannot be created"""
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
side_effect=ValueError("Mocked value error")
)
@@ -832,7 +1183,7 @@ def test_hdf_capture_validate_exception(
hdf5_controller: HDF5RecordController,
):
"""Test _capture_validate fails due to other exceptions"""
- hdf5_controller._get_filename = MagicMock( # type: ignore
+ hdf5_controller._get_filepath = MagicMock( # type: ignore
side_effect=Exception("Mocked error")
)
diff --git a/tests/test_tables.py b/tests/test_tables.py
index adf105c7..b1fac61f 100644
--- a/tests/test_tables.py
+++ b/tests/test_tables.py
@@ -363,9 +363,9 @@ async def test_table_updater_update_mode_submit_exception_data_error(
assert isinstance(table_updater.client.send, AsyncMock)
table_updater.client.send.side_effect = Exception("Mocked exception")
- table_updater.all_values_dict[
- EpicsName(EPICS_FORMAT_TABLE_NAME)
- ] = InErrorException("Mocked in error exception")
+ table_updater.all_values_dict[EpicsName(EPICS_FORMAT_TABLE_NAME)] = (
+ InErrorException("Mocked in error exception")
+ )
await table_updater.update_mode(TableModeEnum.SUBMIT.value)