Skip to content

Commit

Permalink
more bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ggmarshall committed Mar 8, 2024
1 parent 6995728 commit 5b08197
Showing 1 changed file with 70 additions and 116 deletions.
186 changes: 70 additions & 116 deletions src/pygama/pargen/energy_cal.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import pygama.math.binned_fitting as pgb
import pygama.math.distributions as pgf
import pygama.math.histogram as pgh
from pygama.math.histogram import get_i_local_extrema, get_i_local_maxima
import pygama.math.utils as pgu
from pygama.math.least_squares import fit_simple_scaling
from pygama.pargen.utils import convert_to_minuit, return_nans

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,7 +73,7 @@ def __init__(
):
self.energy_param = energy_param

if deg < 0:
if deg < -1:
log.error(f"hpge_E_cal warning: invalid deg = {deg}")
return
self.deg = int(deg)
Expand All @@ -82,14 +84,20 @@ def __init__(

if guess_kev <= 0:
log.error(f"hpge_E_cal warning: invalid guess_kev = {guess_kev}")
if deg > 0:
self.pars = np.zeros(self.deg + 1, dtype=float)
if deg ==-1:
self.pars = np.zeros(2, dtype=float)
self.pars[0] = guess_kev
self.fixed = {1:1}
elif deg == 0:
self.pars = np.zeros(2, dtype=float)
self.pars[1] = guess_kev
self.fixed = {0:0}
else:
self.pars = np.zeros(2, dtype=float)
self.pars = np.zeros(self.deg + 1, dtype=float)
self.pars[1] = guess_kev
self.fixed = fixed
self.results = {}
self.fixed = fixed


self.uncal_is_int = uncal_is_int
self.plot_options = plot_options
Expand Down Expand Up @@ -385,12 +393,12 @@ def hpge_get_energy_peaks(
for i, (li, ei) in enumerate(zip(self.peak_locs, self.peaks_kev)):
log.info(f"\t{i}".ljust(4) + str(ei).ljust(9) + f"| {li:g}".ljust(5))

def hpge_fit_energy_peak_tops(
def hpge_cal_energy_peak_tops(
self,
e_uncal,
):
"""
Fits gaussians to tops of peaks
Calibrates using energy peak top fits
"""

def hpge_fit_energy_peaks(
Expand Down Expand Up @@ -445,7 +453,7 @@ def hpge_fit_energy_peaks(

peak_pars_lines = [i[0] for i in peak_pars]
peaks_mask = np.array(
[True if peak in peak_pars_lines else False for peak in peaks_kev],
[True if peak in peaks_kev else False for peak in peak_pars_lines],
dtype=bool,
)
peak_pars = peak_pars[peaks_mask]
Expand Down Expand Up @@ -564,7 +572,7 @@ def hpge_fit_energy_peaks(
bounds = get_hpge_energy_bounds(func_i, x0)

pars_i, errs_i, cov_i = pgb.fit_binned(
func_i.pdf,
func_i.get_pdf,
hist,
bins,
var=var,
Expand All @@ -580,7 +588,7 @@ def hpge_fit_energy_peaks(
hist,
bins,
None,
func_i.pdf,
func_i.get_pdf,
pars_i,
method="Pearson",
scale_bins=False,
Expand Down Expand Up @@ -722,7 +730,7 @@ def hpge_fit_energy_peaks(

elif peak_param == "mode":
mus = [
func_i.get_mode(func_i, pars_i, cov=cov_i)
func_i.get_mode(pars_i, cov=cov_i)
for func_i, pars_i, cov_i in zip(pk_funcs, pk_pars, pk_covs)
]
mus, mu_vars = zip(*mus)
Expand All @@ -745,7 +753,7 @@ def hpge_fit_energy_peaks(
# Now fit the E scale
try:
pars, errs, cov = hpge_fit_energy_scale(
mus, mu_vars, fitted_peaks_kev, deg=len(self.pars), fixed=self.fixed
mus, mu_vars, fitted_peaks_kev, deg=self.deg, fixed=self.fixed
)

results_dict["pk_cal_pars"] = pars
Expand All @@ -758,10 +766,11 @@ def hpge_fit_energy_peaks(
mu_vars,
fitted_peaks_kev,
pars,
deg=len(self.pars),
deg=self.deg,
fixed=self.fixed,
)
self.pars = np.array(pars)

except ValueError:
log.error("Failed to fit enough peaks to get accurate calibration")

Expand Down Expand Up @@ -968,7 +977,7 @@ def full_calibration(
n_events=None,
):
log.debug(f"Find peaks and compute calibration curve for {self.energy_param}")
log.debug(f"Guess is {self.guess_kev:.3f}")
log.debug(f"Guess is {self.pars[1]:.3f}")
self.hpge_find_energy_peaks(e_uncal)
self.hpge_get_energy_peaks(e_uncal)

Expand Down Expand Up @@ -1016,7 +1025,10 @@ def full_calibration(
)

if self.pars is None:
self.pars = np.full(self.deg + 1, np.nan)
if self.deg <1:
self.pars = np.full(2, np.nan)
else:
self.pars = np.full(self.deg+1, np.nan)

log.error(f"Calibration failed completely for {self.energy_param}")
return
Expand Down Expand Up @@ -1061,7 +1073,7 @@ def calibrate_prominent_peak(
n_events=None,
):
log.debug(f"Find peaks and compute calibration curve for {self.energy_param}")
log.debug(f"Guess is {self.guess_kev:.3f}")
log.debug(f"Guess is {self.pars[1]:.3f}")
if self.deg != 0:
log.error("deg must be 0 for calibrate_prominent_peak")
return
Expand Down Expand Up @@ -1517,10 +1529,10 @@ def unbinned_staged_energy_fit(
if lock_guess is False:
if len(x0) == len(x1):
cs, _ = pgb.goodness_of_fit(
gof_hist, gof_bins, None, func.pdf, x0, method="Pearson"
gof_hist, gof_bins, None, func.get_pdf, x0, method="Pearson"
)
cs2, _ = pgb.goodness_of_fit(
gof_hist, gof_bins, None, func.pdf, x1, method="Pearson"
gof_hist, gof_bins, None, func.get_pdf, x1, method="Pearson"
)
if cs >= cs2:
x0 = x1
Expand Down Expand Up @@ -1620,7 +1632,7 @@ def unbinned_staged_energy_fit(
gof_hist,
gof_bins,
gof_var,
func.pdf,
func.get_pdf,
m.values,
method="Pearson",
scale_bins=True,
Expand Down Expand Up @@ -1650,7 +1662,7 @@ def unbinned_staged_energy_fit(
gof_hist,
gof_bins,
gof_var,
func.pdf,
func.get_pdf,
m2.values,
method="Pearson",
scale_bins=True,
Expand All @@ -1663,8 +1675,8 @@ def unbinned_staged_energy_fit(
frac_errors2 = np.sum(np.abs(np.array(m2.errors)[mask] / np.array(m2.values)[mask]))

if display > 1:
m_fit = func.pdf(bin_cs, *m.values) * np.diff(bin_cs)[0]
m2_fit = func.pdf(bin_cs, *m2.values) * np.diff(bin_cs)[0]
m_fit = func.get_pdf(bin_cs, *m.values) * np.diff(bin_cs)[0]
m2_fit = func.get_pdf(bin_cs, *m2.values) * np.diff(bin_cs)[0]
plt.figure()
plt.step(bin_cs, hist, label="hist")
plt.plot(bin_cs, func(bin_cs, *x0)[1], label="Guess")
Expand All @@ -1688,7 +1700,7 @@ def unbinned_staged_energy_fit(
gof_hist,
gof_bins,
gof_var,
func.pdf,
func.get_pdf,
m.values,
method="Pearson",
scale_bins=True,
Expand Down Expand Up @@ -1740,18 +1752,30 @@ def unbinned_staged_energy_fit(
debug_string = f'dropping tail tail val : {fit[0]["htail"]} tail err : {fit[1]["htail"]} '
debug_string += f"p_val no tail: : {p_val_no_tail} p_val with tail: {p_val}"
log.debug(debug_string)

#if display > 0:
m_fit = pgf.gauss_on_step.get_pdf(bin_cs, *fit_no_tail[0])
m_fit_tail = pgf.hpge_peak.get_pdf(bin_cs, *fit[0])
plt.figure()
plt.step(bin_cs, hist, where="mid", label="hist")
plt.plot(
bin_cs,
m_fit * np.diff(bin_cs)[0],
label=f"Drop tail: {p_val_no_tail}",
)
plt.plot(
bin_cs,
m_fit_tail * np.diff(bin_cs)[0],
label=f"Drop tail: {p_val}",
)
plt.plot(
bin_cs,
pgf.hpge_peak.pdf_ext(bin_cs, *fit[0])[1] * np.diff(bin_cs)[0],
label=f"Drop tail: {p_val}",
)
plt.legend()
plt.show()
fit = fit_no_tail
if display > 0:
m_fit = pgf.gauss_on_step.pdf(bin_cs, *fit[0])
plt.figure()
plt.step(bin_cs, hist, where="mid", label="hist")
plt.plot(
bin_cs,
m_fit * np.diff(bin_cs)[0],
label=f"Drop tail: {p_val_no_tail}",
)
plt.legend()
plt.show()
return fit


Expand Down Expand Up @@ -1785,9 +1809,9 @@ def hpge_fit_energy_scale(mus, mu_vars, energies_kev, deg=0, fixed=None):
covariance matrix for the best fit parameters.
"""
if deg == 0:
scale, scale_cov = pgu.fit_simple_scaling(energies_kev, mus, var=mu_vars)
pars = np.array([scale, 0])
cov = np.array([[scale_cov, 0], [0, 0]])
scale, scale_cov = fit_simple_scaling(energies_kev, mus, var=mu_vars)
pars = np.array([0,scale])
cov = np.array([[0, 0], [0, scale_cov]])
errs = np.diag(np.sqrt(cov))
else:
poly_pars = (
Expand Down Expand Up @@ -1849,10 +1873,10 @@ def hpge_fit_energy_cal_func(
covariance matrix for the best fit parameters.
"""
if deg == 0:
e_vars = mu_vars / energy_scale_pars[0] ** 2
scale, scale_cov = pgu.fit_simple_scaling(mus, energies_kev, var=e_vars)
pars = np.array([scale, 0])
cov = np.array([[scale_cov, 0], [0, 0]])
e_vars = mu_vars / energy_scale_pars[1] ** 2
scale, scale_cov = fit_simple_scaling(mus, energies_kev, var=e_vars)
pars = np.array([0,scale])
cov = np.array([[0, 0], [0, scale_cov]])
errs = np.diag(np.sqrt(cov))
else:
d_mu_d_es = np.zeros(len(mus))
Expand Down Expand Up @@ -1953,13 +1977,13 @@ def poly_match(xx, yy, deg=-1, rtol=1e-5, atol=1e-8, fixed=None):

# simple shift
if deg == -1:
pars_i = np.array([1, (np.sum(yy_i) - np.sum(xx_i)) / len(yy_i)])
polxx = xx_i + pars_i[1]
pars_i = np.array([(np.sum(yy_i) - np.sum(xx_i)) / len(yy_i), 1])
polxx = xx_i + pars_i[0]

# simple scaling
elif deg == 0:
pars_i = np.array([np.sum(yy_i * xx_i) / np.sum(xx_i * xx_i), 0])
polxx = pars_i[0] * xx_i
pars_i = np.array([0, np.sum(yy_i * xx_i) / np.sum(xx_i * xx_i)])
polxx = pars_i[1] * xx_i

# generic poly of degree >= 1
else:
Expand Down Expand Up @@ -2040,7 +2064,7 @@ def poly_match(xx, yy, deg=-1, rtol=1e-5, atol=1e-8, fixed=None):

return pars, best_ixtup, best_iytup


# move these to dataflow
def get_peak_labels(
labels: list[str], pars: list[float]
) -> tuple(list[float], list[float]):
Expand Down Expand Up @@ -2582,73 +2606,3 @@ def bin_survival_fraction(
sf = 100 * (counts_pass + 10 ** (-6)) / (counts_pass + counts_fail + 10 ** (-6))
return {"bins": pgh.get_bin_centers(bins_pass), "sf": sf}


# does this belong here, could move to math?
def get_i_local_extrema(data, delta):
"""Get lists of indices of the local maxima and minima of data
The "local" extrema are those maxima / minima that have heights / depths of
at least delta.
Converted from MATLAB script at: http://billauer.co.il/peakdet.html
Parameters
----------
data : array-like
the array of data within which extrema will be found
delta : scalar
the absolute level by which data must vary (in one direction) about an
extremum in order for it to be tagged
Returns
-------
imaxes, imins : 2-tuple ( array, array )
A 2-tuple containing arrays of variable length that hold the indices of
the identified local maxima (first tuple element) and minima (second
tuple element)
"""

# prepare output
imaxes, imins = [], []

# sanity checks
data = np.asarray(data)
if not np.isscalar(delta):
log.error("get_i_local_extrema: Input argument delta must be a scalar")
return np.array(imaxes), np.array(imins)
if delta <= 0:
log.error(f"get_i_local_extrema: delta ({delta}) must be positive")
return np.array(imaxes), np.array(imins)

# now loop over data
imax, imin = 0, 0
find_max = True
for i in range(len(data)):
if data[i] > data[imax]:
imax = i
if data[i] < data[imin]:
imin = i

if find_max:
# if the sample is less than the current max by more than delta,
# declare the previous one a maximum, then set this as the new "min"
if data[i] < data[imax] - delta:
imaxes.append(imax)
imin = i
find_max = False
else:
# if the sample is more than the current min by more than delta,
# declare the previous one a minimum, then set this as the new "max"
if data[i] > data[imin] + delta:
imins.append(imin)
imax = i
find_max = True

return np.array(imaxes), np.array(imins)


def get_i_local_maxima(data, delta):
return get_i_local_extrema(data, delta)[0]


def get_i_local_minima(data, delta):
return get_i_local_extrema(data, delta)[1]

0 comments on commit 5b08197

Please sign in to comment.