-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path20080227a.py
240 lines (225 loc) · 7.86 KB
/
20080227a.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""
Given N free energies, get an NxN rate matrix.
Updated 20111111.
"""
from StringIO import StringIO
import math
import numpy as np
import scipy
import Form
import FormOut
import iterutils
from MatrixUtil import ndot
#TODO maybe this should return a rate matrix object instead of a report
def get_form():
"""
@return: the body of a form
"""
# define the default energy string
default_energies = [2, 4, 6, 8]
default_energy_string = '\n'.join(str(E) for E in default_energies)
# define the form objects
form_objects = [
Form.MultiLine('energies', 'ordered energies',
default_energy_string)]
return form_objects
def get_form_out():
return FormOut.Report()
def R_to_distn(R):
"""
@param R: rate matrix
@return: stationary distribution
"""
n = len(R)
Wl, Vl = scipy.linalg.eig(R, left=True, right=False)
val_vec_pairs = [(abs(Wl[i]), Vl[:,i]) for i in range(n)]
dummy, pi_eigenvector = min(val_vec_pairs)
total = np.sum(pi_eigenvector)
pi_arr = np.array([v/total for v in pi_eigenvector])
return pi_arr
def get_numerical_identicality(R, t):
"""
@param R: rate matrix
@param t: time
@return: probability that two t-separated observations are identical
"""
pi_arr = R_to_distn(R)
T = scipy.linalg.expm(R*t)
return np.dot(pi_arr, np.diag(T))
def get_identicality_params(R):
"""
This returns the parameters for an identicality function.
If the rate matrix has n states
then the identicality function is
f(t) = a1*exp(b1*t) + a2*exp(b2*t) + ... + a{n-1}*exp(b{n-1}*t) + c
@param R: time reversible rate matrix
@return: a array, b array, c
"""
n = len(R)
pi_arr = R_to_distn(R)
# symmetrize
lam = np.diag(np.sqrt(pi_arr))
rlam = np.diag(np.reciprocal(np.sqrt(pi_arr)))
S = ndot(lam, R, rlam)
print 'S should be symmetric:'
print S
print S - S.T
# eigendecompose the symmetric matrix
W, V = scipy.linalg.eigh(S)
w_v_pairs = [(W[i], V[:,i]) for i in range(n)]
# get the exponential coefficients
eps = 1e-12
identicality_coeffs = [
np.dot(pi_arr, v*v) for w, v in w_v_pairs if abs(w) > eps]
# get the exponential rate constants
identicality_rates = [
w for w in W if abs(w) > eps]
# get the one dimensional constant
identicality_const = np.inner(pi_arr, pi_arr)
# return the identicality parameters
return (identicality_coeffs, identicality_rates, identicality_const)
def get_symbolic_identicality(coeffs, rates, c, t):
"""
@param coeffs: exponential coefficients
@param rates: exponential rate constants
@param c: a numerical constant
@param t: elapsed time
"""
return c + sum(coeff * math.exp(r*t) for coeff, r in zip(coeffs, rates))
def get_identicality_derivative(coeffs, rates, t):
return sum(r * coeff * math.exp(r*t) for coeff, r in zip(coeffs, rates))
def get_response_content(fs):
# read the energies from the form data
energies = []
for line in iterutils.stripped_lines(fs.energies.splitlines()):
try:
energy = float(line)
except ValueError as e:
raise ValueError('invalid energy: %s' % line)
energies.append(energy)
n = len(energies)
if n > 100:
raise ValueError('too many energies')
# compute the rate matrix
R = np.zeros((n, n))
for row in range(n):
for col in range(n):
rate = math.exp(-(energies[col] - energies[row]))
R[row, col] = rate
for i, r in enumerate(R):
R[i, i] = -np.sum(r) + 1
# get the transition matrix at large finite time
large_t = 1000.0
T = scipy.linalg.expm(R*large_t)
# eigendecompose
Wr, Vr = scipy.linalg.eig(R, left=False, right=True)
Wl, Vl = scipy.linalg.eig(R, left=True, right=False)
# get left eigenvector associated with stationary distribution
val_vec_pairs = [(abs(Wl[i]), Vl[:,i]) for i in range(n)]
dummy, pi_eigenvector = min(val_vec_pairs)
# get the stationary distribution itself
total = np.sum(pi_eigenvector)
pi_arr = np.array([v/total for v in pi_eigenvector])
# get the square root stationary vector and diagonal matrix
sqrt_pi_arr = np.sqrt(pi_arr)
lam = np.diag(sqrt_pi_arr)
# get reciprocal arrays
recip_sqrt_pi_arr = np.reciprocal(sqrt_pi_arr)
recip_lam = np.reciprocal(lam)
# print things
np.set_printoptions(linewidth=300)
out = StringIO()
print >> out, 'rate matrix:'
print >> out, R
print >> out
print >> out, 'rate matrix row sums:'
print >> out, np.sum(R, axis=1)
print >> out
print >> out, 'eigenvalues:'
print >> out, Wr
print >> out
print >> out, 'corresponding orthonormal right eigenvectors (columns):'
print >> out, Vr
print >> out
print >> out, 'eigenvalues:'
print >> out, Wl
print >> out
print >> out, 'corresponding orthonormal left eigenvectors (columns):'
print >> out, Vl
print >> out
print >> out, 'L2 normalized eigenvector associated with stationary distn:'
print >> out, pi_eigenvector
print >> out
print >> out, 'L1 renormalized vector (the stationary distribution):'
print >> out, pi_arr
print >> out
print >> out
# eigendecompose the transition matrix
Wr, Vr = scipy.linalg.eig(T, left=False, right=True)
Wl, Vl = scipy.linalg.eig(T, left=True, right=False)
print >> out, 'transition matrix for t=%f:' % large_t
print >> out, T
print >> out
print >> out, 'transition matrix row sums:'
print >> out, np.sum(T, axis=1)
print >> out
print >> out, 'eigenvalues:'
print >> out, Wr
print >> out
print >> out, 'corresponding orthonormal right eigenvectors (columns):'
print >> out, Vr
print >> out
print >> out, 'eigenvalues:'
print >> out, Wl
print >> out
print >> out, 'corresponding orthonormal left eigenvectors (columns):'
print >> out, Vl
print >> out
print >> out, 'incorrect reconstitution of the transition matrix:'
print >> out, ndot(Vr, np.diag(Wr), Vl.T)
print >> out
print >> out
# Use the known properties of reversibility to symmetrize the matrix.
t = 3
coeffs, rates, c = get_identicality_params(R)
print >> out, 'brute identicality computation for t=%f:' % t
print >> out, get_numerical_identicality(R, t)
print >> out
print >> out, 'sophisticated identicality computation for t=%f:' % t
print >> out, get_symbolic_identicality(coeffs, rates, c, t)
print >> out
print >> out
# Try another couple rate matrices.
e2 = math.exp(2)
en2 = math.exp(-2)
rate_matrices = [
np.array([[-2.0, 2.0], [2.0, -2.0]]),
np.array([[-1.0, 1.0], [3.0, -3.0]]),
np.array([[-1, 1, 0], [1, -2, 1], [0, 1, -1]]),
#np.array([[-4.0, 4.0, 0], [1, -2, 1], [0, 4, -4]])]
#np.array([[-1, 1, 0], [7, -14, 7], [0, 1, -1]])]
np.array([[-en2, en2, 0], [e2, -2*e2, e2], [0, en2, -en2]])]
t = 3.0
for R in rate_matrices:
coeffs, rates, c = get_identicality_params(R)
print >> out, 'test rate matrix:'
print >> out, R
print >> out
print >> out, 'eigenvalues:'
print >> out, scipy.linalg.eigvals(R)
print >> out
print >> out, 'stationary distribution:'
print >> out, R_to_distn(R)
print >> out
print >> out, 'brute identicality computation for t=%f:' % t
print >> out, get_numerical_identicality(R, t)
print >> out
print >> out, 'sophisticated identicality computation for t=%f:' % t
print >> out, get_symbolic_identicality(coeffs, rates, c, t)
print >> out
print >> out, 'identicality derivative for t=%f:' % t
print >> out, get_identicality_derivative(coeffs, rates, t)
print >> out
print >> out
# return the message
return out.getvalue().rstrip()