Skip to content

Commit

Permalink
Add new :DATASET pv to capture records and pass the name to the cli…
Browse files Browse the repository at this point in the history
…ent (#118)

added new `:DATASET` record to capture records, and `DATA:DATASETS`
  • Loading branch information
evalott100 authored Jun 14, 2024
1 parent 8e25d64 commit efc3822
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 43 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
"click",
"h5py",
"softioc>=4.4.0",
"pandablocks~=0.8.0",
"pandablocks~=0.9.0",
"pvi~=0.7.0",
"typing-extensions;python_version<'3.8'",
] # Add project dependencies here, e.g. ["click", "numpy"]
Expand Down
129 changes: 99 additions & 30 deletions src/pandablocks_ioc/_hdf_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import os
from asyncio import CancelledError
from collections import deque
from dataclasses import dataclass
from enum import Enum
from importlib.util import find_spec
from pathlib import Path
from typing import Callable, Deque, Optional, Union
from typing import Callable, Deque, Dict, Optional, Union

from pandablocks.asyncio import AsyncioClient
from pandablocks.hdf import (
Expand All @@ -22,6 +23,7 @@
from softioc.pythonSoftIoc import RecordWrapper

from ._pvi import PviGroup, add_automatic_pvi_info, add_data_capture_pvi_info
from ._tables import ReadOnlyPvaTable
from ._types import ONAM_STR, ZNAM_STR, EpicsName

HDFReceived = Union[ReadyData, StartData, FrameData, EndData]
Expand Down Expand Up @@ -70,6 +72,7 @@ def __init__(
status_message_setter: Callable,
number_received_setter: Callable,
number_captured_setter_pipeline: NumCapturedSetter,
dataset_name_cache: Dict[str, Dict[str, str]],
):
# Only one filename - user must stop capture and set new FileName/FilePath
# for new files
Expand All @@ -94,6 +97,8 @@ def __init__(
self.number_captured_setter_pipeline = number_captured_setter_pipeline
self.number_captured_setter_pipeline.number_captured_setter(0)

self.dataset_name_cache = dataset_name_cache

if (
self.capture_mode == CaptureMode.LAST_N
and self.number_of_rows_to_capture <= 0
Expand All @@ -114,7 +119,9 @@ def put_data_to_file(self, data: HDFReceived):

def start_pipeline(self):
self.pipeline = create_default_pipeline(
iter([self.filepath]), self.number_captured_setter_pipeline
iter([self.filepath]),
self.dataset_name_cache,
self.number_captured_setter_pipeline,
)

def _handle_StartData(self, data: StartData):
Expand Down Expand Up @@ -304,10 +311,59 @@ def handle_data(self, data: HDFReceived):
)


@dataclass
class Dataset:
name: str
capture: str


class DatasetNameCache:
def __init__(self, datasets: Dict[str, Dataset], datasets_record_name: EpicsName):
self.datasets = datasets

self._datasets_table_record = ReadOnlyPvaTable(
datasets_record_name, ["Name", "Type"]
)
self._datasets_table_record.set_rows(
["Name", "Type"], [[], []], length=300, default_data_type=str
)

def hdf_writer_names(self):
"""Formats the current dataset names for use in the HDFWriter"""

hdf_names: Dict[str, Dict[str, str]] = {}
for record_name, dataset in self.datasets.items():
if not dataset.name or dataset.capture == "No":
continue

field_name = record_name.replace(":", ".")

hdf_names[field_name] = hdf_name = {}

hdf_name[dataset.capture.split(" ")[-1]] = dataset.name
# Suffix -min and -max if both are present
if "Min Max" in dataset.capture:
hdf_name["Min"] = f"{dataset.name}-min"
hdf_name["Max"] = f"{dataset.name}-max"
return hdf_names

def update_datasets_record(self):
dataset_name_list = [
dataset.name
for dataset in self.datasets.values()
if dataset.name and dataset.capture != "No"
]
self._datasets_table_record.update_row("Name", dataset_name_list)
self._datasets_table_record.update_row(
"Type",
["float64"] * len(dataset_name_list),
)


class HDF5RecordController:
"""Class to create and control the records that handle HDF5 processing"""

_DATA_PREFIX = "DATA"
DATA_PREFIX = "DATA"

_client: AsyncioClient

Expand All @@ -325,19 +381,28 @@ class HDF5RecordController:

_handle_hdf5_data_task: Optional[asyncio.Task] = None

def __init__(self, client: AsyncioClient, record_prefix: str):
def __init__(
self,
client: AsyncioClient,
dataset_cache: Dict[str, Dataset],
record_prefix: str,
):
if find_spec("h5py") is None:
logging.warning("No HDF5 support detected - skipping creating HDF5 records")
return

self._client = client
_datasets_record_name = EpicsName(
HDF5RecordController.DATA_PREFIX + ":DATASETS"
)
self._datasets = DatasetNameCache(dataset_cache, _datasets_record_name)

path_length = os.pathconf("/", "PC_PATH_MAX")
filename_length = os.pathconf("/", "PC_NAME_MAX")

# Create the records, including an uppercase alias for each
# Naming convention and settings (mostly) copied from FSCN2 HDF5 records
directory_record_name = EpicsName(self._DATA_PREFIX + ":HDF_DIRECTORY")
directory_record_name = EpicsName(self.DATA_PREFIX + ":HDF_DIRECTORY")
self._directory_record = builder.longStringOut(
directory_record_name,
length=path_length,
Expand All @@ -353,10 +418,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.longStringOut,
)
self._directory_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":HDFDirectory"
record_prefix + ":" + self.DATA_PREFIX + ":HDFDirectory"
)

create_directory_record_name = EpicsName(self._DATA_PREFIX + ":CreateDirectory")
create_directory_record_name = EpicsName(self.DATA_PREFIX + ":CreateDirectory")
self._create_directory_record = builder.longOut(
create_directory_record_name,
initial_value=0,
Expand All @@ -372,7 +437,7 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
record_prefix + ":" + create_directory_record_name.upper()
)

directory_exists_name = EpicsName(self._DATA_PREFIX + ":DirectoryExists")
directory_exists_name = EpicsName(self.DATA_PREFIX + ":DirectoryExists")
self._directory_exists_record = builder.boolIn(
directory_exists_name,
ZNAM="No",
Expand All @@ -390,7 +455,7 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
record_prefix + ":" + directory_exists_name.upper()
)

file_name_record_name = EpicsName(self._DATA_PREFIX + ":HDF_FILE_NAME")
file_name_record_name = EpicsName(self.DATA_PREFIX + ":HDF_FILE_NAME")
self._file_name_record = builder.longStringOut(
file_name_record_name,
length=filename_length,
Expand All @@ -405,12 +470,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.longStringOut,
)
self._file_name_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":HDFFileName"
record_prefix + ":" + self.DATA_PREFIX + ":HDFFileName"
)

full_file_path_record_name = EpicsName(
self._DATA_PREFIX + ":HDF_FULL_FILE_PATH"
)
full_file_path_record_name = EpicsName(self.DATA_PREFIX + ":HDF_FULL_FILE_PATH")
self._full_file_path_record = builder.longStringIn(
full_file_path_record_name,
length=path_length + 1 + filename_length,
Expand All @@ -423,10 +486,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.longStringIn,
)
self._full_file_path_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":HDFFullFilePath"
record_prefix + ":" + self.DATA_PREFIX + ":HDFFullFilePath"
)

num_capture_record_name = EpicsName(self._DATA_PREFIX + ":NUM_CAPTURE")
num_capture_record_name = EpicsName(self.DATA_PREFIX + ":NUM_CAPTURE")
self._num_capture_record = builder.longOut(
num_capture_record_name,
initial_value=0, # Infinite capture
Expand All @@ -442,10 +505,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
)
# No validate - users are allowed to change this at any time
self._num_capture_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":NumCapture"
record_prefix + ":" + self.DATA_PREFIX + ":NumCapture"
)

num_captured_record_name = EpicsName(self._DATA_PREFIX + ":NUM_CAPTURED")
num_captured_record_name = EpicsName(self.DATA_PREFIX + ":NUM_CAPTURED")
self._num_captured_record = builder.longIn(
num_captured_record_name,
initial_value=0,
Expand All @@ -459,10 +522,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.longIn,
)
self._num_captured_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":NumCaptured"
record_prefix + ":" + self.DATA_PREFIX + ":NumCaptured"
)

num_received_record_name = EpicsName(self._DATA_PREFIX + ":NUM_RECEIVED")
num_received_record_name = EpicsName(self.DATA_PREFIX + ":NUM_RECEIVED")
self._num_received_record = builder.longIn(
num_received_record_name,
initial_value=0,
Expand All @@ -476,10 +539,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.longIn,
)
self._num_received_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":NumReceived"
record_prefix + ":" + self.DATA_PREFIX + ":NumReceived"
)

flush_period_record_name = EpicsName(self._DATA_PREFIX + ":FLUSH_PERIOD")
flush_period_record_name = EpicsName(self.DATA_PREFIX + ":FLUSH_PERIOD")
self._flush_period_record = builder.aOut(
flush_period_record_name,
initial_value=1.0,
Expand All @@ -493,10 +556,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.aOut,
)
self._flush_period_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":FlushPeriod"
record_prefix + ":" + self.DATA_PREFIX + ":FlushPeriod"
)

capture_control_record_name = EpicsName(self._DATA_PREFIX + ":CAPTURE")
capture_control_record_name = EpicsName(self.DATA_PREFIX + ":CAPTURE")
self._capture_control_record = builder.boolOut(
capture_control_record_name,
ZNAM=ZNAM_STR,
Expand All @@ -511,10 +574,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
self._capture_control_record,
)
self._capture_control_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":Capture"
record_prefix + ":" + self.DATA_PREFIX + ":Capture"
)

capture_mode_record_name = EpicsName(self._DATA_PREFIX + ":CAPTURE_MODE")
capture_mode_record_name = EpicsName(self.DATA_PREFIX + ":CAPTURE_MODE")
self._capture_mode_record = builder.mbbOut(
capture_mode_record_name,
*[capture_mode.name for capture_mode in CaptureMode],
Expand All @@ -528,10 +591,10 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.mbbOut,
)
self._capture_mode_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":CaptureMode"
record_prefix + ":" + self.DATA_PREFIX + ":CaptureMode"
)

status_message_record_name = EpicsName(self._DATA_PREFIX + ":STATUS")
status_message_record_name = EpicsName(self.DATA_PREFIX + ":STATUS")
self._status_message_record = builder.longStringIn(
status_message_record_name,
initial_value="OK",
Expand All @@ -545,7 +608,7 @@ def __init__(self, client: AsyncioClient, record_prefix: str):
builder.stringIn,
)
self._status_message_record.add_alias(
record_prefix + ":" + self._DATA_PREFIX + ":Status"
record_prefix + ":" + self.DATA_PREFIX + ":Status"
)

def _parameter_validate(self, record: RecordWrapper, new_val) -> bool:
Expand Down Expand Up @@ -615,10 +678,10 @@ async def _update_directory_path(self, new_val) -> None:
self._directory_exists_record.set(0)

if self._directory_exists_record.get() == 0:
sevr = alarm.MAJOR_ALARM, alrm = alarm.STATE_ALARM
sevr, alrm = alarm.MAJOR_ALARM, alarm.STATE_ALARM
logging.error(status_msg)
else:
sevr = alarm.NO_ALARM, alrm = alarm.NO_ALARM
sevr, alrm = alarm.NO_ALARM, alarm.NO_ALARM
logging.debug(status_msg)

self._status_message_record.set(status_msg, severity=sevr, alarm=alrm)
Expand Down Expand Up @@ -647,13 +710,19 @@ async def _handle_hdf5_data(self) -> None:
number_captured_setter_pipeline = NumCapturedSetter(
self._num_captured_record.set
)

# Update `DATA:DATASETS` to match the names of the datasets
# in the HDF5 file
self._datasets.update_datasets_record()

buffer = HDF5Buffer(
capture_mode,
filepath,
num_capture,
self._status_message_record.set,
self._num_received_record.set,
number_captured_setter_pipeline,
self._datasets.hdf_writer_names(),
)
flush_period: float = self._flush_period_record.get()
async for data in self._client.data(
Expand Down
9 changes: 8 additions & 1 deletion src/pandablocks_ioc/_pvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def add_automatic_pvi_info(
_positions_table_group = Group(
name="PositionsTable", layout=Grid(labelled=True), children=[]
)
_positions_table_headers = ["VALUE", "UNITS", "SCALE", "OFFSET", "CAPTURE"]
_positions_table_headers = ["VALUE", "UNITS", "SCALE", "OFFSET", "DATASET", "CAPTURE"]


# TODO: Replicate this for the BITS table
Expand All @@ -174,6 +174,7 @@ def add_positions_table_row(
units_record_name: EpicsName,
scale_record_name: EpicsName,
offset_record_name: EpicsName,
dataset_record_name: EpicsName,
capture_record_name: EpicsName,
) -> None:
"""Add a Row to the Positions table"""
Expand Down Expand Up @@ -205,6 +206,12 @@ def add_positions_table_row(
pv=offset_record_name,
widget=TextWrite(),
),
SignalRW(
name=epics_to_pvi_name(dataset_record_name),
label=dataset_record_name,
pv=dataset_record_name,
widget=TextWrite(),
),
SignalRW(
name=epics_to_pvi_name(capture_record_name),
label=capture_record_name,
Expand Down
Loading

0 comments on commit efc3822

Please sign in to comment.