-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstat_model_v3.py
171 lines (129 loc) · 5.49 KB
/
stat_model_v3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
from functools import reduce
from astropy.io import fits
from scipy.optimize import curve_fit
from scipy.special import wofz
from lmfit.models import GaussianModel, VoigtModel, LinearModel, ConstantModel
from sklearn.metrics import mean_squared_error
matplotlib.style.use("ggplot")
%matplotlib qt
# Import data
DATA_DIR = 'DATA/'
WAVE_TOP = 8410 # Angstrom
WAVE_BOTTOM = 8790
WAVERANGE = np.linspace(WAVE_TOP, WAVE_BOTTOM, 1000)
star_data = fits.getdata(DATA_DIR + 'data_stars.fits', 1, memmap=False)
spectra = pd.read_csv(DATA_DIR + 'spectra_fits.csv').iloc[:, :-1]
#________________ Define functions ____________________________________________#
def cauchy(x):
profile = (1 / np.pi) * (1 / (1 + x**2))
return profile
def gauss(x):
profile = (1 / np.sqrt(2 * np.pi)) * np.e**(-(1 / 2) * x**2)
return profile
def spectrum_c(wavelength, C1, C2, C3, C4, C5):
""" Lorentzian (Cauchy). """
f = C1 + C2 * (wavelength - waveref) + C3 * cauchy((wavelength - C4) / C5)
return f
def spectrum_g(wavelength, C1, C2, C3, C4, C5):
""" Gaussian."""
f = C1 + C2 * (wavelength - waveref) + C3 * gauss((wavelength - C4) / C5)
return f
def get_line_info(line_number, all=False):
left_line_1 = 8482.5
right_line_1 = 8502.5
left_line_2 = 8527
right_line_2 = 8547
left_line_3 = 8650
right_line_3 = 8670
# Get in aray for easier handling
line_cut_refs = [(left_line_1, right_line_1),
(left_line_2, right_line_2),
(left_line_3, right_line_3)]
if all == True:
return line_cut_refs
elif (all == False and line_number == 1):
return line_cut_refs[0]
elif (all == False and line_number == 2):
return line_cut_refs[1]
elif (all == False and line_number == 3):
return line_cut_refs[2]
def get_masks_lines(waverange):
line_cut_refs = get_line_info(None, True)
# Define continuum masks
mask_line_1 = ((waverange > line_cut_refs[0][0]) &
(waverange < line_cut_refs[0][1]))
mask_line_2 = ((waverange > line_cut_refs[1][0]) &
(waverange < line_cut_refs[1][1]))
mask_line_3 = ((waverange > line_cut_refs[2][0]) &
(waverange < line_cut_refs[2][1]))
return [mask_line_1, mask_line_2, mask_line_3]
def get_continuum(waverange_, flux_):
""" Enter the line info as a list/array with touples containing
the left and right wavelength values from which the line will be masked. """
# Import line data and pertinent corrections to NaN values
flux = flux_.fillna(0)
# Mask to remove 0 from the spectra
mask_nozero = np.array(flux != 0)
waverange = waverange_[mask_nozero]
flux = flux[mask_nozero]
mask_lines = get_masks_lines(waverange)
# Combine masks into one to avoid value overlap: this masks the continuum
mask_lines_final = np.array([any(tup) for tup in zip(mask_lines[0],
mask_lines[1],
mask_lines[2])])
return waverange[~mask_lines_final], flux[~mask_lines_final]
def get_filtered_line(waverange_, flux_, line_number):
""" Returns the selected line without the other two for proper fitting. """
# Import line data and pertinent corrections to NaN values
flux = flux_.fillna(0)
# Mask to remove 0 from the spectra
mask_nozero = np.array(flux != 0)
waverange = waverange_[mask_nozero]
flux = flux[mask_nozero]
mask_line = get_masks_lines(waverange)[line_number - 1]
# mask_lines = get_masks_lines(waverange)
# del mask_lines[line_number - 1]
#
# mask_lines_final = np.array([any(tup) for tup in zip(mask_lines[0],
# mask_lines[1])])
return waverange[mask_line], flux[mask_line]
i = 123
line_boundaries = get_line_info(1)
ref_wave = sum(line_boundaries) / 2
waveref = ref_wave
contwave, contflux = get_continuum(WAVERANGE, spectra.iloc[i])
# 1. Flux of the reference wavelenght (interval half)
# searchsorted finds the index where the value would be place in the sorted array
theta1_g = spectra.iloc[i][np.searchsorted(WAVERANGE, ref_wave, side='right')]
# 2. Continuum level
theta2_g = np.polyfit(contwave, contflux, deg=1)[0]
# 3. Strength of the absorption line
theta3_g = theta3_g = np.min(spectra.iloc[i][(
WAVERANGE > line_boundaries[0]) & (WAVERANGE < line_boundaries[1])])
# 4. Wavelength of the peak
mask_peak = ((WAVERANGE > line_boundaries[0]) &
(WAVERANGE < line_boundaries[1]))
theta4_g = WAVERANGE[spectra.iloc[i] == np.min(spectra.iloc[i][mask_peak])][0]
# 5. Line width
theta5_g = line_boundaries[1] - line_boundaries[0]
guess = ([theta1_g, theta2_g, theta3_g, theta4_g, theta5_g])
line_w, line_f = get_filtered_line(WAVERANGE, spectra.iloc[i], 1)
#________________ Defnie the lmfit params _____________________________________#
# Build models as Voigt and constant
model = VoigtModel() + ConstantModel()
params = model.make_params(ref_wave=theta1_g,
cont_level=theta2_g,
flux_max=theta3_g,
peak_wave=theta4_g,
width_peak=theta5_g)
params['center'].min = 8490
params['center'].max = 8494
params['fwhm'].max = 0.5
result = model.fit(line_f, params, x=line_w)
fig, ax = plt.subplots(1, figsize=[15, 10])
ax.plot(WAVERANGE, spectra.iloc[i])
ax.plot(line_w, result.best_fit, c='k')