diff --git a/src/pygama/pargen/energy_cal.py b/src/pygama/pargen/energy_cal.py index b0a347e56..e3009c1e4 100644 --- a/src/pygama/pargen/energy_cal.py +++ b/src/pygama/pargen/energy_cal.py @@ -23,7 +23,9 @@ import pygama.math.binned_fitting as pgb import pygama.math.distributions as pgf import pygama.math.histogram as pgh +from pygama.math.histogram import get_i_local_extrema, get_i_local_maxima import pygama.math.utils as pgu +from pygama.math.least_squares import fit_simple_scaling from pygama.pargen.utils import convert_to_minuit, return_nans log = logging.getLogger(__name__) @@ -71,7 +73,7 @@ def __init__( ): self.energy_param = energy_param - if deg < 0: + if deg < -1: log.error(f"hpge_E_cal warning: invalid deg = {deg}") return self.deg = int(deg) @@ -82,14 +84,20 @@ def __init__( if guess_kev <= 0: log.error(f"hpge_E_cal warning: invalid guess_kev = {guess_kev}") - if deg > 0: - self.pars = np.zeros(self.deg + 1, dtype=float) + if deg ==-1: + self.pars = np.zeros(2, dtype=float) + self.pars[0] = guess_kev + self.fixed = {1:1} + elif deg == 0: + self.pars = np.zeros(2, dtype=float) self.pars[1] = guess_kev + self.fixed = {0:0} else: - self.pars = np.zeros(2, dtype=float) + self.pars = np.zeros(self.deg + 1, dtype=float) self.pars[1] = guess_kev + self.fixed = fixed self.results = {} - self.fixed = fixed + self.uncal_is_int = uncal_is_int self.plot_options = plot_options @@ -385,12 +393,12 @@ def hpge_get_energy_peaks( for i, (li, ei) in enumerate(zip(self.peak_locs, self.peaks_kev)): log.info(f"\t{i}".ljust(4) + str(ei).ljust(9) + f"| {li:g}".ljust(5)) - def hpge_fit_energy_peak_tops( + def hpge_cal_energy_peak_tops( self, e_uncal, ): """ - Fits gaussians to tops of peaks + Calibrates using energy peak top fits """ def hpge_fit_energy_peaks( @@ -445,7 +453,7 @@ def hpge_fit_energy_peaks( peak_pars_lines = [i[0] for i in peak_pars] peaks_mask = np.array( - [True if peak in peak_pars_lines else False for peak in peaks_kev], + [True if peak in peaks_kev else False for peak in peak_pars_lines], dtype=bool, ) peak_pars = peak_pars[peaks_mask] @@ -564,7 +572,7 @@ def hpge_fit_energy_peaks( bounds = get_hpge_energy_bounds(func_i, x0) pars_i, errs_i, cov_i = pgb.fit_binned( - func_i.pdf, + func_i.get_pdf, hist, bins, var=var, @@ -580,7 +588,7 @@ def hpge_fit_energy_peaks( hist, bins, None, - func_i.pdf, + func_i.get_pdf, pars_i, method="Pearson", scale_bins=False, @@ -722,7 +730,7 @@ def hpge_fit_energy_peaks( elif peak_param == "mode": mus = [ - func_i.get_mode(func_i, pars_i, cov=cov_i) + func_i.get_mode(pars_i, cov=cov_i) for func_i, pars_i, cov_i in zip(pk_funcs, pk_pars, pk_covs) ] mus, mu_vars = zip(*mus) @@ -745,7 +753,7 @@ def hpge_fit_energy_peaks( # Now fit the E scale try: pars, errs, cov = hpge_fit_energy_scale( - mus, mu_vars, fitted_peaks_kev, deg=len(self.pars), fixed=self.fixed + mus, mu_vars, fitted_peaks_kev, deg=self.deg, fixed=self.fixed ) results_dict["pk_cal_pars"] = pars @@ -758,10 +766,11 @@ def hpge_fit_energy_peaks( mu_vars, fitted_peaks_kev, pars, - deg=len(self.pars), + deg=self.deg, fixed=self.fixed, ) self.pars = np.array(pars) + except ValueError: log.error("Failed to fit enough peaks to get accurate calibration") @@ -968,7 +977,7 @@ def full_calibration( n_events=None, ): log.debug(f"Find peaks and compute calibration curve for {self.energy_param}") - log.debug(f"Guess is {self.guess_kev:.3f}") + log.debug(f"Guess is {self.pars[1]:.3f}") self.hpge_find_energy_peaks(e_uncal) self.hpge_get_energy_peaks(e_uncal) @@ -1016,7 +1025,10 @@ def full_calibration( ) if self.pars is None: - self.pars = np.full(self.deg + 1, np.nan) + if self.deg <1: + self.pars = np.full(2, np.nan) + else: + self.pars = np.full(self.deg+1, np.nan) log.error(f"Calibration failed completely for {self.energy_param}") return @@ -1061,7 +1073,7 @@ def calibrate_prominent_peak( n_events=None, ): log.debug(f"Find peaks and compute calibration curve for {self.energy_param}") - log.debug(f"Guess is {self.guess_kev:.3f}") + log.debug(f"Guess is {self.pars[1]:.3f}") if self.deg != 0: log.error("deg must be 0 for calibrate_prominent_peak") return @@ -1517,10 +1529,10 @@ def unbinned_staged_energy_fit( if lock_guess is False: if len(x0) == len(x1): cs, _ = pgb.goodness_of_fit( - gof_hist, gof_bins, None, func.pdf, x0, method="Pearson" + gof_hist, gof_bins, None, func.get_pdf, x0, method="Pearson" ) cs2, _ = pgb.goodness_of_fit( - gof_hist, gof_bins, None, func.pdf, x1, method="Pearson" + gof_hist, gof_bins, None, func.get_pdf, x1, method="Pearson" ) if cs >= cs2: x0 = x1 @@ -1620,7 +1632,7 @@ def unbinned_staged_energy_fit( gof_hist, gof_bins, gof_var, - func.pdf, + func.get_pdf, m.values, method="Pearson", scale_bins=True, @@ -1650,7 +1662,7 @@ def unbinned_staged_energy_fit( gof_hist, gof_bins, gof_var, - func.pdf, + func.get_pdf, m2.values, method="Pearson", scale_bins=True, @@ -1663,8 +1675,8 @@ def unbinned_staged_energy_fit( frac_errors2 = np.sum(np.abs(np.array(m2.errors)[mask] / np.array(m2.values)[mask])) if display > 1: - m_fit = func.pdf(bin_cs, *m.values) * np.diff(bin_cs)[0] - m2_fit = func.pdf(bin_cs, *m2.values) * np.diff(bin_cs)[0] + m_fit = func.get_pdf(bin_cs, *m.values) * np.diff(bin_cs)[0] + m2_fit = func.get_pdf(bin_cs, *m2.values) * np.diff(bin_cs)[0] plt.figure() plt.step(bin_cs, hist, label="hist") plt.plot(bin_cs, func(bin_cs, *x0)[1], label="Guess") @@ -1688,7 +1700,7 @@ def unbinned_staged_energy_fit( gof_hist, gof_bins, gof_var, - func.pdf, + func.get_pdf, m.values, method="Pearson", scale_bins=True, @@ -1740,18 +1752,30 @@ def unbinned_staged_energy_fit( debug_string = f'dropping tail tail val : {fit[0]["htail"]} tail err : {fit[1]["htail"]} ' debug_string += f"p_val no tail: : {p_val_no_tail} p_val with tail: {p_val}" log.debug(debug_string) + + #if display > 0: + m_fit = pgf.gauss_on_step.get_pdf(bin_cs, *fit_no_tail[0]) + m_fit_tail = pgf.hpge_peak.get_pdf(bin_cs, *fit[0]) + plt.figure() + plt.step(bin_cs, hist, where="mid", label="hist") + plt.plot( + bin_cs, + m_fit * np.diff(bin_cs)[0], + label=f"Drop tail: {p_val_no_tail}", + ) + plt.plot( + bin_cs, + m_fit_tail * np.diff(bin_cs)[0], + label=f"Drop tail: {p_val}", + ) + plt.plot( + bin_cs, + pgf.hpge_peak.pdf_ext(bin_cs, *fit[0])[1] * np.diff(bin_cs)[0], + label=f"Drop tail: {p_val}", + ) + plt.legend() + plt.show() fit = fit_no_tail - if display > 0: - m_fit = pgf.gauss_on_step.pdf(bin_cs, *fit[0]) - plt.figure() - plt.step(bin_cs, hist, where="mid", label="hist") - plt.plot( - bin_cs, - m_fit * np.diff(bin_cs)[0], - label=f"Drop tail: {p_val_no_tail}", - ) - plt.legend() - plt.show() return fit @@ -1785,9 +1809,9 @@ def hpge_fit_energy_scale(mus, mu_vars, energies_kev, deg=0, fixed=None): covariance matrix for the best fit parameters. """ if deg == 0: - scale, scale_cov = pgu.fit_simple_scaling(energies_kev, mus, var=mu_vars) - pars = np.array([scale, 0]) - cov = np.array([[scale_cov, 0], [0, 0]]) + scale, scale_cov = fit_simple_scaling(energies_kev, mus, var=mu_vars) + pars = np.array([0,scale]) + cov = np.array([[0, 0], [0, scale_cov]]) errs = np.diag(np.sqrt(cov)) else: poly_pars = ( @@ -1849,10 +1873,10 @@ def hpge_fit_energy_cal_func( covariance matrix for the best fit parameters. """ if deg == 0: - e_vars = mu_vars / energy_scale_pars[0] ** 2 - scale, scale_cov = pgu.fit_simple_scaling(mus, energies_kev, var=e_vars) - pars = np.array([scale, 0]) - cov = np.array([[scale_cov, 0], [0, 0]]) + e_vars = mu_vars / energy_scale_pars[1] ** 2 + scale, scale_cov = fit_simple_scaling(mus, energies_kev, var=e_vars) + pars = np.array([0,scale]) + cov = np.array([[0, 0], [0, scale_cov]]) errs = np.diag(np.sqrt(cov)) else: d_mu_d_es = np.zeros(len(mus)) @@ -1953,13 +1977,13 @@ def poly_match(xx, yy, deg=-1, rtol=1e-5, atol=1e-8, fixed=None): # simple shift if deg == -1: - pars_i = np.array([1, (np.sum(yy_i) - np.sum(xx_i)) / len(yy_i)]) - polxx = xx_i + pars_i[1] + pars_i = np.array([(np.sum(yy_i) - np.sum(xx_i)) / len(yy_i), 1]) + polxx = xx_i + pars_i[0] # simple scaling elif deg == 0: - pars_i = np.array([np.sum(yy_i * xx_i) / np.sum(xx_i * xx_i), 0]) - polxx = pars_i[0] * xx_i + pars_i = np.array([0, np.sum(yy_i * xx_i) / np.sum(xx_i * xx_i)]) + polxx = pars_i[1] * xx_i # generic poly of degree >= 1 else: @@ -2040,7 +2064,7 @@ def poly_match(xx, yy, deg=-1, rtol=1e-5, atol=1e-8, fixed=None): return pars, best_ixtup, best_iytup - +# move these to dataflow def get_peak_labels( labels: list[str], pars: list[float] ) -> tuple(list[float], list[float]): @@ -2582,73 +2606,3 @@ def bin_survival_fraction( sf = 100 * (counts_pass + 10 ** (-6)) / (counts_pass + counts_fail + 10 ** (-6)) return {"bins": pgh.get_bin_centers(bins_pass), "sf": sf} - -# does this belong here, could move to math? -def get_i_local_extrema(data, delta): - """Get lists of indices of the local maxima and minima of data - - The "local" extrema are those maxima / minima that have heights / depths of - at least delta. - Converted from MATLAB script at: http://billauer.co.il/peakdet.html - - Parameters - ---------- - data : array-like - the array of data within which extrema will be found - delta : scalar - the absolute level by which data must vary (in one direction) about an - extremum in order for it to be tagged - - Returns - ------- - imaxes, imins : 2-tuple ( array, array ) - A 2-tuple containing arrays of variable length that hold the indices of - the identified local maxima (first tuple element) and minima (second - tuple element) - """ - - # prepare output - imaxes, imins = [], [] - - # sanity checks - data = np.asarray(data) - if not np.isscalar(delta): - log.error("get_i_local_extrema: Input argument delta must be a scalar") - return np.array(imaxes), np.array(imins) - if delta <= 0: - log.error(f"get_i_local_extrema: delta ({delta}) must be positive") - return np.array(imaxes), np.array(imins) - - # now loop over data - imax, imin = 0, 0 - find_max = True - for i in range(len(data)): - if data[i] > data[imax]: - imax = i - if data[i] < data[imin]: - imin = i - - if find_max: - # if the sample is less than the current max by more than delta, - # declare the previous one a maximum, then set this as the new "min" - if data[i] < data[imax] - delta: - imaxes.append(imax) - imin = i - find_max = False - else: - # if the sample is more than the current min by more than delta, - # declare the previous one a minimum, then set this as the new "max" - if data[i] > data[imin] + delta: - imins.append(imin) - imax = i - find_max = True - - return np.array(imaxes), np.array(imins) - - -def get_i_local_maxima(data, delta): - return get_i_local_extrema(data, delta)[0] - - -def get_i_local_minima(data, delta): - return get_i_local_extrema(data, delta)[1]