From c9808ad43edc1dc9fc52f61b6c1e099b31d3b963 Mon Sep 17 00:00:00 2001 From: Brian Gow Date: Tue, 29 Oct 2024 10:07:22 -0400 Subject: [PATCH 1/3] allow expanded physical signal in calc_adc_params --- wfdb/io/_signal.py | 201 +++++++++++++++++++++++++++------------------ 1 file changed, 122 insertions(+), 79 deletions(-) diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index e3df065e..db878bc7 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -699,21 +699,25 @@ def dac(self, expanded=False, return_res=64, inplace=False): return p_signal - def calc_adc_params(self): + def calc_adc_gain_baseline(self, ch, minvals, maxvals): """ - Compute appropriate adc_gain and baseline parameters for adc - conversion, given the physical signal and the fmts. + Compute adc_gain and baseline parameters for a given channel. Parameters ---------- - N/A + ch: int + The channel that the adc_gain and baseline are being computed for. + minvals: list + The minimum values for each channel. + maxvals: list + The maximum values for each channel. Returns ------- - adc_gains : list - List of calculated `adc_gain` values for each channel. - baselines : list - List of calculated `baseline` values for each channel. + adc_gain : float + Calculated `adc_gain` value for a given channel. + baseline : int + Calculated `baseline` value for a given channel. Notes ----- @@ -728,86 +732,125 @@ def calc_adc_params(self): int32 `baseline` values, but does not consider over/underflow for calculated float `adc_gain` values. + """ + # Get the minimum and maximum (valid) storage values + dmin, dmax = _digi_bounds(self.fmt[ch]) + # add 1 because the lowest value is used to store nans + dmin = dmin + 1 + + pmin = minvals[ch] + pmax = maxvals[ch] + + # Figure out digital samples used to store physical samples + + # If the entire signal is NAN, gain/baseline won't be used + if pmin == np.nan: + adc_gain = 1 + baseline = 1 + # If the signal is just one value, store one digital value. + elif pmin == pmax: + if pmin == 0: + adc_gain = 1 + baseline = 1 + else: + # All digital values are +1 or -1. Keep adc_gain > 0 + adc_gain = abs(1 / pmin) + baseline = 0 + # Regular varied signal case. + else: + # The equation is: p = (d - b) / g + + # Approximately, pmax maps to dmax, and pmin maps to + # dmin. Gradient will be equal to, or close to + # delta(d) / delta(p), since intercept baseline has + # to be an integer. + + # Constraint: baseline must be between +/- 2**31 + adc_gain = (dmax - dmin) / (pmax - pmin) + baseline = dmin - adc_gain * pmin + + # Make adjustments for baseline to be an integer + # This up/down round logic of baseline is to ensure + # there is no overshoot of dmax. Now pmax will map + # to dmax or dmax-1 which is also fine. + if pmin > 0: + baseline = int(np.ceil(baseline)) + else: + baseline = int(np.floor(baseline)) + + # After baseline is set, adjust gain correspondingly.Set + # the gain to map pmin to dmin, and p==0 to baseline. + # In the case where pmin == 0 and dmin == baseline, + # adc_gain is already correct. Avoid dividing by 0. + if dmin != baseline: + adc_gain = (dmin - baseline) / pmin + + # Remap signal if baseline exceeds boundaries. + # This may happen if pmax < 0 + if baseline > MAX_I32: + # pmin maps to dmin, baseline maps to 2**31 - 1 + # pmax will map to a lower value than before + adc_gain = (MAX_I32) - dmin / abs(pmin) + baseline = MAX_I32 + # This may happen if pmin > 0 + elif baseline < MIN_I32: + # pmax maps to dmax, baseline maps to -2**31 + 1 + adc_gain = (dmax - MIN_I32) / pmax + baseline = MIN_I32 + + return adc_gain, baseline + + def calc_adc_params(self): + """ + Compute appropriate adc_gain and baseline parameters for adc + conversion, given the physical signal and the fmts. + + Parameters + ---------- + N/A + + Returns + ------- + adc_gains : list + List of calculated `adc_gain` values for each channel. + baselines : list + List of calculated `baseline` values for each channel + """ adc_gains = [] baselines = [] - if np.where(np.isinf(self.p_signal))[0].size: - raise ValueError("Signal contains inf. Cannot perform adc.") + if self.p_signal is not None: + if np.where(np.isinf(self.p_signal))[0].size: + raise ValueError("Signal contains inf. Cannot perform adc.") - # min and max ignoring nans, unless whole channel is NAN. - # Should suppress warning message. - minvals = np.nanmin(self.p_signal, axis=0) - maxvals = np.nanmax(self.p_signal, axis=0) + # min and max ignoring nans, unless whole channel is NAN. + # Should suppress warning message. + minvals = np.nanmin(self.p_signal, axis=0) + maxvals = np.nanmax(self.p_signal, axis=0) - for ch in range(np.shape(self.p_signal)[1]): - # Get the minimum and maximum (valid) storage values - dmin, dmax = _digi_bounds(self.fmt[ch]) - # add 1 because the lowest value is used to store nans - dmin = dmin + 1 + for ch in range(np.shape(self.p_signal)[1]): + adc_gain, baseline = self.calc_adc_gain_baseline(ch, minvals, maxvals) + adc_gains.append(adc_gain) + baselines.append(baseline) - pmin = minvals[ch] - pmax = maxvals[ch] + elif self.e_p_signal is not None: + minvals = [] + maxvals = [] + for ch in self.e_p_signal: + minvals.append(min(x for x in ch if not math.isnan(x))) + maxvals.append(max(x for x in ch if not math.isnan(x))) - # Figure out digital samples used to store physical samples + if any(x == math.inf for x in minvals) or any(x == math.inf for x in maxvals): + raise ValueError("Signal contains inf. Cannot perform adc.") - # If the entire signal is NAN, gain/baseline won't be used - if pmin == np.nan: - adc_gain = 1 - baseline = 1 - # If the signal is just one value, store one digital value. - elif pmin == pmax: - if pmin == 0: - adc_gain = 1 - baseline = 1 - else: - # All digital values are +1 or -1. Keep adc_gain > 0 - adc_gain = abs(1 / pmin) - baseline = 0 - # Regular varied signal case. - else: - # The equation is: p = (d - b) / g - - # Approximately, pmax maps to dmax, and pmin maps to - # dmin. Gradient will be equal to, or close to - # delta(d) / delta(p), since intercept baseline has - # to be an integer. - - # Constraint: baseline must be between +/- 2**31 - adc_gain = (dmax - dmin) / (pmax - pmin) - baseline = dmin - adc_gain * pmin - - # Make adjustments for baseline to be an integer - # This up/down round logic of baseline is to ensure - # there is no overshoot of dmax. Now pmax will map - # to dmax or dmax-1 which is also fine. - if pmin > 0: - baseline = int(np.ceil(baseline)) - else: - baseline = int(np.floor(baseline)) - - # After baseline is set, adjust gain correspondingly.Set - # the gain to map pmin to dmin, and p==0 to baseline. - # In the case where pmin == 0 and dmin == baseline, - # adc_gain is already correct. Avoid dividing by 0. - if dmin != baseline: - adc_gain = (dmin - baseline) / pmin - - # Remap signal if baseline exceeds boundaries. - # This may happen if pmax < 0 - if baseline > MAX_I32: - # pmin maps to dmin, baseline maps to 2**31 - 1 - # pmax will map to a lower value than before - adc_gain = (MAX_I32) - dmin / abs(pmin) - baseline = MAX_I32 - # This may happen if pmin > 0 - elif baseline < MIN_I32: - # pmax maps to dmax, baseline maps to -2**31 + 1 - adc_gain = (dmax - MIN_I32) / pmax - baseline = MIN_I32 - - adc_gains.append(adc_gain) - baselines.append(baseline) + for ch, _ in enumerate(self.e_p_signal): + adc_gain, baseline = self.calc_adc_gain_baseline(ch, minvals, maxvals) + adc_gains.append(adc_gain) + baselines.append(baseline) + + else: + raise Exception('Must supply p_signal or e_p_signal to calc_adc_params') return (adc_gains, baselines) From 06154409cbba43fa15e7627fe4b03da04dc774d1 Mon Sep 17 00:00:00 2001 From: Brian Gow Date: Tue, 29 Oct 2024 10:52:45 -0400 Subject: [PATCH 2/3] formatted with black package --- wfdb/io/_signal.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index db878bc7..e600dd48 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -830,7 +830,9 @@ def calc_adc_params(self): maxvals = np.nanmax(self.p_signal, axis=0) for ch in range(np.shape(self.p_signal)[1]): - adc_gain, baseline = self.calc_adc_gain_baseline(ch, minvals, maxvals) + adc_gain, baseline = self.calc_adc_gain_baseline( + ch, minvals, maxvals + ) adc_gains.append(adc_gain) baselines.append(baseline) @@ -841,16 +843,22 @@ def calc_adc_params(self): minvals.append(min(x for x in ch if not math.isnan(x))) maxvals.append(max(x for x in ch if not math.isnan(x))) - if any(x == math.inf for x in minvals) or any(x == math.inf for x in maxvals): + if any(x == math.inf for x in minvals) or any( + x == math.inf for x in maxvals + ): raise ValueError("Signal contains inf. Cannot perform adc.") for ch, _ in enumerate(self.e_p_signal): - adc_gain, baseline = self.calc_adc_gain_baseline(ch, minvals, maxvals) + adc_gain, baseline = self.calc_adc_gain_baseline( + ch, minvals, maxvals + ) adc_gains.append(adc_gain) baselines.append(baseline) else: - raise Exception('Must supply p_signal or e_p_signal to calc_adc_params') + raise Exception( + "Must supply p_signal or e_p_signal to calc_adc_params" + ) return (adc_gains, baselines) From 1b610c82d0937bd316b2ff253c9f48a77b0ac8c0 Mon Sep 17 00:00:00 2001 From: Brian Gow Date: Mon, 4 Nov 2024 14:05:03 -0500 Subject: [PATCH 3/3] update min max call --- wfdb/io/_signal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wfdb/io/_signal.py b/wfdb/io/_signal.py index e600dd48..5e70397f 100644 --- a/wfdb/io/_signal.py +++ b/wfdb/io/_signal.py @@ -840,8 +840,8 @@ def calc_adc_params(self): minvals = [] maxvals = [] for ch in self.e_p_signal: - minvals.append(min(x for x in ch if not math.isnan(x))) - maxvals.append(max(x for x in ch if not math.isnan(x))) + minvals.append(np.nanmin(ch)) + maxvals.append(np.nanmax(ch)) if any(x == math.inf for x in minvals) or any( x == math.inf for x in maxvals