Skip to content

Commit

Permalink
Merge pull request #443 from SamuelBorden/data_trim
Browse files Browse the repository at this point in the history
Created a temporary table for buffer_processor to write to file during build_raw
  • Loading branch information
jasondet authored Jan 20, 2023
2 parents 59882c9 + 685c6ba commit 0fd86aa
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 57 deletions.
91 changes: 55 additions & 36 deletions src/pygama/raw/buffer_processor/buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
import pygama.lgdo as lgdo
from pygama.dsp.errors import ProcessingChainError
from pygama.dsp.processing_chain import build_processing_chain as bpc
from pygama.lgdo import Array, ArrayOfEqualSizedArrays
from pygama.lgdo import Array, ArrayOfEqualSizedArrays, Table

if TYPE_CHECKING:
from pygama.raw.raw_buffer import RawBuffer

log = logging.getLogger(__name__)


def buffer_processor(rb: RawBuffer) -> None:
def buffer_processor(rb: RawBuffer) -> Table:
r"""
Takes in a :class:`.RawBuffer`, performs any processes specified in the :class:`.RawBuffer`'s ``proc_spec``
attribute. Currently implemented attributes:
attribute, and returns a :class:`pygama.lgdo.Table` with the processed buffer. This `tmp_table` shares columns with the :class:`.RawBuffer`'s lgdo,
so no data is copied. Currently implemented attributes:
- "window" (list): [in_name, start_index, stop_index, out_name]
Windows objects with a name specified by the first argument passed in the ``proc_spec``, the window start and stop indices are the next two
Expand All @@ -42,7 +43,9 @@ def buffer_processor(rb: RawBuffer) -> None:
Notes
-----
The original "waveforms" column in the table is deleted if requested! All updates to the rb.lgdo are done in place
The original "waveforms" column in the table is not written to file if request! All updates are done on the
`tmp_table`, which shares the fields with `rb.lgdo` and are done in place. The `tmp_table` is necessary so that
the `rb.lgdo` keeps arrays needed by the table in the buffer.
An example ``proc_spec`` in an :mod:`pygama.raw.build_raw` ``out_spec`` is below
.. code-block:: json
Expand Down Expand Up @@ -89,37 +92,39 @@ def buffer_processor(rb: RawBuffer) -> None:
"""
# Check that there is a valid object to process
if isinstance(rb.lgdo, lgdo.Table) or isinstance(rb.lgdo, lgdo.Struct):
# Create the temporary table that will be written to file
rb_table_size = rb.lgdo.size
tmp_table = Table(size=rb_table_size)
tmp_table.join(other_table=rb.lgdo)

# This is needed if there is a "*" key expansion for decoders in build_raw
if isinstance(rb.lgdo, lgdo.Scalar):
log.info("Cannot process rb.lgdo of type Scalar")
return None
# In the worst case, just return an unprocessed rb.lgdo
else:
log.info(f"Cannot process buffer with an lgdo of type {type(rb.lgdo)}")
tmp_table = rb.lgdo
return tmp_table

# Perform windowing, if requested
if "window" in rb.proc_spec.keys():
process_window(rb)
process_window(rb, tmp_table)

# Read in and perform the DSP routine
if "dsp_config" in rb.proc_spec.keys():
process_dsp(rb)
process_dsp(rb, tmp_table)

# Cast as requested dtype before writing to the table
if "dtype_conv" in rb.proc_spec.keys():
process_dtype_conv(rb)
process_dtype_conv(rb, tmp_table)

# Drop any requested columns from the table
if "drop" in rb.proc_spec.keys():
for drop_keys in rb.proc_spec["drop"]:
try:
rb.lgdo.pop(drop_keys)
rb.lgdo.update_datatype()
except KeyError:
log.info(f"Cannot remove field {drop_keys} from rb.lgdo")
return None
process_drop(rb, tmp_table)

return None
return tmp_table


def process_window(rb: RawBuffer) -> None:
def process_window(rb: RawBuffer, tmp_table: Table) -> None:
r"""
Windows arrays of equal sized arrays according to specifications
given in the rb.proc_spec "window" key.
Expand All @@ -135,6 +140,8 @@ def process_window(rb: RawBuffer) -> None:
----------
rb
A :class:`.RawBuffer` to be processed
tmp_table
A :class:`pygama.lgdo.Table` that shares columns with the `rb.lgdo`
Notes
-----
Expand All @@ -160,7 +167,7 @@ def process_window(rb: RawBuffer) -> None:
)

# Window the waveform values
array_of_arrays = rb.lgdo[window_in_name].values
array_of_arrays = tmp_table[window_in_name].values
windowed_array_of_arrays = window_array_of_arrays(
array_of_arrays, window_start_idx, window_end_idx
)
Expand All @@ -170,30 +177,30 @@ def process_window(rb: RawBuffer) -> None:
t0=t0s, dt=rb.lgdo[window_in_name].dt, values=windowed_array_of_arrays
)

# add this wf_table to the original table
rb.lgdo.add_field(window_out_name, wf_table, use_obj_size=True)
# add this wf_table to the temporary table
tmp_table.add_field(window_out_name, wf_table, use_obj_size=True)

# otherwise, it's (hopefully) just an array of equal sized arrays
else:
array_of_arrays = rb.lgdo[window_in_name]
array_of_arrays = tmp_table[window_in_name]
windowed_array_of_arrays = window_array_of_arrays(
array_of_arrays, window_start_idx, window_end_idx
)
rb.lgdo.add_field(
tmp_table.add_field(
window_out_name, windowed_array_of_arrays, use_obj_size=True
)

return None

# otherwise, rb.lgdo is some other type and we only process it if the rb.out_name is the same as window_in_name
elif rb.out_name == window_in_name:
array_of_arrays = rb.lgdo
array_of_arrays = tmp_table
windowed_array_of_arrays = window_array_of_arrays(
array_of_arrays, window_start_idx, window_end_idx
)

rb.out_name = window_out_name
rb.lgdo = windowed_array_of_arrays
tmp_table = windowed_array_of_arrays

return None

Expand Down Expand Up @@ -233,11 +240,18 @@ def process_windowed_t0(t0s: Array, dts: Array, start_index: int) -> Array:
return copy_t0s


def process_dsp(rb: RawBuffer) -> None:
def process_dsp(rb: RawBuffer, tmp_table: Table) -> None:
r"""
Run a provided DSP config from rb.proc_spec using build_processing_chain, and add specified outputs to the
rb.lgdo.
Parameters
----------
rb
A :class:`.RawBuffer` that contains a `proc_spec` and an `lgdo` attribute
tmp_table
A :class:`pygama.lgdo.Table` that is temporarily created to be written to the raw file
Notes
-----
rb.lgdo is assumed to be an lgdo.Table so that multiple DSP processor outputs can be written to it
Expand All @@ -261,19 +275,15 @@ def process_dsp(rb: RawBuffer) -> None:
# If the processor returns a waveform, create a new waveform table and add it to the original lgdo table
for proc in dsp_out.keys():
# # Check what DSP routine the processors output is from, and manipulate accordingly
# for dsp_proc in dsp_dict["processors"].keys():
# if proc in dsp_proc:
# # In the case of presumming, change the dts
# if dsp_dict["processors"][dsp_proc]["function"] == "presum":
# new_obj = process_presum(rb, dsp_out[proc], dsp_dict, proc)
rb.lgdo.add_field(proc, dsp_out[proc], use_obj_size=True)
tmp_table.add_field(proc, dsp_out[proc], use_obj_size=True)

return None


def process_dtype_conv(rb: RawBuffer) -> None:
def process_dtype_conv(rb: RawBuffer, tmp_table: Table) -> None:
"""
Change the types of fields in an rb.lgdo according to the values specified in the ``proc_spec``'s ``dtype_conv`` list
Change the types of fields in an rb.lgdo according to the values specified in the ``proc_spec``'s ``dtype_conv`` list.
It operates in place on `tmp_table`.
Notes
-----
Expand All @@ -283,7 +293,7 @@ def process_dtype_conv(rb: RawBuffer) -> None:
for return_name in type_list.keys():
# Take care of nested tables with a for loop
path = return_name.split("/")
return_value = rb.lgdo
return_value = tmp_table
for key in path:
try:
return_value = return_value[key]
Expand All @@ -299,3 +309,12 @@ def process_dtype_conv(rb: RawBuffer) -> None:
raise TypeError(f"Cannot recast an object of type {type(return_value)}")

return None


def process_drop(rb: RawBuffer, tmp_table: Table) -> None:
for drop_keys in rb.proc_spec["drop"]:
try:
tmp_table.remove_column(drop_keys, delete=False)
except KeyError:
log.info(f"Cannot remove field {drop_keys} from rb.lgdo")
return None
14 changes: 7 additions & 7 deletions src/pygama/raw/buffer_processor/lh5_buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def lh5_buffer_processor(
if isinstance(out_spec, dict):
RawBufferLibrary(json_dict=out_spec)

# Write everything in the raw file to the new file, process appropriately
# Write everything in the raw file to the new file, check for proc_spec under either the group name, out_name, or the name
for tb in lh5_tables:
lgdo_obj, _ = raw_store.read_object(f"{tb}", lh5_file)

Expand All @@ -131,9 +131,9 @@ def lh5_buffer_processor(
out_name=out_name,
proc_spec=out_spec[decoder_name][group_name]["proc_spec"],
)
buffer_processor(rb)
tmp_table = buffer_processor(rb)
# Update the lgdo_obj to be written to the processed file
lgdo_obj = rb.lgdo
lgdo_obj = tmp_table
else:
pass

Expand All @@ -147,9 +147,9 @@ def lh5_buffer_processor(
out_name=out_name,
proc_spec=out_spec[decoder_name][out_name]["proc_spec"],
)
buffer_processor(rb)
tmp_table = buffer_processor(rb)
# Update the lgdo_obj to be written to the processed file
lgdo_obj = rb.lgdo
lgdo_obj = tmp_table
else:
pass

Expand Down Expand Up @@ -183,9 +183,9 @@ def lh5_buffer_processor(
list(out_spec[decoder_name].keys())[0]
]["proc_spec"],
)
buffer_processor(rb)
tmp_table = buffer_processor(rb)
# Update the lgdo_obj to be written to the processed file
lgdo_obj = rb.lgdo
lgdo_obj = tmp_table
else:
pass
else:
Expand Down
6 changes: 4 additions & 2 deletions src/pygama/raw/raw_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,14 @@ def write_to_lh5_and_clear(
# If a proc_spec if present for this RawBuffer, process that data and then write to the file!
if rb.proc_spec is not None:
# Perform the processing as requested in the `proc_spec` from `out_spec` in build_raw
buffer_processor(rb)
lgdo_to_write = buffer_processor(rb)
else:
lgdo_to_write = rb.lgdo

# write if requested...
if filename != "":
lh5_store.write_object(
rb.lgdo,
lgdo_to_write,
rb.out_name,
filename,
group=group,
Expand Down
Loading

0 comments on commit 0fd86aa

Please sign in to comment.