diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 95dba9133..e5e817249 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -12,6 +12,9 @@ concurrency: group: ${{ github.workflow }}-${{ github.ref }} cancel-in-progress: true +env: + TQDM_MININTERVAL: 10 + jobs: build-and-test: diff --git a/src/pygama/evt/__init__.py b/src/pygama/evt/__init__.py index 8257a98e3..80b544455 100644 --- a/src/pygama/evt/__init__.py +++ b/src/pygama/evt/__init__.py @@ -2,7 +2,8 @@ Utilities for grouping hit data into events. """ +from .build_evt import build_evt from .build_tcm import build_tcm from .tcm import generate_tcm_cols -__all__ = ["build_tcm", "generate_tcm_cols"] +__all__ = ["build_tcm", "generate_tcm_cols", "build_evt"] diff --git a/src/pygama/evt/aggregators.py b/src/pygama/evt/aggregators.py new file mode 100644 index 000000000..993c0ffe6 --- /dev/null +++ b/src/pygama/evt/aggregators.py @@ -0,0 +1,656 @@ +""" +This module provides aggregators to build the `evt` tier. +""" + +from __future__ import annotations + +import awkward as ak +import numpy as np +from lgdo import Array, ArrayOfEqualSizedArrays, VectorOfVectors, lh5 +from lgdo.lh5 import LH5Store +from numpy.typing import NDArray + +from . import utils + + +def evaluate_to_first_or_last( + idx: NDArray, + ids: NDArray, + f_hit: str, + f_dsp: str, + chns: list, + chns_rm: list, + expr: str, + exprl: list, + qry: str | NDArray, + nrows: int, + sorter: tuple, + var_ph: dict = None, + defv: bool | int | float = np.nan, + is_first: bool = True, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> Array: + """Aggregates across channels by returning the expression of the channel + with value of `sorter`. + + Parameters + ---------- + idx + `tcm` index array. + ids + `tcm` id array. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + chns + list of channels to be aggregated. + chns_rm + list of channels to be skipped from evaluation and set to default value. + expr + expression string to be evaluated. + exprl + list of `dsp/hit/evt` parameter tuples in expression ``(tier, field)``. + qry + query expression to mask aggregation. + nrows + length of output array. + sorter + tuple of field in `hit/dsp/evt` tier to evaluate ``(tier, field)``. + var_ph + dictionary of `evt` and additional parameters and their values. + defv + default value. + is_first + defines if sorted by smallest or largest value of `sorter` + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + evt_group + LH5 root group in `evt` file. + """ + + # define dimension of output array + out = np.full(nrows, defv, dtype=type(defv)) + outt = np.zeros(len(out)) + + store = LH5Store() + + for ch in chns: + # get index list for this channel to be loaded + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + + # evaluate at channel + res = utils.get_data_at_channel( + ch=ch, + ids=ids, + idx=idx, + expr=expr, + exprl=exprl, + var_ph=var_ph, + is_evaluated=ch not in chns_rm, + f_hit=f_hit, + f_dsp=f_dsp, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + # get mask from query + limarr = utils.get_mask_from_query( + qry=qry, + length=len(res), + ch=ch, + idx_ch=idx_ch, + f_hit=f_hit, + f_dsp=f_dsp, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + # find if sorter is in hit or dsp + t0 = store.read( + f"{ch}/{sorter[0]}/{sorter[1]}", + f_hit if f"{hit_group}" == sorter[0] else f_dsp, + idx=idx_ch, + )[0].view_as("np") + + if t0.ndim > 1: + raise ValueError(f"sorter '{sorter[0]}/{sorter[1]}' must be a 1D array") + + if is_first: + if ch == chns[0]: + outt[:] = np.inf + + out[idx_ch] = np.where((t0 < outt) & (limarr), res, out[idx_ch]) + outt[idx_ch] = np.where((t0 < outt) & (limarr), t0, outt[idx_ch]) + + else: + out[idx_ch] = np.where((t0 > outt) & (limarr), res, out[idx_ch]) + outt[idx_ch] = np.where((t0 > outt) & (limarr), t0, outt[idx_ch]) + + return Array(nda=out) + + +def evaluate_to_scalar( + mode: str, + idx: NDArray, + ids: NDArray, + f_hit: str, + f_dsp: str, + chns: list, + chns_rm: list, + expr: str, + exprl: list, + qry: str | NDArray, + nrows: int, + var_ph: dict = None, + defv: bool | int | float = np.nan, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> Array: + """Aggregates by summation across channels. + + Parameters + ---------- + mode + aggregation mode. + idx + `tcm` index array. + ids + `tcm` id array. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + chns + list of channels to be aggregated. + chns_rm + list of channels to be skipped from evaluation and set to default value. + expr + expression string to be evaluated. + exprl + list of `dsp/hit/evt` parameter tuples in expression ``(tier, field)``. + qry + query expression to mask aggregation. + nrows + length of output array + var_ph + dictionary of `evt` and additional parameters and their values. + defv + default value. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + evt_group + LH5 root group in `evt` file. + """ + + # define dimension of output array + out = np.full(nrows, defv, dtype=type(defv)) + + for ch in chns: + # get index list for this channel to be loaded + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + + res = utils.get_data_at_channel( + ch=ch, + ids=ids, + idx=idx, + expr=expr, + exprl=exprl, + var_ph=var_ph, + is_evaluated=ch not in chns_rm, + f_hit=f_hit, + f_dsp=f_dsp, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + # get mask from query + limarr = utils.get_mask_from_query( + qry=qry, + length=len(res), + ch=ch, + idx_ch=idx_ch, + f_hit=f_hit, + f_dsp=f_dsp, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + # switch through modes + if "sum" == mode: + if res.dtype == bool: + res = res.astype(int) + out[idx_ch] = np.where(limarr, res + out[idx_ch], out[idx_ch]) + if "any" == mode: + if res.dtype != bool: + res = res.astype(bool) + out[idx_ch] = out[idx_ch] | (res & limarr) + if "all" == mode: + if res.dtype != bool: + res = res.astype(bool) + out[idx_ch] = out[idx_ch] & res & limarr + + return Array(nda=out) + + +def evaluate_at_channel( + idx: NDArray, + ids: NDArray, + f_hit: str, + f_dsp: str, + chns_rm: list, + expr: str, + exprl: list, + ch_comp: Array, + var_ph: dict = None, + defv: bool | int | float = np.nan, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> Array: + """Aggregates by evaluating the expression at a given channel. + + Parameters + ---------- + idx + `tcm` index array. + ids + `tcm` id array. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + chns_rm + list of channels to be skipped from evaluation and set to default value. + expr + expression string to be evaluated. + exprl + list of `dsp/hit/evt` parameter tuples in expression ``(tier, field)``. + ch_comp + array of rawids at which the expression is evaluated. + var_ph + dictionary of `evt` and additional parameters and their values. + defv + default value. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + evt_group + LH5 root group in `evt` file. + """ + + out = np.full(len(ch_comp.nda), defv, dtype=type(defv)) + + for ch in np.unique(ch_comp.nda.astype(int)): + # skip default value + if utils.get_table_name_by_pattern(tcm_id_table_pattern, ch) not in lh5.ls( + f_hit + ): + continue + idx_ch = idx[ids == ch] + res = utils.get_data_at_channel( + ch=utils.get_table_name_by_pattern(tcm_id_table_pattern, ch), + ids=ids, + idx=idx, + expr=expr, + exprl=exprl, + var_ph=var_ph, + is_evaluated=utils.get_table_name_by_pattern(tcm_id_table_pattern, ch) + not in chns_rm, + f_hit=f_hit, + f_dsp=f_dsp, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + out[idx_ch] = np.where(ch == ch_comp.nda[idx_ch], res, out[idx_ch]) + + return Array(nda=out) + + +def evaluate_at_channel_vov( + idx: NDArray, + ids: NDArray, + f_hit: str, + f_dsp: str, + expr: str, + exprl: list, + ch_comp: VectorOfVectors, + chns_rm: list, + var_ph: dict = None, + defv: bool | int | float = np.nan, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> VectorOfVectors: + """Same as :func:`evaluate_at_channel` but evaluates expression at non + flat channels :class:`.VectorOfVectors`. + + Parameters + ---------- + idx + `tcm` index array. + ids + `tcm` id array. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + expr + expression string to be evaluated. + exprl + list of `dsp/hit/evt` parameter tuples in expression ``(tier, field)``. + ch_comp + array of "rawid"s at which the expression is evaluated. + chns_rm + list of channels to be skipped from evaluation and set to default value. + var_ph + dictionary of `evt` and additional parameters and their values. + defv + default value. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + evt_group + LH5 root group in `evt` file. + """ + + # blow up vov to aoesa + out = ak.Array([[] for _ in range(len(ch_comp))]) + + chns = np.unique(ch_comp.flattened_data.nda).astype(int) + ch_comp = ch_comp.view_as("ak") + + type_name = None + for ch in chns: + idx_ch = idx[ids == ch] + res = utils.get_data_at_channel( + ch=utils.get_table_name_by_pattern(tcm_id_table_pattern, ch), + ids=ids, + idx=idx, + expr=expr, + exprl=exprl, + var_ph=var_ph, + is_evaluated=utils.get_table_name_by_pattern(tcm_id_table_pattern, ch) + not in chns_rm, + f_hit=f_hit, + f_dsp=f_dsp, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + # see in which events the current channel is present + mask = ak.to_numpy(ak.any(ch_comp == ch, axis=-1), allow_missing=False) + cv = np.full(len(ch_comp), np.nan) + cv[idx_ch] = res + cv[~mask] = np.nan + cv = ak.drop_none(ak.nan_to_none(ak.Array(cv)[:, None])) + + out = ak.concatenate((out, cv), axis=-1) + + if ch == chns[0]: + type_name = res.dtype + + return VectorOfVectors(ak.values_astype(out, type_name)) + + +def evaluate_to_aoesa( + idx: NDArray, + ids: NDArray, + f_hit: str, + f_dsp: str, + chns: list, + chns_rm: list, + expr: str, + exprl: list, + qry: str | NDArray, + nrows: int, + var_ph: dict = None, + defv: bool | int | float = np.nan, + missv=np.nan, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> ArrayOfEqualSizedArrays: + """Aggregates by returning an :class:`.ArrayOfEqualSizedArrays` of evaluated + expressions of channels that fulfill a query expression. + + Parameters + ---------- + idx + `tcm` index array. + ids + `tcm` id array. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + chns + list of channels to be aggregated. + chns_rm + list of channels to be skipped from evaluation and set to default value. + expr + expression string to be evaluated. + exprl + list of `dsp/hit/evt` parameter tuples in expression ``(tier, field)``. + qry + query expression to mask aggregation. + nrows + length of output :class:`.VectorOfVectors`. + ch_comp + array of "rawid"s at which the expression is evaluated. + var_ph + dictionary of `evt` and additional parameters and their values. + defv + default value. + missv + missing value. + sorter + sorts the entries in the vector according to sorter expression. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + evt_group + LH5 root group in `evt` file. + """ + # define dimension of output array + out = np.full((nrows, len(chns)), missv) + + i = 0 + for ch in chns: + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + res = utils.get_data_at_channel( + ch=ch, + ids=ids, + idx=idx, + expr=expr, + exprl=exprl, + var_ph=var_ph, + is_evaluated=ch not in chns_rm, + f_hit=f_hit, + f_dsp=f_dsp, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + # get mask from query + limarr = utils.get_mask_from_query( + qry=qry, + length=len(res), + ch=ch, + idx_ch=idx_ch, + f_hit=f_hit, + f_dsp=f_dsp, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + out[idx_ch, i] = np.where(limarr, res, out[idx_ch, i]) + + i += 1 + + return ArrayOfEqualSizedArrays(nda=out) + + +def evaluate_to_vector( + idx: NDArray, + ids: NDArray, + f_hit: str, + f_dsp: str, + chns: list, + chns_rm: list, + expr: str, + exprl: list, + qry: str | NDArray, + nrows: int, + var_ph: dict = None, + defv: bool | int | float = np.nan, + sorter: str = None, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> VectorOfVectors: + """Aggregates by returning a :class:`.VectorOfVector` of evaluated + expressions of channels that fulfill a query expression. + + Parameters + ---------- + idx + `tcm` index array. + ids + `tcm` id array. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + chns + list of channels to be aggregated. + chns_rm + list of channels to be skipped from evaluation and set to default value. + expr + expression string to be evaluated. + exprl + list of `dsp/hit/evt` parameter tuples in expression ``(tier, field)``. + qry + query expression to mask aggregation. + nrows + length of output :class:`.VectorOfVectors`. + ch_comp + array of "rawids" at which the expression is evaluated. + var_ph + dictionary of `evt` and additional parameters and their values. + defv + default value. + sorter + sorts the entries in the vector according to sorter expression. + ``ascend_by:`` results in an vector ordered ascending, + ``decend_by:`` sorts descending. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + evt_group + LH5 root group in `evt` file. + """ + out = evaluate_to_aoesa( + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + chns=chns, + chns_rm=chns_rm, + expr=expr, + exprl=exprl, + qry=qry, + nrows=nrows, + var_ph=var_ph, + defv=defv, + missv=np.nan, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ).view_as("np") + + # if a sorter is given sort accordingly + if sorter is not None: + md, fld = sorter.split(":") + s_val = evaluate_to_aoesa( + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + chns=chns, + chns_rm=chns_rm, + expr=fld, + exprl=[tuple(fld.split("."))], + qry=None, + nrows=nrows, + missv=np.nan, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ).view_as("np") + if "ascend_by" == md: + out = out[np.arange(len(out))[:, None], np.argsort(s_val)] + + elif "descend_by" == md: + out = out[np.arange(len(out))[:, None], np.argsort(-s_val)] + else: + raise ValueError( + "sorter values can only have 'ascend_by' or 'descend_by' prefixes" + ) + + return VectorOfVectors( + ak.values_astype(ak.drop_none(ak.nan_to_none(ak.Array(out))), type(defv)) + ) diff --git a/src/pygama/evt/build_evt.py b/src/pygama/evt/build_evt.py new file mode 100644 index 000000000..66489c38c --- /dev/null +++ b/src/pygama/evt/build_evt.py @@ -0,0 +1,574 @@ +""" +This module implements routines to build the `evt` tier. +""" + +from __future__ import annotations + +import itertools +import json +import logging +import re +from importlib import import_module + +import awkward as ak +import numpy as np +from lgdo import Array, ArrayOfEqualSizedArrays, Table, VectorOfVectors, lh5 +from lgdo.lh5 import LH5Store + +from . import aggregators, utils + +log = logging.getLogger(__name__) + + +def build_evt( + f_tcm: str, + f_dsp: str, + f_hit: str, + f_evt: str, + evt_config: str | dict, + wo_mode: str = "write_safe", + evt_group: str = "evt", + tcm_group: str = "hardware_tcm_1", + dsp_group: str = "dsp", + hit_group: str = "hit", + tcm_id_table_pattern: str = "ch{}", +) -> None: + """Transform data from the `hit` and `dsp` levels which a channel sorted to a + event sorted data format. + + Parameters + ---------- + f_tcm + input LH5 file of the `tcm` level. + f_dsp + input LH5 file of the `dsp` level. + f_hit + input LH5 file of the `hit` level. + f_evt + name of the output file. + evt_config + name of configuration file or dictionary defining event fields. Channel + lists can be defined by importing a metadata module. + + - ``operations`` defines the fields ``name=key``, where ``channels`` + specifies the channels used to for this field (either a string or a + list of strings), + - ``aggregation_mode`` defines how the channels should be combined (see + :func:`evaluate_expression`). + - ``expression`` defnies the mathematical/special function to apply + (see :func:`evaluate_expression`), + - ``query`` defines an expression to mask the aggregation. + - ``parameters`` defines any other parameter used in expression. + + For example: + + .. code-block:: json + + { + "channels": { + "geds_on": ["ch1084803", "ch1084804", "ch1121600"], + "spms_on": ["ch1057600", "ch1059201", "ch1062405"], + "muon": "ch1027202", + }, + "operations": { + "energy_id":{ + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal > 25", + "expression": "tcm.array_id", + "sort": "ascend_by:dsp.tp_0_est" + }, + "energy":{ + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "hit.cuspEmax_ctc_cal > 25" + } + "is_muon_rejected":{ + "channels": "muon", + "aggregation_mode": "any", + "expression": "dsp.wf_max>a", + "parameters": {"a":15100}, + "initial": false + }, + "multiplicity":{ + "channels": ["geds_on", "geds_no_psd", "geds_ac"], + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": {"a":25}, + "initial": 0 + }, + "t0":{ + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "dsp.tp_0_est" + }, + "lar_energy":{ + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy(0.5, evt.t0, 48000, 1000, 5000)" + }, + } + } + + wo_mode + writing mode. + evt group + LH5 root group name of `evt` tier. + tcm_group + LH5 root group in `tcm` file. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must + have one placeholder which is the `tcm` id. + """ + + store = LH5Store() + tbl_cfg = evt_config + if not isinstance(tbl_cfg, (str, dict)): + raise TypeError() + if isinstance(tbl_cfg, str): + with open(tbl_cfg) as f: + tbl_cfg = json.load(f) + + if "channels" not in tbl_cfg.keys(): + raise ValueError("channel field needs to be specified in the config") + if "operations" not in tbl_cfg.keys(): + raise ValueError("operations field needs to be specified in the config") + + # check tcm_id_table_pattern validity + pattern_check = re.findall(r"{([^}]*?)}", tcm_id_table_pattern) + if len(pattern_check) != 1: + raise ValueError( + f"tcm_id_table_pattern must have exactly one placeholder. {tcm_id_table_pattern} is invalid." + ) + elif "{" in pattern_check[0] or "}" in pattern_check[0]: + raise ValueError( + f"tcm_id_table_pattern {tcm_id_table_pattern} has an invalid placeholder." + ) + + if ( + utils.get_table_name_by_pattern( + tcm_id_table_pattern, + utils.get_tcm_id_by_pattern(tcm_id_table_pattern, lh5.ls(f_hit)[0]), + ) + != lh5.ls(f_hit)[0] + ): + raise ValueError( + f"tcm_id_table_pattern {tcm_id_table_pattern} does not match keys in data!" + ) + + # create channel list according to config + # This can be either read from the meta data + # or a list of channel names + log.debug("Creating channel dictionary") + + chns = {} + + for k, v in tbl_cfg["channels"].items(): + if isinstance(v, dict): + # it is a meta module. module_name must exist + if "module" not in v.keys(): + raise ValueError( + "Need module_name to load channel via a meta data module" + ) + + attr = {} + # the time_key argument is set to the time key of the DSP file + # in case it is not provided by the config + if "time_key" not in v.keys(): + attr["time_key"] = re.search(r"\d{8}T\d{6}Z", f_dsp).group(0) + + # if "None" do None + elif "None" == v["time_key"]: + attr["time_key"] = None + + # load module + p, m = v["module"].rsplit(".", 1) + met = getattr(import_module(p, package=__package__), m) + chns[k] = met(v | attr) + + elif isinstance(v, str): + chns[k] = [v] + + elif isinstance(v, list): + chns[k] = [e for e in v] + + nrows = store.read_n_rows(f"/{tcm_group}/cumulative_length", f_tcm) + + table = Table(size=nrows) + + for k, v in tbl_cfg["operations"].items(): + log.debug("Processing field " + k) + + # if mode not defined in operation, it can only be an operation on the evt level. + if "aggregation_mode" not in v.keys(): + var = {} + if "parameters" in v.keys(): + var = var | v["parameters"] + res = table.eval(v["expression"].replace(f"{evt_group}.", ""), var) + + # add attribute if present + if "lgdo_attrs" in v.keys(): + res.attrs |= v["lgdo_attrs"] + + table.add_field(k, res) + + # Else we build the event entry + else: + if "channels" not in v.keys(): + chns_e = [] + elif isinstance(v["channels"], str): + chns_e = chns[v["channels"]] + elif isinstance(v["channels"], list): + chns_e = list( + itertools.chain.from_iterable([chns[e] for e in v["channels"]]) + ) + chns_rm = [] + if "exclude_channels" in v.keys(): + if isinstance(v["exclude_channels"], str): + chns_rm = chns[v["exclude_channels"]] + elif isinstance(v["exclude_channels"], list): + chns_rm = list( + itertools.chain.from_iterable( + [chns[e] for e in v["exclude_channels"]] + ) + ) + + pars, qry, defaultv, srter = None, None, np.nan, None + if "parameters" in v.keys(): + pars = v["parameters"] + if "query" in v.keys(): + qry = v["query"] + if "initial" in v.keys(): + defaultv = v["initial"] + if isinstance(defaultv, str) and ( + defaultv in ["np.nan", "np.inf", "-np.inf"] + ): + defaultv = eval(defaultv) + if "sort" in v.keys(): + srter = v["sort"] + + obj = evaluate_expression( + f_tcm=f_tcm, + f_hit=f_hit, + f_dsp=f_dsp, + chns=chns_e, + chns_rm=chns_rm, + mode=v["aggregation_mode"], + expr=v["expression"], + nrows=nrows, + table=table, + para=pars, + qry=qry, + defv=defaultv, + sorter=srter, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + tcm_group=tcm_group, + ) + + # add attribute if present + if "lgdo_attrs" in v.keys(): + obj.attrs |= v["lgdo_attrs"] + + table.add_field(k, obj) + + # write output fields into f_evt + if "outputs" in tbl_cfg.keys(): + if len(tbl_cfg["outputs"]) < 1: + log.warning("No output fields specified, no file will be written.") + else: + clms_to_remove = [e for e in table.keys() if e not in tbl_cfg["outputs"]] + for fld in clms_to_remove: + table.remove_field(fld, True) + store.write( + obj=table, name=f"/{evt_group}/", lh5_file=f_evt, wo_mode=wo_mode + ) + else: + log.warning("No output fields specified, no file will be written.") + + key = re.search(r"\d{8}T\d{6}Z", f_hit).group(0) + log.info( + f"Applied {len(tbl_cfg['operations'])} operations to key {key} and saved {len(tbl_cfg['outputs'])} evt fields across {len(chns)} channel groups" + ) + + +def evaluate_expression( + f_tcm: str, + f_hit: str, + f_dsp: str, + chns: list, + chns_rm: list, + mode: str, + expr: str, + nrows: int, + table: Table = None, + para: dict = None, + qry: str = None, + defv: bool | int | float = np.nan, + sorter: str = None, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", + tcm_group: str = "tcm", +) -> Array | ArrayOfEqualSizedArrays | VectorOfVectors: + """Evaluates the expression defined by the user across all channels + according to the mode. + + Parameters + ---------- + f_tcm + path to `tcm` tier file. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + chns + list of channel names across which expression gets evaluated (form: + ``ch``). + chns_rm + list of channels which get set to default value during evaluation. In + function mode they are removed entirely (form: ``ch``) + mode + The mode determines how the event entry is calculated across channels. + Options are: + + - ``first_at:sorter``: aggregates across channels by returning the + expression of the channel with smallest value of sorter. + - ``last_at``: aggregates across channels by returning the expression of + the channel with largest value of sorter. + - ``sum``: aggregates by summation. + - ``any``: aggregates by logical or. + - ``all``: aggregates by logical and. + - ``keep_at_ch:ch_field``: aggregates according to passed ch_field. + - ``keep_at_idx:tcm_idx_field``: aggregates according to passed tcm + index field. + - ``gather``: Channels are not combined, but result saved as + :class:`.VectorOfVectors`. + + qry + a query that can mask the aggregation. + expr + the expression. That can be any mathematical equation/comparison. If + `mode` is ``function``, the expression needs to be a special processing + function defined in modules (e.g. :func:`.modules.spm.get_energy`). In + the expression parameters from either hit, dsp, evt tier (from + operations performed before this one! Dictionary operations order + matters), or from the ``parameters`` field can be used. + nrows + number of rows to be processed. + table + table of `evt` tier data. + para + dictionary of parameters defined in the ``parameters`` field in the + configuration dictionary. + defv + default value of evaluation. + sorter + can be used to sort vector outputs according to sorter expression (see + :func:`evaluate_to_vector`). + tcm_id_table_pattern + pattern to format tcm id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + evt group + LH5 root group name of `evt` tier. + tcm_group + LH5 root group in `tcm` file. + dsp_group + LH5 root group in `dsp` file. + hit_group + LH5 root group in `hit` file. + """ + + store = LH5Store() + + # find parameters in evt file or in parameters + exprl = re.findall( + rf"({evt_group}|{hit_group}|{dsp_group}).([a-zA-Z_$][\w$]*)", expr + ) + var_ph = {} + if table: + var_ph = var_ph | { + e: table[e].view_as("ak") + for e in table.keys() + if isinstance(table[e], (Array, ArrayOfEqualSizedArrays, VectorOfVectors)) + } + if para: + var_ph = var_ph | para + + if mode == "function": + # evaluate expression + func, params = expr.split("(") + params = ( + params.replace(f"{dsp_group}.", f"{dsp_group}_") + .replace(f"{hit_group}.", f"{hit_group}_") + .replace(f"{evt_group}.", "") + ) + params = [ + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + [x for x in chns if x not in chns_rm], + ] + [utils.num_and_pars(e, var_ph) for e in params[:-1].split(",")] + + # load function dynamically + p, m = func.rsplit(".", 1) + met = getattr(import_module(p, package=__package__), m) + return met(*params) + + else: + # check if query is either on channel basis or evt basis (and not a mix) + qry_mask = qry + if qry is not None: + if f"{evt_group}." in qry and ( + f"{hit_group}." in qry or f"{dsp_group}." in qry + ): + raise ValueError( + f"Query can't be a mix of {evt_group} tier and lower tiers." + ) + + # if it is an evt query we can evaluate it directly here + if table and f"{evt_group}." in qry: + qry_mask = eval(qry.replace(f"{evt_group}.", ""), table) + + # load TCM data to define an event + ids = store.read(f"/{tcm_group}/array_id", f_tcm)[0].view_as("np") + idx = store.read(f"/{tcm_group}/array_idx", f_tcm)[0].view_as("np") + + # switch through modes + if table and (("keep_at_ch:" == mode[:11]) or ("keep_at_idx:" == mode[:12])): + if "keep_at_ch:" == mode[:11]: + ch_comp = table[mode[11:].replace(f"{evt_group}.", "")] + else: + ch_comp = table[mode[12:].replace(f"{evt_group}.", "")] + if isinstance(ch_comp, Array): + ch_comp = Array(nda=ids[ch_comp.view_as("np")]) + elif isinstance(ch_comp, VectorOfVectors): + ch_comp = ch_comp.view_as("ak") + ch_comp = VectorOfVectors( + array=ak.unflatten( + ids[ak.flatten(ch_comp)], ak.count(ch_comp, axis=-1) + ) + ) + else: + raise NotImplementedError( + type(ch_comp) + + " not supported (only Array and VectorOfVectors are supported)" + ) + + if isinstance(ch_comp, Array): + return aggregators.evaluate_at_channel( + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + chns_rm=chns_rm, + expr=expr, + exprl=exprl, + ch_comp=ch_comp, + var_ph=var_ph, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + elif isinstance(ch_comp, VectorOfVectors): + return aggregators.evaluate_at_channel_vov( + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + expr=expr, + exprl=exprl, + ch_comp=ch_comp, + chns_rm=chns_rm, + var_ph=var_ph, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + else: + raise NotImplementedError( + type(ch_comp) + + " not supported (only Array and VectorOfVectors are supported)" + ) + elif "first_at:" in mode or "last_at:" in mode: + sorter = tuple( + re.findall( + rf"({evt_group}|{hit_group}|{dsp_group}).([a-zA-Z_$][\w$]*)", + mode.split("first_at:")[-1], + )[0] + ) + return aggregators.evaluate_to_first_or_last( + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + chns=chns, + chns_rm=chns_rm, + expr=expr, + exprl=exprl, + qry=qry_mask, + nrows=nrows, + sorter=sorter, + var_ph=var_ph, + defv=defv, + is_first=True if "first_at:" in mode else False, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + elif mode in ["sum", "any", "all"]: + return aggregators.evaluate_to_scalar( + mode=mode, + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + chns=chns, + chns_rm=chns_rm, + expr=expr, + exprl=exprl, + qry=qry_mask, + nrows=nrows, + var_ph=var_ph, + defv=defv, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + elif "gather" == mode: + return aggregators.evaluate_to_vector( + idx=idx, + ids=ids, + f_hit=f_hit, + f_dsp=f_dsp, + chns=chns, + chns_rm=chns_rm, + expr=expr, + exprl=exprl, + qry=qry_mask, + nrows=nrows, + var_ph=var_ph, + defv=defv, + sorter=sorter, + tcm_id_table_pattern=tcm_id_table_pattern, + evt_group=evt_group, + hit_group=hit_group, + dsp_group=dsp_group, + ) + else: + raise ValueError(mode + " not a valid mode") diff --git a/src/pygama/evt/modules/__init__.py b/src/pygama/evt/modules/__init__.py new file mode 100644 index 000000000..bd80462f8 --- /dev/null +++ b/src/pygama/evt/modules/__init__.py @@ -0,0 +1,21 @@ +""" +Contains submodules for evt processing +""" + +from .spm import ( + get_energy, + get_energy_dplms, + get_etc, + get_majority, + get_majority_dplms, + get_time_shift, +) + +__all__ = [ + "get_energy", + "get_majority", + "get_energy_dplms", + "get_majority_dplms", + "get_etc", + "get_time_shift", +] diff --git a/src/pygama/evt/modules/legend.py b/src/pygama/evt/modules/legend.py new file mode 100644 index 000000000..2ee2d7e8e --- /dev/null +++ b/src/pygama/evt/modules/legend.py @@ -0,0 +1,35 @@ +""" +Module provides LEGEND internal functions +""" +from importlib import import_module + +from lgdo.lh5 import utils + + +def metadata(params: dict) -> list: + # only import legend meta data when needed. + # LEGEND collaborators can use the meta keyword + # While for users w/o access to the LEGEND meta data this is still working + lm = import_module("legendmeta") + lmeta = lm.LegendMetadata(path=utils.expand_path(params["meta_path"])) + chmap = lmeta.channelmap(params["time_key"]) + + tmp = [ + f"ch{e}" + for e in chmap.map("daq.rawid") + if chmap.map("daq.rawid")[e]["system"] == params["system"] + ] + + if "selectors" in params.keys(): + for k in params["selectors"].keys(): + s = "" + for e in k.split("."): + s += f"['{e}']" + + tmp = [ + e + for e in tmp + if eval("dotter" + s, {"dotter": chmap.map("daq.rawid")[int(e[2:])]}) + == params["selectors"][k] + ] + return tmp diff --git a/src/pygama/evt/modules/spm.py b/src/pygama/evt/modules/spm.py new file mode 100644 index 000000000..2dc5a4290 --- /dev/null +++ b/src/pygama/evt/modules/spm.py @@ -0,0 +1,525 @@ +""" +Module for special event level routines for SiPMs + +functions must take as the first 8 args in order: +- path to the hit file +- path to the dsp ak.Array: + if isinstance(trgr, Array): + return ak.fill_none(ak.nan_to_none(trgr.view_as("ak")), tdefault) + + elif isinstance(trgr, (VectorOfVectors)): + return ak.fill_none( + ak.min(ak.fill_none(trgr.view_as("ak"), tdefault), axis=-1), tdefault + ) + + elif isinstance(trgr, ak.Array): + if trgr.ndim == 1: + return ak.fill_none(trgr, tdefault) + elif trgr.ndim == 2: + return ak.fill_none(ak.min(ak.fill_none(trgr, tdefault), axis=-1), tdefault) + else: + raise ValueError(f"Too many dimensions: {trgr.ndim}") + elif isinstance(trgr, (float, int)) and isinstance(length, int): + return ak.Array([trgr] * length) + else: + raise ValueError(f"Can't deal with t0 of type {type(trgr)}") + + +# get SiPM coincidence window mask +def get_spm_mask( + lim: float, trgr: ak.Array, tmin: float, tmax: float, pe: ak.Array, times: ak.Array +) -> ak.Array: + if trgr.ndim != 1: + raise ValueError("trigger array muse be 1 dimensional!") + if (len(trgr) != len(pe)) or (len(trgr) != len(times)): + raise ValueError( + f"All arrays must have same dimension across first axis len(pe)={len(pe)}, len(times)={len(times)}, len(trgr)={len(trgr)}" + ) + + tmi = trgr - tmin + tma = trgr + tmax + + mask = ( + ((times * 16.0) < tma[:, None]) & ((times * 16.0) > tmi[:, None]) & (pe > lim) + ) + return mask + + +# get LAr indices according to mask per event over all channels +# mode 0 -> return pulse indices +# mode 1 -> return tcm indices +# mode 2 -> return rawids +# mode 3 -> return tcm_idx +def get_masked_tcm_idx( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + mode=0, +) -> VectorOfVectors: + # load TCM data to define an event + store = LH5Store() + ids = store.read(f"/{tcm_group}/array_id", f_tcm)[0].view_as("np") + idx = store.read(f"/{tcm_group}/array_idx", f_tcm)[0].view_as("np") + + arr_lst = [] + + if isinstance(trgr, (float, int)): + tge = cast_trigger(trgr, tdefault, length=np.max(idx) + 1) + else: + tge = cast_trigger(trgr, tdefault, length=None) + + for ch in chs: + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + + pe = store.read(f"{ch}/{hit_group}/energy_in_pe", f_hit, idx=idx_ch)[0].view_as( + "np" + ) + tmp = np.full((np.max(idx) + 1, len(pe[0])), np.nan) + tmp[idx_ch] = pe + pe = ak.drop_none(ak.nan_to_none(ak.Array(tmp))) + + # times are in sample units + times = store.read(f"{ch}/{hit_group}/trigger_pos", f_hit, idx=idx_ch)[ + 0 + ].view_as("np") + tmp = np.full((np.max(idx) + 1, len(times[0])), np.nan) + tmp[idx_ch] = times + times = ak.drop_none(ak.nan_to_none(ak.Array(tmp))) + + mask = get_spm_mask(lim, tge, tmin, tmax, pe, times) + + if mode == 0: + out_idx = ak.local_index(mask)[mask] + + elif mode == 1: + out_idx = np.full((np.max(idx) + 1), np.nan) + out_idx[idx_ch] = np.where( + ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch) + )[0] + out_idx = ak.drop_none(ak.nan_to_none(ak.Array(out_idx)[:, None])) + out_idx = out_idx[mask[mask] - 1] + + elif mode == 2: + out_idx = ak.Array( + [utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] * len(mask) + ) + out_idx = out_idx[:, None][mask[mask] - 1] + + elif mode == 3: + out_idx = np.full((np.max(idx) + 1), np.nan) + out_idx[idx_ch] = idx_ch + out_idx = ak.drop_none(ak.nan_to_none(ak.Array(out_idx)[:, None])) + out_idx = out_idx[mask[mask] - 1] + + else: + raise ValueError("Unknown mode") + + arr_lst.append(out_idx) + + return VectorOfVectors(array=ak.concatenate(arr_lst, axis=-1)) + + +def get_spm_ene_or_maj( + f_hit, + f_tcm, + hit_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + mode, +): + if mode not in ["energy_hc", "energy_dplms", "majority_hc", "majority_dplms"]: + raise ValueError("Unknown mode") + + # load TCM data to define an event + store = LH5Store() + ids = store.read(f"/{tcm_group}/array_id", f_tcm)[0].view_as("np") + idx = store.read(f"/{tcm_group}/array_idx", f_tcm)[0].view_as("np") + out = np.zeros(np.max(idx) + 1) + + if isinstance(trgr, (float, int)): + tge = cast_trigger(trgr, tdefault, length=np.max(idx) + 1) + else: + tge = cast_trigger(trgr, tdefault, length=None) + + for ch in chs: + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + + if mode in ["energy_dplms", "majority_dplms"]: + pe = ak.drop_none( + ak.nan_to_none( + store.read( + f"{ch}/{hit_group}/energy_in_pe_dplms", f_hit, idx=idx_ch + )[0].view_as("ak") + ) + ) + + # times are in sample units + times = ak.drop_none( + ak.nan_to_none( + store.read( + f"{ch}/{hit_group}/trigger_pos_dplms", f_hit, idx=idx_ch + )[0].view_as("ak") + ) + ) + + else: + pe = ak.drop_none( + ak.nan_to_none( + store.read(f"{ch}/{hit_group}/energy_in_pe", f_hit, idx=idx_ch)[ + 0 + ].view_as("ak") + ) + ) + + # times are in sample units + times = ak.drop_none( + ak.nan_to_none( + store.read(f"{ch}/{hit_group}/trigger_pos", f_hit, idx=idx_ch)[ + 0 + ].view_as("ak") + ) + ) + + mask = get_spm_mask(lim, tge[idx_ch], tmin, tmax, pe, times) + pe = pe[mask] + + if mode in ["energy_hc", "energy_dplms"]: + out[idx_ch] = out[idx_ch] + ak.to_numpy(ak.nansum(pe, axis=-1)) + + else: + out[idx_ch] = out[idx_ch] + ak.to_numpy( + ak.where(ak.nansum(pe, axis=-1) > lim, 1, 0) + ) + + return Array(nda=out) + + +# get LAr energy per event over all channels +def get_energy( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, +) -> Array: + return get_spm_ene_or_maj( + f_hit, + f_tcm, + hit_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + "energy_hc", + ) + + +# get LAr majority per event over all channels +def get_majority( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, +) -> Array: + return get_spm_ene_or_maj( + f_hit, + f_tcm, + hit_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + "majority_hc", + ) + + +# get LAr energy per event over all channels +def get_energy_dplms( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, +) -> Array: + return get_spm_ene_or_maj( + f_hit, + f_tcm, + hit_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + "energy_dplms", + ) + + +# get LAr majority per event over all channels +def get_majority_dplms( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, +) -> Array: + return get_spm_ene_or_maj( + f_hit, + f_tcm, + hit_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + "majority_dplms", + ) + + +# Calculate the ETC in different trailing modes: +# trail = 0: Singlet window = [tge,tge+swin] +# trail = 1: Singlet window = [t_first_lar_pulse, t_first_lar_pulse+ swin] +# trail = 2: Like trail = 1, but t_first_lar_pulse <= tge is ensured +# min_first_pls_ene sets the minimum energy of the first pulse (only used in trail > 0) +# max_per_channel, maximum number of pes a channel is allowed to have, if above it gets excluded +def get_etc( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, + swin, + trail, + min_first_pls_ene, + max_per_channel, +) -> Array: + # load TCM data to define an event + store = LH5Store() + ids = store.read(f"/{tcm_group}/array_id", f_tcm)[0].view_as("np") + idx = store.read(f"/{tcm_group}/array_idx", f_tcm)[0].view_as("np") + pe_lst = [] + time_lst = [] + + if isinstance(trgr, (float, int)): + tge = cast_trigger(trgr, tdefault, length=np.max(idx) + 1) + else: + tge = cast_trigger(trgr, tdefault, length=None) + + for ch in chs: + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + + pe = store.read(f"{ch}/{hit_group}/energy_in_pe", f_hit, idx=idx_ch)[0].view_as( + "np" + ) + tmp = np.full((np.max(idx) + 1, len(pe[0])), np.nan) + tmp[idx_ch] = pe + pe = ak.drop_none(ak.nan_to_none(ak.Array(tmp))) + + # times are in sample units + times = store.read(f"{ch}/{hit_group}/trigger_pos", f_hit, idx=idx_ch)[ + 0 + ].view_as("np") + tmp = np.full((np.max(idx) + 1, len(times[0])), np.nan) + tmp[idx_ch] = times + times = ak.drop_none(ak.nan_to_none(ak.Array(tmp))) + + mask = get_spm_mask(lim, tge, tmin, tmax, pe, times) + + pe = pe[mask] + + # max pe mask + max_pe_mask = ak.nansum(pe, axis=-1) < max_per_channel + pe = ak.drop_none( + ak.nan_to_none(ak.where(max_pe_mask, pe, ak.Array([[np.nan]]))) + ) + pe_lst.append(pe) + + times = times[mask] * 16 + times = ak.drop_none( + ak.nan_to_none(ak.where(max_pe_mask, times, ak.Array([[np.nan]]))) + ) + time_lst.append(times) + + pe_all = ak.concatenate(pe_lst, axis=-1) + time_all = ak.concatenate(time_lst, axis=-1) + + if trail > 0: + t1d = ak.min(time_all[pe_all > min_first_pls_ene], axis=-1) + + if trail == 2: + t1d = ak.where(t1d > tge, tge, t1d) + + mask_total = time_all > t1d + mask_singlet = (time_all > t1d) & (time_all < t1d + swin) + + else: + mask_total = time_all > tge + mask_singlet = (time_all > tge) & (time_all < tge + swin) + + pe_singlet = ak.to_numpy( + ak.fill_none(ak.nansum(pe_all[mask_singlet], axis=-1), 0), allow_missing=False + ) + pe_total = ak.to_numpy( + ak.fill_none(ak.nansum(pe_all[mask_total], axis=-1), 0), allow_missing=False + ) + etc = np.divide( + pe_singlet, pe_total, out=np.full_like(pe_total, np.nan), where=pe_total != 0 + ) + + return Array(nda=etc) + + +# returns relative time shift of the first LAr pulse relative to the Ge trigger +def get_time_shift( + f_hit, + f_dsp, + f_tcm, + hit_group, + dsp_group, + tcm_group, + tcm_id_table_pattern, + chs, + lim, + trgr, + tdefault, + tmin, + tmax, +) -> Array: + store = LH5Store() + # load TCM data to define an event + ids = store.read(f"/{tcm_group}/array_id", f_tcm)[0].view_as("np") + idx = store.read(f"/{tcm_group}/array_idx", f_tcm)[0].view_as("np") + time_all = ak.Array([[] for x in range(np.max(idx) + 1)]) + + if isinstance(trgr, (float, int)): + tge = cast_trigger(trgr, tdefault, length=np.max(idx) + 1) + else: + tge = cast_trigger(trgr, tdefault, length=None) + + for ch in chs: + idx_ch = idx[ids == utils.get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + + pe = store.read(f"{ch}/{hit_group}/energy_in_pe", f_hit, idx=idx_ch)[0].view_as( + "np" + ) + tmp = np.full((np.max(idx) + 1, len(pe[0])), np.nan) + tmp[idx_ch] = pe + pe = ak.drop_none(ak.nan_to_none(ak.Array(tmp))) + + # times are in sample units + times = store.read(f"{ch}/{hit_group}/trigger_pos", f_hit, idx=idx_ch)[ + 0 + ].view_as("np") + tmp = np.full((np.max(idx) + 1, len(times[0])), np.nan) + tmp[idx_ch] = times + times = ak.drop_none(ak.nan_to_none(ak.Array(tmp))) + + mask = get_spm_mask(lim, tge, tmin, tmax, pe, times) + + # apply mask and convert sample units to ns + times = times[mask] * 16 + + time_all = ak.concatenate((time_all, times), axis=-1) + + out = ak.min(time_all, axis=-1) + + # Convert to 1D numpy array + out = ak.to_numpy(ak.fill_none(out, np.inf), allow_missing=False) + tge = ak.to_numpy(tge, allow_missing=False) + + return Array(out - tge) diff --git a/src/pygama/evt/utils.py b/src/pygama/evt/utils.py new file mode 100644 index 000000000..175cd868a --- /dev/null +++ b/src/pygama/evt/utils.py @@ -0,0 +1,282 @@ +""" +This module provides utilities to build the `evt` tier. +""" + +from __future__ import annotations + +import re + +import awkward as ak +import numpy as np +from lgdo.lh5 import LH5Store +from numpy.typing import NDArray + + +def get_tcm_id_by_pattern(tcm_id_table_pattern: str, ch: str) -> int: + pre = tcm_id_table_pattern.split("{")[0] + post = tcm_id_table_pattern.split("}")[1] + return int(ch.strip(pre).strip(post)) + + +def get_table_name_by_pattern(tcm_id_table_pattern: str, ch_id: int) -> str: + # check tcm_id_table_pattern validity + pattern_check = re.findall(r"{([^}]*?)}", tcm_id_table_pattern)[0] + if pattern_check == "" or ":" == pattern_check[0]: + return tcm_id_table_pattern.format(ch_id) + else: + raise NotImplementedError( + "Only empty placeholders with format specifications are currently implemented" + ) + + +def num_and_pars(value: str, par_dic: dict): + # function tries to convert a string to a int, float, bool + # or returns the value if value is a key in par_dic + if value in par_dic.keys(): + return par_dic[value] + try: + value = int(value) + except ValueError: + try: + value = float(value) + except ValueError: + try: + value = bool(value) + except ValueError: + pass + return value + + +def find_parameters( + f_hit: str, + f_dsp: str, + ch: str, + idx_ch: NDArray, + exprl: list, + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> dict: + """Wraps :func:`load_vars_to_nda` to return parameters from `hit` and `dsp` + tiers. + + Parameters + ---------- + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + ch + "rawid" in the tiers. + idx_ch + index array of entries to be read from files. + exprl + list of tuples ``(tier, field)`` to be found in the `hit/dsp` tiers. + dsp_group + LH5 root group in dsp file. + hit_group + LH5 root group in hit file. + """ + + # find fields in either dsp, hit + dsp_flds = [e[1] for e in exprl if e[0] == dsp_group] + hit_flds = [e[1] for e in exprl if e[0] == hit_group] + + store = LH5Store() + hit_dict, dsp_dict = {}, {} + if len(hit_flds) > 0: + hit_ak = store.read( + f"{ch.replace('/','')}/{hit_group}/", f_hit, field_mask=hit_flds, idx=idx_ch + )[0].view_as("ak") + hit_dict = dict( + zip([f"{hit_group}_" + e for e in ak.fields(hit_ak)], ak.unzip(hit_ak)) + ) + if len(dsp_flds) > 0: + dsp_ak = store.read( + f"{ch.replace('/','')}/{dsp_group}/", f_dsp, field_mask=dsp_flds, idx=idx_ch + )[0].view_as("ak") + dsp_dict = dict( + zip([f"{dsp_group}_" + e for e in ak.fields(dsp_ak)], ak.unzip(dsp_ak)) + ) + + return hit_dict | dsp_dict + + +def get_data_at_channel( + ch: str, + ids: NDArray, + idx: NDArray, + expr: str, + exprl: list, + var_ph: dict, + is_evaluated: bool, + f_hit: str, + f_dsp: str, + defv, + tcm_id_table_pattern: str = "ch{}", + evt_group: str = "evt", + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> np.ndarray: + """Evaluates an expression and returns the result. + + Parameters + ---------- + ch + "rawid" of channel to be evaluated. + idx + `tcm` index array. + ids + `tcm` id array. + expr + expression to be evaluated. + exprl + list of parameter-tuples ``(root_group, field)`` found in the expression. + var_ph + dict of additional parameters that are not channel dependent. + is_evaluated + if false, the expression does not get evaluated but an array of default + values is returned. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + defv + default value. + tcm_id_table_pattern + Pattern to format tcm id values to table name in higher tiers. Must have one + placeholder which is the tcm id. + dsp_group + LH5 root group in dsp file. + hit_group + LH5 root group in hit file. + evt_group + LH5 root group in evt file. + """ + + # get index list for this channel to be loaded + idx_ch = idx[ids == get_tcm_id_by_pattern(tcm_id_table_pattern, ch)] + outsize = len(idx_ch) + + if not is_evaluated: + res = np.full(outsize, defv, dtype=type(defv)) + elif "tcm.array_id" == expr: + res = np.full( + outsize, get_tcm_id_by_pattern(tcm_id_table_pattern, ch), dtype=int + ) + elif "tcm.index" == expr: + res = np.where(ids == get_tcm_id_by_pattern(tcm_id_table_pattern, ch))[0] + else: + var = find_parameters( + f_hit=f_hit, + f_dsp=f_dsp, + ch=ch, + idx_ch=idx_ch, + exprl=exprl, + hit_group=hit_group, + dsp_group=dsp_group, + ) + + if var_ph is not None: + var = var | var_ph + + # evaluate expression + # move tier+dots in expression to underscores (e.g. evt.foo -> evt_foo) + res = eval( + expr.replace(f"{dsp_group}.", f"{dsp_group}_") + .replace(f"{hit_group}.", f"{hit_group}_") + .replace(f"{evt_group}.", ""), + var, + ) + + # in case the expression evaluates to a single value blow it up + if (not hasattr(res, "__len__")) or (isinstance(res, str)): + return np.full(outsize, res) + + # the resulting arrays need to be 1D from the operation, + # this can only change once we support larger than two dimensional LGDOs + # ak.to_numpy() raises error if array not regular + res = ak.to_numpy(res, allow_missing=False) + + # in this method only 1D values are allowed + if res.ndim > 1: + raise ValueError( + f"expression '{expr}' must return 1D array. If you are using VectorOfVectors or ArrayOfEqualSizedArrays, use awkward reduction functions to reduce the dimension" + ) + + return res + + +def get_mask_from_query( + qry: str | NDArray, + length: int, + ch: str, + idx_ch: NDArray, + f_hit: str, + f_dsp: str, + hit_group: str = "hit", + dsp_group: str = "dsp", +) -> np.ndarray: + """Evaluates a query expression and returns a mask accordingly. + + Parameters + ---------- + qry + query expression. + length + length of the return mask. + ch + "rawid" of channel to be evaluated. + idx_ch + channel indices to be read. + f_hit + path to `hit` tier file. + f_dsp + path to `dsp` tier file. + hit_group + LH5 root group in hit file. + dsp_group + LH5 root group in dsp file. + """ + + # get sub evt based query condition if needed + if isinstance(qry, str): + qry_lst = re.findall(r"(hit|dsp).([a-zA-Z_$][\w$]*)", qry) + qry_var = find_parameters( + f_hit=f_hit, + f_dsp=f_dsp, + ch=ch, + idx_ch=idx_ch, + exprl=qry_lst, + hit_group=hit_group, + dsp_group=dsp_group, + ) + limarr = eval( + qry.replace(f"{dsp_group}.", f"{dsp_group}_").replace( + f"{hit_group}.", f"{hit_group}_" + ), + qry_var, + ) + + # in case the expression evaluates to a single value blow it up + if (not hasattr(limarr, "__len__")) or (isinstance(limarr, str)): + return np.full(len(idx_ch), limarr) + + limarr = ak.to_numpy(limarr, allow_missing=False) + if limarr.ndim > 1: + raise ValueError( + f"query '{qry}' must return 1D array. If you are using VectorOfVectors or ArrayOfEqualSizedArrays, use awkward reduction functions to reduce the dimension" + ) + + # or forward the array + elif isinstance(qry, np.ndarray): + limarr = qry + + # if no condition, it must be true + else: + limarr = np.ones(length).astype(bool) + + # explicit cast to bool + if limarr.dtype != bool: + limarr = limarr.astype(bool) + + return limarr diff --git a/src/pygama/hit/build_hit.py b/src/pygama/hit/build_hit.py index 2b3e8ef5f..2a6d6a066 100644 --- a/src/pygama/hit/build_hit.py +++ b/src/pygama/hit/build_hit.py @@ -48,14 +48,14 @@ def build_hit( .. code-block:: json { - "outputs": ["calE", "AoE"], - "operations": { - "calE": { - "expression": "sqrt(a + b * trapEmax**2)", - "parameters": {"a": "1.23", "b": "42.69"}, - }, - "AoE": {"expression": "A_max/calE"}, - } + "outputs": ["calE", "AoE"], + "operations": { + "calE": { + "expression": "sqrt(a + b * trapEmax**2)", + "parameters": {"a": "1.23", "b": "42.69"}, + }, + "AoE": {"expression": "A_max/calE"}, + } } The ``outputs`` array lists columns that will be effectively written in diff --git a/src/pygama/pargen/energy_optimisation.py b/src/pygama/pargen/energy_optimisation.py index 1c34901d9..ecad4bbd7 100644 --- a/src/pygama/pargen/energy_optimisation.py +++ b/src/pygama/pargen/energy_optimisation.py @@ -950,17 +950,15 @@ def event_selection( initial_idxs = np.where(initial_mask)[0] guess_keV = 2620 / np.nanpercentile(rough_energy, 99) - Euc_min = threshold / guess_keV + Euc_min = threshold / guess_keV * 0.6 Euc_max = 2620 / guess_keV * 1.1 - dEuc = 5 / guess_keV + dEuc = 1 # / guess_keV hist, bins, var = pgh.get_hist(rough_energy, range=(Euc_min, Euc_max), dx=dEuc) detected_peaks_locs, detected_peaks_keV, roughpars = pgc.hpge_find_E_peaks( hist, bins, var, - np.array( - [238.632, 583.191, 727.330, 860.564, 1592.5, 1620.5, 2103.53, 2614.553] - ), + np.array([238.632, 583.191, 727.330, 860.564, 1620.5, 2103.53, 2614.553]), ) log.debug(f"detected {detected_peaks_keV} keV peaks at {detected_peaks_locs}") diff --git a/src/pygama/skm/__init__.py b/src/pygama/skm/__init__.py new file mode 100644 index 000000000..7b9ae88d2 --- /dev/null +++ b/src/pygama/skm/__init__.py @@ -0,0 +1,7 @@ +""" +Utilities for grouping hit data into events. +""" + +from .build_skm import build_skm + +__all__ = ["build_skm"] diff --git a/src/pygama/skm/build_skm.py b/src/pygama/skm/build_skm.py new file mode 100644 index 000000000..a92619b83 --- /dev/null +++ b/src/pygama/skm/build_skm.py @@ -0,0 +1,236 @@ +""" +This module implements routines to build the `skm` tier, consisting of skimmed +data from lower tiers. +""" + +from __future__ import annotations + +import json +import logging +import os + +import awkward as ak +import numpy as np +from lgdo import Array, Table, lh5 +from lgdo.lh5 import LH5Store + +from pygama.evt import utils + +log = logging.getLogger(__name__) + + +def build_skm( + f_evt: str, + f_hit: str, + f_dsp: str, + f_tcm: str, + f_skm: str, + skm_conf: dict | str, + wo_mode="w", + skm_group: str = "skm", + evt_group: str = "evt", + tcm_group: str = "hardware_tcm_1", + dsp_group: str = "dsp", + hit_group: str = "hit", + tcm_id_table_pattern: str = "ch{}", +) -> None: + """Builds a skimmed file from a (set) of `evt/hit/dsp` tier file(s). + + Parameters + ---------- + f_evt + path of `evt` file. + f_hit + path of `hit` file. + f_dsp + path of `dsp` file. + f_tcm + path of `tcm` file. + f_skm + name of the `skm` output file. + skm_conf + name of configuration file or dictionary defining `skm` fields. + + - ``multiplicity`` defines up to which row length + :class:`.VectorOfVector` fields should be kept. + - ``postfixes`` list of postfixes must be list of + ``len(multiplicity)``. If not given, numbers from 0 to + ``multiplicity -1`` are used + - ``operations`` are forwarded from lower tiers and clipped/padded + according to ``missing_value`` if needed. If the forwarded field + is not an evt tier, ``tcm_idx`` must be passed that specifies the + value to pick across channels. + + For example: + + .. code-block:: json + + { + "multiplicity": 2, + "postfixes":["", "aux"], + "operations": { + "timestamp":{ + "forward_field": "evt.timestamp" + }, + "multiplicity":{ + "forward_field": "evt.multiplicity" + }, + "energy":{ + "forward_field": "hit.cuspEmax_ctc_cal", + "missing_value": "np.nan", + "tcm_idx": "evt.energy_idx" + }, + "energy_id":{ + "forward_field": "tcm.array_id", + "missing_value": 0, + "tcm_idx": "evt.energy_idx" + } + } + } + + wo_mode + writing mode. + + - ``write_safe`` or ``w``: only proceed with writing if the file does + not already exists. + - ``append`` or ``a``: append to file. + - ``overwrite`` or ``o``: replaces existing file. + + skm_group + `skm` LH5 root group name. + evt_group + `evt` LH5 root group name. + hit_group + `hit` LH5 root group name. + dsp_group + `dsp` LH5 root group name. + tcm_group + `tcm` LH5 root group name. + tcm_id_table_pattern + pattern to format `tcm` id values to table name in higher tiers. Must have one + placeholder which is the `tcm` id. + """ + f_dict = {evt_group: f_evt, hit_group: f_hit, dsp_group: f_dsp, tcm_group: f_tcm} + log = logging.getLogger(__name__) + log.debug(f"I am skimming {len(f_evt) if isinstance(f_evt,list) else 1} files") + + tbl_cfg = skm_conf + if not isinstance(tbl_cfg, (str, dict)): + raise TypeError() + if isinstance(tbl_cfg, str): + with open(tbl_cfg) as f: + tbl_cfg = json.load(f) + + # Check if multiplicity is given + if "multiplicity" not in tbl_cfg.keys(): + raise ValueError("multiplicity field missing") + + multi = int(tbl_cfg["multiplicity"]) + store = LH5Store() + # df = pd.DataFrame() + table = Table() + if "operations" in tbl_cfg.keys(): + for op in tbl_cfg["operations"].keys(): + miss_val = np.nan + if "missing_value" in tbl_cfg["operations"][op].keys(): + miss_val = tbl_cfg["operations"][op]["missing_value"] + if isinstance(miss_val, str) and ( + miss_val in ["np.nan", "np.inf", "-np.inf"] + ): + miss_val = eval(miss_val) + + fw_fld = tbl_cfg["operations"][op]["forward_field"].split(".") + + # load object if from evt tier + if fw_fld[0] == evt_group: + obj = store.read(f"/{fw_fld[0]}/{fw_fld[1]}", f_dict[fw_fld[0]])[ + 0 + ].view_as("ak") + + # else collect data from lower tier via tcm_idx + else: + if "tcm_idx" not in tbl_cfg["operations"][op].keys(): + raise ValueError( + f"{op} is an sub evt level operation. tcm_idx field must be specified" + ) + tcm_idx_fld = tbl_cfg["operations"][op]["tcm_idx"].split(".") + tcm_idx = store.read( + f"/{tcm_idx_fld[0]}/{tcm_idx_fld[1]}", f_dict[tcm_idx_fld[0]] + )[0].view_as("ak")[:, :multi] + + obj = ak.Array([[] for x in range(len(tcm_idx))]) + + # load TCM data to define an event + ids = store.read(f"/{tcm_group}/array_id", f_tcm)[0].view_as("ak") + ids = ak.unflatten(ids[ak.flatten(tcm_idx)], ak.count(tcm_idx, axis=-1)) + + idx = store.read(f"/{tcm_group}/array_idx", f_tcm)[0].view_as("ak") + idx = ak.unflatten(idx[ak.flatten(tcm_idx)], ak.count(tcm_idx, axis=-1)) + + if "tcm.array_id" == tbl_cfg["operations"][op]["forward_field"]: + obj = ids + elif "tcm.array_idx" == tbl_cfg["operations"][op]["forward_field"]: + obj = idx + + else: + chns = np.unique( + ak.to_numpy(ak.flatten(ids), allow_missing=False) + ).astype(int) + + # Get the data + for ch in chns: + ch_idx = idx[ids == ch] + ct_idx = ak.count(ch_idx, axis=-1) + fl_idx = ak.to_numpy(ak.flatten(ch_idx), allow_missing=False) + + if ( + f"{utils.get_table_name_by_pattern(tcm_id_table_pattern,ch)}/{fw_fld[0]}/{fw_fld[1]}" + not in lh5.ls(f_dict[fw_fld[0]], f"ch{ch}/{fw_fld[0]}/") + ): + och = Array(nda=np.full(len(fl_idx), miss_val)) + else: + och, _ = store.read( + f"{utils.get_table_name_by_pattern(tcm_id_table_pattern,ch)}/{fw_fld[0]}/{fw_fld[1]}", + f_dict[fw_fld[0]], + idx=fl_idx, + ) + if not isinstance(och, Array): + raise ValueError( + f"{type(och)} not supported. Forward only Array fields" + ) + och = och.view_as("ak") + och = ak.unflatten(och, ct_idx) + obj = ak.concatenate((obj, och), axis=-1) + + # Pad, clip and numpyfy + if obj.ndim > 1: + obj = ak.pad_none(obj, multi, clip=True) + obj = ak.to_numpy(ak.fill_none(obj, miss_val)) + + if obj.ndim > 1: + if "postfixes" in tbl_cfg.keys(): + nms = [f"{op}{x}" for x in tbl_cfg["postfixes"]] + else: + nms = [f"{op}_{x}" for x in range(multi)] + + for i in range(len(nms)): + # add attribute if present + ob = Array(nda=obj[:, i]) + if "lgdo_attrs" in tbl_cfg["operations"][op].keys(): + ob.attrs |= tbl_cfg["operations"][op]["lgdo_attrs"] + table.add_field(nms[i], ob, True) + else: + obj = Array(nda=obj) + if "lgdo_attrs" in tbl_cfg["operations"][op].keys(): + obj.attrs |= tbl_cfg["operations"][op]["lgdo_attrs"] + table.add_field(op, obj, True) + + # last thing missing is writing it out + if wo_mode not in ["w", "write_safe", "o", "overwrite", "a", "append"]: + raise ValueError(f"wo_mode {wo_mode} not valid.") + log.debug("saving skm file") + if (wo_mode in ["w", "write_safe"]) and os.path.exists(f_skm): + raise FileExistsError(f"Write_safe mode: {f_skm} exists.") + + wo = wo_mode if wo_mode not in ["o", "overwrite"] else "of" + store.write(obj=table, name=f"/{skm_group}/", lh5_file=f_skm, wo_mode=wo) diff --git a/tests/evt/configs/basic-evt-config.json b/tests/evt/configs/basic-evt-config.json new file mode 100644 index 000000000..3a8c62753 --- /dev/null +++ b/tests/evt/configs/basic-evt-config.json @@ -0,0 +1,90 @@ +{ + "channels": { + "geds_on": ["ch1084803", "ch1084804", "ch1121600"] + }, + "outputs": [ + "multiplicity", + "energy", + "energy_id", + "energy_idx", + "energy_any_above1MeV", + "energy_all_above1MeV", + "energy_aux", + "energy_sum", + "is_usable_aoe", + "aoe", + "is_aoe_rejected" + ], + "operations": { + "multiplicity": { + "channels": "geds_on", + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": { "a": 25 }, + "initial": 0, + "lgdo_attrs": { "statement": "0bb decay is real" } + }, + "energy": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", + "initial": "np.nan" + }, + "energy_id": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "initial": 0 + }, + "energy_idx": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.index", + "initial": 999999999999 + }, + "energy_any_above1MeV": { + "channels": "geds_on", + "aggregation_mode": "any", + "expression": "hit.cuspEmax_ctc_cal>1000", + "initial": false + }, + "energy_all_above1MeV": { + "channels": "geds_on", + "aggregation_mode": "all", + "expression": "hit.cuspEmax_ctc_cal>1000", + "initial": false + }, + "energy_aux": { + "channels": "geds_on", + "aggregation_mode": "last_at:dsp.tp_0_est", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", + "initial": "np.nan" + }, + "energy_sum": { + "channels": "geds_on", + "aggregation_mode": "sum", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", + "initial": 0.0 + }, + "is_usable_aoe": { + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "True", + "initial": false + }, + "aoe": { + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "hit.AoE_Classifier", + "initial": "np.nan" + }, + "is_aoe_rejected": { + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "~(hit.AoE_Double_Sided_Cut)", + "initial": false + } + } +} diff --git a/tests/evt/configs/module-test-evt-config.json b/tests/evt/configs/module-test-evt-config.json new file mode 100644 index 000000000..0daa94658 --- /dev/null +++ b/tests/evt/configs/module-test-evt-config.json @@ -0,0 +1,72 @@ +{ + "channels": { + "spms_on": ["ch1057600", "ch1059201", "ch1062405"], + "geds_on": ["ch1084803", "ch1084804", "ch1121600"] + }, + "outputs": [ + "energy_first", + "energy_first_id", + "t0", + "lar_energy", + "lar_multiplicity", + "is_lar_rejected", + "lar_classifier", + "lar_energy_dplms", + "lar_multiplicity_dplms", + "lar_time_shift" + ], + "operations": { + "energy_first": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", + "initial": "np.nan" + }, + "energy_first_id": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "initial": 0 + }, + "t0": { + "aggregation_mode": "keep_at_ch:evt.energy_first_id", + "expression": "dsp.tp_0_est", + "initial": 0.0 + }, + "lar_energy": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": "pygama.evt.modules.spm.get_energy(0.5,evt.t0,48000,1000,5000)" + }, + "lar_multiplicity": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority(0.5,evt.t0,48000,1000,5000)" + }, + "is_lar_rejected": { + "expression": "(evt.lar_energy >4) | (evt.lar_multiplicity > 4) " + }, + "lar_classifier": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_etc(0.5,evt.t0,48000,100,6000,80,1,0,50)" + }, + "lar_energy_dplms": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy_dplms(0.5,evt.t0,48000,1000,5000)" + }, + "lar_multiplicity_dplms": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority_dplms(0.5,evt.t0,48000,1000,5000)" + }, + "lar_time_shift": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_time_shift(0.5,evt.t0,48000,1000,5000)" + } + } +} diff --git a/tests/evt/configs/module-test-t0-vov-evt-config.json b/tests/evt/configs/module-test-t0-vov-evt-config.json new file mode 100644 index 000000000..cda042337 --- /dev/null +++ b/tests/evt/configs/module-test-t0-vov-evt-config.json @@ -0,0 +1,82 @@ +{ + "channels": { + "spms_on": ["ch1057600", "ch1059201", "ch1062405"], + "geds_on": ["ch1084803", "ch1084804", "ch1121600"] + }, + "outputs": [ + "energy", + "energy_id", + "t0", + "lar_energy", + "lar_multiplicity", + "is_lar_rejected", + "lar_classifier", + "lar_energy_dplms", + "lar_multiplicity_dplms", + "lar_time_shift", + "lar_tcm_index", + "lar_pulse_index" + ], + "operations": { + "energy": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal" + }, + "energy_id": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id" + }, + "t0": { + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "dsp.tp_0_est", + "initial": 0.0 + }, + "lar_energy": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy(0.5,evt.t0,48000,1000,5000)" + }, + "lar_multiplicity": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority(0.5,evt.t0,48000,1000,5000)" + }, + "is_lar_rejected": { + "expression": "(evt.lar_energy >4) | (evt.lar_multiplicity > 4) " + }, + "lar_classifier": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_etc(0.5,evt.t0,48000,100,6000,80,1,0,50)" + }, + "lar_energy_dplms": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_energy_dplms(0.5,evt.t0,48000,1000,5000)" + }, + "lar_multiplicity_dplms": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_majority_dplms(0.5,evt.t0,48000,1000,5000)" + }, + "lar_time_shift": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_time_shift(0.5,evt.t0,48000,1000,5000)" + }, + "lar_tcm_index": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_masked_tcm_idx(0.5,evt.t0,48000,1000,5000,1)" + }, + "lar_pulse_index": { + "channels": "spms_on", + "aggregation_mode": "function", + "expression": ".modules.spm.get_masked_tcm_idx(0.5,evt.t0,48000,1000,5000,0)" + } + } +} diff --git a/tests/evt/configs/query-test-evt-config.json b/tests/evt/configs/query-test-evt-config.json new file mode 100644 index 000000000..901d2d6c1 --- /dev/null +++ b/tests/evt/configs/query-test-evt-config.json @@ -0,0 +1,102 @@ +{ + "channels": { + "geds_on": ["ch1084803", "ch1084804", "ch1121600"] + }, + "outputs": [ + "multiplicity", + "test_sum", + "test_first", + "test_first2", + "test_last", + "test_last2", + "test_any", + "test_any2", + "test_all", + "test_all2", + "test_vov", + "test_vov2" + ], + "operations": { + "multiplicity": { + "channels": "geds_on", + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": { "a": 25 }, + "initial": 0 + }, + "test_sum": { + "channels": "geds_on", + "aggregation_mode": "sum", + "query": "evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_first": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "query": "evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_first2": { + "channels": "geds_on", + "aggregation_mode": "first_at:dsp.tp_0_est", + "expression": "True", + "initial": false + }, + "test_last": { + "channels": "geds_on", + "aggregation_mode": "last_at:dsp.tp_0_est", + "query": "evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_last2": { + "channels": "geds_on", + "aggregation_mode": "last_at:dsp.tp_0_est", + "expression": "True", + "initial": false + }, + "test_any": { + "channels": "geds_on", + "aggregation_mode": "any", + "query": "evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_any2": { + "channels": "geds_on", + "aggregation_mode": "any", + "query": "hit.cuspEmax_ctc_cal >25", + "expression": "True", + "initial": false + }, + "test_all": { + "channels": "geds_on", + "aggregation_mode": "all", + "query": "evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_all2": { + "channels": "geds_on", + "aggregation_mode": "all", + "query": "hit.cuspEmax_ctc_cal >25", + "expression": "True", + "initial": false + }, + "test_vov": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "evt.multiplicity == 1", + "expression": "True", + "initial": false + }, + "test_vov2": { + "channels": "geds_on", + "aggregation_mode": "gather", + "expression": "True", + "initial": false + } + } +} diff --git a/tests/evt/configs/vov-test-evt-config.json b/tests/evt/configs/vov-test-evt-config.json new file mode 100644 index 000000000..31334101e --- /dev/null +++ b/tests/evt/configs/vov-test-evt-config.json @@ -0,0 +1,85 @@ +{ + "channels": { + "geds_on": ["ch1084803", "ch1084804", "ch1121600"], + "ts_master": "ch1084803" + }, + "outputs": [ + "timestamp", + "energy", + "energy_sum", + "energy_id", + "energy_idx", + "aoe", + "aoe_idx", + "multiplicity", + "is_saturated", + "energy_times_aoe", + "energy_times_multiplicity", + "multiplicity_squared" + ], + "operations": { + "timestamp": { + "channels": "ts_master", + "aggregation_mode": "sum", + "expression": "dsp.timestamp", + "initial": 0.0 + }, + "energy": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal" + }, + "energy_sum": { + "channels": "geds_on", + "aggregation_mode": "sum", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "hit.cuspEmax_ctc_cal", + "initial": 0.0 + }, + "energy_idx": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.index", + "sort": "ascend_by:dsp.tp_0_est", + "initial": 0 + }, + "energy_id": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "sort": "ascend_by:dsp.tp_0_est", + "initial": 0 + }, + "aoe": { + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "hit.AoE_Classifier" + }, + "aoe_idx": { + "aggregation_mode": "keep_at_idx:evt.energy_idx", + "expression": "hit.AoE_Classifier" + }, + "multiplicity": { + "channels": "geds_on", + "aggregation_mode": "sum", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": { "a": 25 }, + "initial": 0 + }, + "is_saturated": { + "aggregation_mode": "keep_at_ch:evt.energy_id", + "expression": "hit.is_saturated" + }, + "energy_times_aoe": { + "expression": "evt.energy*evt.aoe" + }, + "energy_times_multiplicity": { + "expression": "evt.energy*evt.multiplicity" + }, + "multiplicity_squared": { + "expression": "evt.multiplicity*evt.multiplicity" + } + } +} diff --git a/tests/evt/test_build_evt.py b/tests/evt/test_build_evt.py new file mode 100644 index 000000000..0f193074c --- /dev/null +++ b/tests/evt/test_build_evt.py @@ -0,0 +1,313 @@ +import os +from pathlib import Path + +import awkward as ak +import numpy as np +import pytest +from lgdo import Array, VectorOfVectors, lh5 +from lgdo.lh5 import LH5Store + +from pygama.evt import build_evt + +config_dir = Path(__file__).parent / "configs" +store = LH5Store() + + +def test_basics(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{config_dir}/basic-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + assert "statement" in store.read("/evt/multiplicity", outfile)[0].getattrs().keys() + assert ( + store.read("/evt/multiplicity", outfile)[0].getattrs()["statement"] + == "0bb decay is real" + ) + assert os.path.exists(outfile) + assert len(lh5.ls(outfile, "/evt/")) == 11 + nda = { + e: store.read(f"/evt/{e}", outfile)[0].view_as("np") + for e in ["energy", "energy_aux", "energy_sum", "multiplicity"] + } + assert ( + nda["energy"][nda["multiplicity"] == 1] + == nda["energy_aux"][nda["multiplicity"] == 1] + ).all() + assert ( + nda["energy"][nda["multiplicity"] == 1] + == nda["energy_sum"][nda["multiplicity"] == 1] + ).all() + assert ( + nda["energy_aux"][nda["multiplicity"] == 1] + == nda["energy_sum"][nda["multiplicity"] == 1] + ).all() + + eid = store.read("/evt/energy_id", outfile)[0].view_as("np") + eidx = store.read("/evt/energy_idx", outfile)[0].view_as("np") + eidx = eidx[eidx != 999999999999] + + ids = store.read("hardware_tcm_1/array_id", lgnd_test_data.get_path(tcm_path))[ + 0 + ].view_as("np") + ids = ids[eidx] + assert ak.all(ids == eid[eid != 0]) + + +def test_lar_module(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{config_dir}/module-test-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + + assert os.path.exists(outfile) + assert len(lh5.ls(outfile, "/evt/")) == 10 + nda = { + e: store.read(f"/evt/{e}", outfile)[0].view_as("np") + for e in ["lar_multiplicity", "lar_multiplicity_dplms", "t0", "lar_time_shift"] + } + assert np.max(nda["lar_multiplicity"]) <= 3 + assert np.max(nda["lar_multiplicity_dplms"]) <= 3 + assert ((nda["lar_time_shift"] + nda["t0"]) >= 0).all() + + +def test_lar_t0_vov_module(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{config_dir}/module-test-t0-vov-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + + assert os.path.exists(outfile) + assert len(lh5.ls(outfile, "/evt/")) == 12 + nda = { + e: store.read(f"/evt/{e}", outfile)[0].view_as("np") + for e in ["lar_multiplicity", "lar_multiplicity_dplms", "lar_time_shift"] + } + assert np.max(nda["lar_multiplicity"]) <= 3 + assert np.max(nda["lar_multiplicity_dplms"]) <= 3 + + ch_idx = store.read("/evt/lar_tcm_index", outfile)[0].view_as("ak") + pls_idx = store.read("/evt/lar_pulse_index", outfile)[0].view_as("ak") + assert ak.count(ch_idx) == ak.count(pls_idx) + assert ak.all(ak.count(ch_idx, axis=-1) == ak.count(pls_idx, axis=-1)) + + +def test_vov(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{config_dir}/vov-test-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + + assert os.path.exists(outfile) + assert len(lh5.ls(outfile, "/evt/")) == 12 + vov_ene, _ = store.read("/evt/energy", outfile) + vov_aoe, _ = store.read("/evt/aoe", outfile) + arr_ac, _ = store.read("/evt/multiplicity", outfile) + vov_aoeene, _ = store.read("/evt/energy_times_aoe", outfile) + vov_eneac, _ = store.read("/evt/energy_times_multiplicity", outfile) + arr_ac2, _ = store.read("/evt/multiplicity_squared", outfile) + assert isinstance(vov_ene, VectorOfVectors) + assert isinstance(vov_aoe, VectorOfVectors) + assert isinstance(arr_ac, Array) + assert isinstance(vov_aoeene, VectorOfVectors) + assert isinstance(vov_eneac, VectorOfVectors) + assert isinstance(arr_ac2, Array) + assert (np.diff(vov_ene.cumulative_length.nda, prepend=[0]) == arr_ac.nda).all() + + vov_eid = store.read("/evt/energy_id", outfile)[0].view_as("ak") + vov_eidx = store.read("/evt/energy_idx", outfile)[0].view_as("ak") + vov_aoe_idx = store.read("/evt/aoe_idx", outfile)[0].view_as("ak") + + ids = store.read("hardware_tcm_1/array_id", lgnd_test_data.get_path(tcm_path))[ + 0 + ].view_as("ak") + ids = ak.unflatten(ids[ak.flatten(vov_eidx)], ak.count(vov_eidx, axis=-1)) + assert ak.all(ids == vov_eid) + + arr_ene = store.read("/evt/energy_sum", outfile)[0].view_as("ak") + assert ak.all(arr_ene == ak.nansum(vov_ene.view_as("ak"), axis=-1)) + assert ak.all(vov_aoe.view_as("ak") == vov_aoe_idx) + + +def test_graceful_crashing(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + f_tcm = lgnd_test_data.get_path(tcm_path) + f_dsp = lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")) + f_hit = lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")) + f_config = f"{config_dir}/basic-evt-config.json" + + with pytest.raises(KeyError): + build_evt(f_dsp, f_tcm, f_hit, outfile, f_config) + + with pytest.raises(KeyError): + build_evt(f_tcm, f_hit, f_dsp, outfile, f_config) + + with pytest.raises(TypeError): + build_evt(f_tcm, f_dsp, f_hit, outfile, None) + + conf = {"operations": {}} + with pytest.raises(ValueError): + build_evt(f_tcm, f_dsp, f_hit, outfile, conf) + + conf = {"channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}} + with pytest.raises(ValueError): + build_evt(f_tcm, f_dsp, f_hit, outfile, conf) + + conf = { + "channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}, + "outputs": ["foo"], + "operations": { + "foo": { + "channels": "geds_on", + "aggregation_mode": "banana", + "expression": "hit.cuspEmax_ctc_cal > a", + "parameters": {"a": 25}, + "initial": 0, + } + }, + } + with pytest.raises(ValueError): + build_evt(f_tcm, f_dsp, f_hit, outfile, conf) + + +def test_query(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{config_dir}/query-test-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + assert len(lh5.ls(outfile, "/evt/")) == 12 + + +def test_vector_sort(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + f_tcm = lgnd_test_data.get_path(tcm_path) + f_dsp = lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")) + f_hit = lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")) + + conf = { + "channels": {"geds_on": ["ch1084803", "ch1084804", "ch1121600"]}, + "outputs": ["acend_id", "t0_acend", "decend_id", "t0_decend"], + "operations": { + "acend_id": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "sort": "ascend_by:dsp.tp_0_est", + }, + "t0_acend": { + "aggregation_mode": "keep_at_ch:evt.acend_id", + "expression": "dsp.tp_0_est", + }, + "decend_id": { + "channels": "geds_on", + "aggregation_mode": "gather", + "query": "hit.cuspEmax_ctc_cal>25", + "expression": "tcm.array_id", + "sort": "descend_by:dsp.tp_0_est", + }, + "t0_decend": { + "aggregation_mode": "keep_at_ch:evt.acend_id", + "expression": "dsp.tp_0_est", + }, + }, + } + build_evt(f_tcm, f_dsp, f_hit, outfile, conf) + + assert os.path.exists(outfile) + assert len(lh5.ls(outfile, "/evt/")) == 4 + vov_t0, _ = store.read("/evt/t0_acend", outfile) + nda_t0 = vov_t0.to_aoesa().view_as("np") + assert ((np.diff(nda_t0) >= 0) | (np.isnan(np.diff(nda_t0)))).all() + vov_t0, _ = store.read("/evt/t0_decend", outfile) + nda_t0 = vov_t0.to_aoesa().view_as("np") + assert ((np.diff(nda_t0) <= 0) | (np.isnan(np.diff(nda_t0)))).all() + + +def test_tcm_id_table_pattern(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + f_tcm = lgnd_test_data.get_path(tcm_path) + f_dsp = lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")) + f_hit = lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")) + f_config = f"{config_dir}/basic-evt-config.json" + + with pytest.raises(ValueError): + build_evt(f_tcm, f_dsp, f_hit, outfile, f_config, tcm_id_table_pattern="ch{{}}") + with pytest.raises(ValueError): + build_evt(f_tcm, f_dsp, f_hit, outfile, f_config, tcm_id_table_pattern="ch{}{}") + with pytest.raises(NotImplementedError): + build_evt( + f_tcm, f_dsp, f_hit, outfile, f_config, tcm_id_table_pattern="ch{tcm_id}" + ) + with pytest.raises(ValueError): + build_evt( + f_tcm, f_dsp, f_hit, outfile, f_config, tcm_id_table_pattern="apple{}banana" + ) diff --git a/tests/skm/configs/basic-skm-config.json b/tests/skm/configs/basic-skm-config.json new file mode 100644 index 000000000..8037b21bf --- /dev/null +++ b/tests/skm/configs/basic-skm-config.json @@ -0,0 +1,25 @@ +{ + "multiplicity": 3, + "operations": { + "timestamp": { + "forward_field": "evt.timestamp", + "lgdo_attrs": { "info": "pk was here" } + }, + "energy_sum": { + "forward_field": "evt.energy_sum" + }, + "multiplicity": { + "forward_field": "evt.multiplicity" + }, + "energy": { + "forward_field": "hit.cuspEmax_ctc_cal", + "missing_value": "np.nan", + "tcm_idx": "evt.energy_idx" + }, + "energy_id": { + "forward_field": "tcm.array_id", + "missing_value": 0, + "tcm_idx": "evt.energy_idx" + } + } +} diff --git a/tests/skm/test_build_skm.py b/tests/skm/test_build_skm.py new file mode 100644 index 000000000..b23137ec6 --- /dev/null +++ b/tests/skm/test_build_skm.py @@ -0,0 +1,113 @@ +import os +from pathlib import Path + +import awkward as ak +import numpy as np +from lgdo.lh5 import LH5Store + +from pygama.evt import build_evt +from pygama.skm import build_skm + +config_dir = Path(__file__).parent / "configs" +evt_config_dir = Path(__file__).parent.parent / "evt" / "configs" +store = LH5Store() + + +def test_basics(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{evt_config_dir}/vov-test-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + + skm_conf = f"{config_dir}/basic-skm-config.json" + skm_out = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_skm.lh5" + build_skm( + outfile, + lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + lgnd_test_data.get_path(tcm_path), + skm_out, + skm_conf, + wo_mode="o", + ) + + assert os.path.exists(skm_out) + df = store.read("/skm/", skm_out)[0].view_as("pd") + assert "timestamp" in df.keys() + assert "energy_0" in df.keys() + assert "energy_1" in df.keys() + assert "energy_2" in df.keys() + assert "energy_id_0" in df.keys() + assert "energy_id_1" in df.keys() + assert "energy_id_2" in df.keys() + assert "multiplicity" in df.keys() + assert "energy_sum" in df.keys() + assert (df.multiplicity.to_numpy() <= 3).all() + assert ( + np.nan_to_num(df.energy_0.to_numpy()) + + np.nan_to_num(df.energy_1.to_numpy()) + + np.nan_to_num(df.energy_2.to_numpy()) + == df.energy_sum.to_numpy() + ).all() + + vov_eid = ak.to_numpy( + ak.fill_none( + ak.pad_none( + store.read("/evt/energy_id", outfile)[0].view_as("ak"), 3, clip=True + ), + 0, + ), + allow_missing=False, + ) + assert (vov_eid[:, 0] == df.energy_id_0.to_numpy()).all() + assert (vov_eid[:, 1] == df.energy_id_1.to_numpy()).all() + assert (vov_eid[:, 2] == df.energy_id_2.to_numpy()).all() + + +def test_attribute_passing(lgnd_test_data, tmptestdir): + outfile = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_evt.lh5" + tcm_path = "lh5/prod-ref-l200/generated/tier/tcm/phy/p03/r001/l200-p03-r001-phy-20230322T160139Z-tier_tcm.lh5" + if os.path.exists(outfile): + os.remove(outfile) + build_evt( + f_tcm=lgnd_test_data.get_path(tcm_path), + f_dsp=lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + f_hit=lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + f_evt=outfile, + evt_config=f"{evt_config_dir}/vov-test-evt-config.json", + wo_mode="o", + evt_group="evt", + hit_group="hit", + dsp_group="dsp", + tcm_group="hardware_tcm_1", + ) + + skm_conf = f"{config_dir}/basic-skm-config.json" + + skm_out = f"{tmptestdir}/l200-p03-r001-phy-20230322T160139Z-tier_skm.lh5" + + build_skm( + outfile, + lgnd_test_data.get_path(tcm_path.replace("tcm", "hit")), + lgnd_test_data.get_path(tcm_path.replace("tcm", "dsp")), + lgnd_test_data.get_path(tcm_path), + skm_out, + skm_conf, + wo_mode="o", + ) + + assert os.path.exists(skm_out) + assert "info" in store.read("/skm/timestamp", skm_out)[0].getattrs().keys() + assert store.read("/skm/timestamp", skm_out)[0].getattrs()["info"] == "pk was here"