Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NaN handling in Record.adc, and other fixes #481

Merged
merged 13 commits into from
Apr 19, 2024
8 changes: 4 additions & 4 deletions tests/test_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,19 +1053,20 @@ def test_physical_conversion(self):
adc_gain = [1.0, 1234.567, 765.4321]
baseline = [10, 20, -30]
d_signal = np.repeat(np.arange(-100, 100), 3).reshape(-1, 3)
d_signal[5:10, :] = [-32768, -2048, -128]
e_d_signal = list(d_signal.transpose())
fmt = ["16", "16", "16"]
fmt = ["16", "212", "80"]

# Test adding or subtracting a small offset (0.01 ADU) to check
# that we correctly round to the nearest integer
for offset in (0, -0.01, 0.01):
p_signal = (d_signal + offset - baseline) / adc_gain
p_signal[5:10, :] = np.nan
e_p_signal = list(p_signal.transpose())

# Test converting p_signal to d_signal

record = wfdb.Record(
n_sig=n_sig,
p_signal=p_signal.copy(),
adc_gain=adc_gain,
baseline=baseline,
Expand All @@ -1081,7 +1082,6 @@ def test_physical_conversion(self):
# Test converting e_p_signal to e_d_signal

record = wfdb.Record(
n_sig=n_sig,
e_p_signal=[s.copy() for s in e_p_signal],
adc_gain=adc_gain,
baseline=baseline,
Expand All @@ -1108,7 +1108,7 @@ def test_physical_conversion(self):
p_signal=p_signal,
adc_gain=adc_gain,
baseline=baseline,
fmt=["16", "16", "16"],
fmt=fmt,
write_dir=self.temp_path,
)
record = wfdb.rdrecord(
Expand Down
84 changes: 38 additions & 46 deletions wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,68 +532,60 @@ def adc(self, expanded=False, inplace=False):
# To do: choose the minimum return res needed
intdtype = "int64"

# Convert a physical (1D or 2D) signal array to digital. Note that
# the input array is modified!
def adc_inplace(p_signal, adc_gain, baseline, d_nan):
nanlocs = np.isnan(p_signal)
np.multiply(p_signal, adc_gain, p_signal)
np.add(p_signal, baseline, p_signal)
np.round(p_signal, 0, p_signal)
np.copyto(p_signal, d_nan, where=nanlocs)
d_signal = p_signal.astype(intdtype, copy=False)
return d_signal

# Do inplace conversion and set relevant variables.
if inplace:
if expanded:
for ch in range(self.n_sig):
# NAN locations for the channel
ch_nanlocs = np.isnan(self.e_p_signal[ch])
np.multiply(
self.e_p_signal[ch],
for ch, ch_p_signal in enumerate(self.e_p_signal):
ch_d_signal = adc_inplace(
ch_p_signal,
self.adc_gain[ch],
self.e_p_signal[ch],
)
np.add(
self.e_p_signal[ch],
self.baseline[ch],
self.e_p_signal[ch],
)
np.round(self.e_p_signal[ch], 0, self.e_p_signal[ch])
self.e_p_signal[ch] = self.e_p_signal[ch].astype(
intdtype, copy=False
d_nans[ch],
)
self.e_p_signal[ch][ch_nanlocs] = d_nans[ch]
self.e_p_signal[ch] = ch_d_signal
self.e_d_signal = self.e_p_signal
self.e_p_signal = None
else:
nanlocs = np.isnan(self.p_signal)
np.multiply(self.p_signal, self.adc_gain, self.p_signal)
np.add(self.p_signal, self.baseline, self.p_signal)
np.round(self.p_signal, 0, self.p_signal)
self.p_signal = self.p_signal.astype(intdtype, copy=False)
self.d_signal = self.p_signal
self.d_signal = adc_inplace(
self.p_signal,
self.adc_gain,
self.baseline,
d_nans,
)
self.p_signal = None

# Return the variable
else:
if expanded:
d_signal = []
for ch in range(self.n_sig):
# NAN locations for the channel
ch_nanlocs = np.isnan(self.e_p_signal[ch])
ch_d_signal = self.e_p_signal[ch].copy()
np.multiply(ch_d_signal, self.adc_gain[ch], ch_d_signal)
np.add(ch_d_signal, self.baseline[ch], ch_d_signal)
np.round(ch_d_signal, 0, ch_d_signal)
ch_d_signal = ch_d_signal.astype(intdtype, copy=False)
ch_d_signal[ch_nanlocs] = d_nans[ch]
d_signal.append(ch_d_signal)
e_d_signal = []
for ch, ch_p_signal in enumerate(self.e_p_signal):
ch_d_signal = adc_inplace(
ch_p_signal.copy(),
self.adc_gain[ch],
self.baseline[ch],
d_nans[ch],
)
e_d_signal.append(ch_d_signal)
return e_d_signal

else:
nanlocs = np.isnan(self.p_signal)
# Cannot cast dtype to int now because gain is float.
d_signal = self.p_signal.copy()
np.multiply(d_signal, self.adc_gain, d_signal)
np.add(d_signal, self.baseline, d_signal)
np.round(d_signal, 0, d_signal)
d_signal = d_signal.astype(intdtype, copy=False)

if nanlocs.any():
for ch in range(d_signal.shape[1]):
if nanlocs[:, ch].any():
d_signal[nanlocs[:, ch], ch] = d_nans[ch]

return d_signal
return adc_inplace(
self.p_signal.copy(),
self.adc_gain,
self.baseline,
d_nans,
)

def dac(self, expanded=False, return_res=64, inplace=False):
"""
Expand Down
Loading