-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathassignment3_joris.py
404 lines (320 loc) · 12.9 KB
/
assignment3_joris.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
#!/usr/bin/env python
"""
Author: Joris van Steenbrugge
Description: Train a Hidden Markov Model and sample possible sequences
based on the probabilities.
"""
from __future__ import division
from sys import argv
from random import random
from Bio import SeqIO
import numpy
# Background amino acid probabilities
pa = { 'A': 0.074, 'C': 0.025, 'D': 0.054, 'E': 0.054, 'F': 0.047, 'G': 0.074,
'H': 0.026, 'I': 0.068, 'L': 0.099, 'K': 0.058, 'M': 0.025, 'N': 0.045,
'P': 0.039, 'Q': 0.034, 'R': 0.052, 'S': 0.057, 'T': 0.051, 'V': 0.073,
'W': 0.013, 'Y': 0.034 }
class HMM():
"""HMM object to store an HMM model
This object is designed to keep track of all HMM states, emissions, and
transitions. It may be used in your implementation, but may also be ignored,
and replaced by a data structure of choice
"""
# Emission probabilities for the match and insert states
e_m = []; e_i = pa;
# Transition probabilities from/to matches, inserts and deletions
t_mm = []; t_mi = []; t_md = [];
t_im = []; t_ii = []
t_dm = []; t_dd = [];
def __init__(self, match_states, insert_states):
"""Initialize HMM object with number of match states
nmatches: int, number of match states
"""
self.match_states_idx = match_states
self.insert_states_idx = insert_states
self.nmatches = len(match_states)
print("There are {} match states".format(self.nmatches))
self.e_m = [dict(pa) for _ in range(0, self.nmatches)]
for i in range(0, self.nmatches):
for j in pa.keys():
self.e_m[i][j] = 0.0
self.e_i = pa;
self.t_mm = [0.0] * (self.nmatches + 1)
self.t_mi = [0.0] * (self.nmatches + 1)
self.t_im = [1.0] * (self.nmatches + 1)
self.t_ii = [0.0] * (self.nmatches + 1)
self.t_md = [0.0] * (self.nmatches + 1)
self.t_dm = [1.0] * (self.nmatches + 1)
self.t_dd = [0.0] * (self.nmatches + 1)
def get_probabilities(self, sequences):
"""Wrapper to calculate the transmissionand emission probabilities
Keyword Arguments:
sequences -- list of strings, aligned amino acid sequences
"""
for pos, match_idx in enumerate(self.match_states_idx):
for amino in pa.keys():
self.e_m[pos][amino] = self._emissions_match_state(amino,
sequences, match_idx)
self._transmission_probabilities(sequences)
def print_probabilities(self):
"""Prints the different transmission probabilities to the console.
"""
print("Transmission:")
print("mm {}".format(tuple(self.t_mm)))
print("mi {}".format(tuple(self.t_mi)))
print("md {}".format(tuple(self.t_md)))
print("im {}".format(tuple(self.t_im)))
print("ii {}".format(tuple(self.t_ii)))
print("dm {}".format(tuple(self.t_dm)))
print("dd {}".format(tuple(self.t_dd)))
def get_emission_matrix(self):
"""Yields an emission probability matrix for a position specific profile.
Probabilities are rounded to 3 decimals.
"""
#Header line with all amino acids
aminos = self.e_m[0].keys()
print("\t{}".format("\t".join(aminos)))
#for each match state take the emission state
for idx, match_state in enumerate(self.e_m):
#Join the emission state dictionary to a tab separated string
# containing rounded probability scores.
values = "\t".join(["%.3f" % round(x[1],3) \
for x in match_state.items()])
yield("{}\t{}".format(idx+1, values))
def _emissions_match_state(self, amino, sequences, match_idx):
"""Returns the emission probabilities in each match state.
This function is intended to be called from within the
class object privately.-=p
.
Keyword Arguments:
amino -- char, the one letter amino acid to count the
emission probability for.
sequences -- list of strings, containint the aligned aa
sequences
match_idx -- int, th position of the current match state
"""
total = [seq[match_idx] for seq in sequences if seq[match_idx] != "-"]
total = len(total)
count = 0
for seq in sequences:
if seq[match_idx] == amino:
count += 1
return count / total
def _pos_prob(self, sequences, pos):
"""Returns four transmission probabilities at the start or end position
Keyword Arguments:
sequences -- list of strings, aligned amino acid sequences
pos -- int, should be either 0 for start or -1 for end position
As the start and end positions have no predecessors and successors,
respectively , the transmission probabilities are calculated here
by calculating the chance for a match/deletion/insertion at the
position itself.
"""
#Get all sequences at the specific position
pos_seq = [seq[pos] for seq in sequences]
length = len(pos_seq)
#Either a match or insert state
pos_state = self._get_state(0)
mm = 0.0
mi = 0.0
md = 0.0
im = 0.0
c = pos_seq.count("-")
# If the position is a match state the number of - indicates the number
# of deletions.
if pos_state == 'match':
md = (c / length)
mm = ((length - c) / length)
#If the position is a insert state the number of "-" indicates the
# number of m -> positions.
else: # state == "insert"
mm = (c / length)
mi = ((length - c) / length)
return mm, mi, md, im
def _get_state(self, idx):
"""Returns what state a position is in
Keyword Arguments:
idx -- int, the position
"""
if idx in self.match_states_idx:
return 'match'
else:
return 'insert'
def _transmission_probabilities(self, sequences):
"""Calculates the transmission probabilities.
Keyword Arguments:
sequences -- list of strings, aligned amino acid sequences
Calculates probabilities for various transmissions (mm,md, mi) and
stores them in class contained lists.
Transitions that are not calculated: im, dd, ii due to time
restrictions.
"""
self.t_mm[0], self.t_mi[0], \
self.t_md[0], self.t_im[0] = self._pos_prob(sequences, 0)
self.t_mm[-1], self.t_mi[-1], \
self.t_md[-1], self.t_im[-1] = self._pos_prob(sequences, -1)
for i, M in enumerate(self.match_states_idx):
next_state = self._get_state(M + 1)
current_seq = [seq[M] for seq in sequences]
try: # The last match state is possibly the last state
next_seq = [seq[M + 1] for seq in sequences]
length = len(next_seq)
if next_state == "match":
c = next_seq.count("-")
mm = ((length - c) / length)
md = (c / length)
self.t_mm[i + 1] = mm
self.t_md[i + 1] = md
self.t_mi[i + 1] = 0.0
else: #next_state == "insert"
c = next_seq.count("-")
mi = ((length - c) / length)
mm = (c / length)
self.t_mi[i + 1] = mi
self.t_md[i + 1] = 0.0
self.t_mm[i + 1] = mm
except IndexError:
pass
def sample(events):
"""Return a key from dict based on the probabilities
events: dict of {key: probability}, sum of probabilities
should be 1.0.
"""
k = events.keys()
cum = [0 for i in k]
cum[0] = events[k[0]]
for i in range(1,len(events)):
cum[i] = cum[i-1] + events[k[i]]
# Should not be necessary, but for safety
cum[len(cum)-1] = 1.0
r = random()
pick = ''
i = 0
while (pick == '') and (i < len(cum)):
if r < cum[i]:
pick = k[i]
i = i + 1
return pick
def get_states(sequences):
"""Returns the indices of match states.
Keyword Arguments:
sequences -- list of strings, aligned amino acid sequences.
This is implemented in a way that the length of the indices list is
the number of match states.
"""
match_states = []
insert_states = []
half_length = len(sequences) / 2.0
for i in range(len(sequences[0])):
has_amino = 0
for seq in sequences:
if seq[i] in pa:
has_amino += 1
if has_amino >= half_length:
match_states.append(i)
else:
insert_states.append(i)
return match_states, insert_states
def calc_traverse_path(a, p):
"""Returns a randomly picked value in a based on probabilities in p
Keyword Arguments:
a -- list, containing values to be considered
p -- list, containing probabilites for each considered value
"""
return numpy.random.choice(a = a, p = p)
def traverse(hmm):
"""Returns a randomly selected sequence based on probabilities in the HMM
Keyword Arguments:
hmm -- hmm class object
iteratively randomly selects a transition based on probabilities.
In match states an amino acid is selected based on the match state
emission probability list (contained in the hmm class).
In insert states an amino acid is selected based on the background
distribution emissions.
"""
sequence = ""
state = "match"
for idx in range(len(hmm.t_mm)):
if state == "match":
if idx != 0:
sequence += sample(hmm.e_m[idx - 1])
mm = hmm.t_mm[ idx ]
md = hmm.t_md[ idx ]
mi = hmm.t_mi[ idx ]
a = ("mm", "md", "mi")
p = ( mm, md, mi )
elif state == "insert":
if idx != 0:
sequence += sample(hmm.e_i)
im = hmm.t_im[ idx ]
ii = hmm.t_ii[ idx ]
a = ("im", "ii")
p = ( im, ii )
elif state == "delete":
dd = hmm.t_dd[ idx ]
dm = hmm.t_dm[ idx ]
a = ("dd", "dm")
p = ( dd, dm )
choice = calc_traverse_path(a, p)
if choice == "mm":
state = "match"
elif choice == "md":
state = "delete"
elif choice == "mi":
state = "insert"
elif choice == "im":
state = "match"
elif choice == "ii":
state = "insert"
elif choice == "dd":
state = "delete"
elif choice == "dm":
state = "match"
return sequence
def parse_fasta(file_name):
"""Returns a list with sequences from a fasta file
It still feels a bit redundant to
Keyword Arguments:
file_name -- string, file path of the fasta file
"""
sequences = []
seq = []
with open(file_name) as in_file:
for line in in_file:
if line.startswith(">"): #entry start
if len(seq) == 0:
pass
else:#entry is present
sequences.append("".join(seq))
seq = []
else:
seq.append(line.strip())
sequences.append("".join(seq))
return sequences
#return [str(entry.seq) for entry in SeqIO.parse(file_name, "fasta")]
def find_HMM(infile):
"""Wrapper function to train the HMM.
Keyword Arguments:
infile -- filepath to a fasta file containin aligned amino acid
sequences.
"""
sequences = parse_fasta(infile)
match_states, insert_states = get_states(sequences)
hmm = HMM(match_states, insert_states)
hmm.get_probabilities(sequences)
return hmm
if __name__ == "__main__":
#Q1
hmm = find_HMM(infile = "/home/joris/hmm_data/test.fasta")
#Q2
hmm.print_probabilities()
#Q3
print("\n".join(list(hmm.get_emission_matrix())))
#Q4
for i in range(10):
print(traverse(hmm))
# #Q6
hmm = find_HMM(infile = "/home/joris/hmm_data/test_large.fasta")
hmm.print_probabilities()
print("\n".join(hmm.get_emission_matrix()))
print(traverse(hmm))