Skip to content

Commit

Permalink
Allow expanded physical signal in calc_adc_params (#512)
Browse files Browse the repository at this point in the history
This PR updates `calc_adc_params` to allow an expanded physical signal
to be passed. Previously only a non-expanded signal was allowed. The
function will now calculate the `adc_gain` and `baseline` for
`p_signal` or `e_p_signal`.

I check for the signal that is not `None` and then compute the gain
and baseline. I moved the calculation itself to a new function
`calc_adc_gain_baseline` so we could call it for either signal type.
  • Loading branch information
Benjamin Moody committed Nov 25, 2024
2 parents 11adca8 + 1b610c8 commit 86028c4
Showing 1 changed file with 132 additions and 81 deletions.
213 changes: 132 additions & 81 deletions wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,21 +699,25 @@ def dac(self, expanded=False, return_res=64, inplace=False):

return p_signal

def calc_adc_params(self):
def calc_adc_gain_baseline(self, ch, minvals, maxvals):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.
Compute adc_gain and baseline parameters for a given channel.
Parameters
----------
N/A
ch: int
The channel that the adc_gain and baseline are being computed for.
minvals: list
The minimum values for each channel.
maxvals: list
The maximum values for each channel.
Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel.
adc_gain : float
Calculated `adc_gain` value for a given channel.
baseline : int
Calculated `baseline` value for a given channel.
Notes
-----
Expand All @@ -729,85 +733,132 @@ def calc_adc_params(self):
for calculated float `adc_gain` values.
"""
adc_gains = []
baselines = []
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1

pmin = minvals[ch]
pmax = maxvals[ch]

# Figure out digital samples used to store physical samples

# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g

# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.

# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin

# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))

# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin

# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32

return adc_gain, baseline

def calc_adc_params(self):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.
Parameters
----------
N/A
if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")
Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel
# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)
"""
adc_gains = []
baselines = []

for ch in range(np.shape(self.p_signal)[1]):
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1
if self.p_signal is not None:
if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")

pmin = minvals[ch]
pmax = maxvals[ch]
# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)

# Figure out digital samples used to store physical samples
for ch in range(np.shape(self.p_signal)[1]):
adc_gain, baseline = self.calc_adc_gain_baseline(
ch, minvals, maxvals
)
adc_gains.append(adc_gain)
baselines.append(baseline)

elif self.e_p_signal is not None:
minvals = []
maxvals = []
for ch in self.e_p_signal:
minvals.append(np.nanmin(ch))
maxvals.append(np.nanmax(ch))

if any(x == math.inf for x in minvals) or any(
x == math.inf for x in maxvals
):
raise ValueError("Signal contains inf. Cannot perform adc.")

for ch, _ in enumerate(self.e_p_signal):
adc_gain, baseline = self.calc_adc_gain_baseline(
ch, minvals, maxvals
)
adc_gains.append(adc_gain)
baselines.append(baseline)

# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g

# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.

# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin

# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))

# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin

# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32

adc_gains.append(adc_gain)
baselines.append(baseline)
else:
raise Exception(
"Must supply p_signal or e_p_signal to calc_adc_params"
)

return (adc_gains, baselines)

Expand Down

0 comments on commit 86028c4

Please sign in to comment.