Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][FIX] irfft: ramp correction and apodization fix for asymmetric interferograms #641

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 49 additions & 42 deletions orangecontrib/spectroscopy/irfft.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def find_zpd(ifg, peak_search):
else:
raise NotImplementedError

def wing_size(ifg, zpd):
"""Calculate negative and positive wing size (including zpd point)"""
wing_n = zpd + 1
wing_p = ifg.shape[-1] - zpd
return wing_n, wing_p

def apodize(ifg, zpd, apod_func):
"""
Perform apodization of asymmetric interferogram using selected apodization
Expand All @@ -73,36 +79,26 @@ def apodize(ifg, zpd, apod_func):
ifg_apod (np.array): apodized interferogram(s)
"""

# Calculate negative and positive wing size
# correcting zpd from 0-based index
ifg_N = ifg.shape[-1]
wing_n = zpd + 1
wing_p = ifg_N - (zpd + 1)

if apod_func == ApodFunc.BOXCAR:
# Boxcar apodization AKA as-collected
return ifg

elif apod_func == ApodFunc.BLACKMAN_HARRIS_3:
# Calculate negative and positive wing size
# correcting zpd from 0-based index
ifg_N = ifg.shape[-1]
wing_n, wing_p = wing_size(ifg, zpd)
# Use larger wing to define apodization window width
M = max(wing_n, wing_p)

# Generate apodization function
if apod_func == ApodFunc.BLACKMAN_HARRIS_3:
# Blackman-Harris (3-term)
# Reference: W. Herres and J. Gronholz, Bruker
# "Understanding FT-IR Data Processing"
A0 = 0.42323
A1 = 0.49755
A2 = 0.07922
A3 = 0.0
n_n = np.arange(wing_n)
n_p = np.arange(wing_p)
Bs_n = A0\
+ A1 * np.cos(np.pi*n_n/wing_n)\
+ A2 * np.cos(np.pi*2*n_n/wing_n)\
+ A3 * np.cos(np.pi*3*n_n/wing_n)
Bs_p = A0\
+ A1 * np.cos(np.pi*n_p/wing_p)\
+ A2 * np.cos(np.pi*2*n_p/wing_p)\
+ A3 * np.cos(np.pi*3*n_p/wing_p)
Bs = np.hstack((Bs_n[::-1], Bs_p))

elif apod_func == ApodFunc.BLACKMAN_HARRIS_4:
# Blackman-Harris (4-term)
# Reference: W. Herres and J. Gronholz, Bruker
Expand All @@ -111,31 +107,28 @@ def apodize(ifg, zpd, apod_func):
A1 = 0.48829
A2 = 0.14128
A3 = 0.01168
n_n = np.arange(wing_n)
n_p = np.arange(wing_p)
Bs_n = A0\
+ A1 * np.cos(np.pi*n_n/wing_n)\
+ A2 * np.cos(np.pi*2*n_n/wing_n)\
+ A3 * np.cos(np.pi*3*n_n/wing_n)
Bs_p = A0\
+ A1 * np.cos(np.pi*n_p/wing_p)\
+ A2 * np.cos(np.pi*2*n_p/wing_p)\
+ A3 * np.cos(np.pi*3*n_p/wing_p)
Bs = np.hstack((Bs_n[::-1], Bs_p))

elif apod_func == ApodFunc.BLACKMAN_NUTTALL:
# Blackman-Nuttall (Eric Peach)
# TODO I think this has silent problems with asymmetric interferograms
delta = np.min([wing_n, wing_p])

# Create Blackman Nuttall Window according to the formula given by Wolfram.
xs = np.arange(ifg_N)
Bs = np.zeros(ifg_N)
Bs = 0.3635819\
- 0.4891775 * np.cos(2*np.pi*xs/(2*delta - 1))\
+ 0.1365995 * np.cos(4*np.pi*xs/(2*delta - 1))\
- 0.0106411 * np.cos(6*np.pi*xs/(2*delta - 1))
# Blackman-Nuttall
# Reference: A. Nuttall, "Some windows with very good sidelobe behavior," IEEE
# Transactions on Acoustics, Speech, and Signal Processing, 1981.
A0 = 0.3635819
A1 = 0.4891775
A2 = 0.1365995
A3 = 0.0106411
else:
raise ValueError(f"Invalid apodization function {apod_func}")

n = np.arange(1-M, M, 1)
Bs = A0 \
+ A1 * np.cos(np.pi*n/(M-1)) \
+ A2 * np.cos(np.pi*2*n/(M-1)) \
+ A3 * np.cos(np.pi*3*n/(M-1))

# Select appropriate subsection of apodization function
if wing_n > wing_p:
Bs = Bs[:ifg_N]
else:
Bs = Bs[2 * M - 1 - ifg_N:]
# Apodize the sampled Interferogram
try:
ifg_apod = ifg * Bs
Expand All @@ -144,6 +137,18 @@ def apodize(ifg, zpd, apod_func):

return ifg_apod

def ramp(ifg, zpd):
wing_n, wing_p = wing_size(ifg, zpd)
# Use smaller wing to define symmetric subset size
M = min(wing_n, wing_p)
ramp = np.linspace(0, 1, 2 * M - 1)
if wing_n < wing_p:
ifg[..., :ramp.shape[0]] *= ramp
else:
ifg[..., -ramp.shape[0]:] *= ramp[::-1]

return ifg

def _zero_fill_size(ifg_N, zff):
# Calculate desired array size
Nzff = ifg_N * zff
Expand Down Expand Up @@ -227,6 +232,7 @@ def __call__(self, ifg, zpd=None, phase=None):
# Note that L is now the zpd index
Ixs = ifg[self.zpd - L : self.zpd + L].copy()
ifg = apodize(ifg, self.zpd, self.apod_func)
ifg = ramp(ifg, self.zpd)
ifg = zero_fill(ifg, self.zff)
ifg = np.hstack((ifg[self.zpd:], ifg[0:self.zpd]))
Nzff = ifg.shape[0]
Expand Down Expand Up @@ -312,6 +318,7 @@ def __call__(self, ifg, zpd=None, phase=None):
# Note that L is now the zpd index
Ixs = ifg[:, self.zpd - L : self.zpd + L].copy()
ifg = apodize(ifg, self.zpd, self.apod_func)
ifg = ramp(ifg, self.zpd)
ifg = zero_fill(ifg, self.zff)
ifg = np.hstack((ifg[:, self.zpd:], ifg[:, 0:self.zpd]))
Nzff = ifg.shape[1]
Expand Down
47 changes: 40 additions & 7 deletions orangecontrib/spectroscopy/tests/test_irfft.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from orangecontrib.spectroscopy.irfft import (IRFFT, zero_fill, PhaseCorrection,
find_zpd, PeakSearch, ApodFunc,
MultiIRFFT,
)
MultiIRFFT, apodize, ramp,
)

dx = 1.0 / 15797.337544 / 2.0

Expand All @@ -19,6 +19,14 @@ def setUp(self):
self.ifg_single = Orange.data.Table("IFG_single.dpt")
self.ifg_seq_ref = Orange.data.Table("agilent/background_agg256.seq")
self.sc_dat_ref = Orange.data.Table("agilent/background_agg256.dat")
self.ifgs = {
'even_sym': self.ifg_single.X[0],
'odd_sym': self.ifg_single.X[0][1:],
'even_asym': self.ifg_seq_ref.X[0][1:],
'odd_asym': self.ifg_seq_ref.X[0],
'even_asym_reverse': self.ifg_seq_ref.X[0][1:][::-1],
'odd_asym_reverse': self.ifg_seq_ref.X[0][::-1],
}

def test_zero_fill(self):
N = 1975
Expand All @@ -32,11 +40,18 @@ def test_zero_fill(self):
# Final array should be >= N * zff
assert N_zf >= N * zff

def test_simple_fft(self):
def test_simple_fft_even(self):
"""Even array, symmetric ifg"""
data = self.ifg_single.X[0]
fft = IRFFT(dx=dx)
fft(data)

def test_simple_fft_odd(self):
"""Odd array, asymmetric ifg"""
data = self.ifg_seq_ref.X[0]
fft = IRFFT(dx=dx)
fft(data)

def test_stored_phase_zpd(self):
data = self.ifg_single.X[0]
fft = IRFFT(dx=dx)
Expand Down Expand Up @@ -96,8 +111,8 @@ def test_agilent_fft_ab(self):
# Calculate absorbance from ssc and rsc
ab = np.log10(rsc / ssc)
# Compare to agilent absorbance
# NB 4 mAbs error
np.testing.assert_allclose(ab[limits[0]:limits[1]], dat, atol=0.004)
# NB 0.4 mAbs max error
np.testing.assert_allclose(ab[limits[0]:limits[1]], dat, rtol=2.5e-04)

def test_multi(self):
dx_ag = (1 / 1.57980039e+04 / 2) * 4
Expand Down Expand Up @@ -133,5 +148,23 @@ def test_multi_ab(self):
# Calculate absorbance from ssc and rsc
ab = np.log10(rsc / ssc)
# Compare to agilent absorbance
# NB 4 mAbs error
np.testing.assert_allclose(ab[:, limits[0]:limits[1]], dat, atol=0.004)
# NB 0.4 mAbs max error
np.testing.assert_allclose(ab[:, limits[0]:limits[1]], dat, rtol=2.5e-04)

def test_apodization(self):
for apod_func in ApodFunc:
for k, ifg in self.ifgs.items():
with self.subTest(apod_func=apod_func.name, ifg=k):
zpd = find_zpd(ifg, PeakSearch.ABSOLUTE)
out = apodize(ifg, zpd, apod_func)
# Apodization should not change value at zpd
self.assertAlmostEqual(ifg[zpd], out[zpd])

def test_ramp(self):
ifg = self.ifgs['odd_asym']
zpd = find_zpd(ifg, PeakSearch.ABSOLUTE)
ramp_fwd = ramp(ifg, zpd)
ifg_rev = self.ifgs['odd_asym_reverse']
zpd_rev = find_zpd(ifg_rev, PeakSearch.ABSOLUTE)
ramp_rev = ramp(ifg_rev, zpd_rev)
np.testing.assert_array_equal(ramp_fwd, ramp_rev[::-1])