Skip to content

Commit

Permalink
Merge pull request #11 from jasondet/main
Browse files Browse the repository at this point in the history
Bug fix: prepare enough space for tracelists and improved error messages
  • Loading branch information
gipert authored Sep 16, 2023
2 parents d9c7f08 + daedbb9 commit a7becb6
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 47 deletions.
4 changes: 0 additions & 4 deletions src/daq2lh5/compass/compass_header_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lgdo

from ..data_decoder import DataDecoder
from ..raw_buffer import RawBuffer
from .compass_config_parser import compass_config_to_struct

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +72,3 @@ def make_lgdo(self, key: int = None, size: int = None) -> lgdo.Struct:
"self.config still None, need to decode header before calling make_lgdo"
)
return self.config # self.config is already an lgdo, namely it is a struct

def buffer_is_full(self, rb: RawBuffer) -> bool:
return rb.loc > 0
7 changes: 0 additions & 7 deletions src/daq2lh5/data_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import numpy as np
from lgdo import LH5Store

from .raw_buffer import RawBuffer

LGDO = Union[lgdo.Scalar, lgdo.Struct, lgdo.Array, lgdo.VectorOfVectors]


Expand Down Expand Up @@ -273,8 +271,3 @@ def get_max_rows_in_packet(self) -> int:
buffers.
"""
return 1

def buffer_is_full(self, rb: RawBuffer) -> bool:
"""Returns whether the buffer is too full to read in another packet."""

return len(rb.lgdo) - rb.loc < self.get_max_rows_in_packet()
6 changes: 6 additions & 0 deletions src/daq2lh5/data_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ def open_stream(
# use the first available key
key = rb.key_list[0] if len(rb.key_list) > 0 else None
rb.lgdo = decoder.make_lgdo(key=key, size=buffer_size)
rb.fill_safety = decoder.get_max_rows_in_packet()
if buffer_size < rb.fill_safety:
raise ValueError(
f"{dec_name} requires a buffer of at least length"
f"{rb.fill_safety} but buffer size is only {buffer_size}"
)

# make sure there were no entries in rb_lib that weren't among the
# decoders. If so, just emit a warning and continue.
Expand Down
4 changes: 0 additions & 4 deletions src/daq2lh5/fc/fc_config_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import numpy as np

from ..data_decoder import DataDecoder
from ..raw_buffer import RawBuffer

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,6 +59,3 @@ def decode_config(self, fcio: fcutils.fcio) -> lgdo.Struct:

def make_lgdo(self, key: int = None, size: int = None) -> lgdo.Struct:
return self.config

def buffer_is_full(self, rb: RawBuffer) -> bool:
return rb.loc > 0
27 changes: 12 additions & 15 deletions src/daq2lh5/fc/fc_event_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,6 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.skipped_channels = {}
self.fc_config = None
self.max_numtraces = 1

def get_key_lists(self) -> range:
return [range(self.fc_config["nadcs"].value)]

def get_decoded_values(self, channel: int = None) -> dict[str, dict[str, Any]]:
# FC uses the same values for all channels
return self.decoded_values

def set_file_config(self, fc_config: lgdo.Struct) -> None:
"""Access ``FCIOConfig`` members once when each file is opened.
Expand All @@ -116,6 +108,17 @@ def set_file_config(self, fc_config: lgdo.Struct) -> None:
"""
self.fc_config = fc_config
self.decoded_values["waveform"]["wf_len"] = self.fc_config["nsamples"].value
self.decoded_values["tracelist"]["length_guess"] = self.fc_config["nadcs"].value

def get_key_lists(self) -> range:
return [range(self.fc_config["nadcs"].value)]

def get_decoded_values(self, channel: int = None) -> dict[str, dict[str, Any]]:
# FC uses the same values for all channels
return self.decoded_values

def get_max_rows_in_packet(self) -> int:
return self.fc_config["nadcs"].value

def decode_packet(
self,
Expand Down Expand Up @@ -143,12 +146,6 @@ def decode_packet(
n_bytes
(estimated) number of bytes in the packet that was just decoded.
"""
if fcio.numtraces > self.max_numtraces:
self.max_numtraces = fcio.numtraces
# The buffer might be storing all channels' data, so set the
# fill_safety to the max number of traces we've seen so far.
for rb in evt_rbkd.values():
rb.fill_safety = self.max_numtraces
any_full = False

# a list of channels is read out simultaneously for each event
Expand Down Expand Up @@ -214,4 +211,4 @@ def decode_packet(
evt_rbkd[iwf].loc += 1
any_full |= evt_rbkd[iwf].is_full()

return any_full
return bool(any_full)
3 changes: 2 additions & 1 deletion src/daq2lh5/fc/fc_status_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import fcutils

from ..data_decoder import DataDecoder, RawBuffer
from ..data_decoder import DataDecoder
from ..raw_buffer import RawBuffer


class FCStatusDecoder(DataDecoder):
Expand Down
5 changes: 1 addition & 4 deletions src/daq2lh5/orca/orca_header_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import lgdo

from ..raw_buffer import RawBuffer, RawBufferLibrary
from ..raw_buffer import RawBufferLibrary
from .orca_base import OrcaDecoder
from .orca_header import OrcaHeader
from .orca_packet import OrcaPacket
Expand All @@ -26,9 +26,6 @@ def __init__(self, header: OrcaHeader = None, **kwargs) -> None:
def make_lgdo(self, key: int = None, size: int = None) -> lgdo.Scalar:
return lgdo.Scalar(value="")

def buffer_is_full(self, rb: RawBuffer) -> bool:
return rb.loc > 0

def decode_packet(
self, packet: OrcaPacket, packet_id: int, rbl: RawBufferLibrary = None
) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def pytest_sessionfinish(session, exitstatus):
@pytest.fixture(scope="session")
def lgnd_test_data():
ldata = LegendTestData()
ldata.checkout("c089a59")
ldata.checkout("ecb370e")
return ldata


Expand Down
8 changes: 5 additions & 3 deletions tests/fc/test_fc_event_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ def event_rbkd(fcio_obj, fcio_config):
decoder = FCEventDecoder()
decoder.set_file_config(fcio_config)

# get just one record (because size=1, see below) and check if it's a (sparse)event
# get just one record and check if it's a (sparse) event
assert fcio_obj.get_record() == 3 or fcio_obj.get_record() == 6

# build raw buffer for each channel in the FC trace list
rbkd = {}
for i in fcio_obj.tracelist:
rbkd[i] = RawBuffer(lgdo=decoder.make_lgdo(size=1))
nadcs = decoder.get_max_rows_in_packet()
rbkd[i] = RawBuffer(lgdo=decoder.make_lgdo(size=nadcs))
rbkd[i].fill_safety = nadcs

# decode packet into the lgdo's and check if the buffer is full
assert decoder.decode_packet(fcio=fcio_obj, evt_rbkd=rbkd, packet_id=69) is True
Expand Down Expand Up @@ -74,7 +76,7 @@ def test_values(event_rbkd, fcio_obj):
loc = event_rbkd[ch].loc - 1
tbl = event_rbkd[ch].lgdo

assert event_rbkd[ch].fill_safety == fc.numtraces
assert event_rbkd[ch].fill_safety >= fc.numtraces

assert tbl["packet_id"].nda[loc] == 69
assert tbl["eventnumber"].nda[loc] == fc.eventnumber
Expand Down
39 changes: 31 additions & 8 deletions tests/test_build_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ def test_build_raw_fc(lgnd_test_data, tmptestdir):
overwrite=True,
)

out_file = lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.lh5")
assert lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.lh5") != ""

with pytest.raises(FileExistsError):
build_raw(
in_stream=lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.fcio")
)
os.remove(out_file)

out_file = f"{tmptestdir}/L200-comm-20211130-phy-spms.lh5"

build_raw(
Expand All @@ -37,7 +44,30 @@ def test_build_raw_fc(lgnd_test_data, tmptestdir):
overwrite=True,
)

assert os.path.exists(f"{tmptestdir}/L200-comm-20211130-phy-spms.lh5")
assert os.path.exists(out_file)


def test_build_raw_fc_ghissue10(lgnd_test_data, tmptestdir):
out_file = f"{tmptestdir}/l200-p06-r007-cal-20230725T202227Z.lh5"
build_raw(
in_stream=lgnd_test_data.get_path(
"fcio/l200-p06-r007-cal-20230725T202227Z.fcio"
),
out_spec=out_file,
buffer_size=123,
overwrite=True,
)

assert os.path.exists(out_file)


def test_invalid_user_buffer_size(lgnd_test_data, tmptestdir):
with pytest.raises(ValueError):
build_raw(
in_stream=lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.fcio"),
buffer_size=5,
overwrite=True,
)


def test_build_raw_fc_out_spec(lgnd_test_data, tmptestdir):
Expand Down Expand Up @@ -216,13 +246,6 @@ def test_build_raw_compass_out_spec_no_config(lgnd_test_data, tmptestdir):
assert (lh5_obj["channel"].nda == [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]).all()


def test_build_raw_overwrite(lgnd_test_data):
with pytest.raises(FileExistsError):
build_raw(
in_stream=lgnd_test_data.get_path("fcio/L200-comm-20211130-phy-spms.fcio")
)


def test_build_raw_orca_sis3316(lgnd_test_data, tmptestdir):
out_file = f"{tmptestdir}/coherent-run1141-bkg.lh5"
out_spec = {
Expand Down

0 comments on commit a7becb6

Please sign in to comment.