-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathParamAP.py
901 lines (786 loc) · 52.4 KB
/
ParamAP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
#!/usr/bin/env python3
'''
Copyright 2021 The Regents of the University of Colorado
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Author: Christian Rickert <christian.rickert@cuanschutz.edu>
Title: ParamAP (parametrization of sinoatrial myocyte action potentials)
DOI: https://doi.org/10.5281/zenodo.823742
URL: https://github.com/christianrickert/ParamAP/
'''
# imports
# runtime
import fnmatch
import functools
import gc
import math
import os
import sys
# numpy
import numpy as np
# scipy
import scipy.signal as sp_sig
import scipy.spatial as sp_spat
import scipy.stats as sp_stat
# matplotlib
import matplotlib.backends.backend_pdf as mpbp
import matplotlib.pyplot as mpp
# variables
APEXT = 0.5 # margin of ap extension (%)
FFRAME = 0.5 # time interval for filtering (ms)
POLYNOM = 2 # polynomial order used for filtering
# functions
def askboolean(dlabel="custom boolean", dval=True):
"""Returns a boolean provided by the user."""
if dval: # True
dstr = "Y/n"
else: # False
dstr = "y/N"
while True:
uchoice = input(dlabel + " [" + dstr + "]: ") or dstr
if uchoice in ["Y/n", "Y", "y", "Yes", "yes"]:
print("True\n")
return True # break
elif uchoice in ["y/N", "N", "n", "No", "no"]:
print("False\n")
return False # break
else:
continue
def askext(dlabel="custom extension", dext='atf'):
"""Returns a file extention provided by the user."""
while True:
uext = str(input("Enter " + dlabel + " [" + dext + "]: ")).lower() or dext
if uext not in ["dat", "log", "pdf"] and len(uext) == 3:
print(uext + "\n")
return uext # break
else:
print("Invalid file extension!\n")
continue
def askunit(dlabel="custom unit", daxis='', dunit=''):
"""Returns a unit provided by the user."""
while True:
uunit = input("Enter " + dlabel + " [" + dunit + "]: ") or dunit
if daxis in ["x", "X"]:
if uunit in ["ms", "s"]:
print(uunit + "\n")
return uunit # break
else:
print("Invalid unit for X-axis!\n")
continue
elif daxis in ["y", "Y"]:
if uunit in ["mV", "V"]:
print(uunit + "\n")
return uunit # break
else:
print("Invalid unit for Y-axis!\n")
continue
def askvalue(dlabel="custom value", dval=1.0, dunit="", dtype="float"):
"""Returns a value provided by the user."""
while True:
try:
uval = float(input("Enter " + dlabel + " [" + str(dval) + "]" + dunit + ": ") or dval)
break
except ValueError:
print("Non-numerical input!\n")
continue
if dtype == "float": # default
pass
elif dtype == "int":
uval = int(round(uval))
print(str(uval) + "\n")
return uval
def getfiles(path='/home/user/', pattern='*'):
"""Returns all files in path matching the pattern."""
abspath = os.path.abspath(path)
for fileobject in os.listdir(abspath):
filename = os.path.join(abspath, fileobject)
if os.path.isfile(filename) and fnmatch.fnmatchcase(fileobject, pattern):
yield os.path.join(abspath, filename)
def getneighbors(origin_i=np.empty(0), vicinity=np.empty(0), origin_x=np.empty(0), origin_y=np.empty(0), hwidth=float("inf"), fheight=0.0, limit=None, within=float("inf"), bads=False):
"""Returns all nearest-neighbors in ascending (i.e. increasing distance) order."""
neighbors = np.zeros(0)
badorigins = np.zeros(0)
vicinity_kdt = sp_spat.KDTree(list(zip(vicinity, np.zeros(vicinity.size)))) # KDTree for the nearest neighbors search
for origin in origin_i:
neighbor_left, neighbor_right = False, False
for position in vicinity_kdt.query([origin, 0.0], k=limit, distance_upper_bound=within)[1]: # return nearest neighbors in ascending order
if not neighbor_left or not neighbor_right:
neighbor = vicinity[position]
if (abs(origin_x[origin]-origin_x[neighbor]) <= hwidth) and (abs(origin_y[origin]-origin_y[neighbor]) >= fheight): # relative criteria for minima left and right of maximum
if not neighbor_left and (neighbor < origin): # criteria for minimum left of maximum only
neighbors = np.append(neighbors, neighbor)
neighbor_left = True
elif not neighbor_right and (neighbor > origin): # criteria for minimum right of maximum only
neighbors = np.append(neighbors, neighbor)
neighbor_right = True
else: # odd origins with missing neighbors
badorigins = np.append(badorigins, np.argwhere(origin == origin_i))
neighbors = np.sort(np.unique(neighbors)) # unique elements only
if neighbors.size <= 1: # missing neighbor
if neighbor_left:
neighbors = np.append(neighbors, 0.0) # append neighbor_right
if neighbor_right:
neighbors = np.insert(neighbors, 0, 0.0) # insert neighbor_left
badorigins = np.sort(np.unique(badorigins))
return (neighbors.astype(int), badorigins.astype(int)) if bads else (neighbors.astype(int))
def getrunavg(xdata=np.empty(0), xinterval=FFRAME, porder=POLYNOM):
"""Returns the running average count based on a given time interval."""
tmprun = int(round(xinterval/(xdata[1]-xdata[0])))
while tmprun <= porder: # prevents filtering
tmprun += 1
return (tmprun) if tmprun % 2 else (tmprun + 1) # odd number
def getpartitions(pstart=0, pstop=100, pint=5, pmin=10):
"""Returns a partition list in percent to segment an interval."""
plist = []
for part_l in list(range(int(pstart), int(pstop)+int(pint), int(pint))):
for part_r in list(range(int(pstart), int(pstop)+int(pint), int(pint))):
if part_r > part_l and part_r-part_l >= int(pmin): # no duplication or empty partitions, minimum size
plist.append([part_l, part_r]) # return statement removes the outmost list
return plist
def getbestlinearfit(xaxis=np.empty(0), yaxis=np.empty(0), xmin=0.0, xmax=1.0, pstart=0, pstop=100, pint=1, pmin=10):
"""Returns the best linear fit from segments of an interval."""
bst_r = 0 # regression coefficient
seg_i = np.argwhere((xaxis >= xmin) & (xaxis <= xmax)).ravel() # analyzing partial segment only
seg_t = xaxis[seg_i[-1]]-xaxis[seg_i[0]] # full interval from partial segment
seg_m, seg_n, seg_r = 0.0, 0.0, 0.0
for partition in getpartitions(pstart, pstop, pint, pmin):
seg_i = np.argwhere((xaxis >= (avgfmin_x[0]+(seg_t*partition[0]/100))) & (xaxis <= (avgfmin_x[0]+(seg_t*partition[1]/100)))).ravel() # 'ravel()' required for 'sp_stat.linregress()'
seg_x = xaxis[seg_i]
seg_y = yaxis[seg_i]
seg_m, seg_n, seg_r = sp_stat.linregress(seg_x, seg_y)[0:3] # tuple unpacking and linear regression of partial ap segment
if math.pow(seg_r, 2.0) >= math.pow(bst_r, 2.0):
bst_m, bst_n, bst_r = seg_m, seg_n, seg_r
bst_i, bst_x, bst_y = seg_i, seg_x, seg_y
# print(partition[0], " - ", partition[1], " : ", str(partition[1]-partition[0]), " ~ ", str(math.pow(bst_r, 2.0))) # shows progress, but is slow!
return (bst_i, bst_x, bst_y, bst_m, bst_n, bst_r)
def mpp_setup(title='Plot title', xlabel='Time [ ]', ylabel='Voltage [ ]'):
"""Provides a title and axes labels to a Matplotlib plot."""
mpp.title(title)
mpp.xlabel(xlabel)
mpp.ylabel(ylabel)
def readfile(inputfile='name'):
""" read an atf file into a numpy array """
defux = ["ms", "s"]
defuy = ["mV", "V"]
inpunits = False
try:
with open(inputfile, 'r') as in_data:
full_record = [] # python list
for _ in range(0, 2):
full_record.append(in_data.readline().strip().split()) # first and second record
full_record.append([in_data.readline().strip() for line in range(0, int(full_record[1][0]))]) # optional record
full_record.append([in_data.readline().strip().split('\t')]) # title record
full_record.append(np.genfromtxt(in_data, dtype='float', delimiter='\t', unpack=True)) # data record, use: 'np.loadtxt()' for limited memory usage, but lower speed
except FileNotFoundError:
print("Error! File not found.")
read_result = None
raise
else:
inpuxy = full_record[3][0]
inpux = str(inpuxy[0]).split()[-1][1:-2]
inpuy = str(inpuxy[1]).split()[-1][1:-2]
if inpux in defux and inpuy in defuy:
inpunits = True
inp_xy = full_record[4]
read_result = inp_xy, inpunits, inpux, inpuy
return read_result
def toggle_mpp_imode(activate=True):
"""Toggle matplotlib interactive mode on or off."""
inter_active = mpp.isinteractive()
if activate and not inter_active:
mpp.ion() # turn interactive mode on
elif not activate and inter_active:
mpp.ioff() # turn interactive mode off
# main routine
AUTHOR = "Copyright 2021 The Regents of the University of Colorado"
SEPARBOLD = 79*'='
SEPARNORM = 79*'-'
SOFTWARE = "ParamAP"
VERSION = "version 1.3," # (2021-05-29)
WORKDIR = SOFTWARE # working directory for parameterization
print('{0:^79}'.format(SEPARBOLD) + os.linesep)
GREETER = '{0:<{w0}}{1:<{w1}}{2:<{w2}}'.format(SOFTWARE, VERSION, AUTHOR, w0=len(SOFTWARE)+1, w1=len(VERSION)+1, w2=len(AUTHOR)+1)
INTERMEDIATELINE = '{0:}'.format("University of Colorado, Anschutz Medical Campus")
DISCLAIMER = "ParamAP is distributed in the hope that it will be useful, but it comes without\nany guarantee or warranty. This program is free software; you can redistribute\nit and/or modify it under the terms of the GNU General Public License:"
URL = "https://www.gnu.org/licenses/gpl-2.0.en.html"
print('{0:^79}'.format(GREETER) + os.linesep)
print('{0:^79}'.format(INTERMEDIATELINE) + os.linesep)
print('{0:^79}'.format(DISCLAIMER) + os.linesep)
print('{0:^79}'.format(URL) + os.linesep)
print('{0:^79}'.format(SEPARBOLD) + os.linesep)
# customize use case
AUTORUN = askboolean("Use automatic mode?", False)
SERIES = askboolean("Run time series analysis?", False)
APMODE = askboolean("Analyze action potentials?", True)
print('{0:^79}'.format(SEPARNORM))
# set up working directory
WORKPATH = os.path.abspath(WORKDIR)
if not os.path.exists(WORKPATH):
os.mkdir(WORKPATH)
print("FOLDER:\t" + WORKPATH + "\n")
FILE = 0 # file
EXTENSION = askext(dlabel="custom file type", dext='atf') # file extension used to filter files in working directory
if SERIES:
AVG_FRAME = askvalue(dlabel="analysis frame time", dval=5000.0, dunit=' ms') # time interval for series analysis (ms)
ATFFILES = getfiles(path=WORKDIR, pattern=("*." + EXTENSION))
for ATFFILE in ATFFILES: # iterate through files
name = os.path.splitext(os.path.split(ATFFILE)[1])[0]
print('{0:^79}'.format(SEPARNORM))
print("FILE:\t" + str(name) + os.linesep)
AP_AMP = 50.0 # minimum acceptable ap amplitude (mV)
AP_HWD = 250.0 # maximum acceptable ap half width (ms)
AP_MAX = 50.0 # maximum acceptable ap value (mV)
AP_MIN = -10.0 # minimum acceptable ap value (mV)
MDP_MAX = -50.0 # maximum acceptable mdp value (mV)
MDP_MIN = -90.0 # minimum acceptable mdp value (mV)
WM_DER = 1.0 # window multiplier for derivative filtering
WM_MAX = 4.0 # window multiplier for maximum detection
WM_MIN = 16.0 # window multiplier for minimum detection
# read file raw data
sys.stdout.write(">> READING... ")
sys.stdout.flush()
RAW_XY, UNITS, UNIT_X, UNIT_Y = readfile(ATFFILE)
if not UNITS: # missing or incomplete units from header
print("\n")
UNIT_X = askunit(dlabel="X-axis unit", daxis="X", dunit=UNIT_X)
UNIT_Y = askunit(dlabel="Y-axis unit", daxis="Y", dunit=UNIT_Y)
sys.stdout.write(1*"\t")
TOMS = 1000.0 if UNIT_X == "s" else 1.0
RAW_XY[0] *= TOMS # full X-axis, UNIT_X = "ms"
raw_x = RAW_XY[0] # partial X-axis for time series analysis
TOMV = 1000.0 if UNIT_Y == "V" else 1.0
RAW_XY[1] *= TOMV # full Y-axis, UNIT_Y = "mV"
raw_y = RAW_XY[1] # partial Y-axis for time series analysis
runavg = getrunavg(RAW_XY[0]) # used for filtering and peak detection
ipg_t = RAW_XY[0][1]-RAW_XY[0][0] # time increment for interpolation grid
if not APMODE: # avoid noise artifacts in beat detection mode
runavg = 10.0*runavg+1
WM_MAX *= 1.5
WM_MIN = WM_MAX
avg_start = RAW_XY[0][0] # interval start for averaging
avg_stop = RAW_XY[0][-1] # interval stop for averaging
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
while True: # repeat data analysis for current file
STARTPDF = True # overwrite existing file
SEGMENT = 0.0
while True: # time series analysis
try:
# create raw data plot
sys.stdout.write(">> PLOTTING... ")
sys.stdout.flush()
mpp_setup(title="Raw data: " + name, xlabel='Time (ms)', ylabel='Voltage (mV)')
mpp.plot(raw_x, raw_y, '0.75') # raw data (grey line)
if STARTPDF:
pdf_file = mpbp.PdfPages(os.path.join(WORKDIR, name + ".pdf"), keep_empty=False) # multi-pdf file
STARTPDF = False # append existing file
mpp.tight_layout() # avoid label overlaps
if SEGMENT == 0.0:
mpp.savefig(pdf_file, format='pdf', dpi=600) # save before .show()!
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
if not AUTORUN:
toggle_mpp_imode(activate=True)
mpp.show()
# set parameters for averaging
sys.stdout.write(">> SETTING... ")
sys.stdout.flush()
if not AUTORUN:
print("\n")
if SEGMENT == 0.0: # initialize values
avg_start = askvalue(dlabel="analysis start time", dval=avg_start, dunit=' ms')
avg_stop = askvalue(dlabel="analysis stop time", dval=avg_stop, dunit=' ms')
AP_MAX = askvalue(dlabel="upper limit for maxima", dval=AP_MAX, dunit=' mV')
AP_MIN = askvalue(dlabel="lower limit for maxima", dval=AP_MIN, dunit=' mV')
MDP_MAX = askvalue(dlabel="upper limit for minima", dval=MDP_MAX, dunit=' mV')
MDP_MIN = askvalue(dlabel="lower limit for minima", dval=MDP_MIN, dunit=' mV')
if APMODE:
AP_HWD = askvalue(dlabel="maximum peak half width", dval=AP_HWD, dunit=' ms')
AP_AMP = askvalue(dlabel="minimum peak amplitude", dval=AP_AMP, dunit=' mV')
runavg = askvalue(dlabel="running average window size", dval=runavg, dunit='', dtype='int')
WM_DER = askvalue(dlabel="window multiplier for derivative", dval=WM_DER, dunit='')
WM_MAX = askvalue(dlabel="window multiplier for maxima", dval=WM_MAX, dunit='')
WM_MIN = askvalue(dlabel="window multiplier for minima", dval=WM_MIN, dunit='')
mpp.clf() # clear canvas
if SEGMENT == 0.0: # set first frame
tmp_start = avg_start + (SEGMENT*AVG_FRAME if SERIES else 0.0)
tmp_stop = (tmp_start + AVG_FRAME) if SERIES else avg_stop
raw_i = np.argwhere((RAW_XY[0] >= tmp_start) & (RAW_XY[0] <= tmp_stop)).ravel()
raw_x = RAW_XY[0][raw_i[0]:raw_i[-1]+1]
raw_y = RAW_XY[1][raw_i[0]:raw_i[-1]+1]
sys.stdout.write(("" if AUTORUN else 1*"\t") + 8*"\t" + " [OK]\n")
sys.stdout.flush()
# filter noise of raw data with Savitzky-Golay
sys.stdout.write(">> FILTERING... ")
sys.stdout.flush()
rawf_y = sp_sig.savgol_filter(raw_y, runavg, POLYNOM, mode='nearest')
sys.stdout.write(7*"\t" + " [OK]\n")
sys.stdout.flush()
# detect extrema in filtered raw data
sys.stdout.write(">> SEARCHING... ")
sys.stdout.flush()
if AUTORUN: # use unrestricted dataset (slower)
# detect maxima in filtered raw data
tmpavg = int(round(WM_MAX*runavg)) if int(round(WM_MAX*runavg)) % 2 else int(round(WM_MAX*runavg))+1
rawfmax_iii = np.asarray(sp_sig.argrelmax(rawf_y, order=tmpavg)).ravel() # unfiltered maxima
rawfmax_x = raw_x[rawfmax_iii]
rawfmax_y = rawf_y[rawfmax_iii]
# detect minima in filtered raw data
tmpavg = int(round(WM_MIN*runavg)) if int(round(WM_MIN*runavg)) % 2 else int(round(WM_MIN*runavg))+1
rawfmin_iii = np.asarray(sp_sig.argrelmin(rawf_y, order=tmpavg)).ravel() # unfiltered minima
rawfmin_x = raw_x[rawfmin_iii]
rawfmin_y = rawf_y[rawfmin_iii]
sys.stdout.write(7*"\t" + " [OK]\n")
sys.stdout.flush()
else: # use restricted dataset (faster)
# detect maxima in filtered raw data
tmpmax_x = raw_x[np.intersect1d(np.argwhere(rawf_y >= AP_MIN), np.argwhere(rawf_y <= AP_MAX))]
tmpmax_y = rawf_y[np.intersect1d(np.argwhere(rawf_y >= AP_MIN), np.argwhere(rawf_y <= AP_MAX))]
tmpavg = int(round(WM_MAX*runavg)) if int(round(WM_MAX*runavg)) % 2 else int(round(WM_MAX*runavg))+1
rawfmax_iii = np.asarray(sp_sig.argrelmax(tmpmax_y, order=tmpavg)).ravel() # unfiltered maxima
rawfmax_ii = np.asarray(np.where(np.in1d(raw_x.ravel(), np.intersect1d(raw_x, tmpmax_x[rawfmax_iii]).ravel()).reshape(raw_x.shape))).ravel() # back to full dataset
rawfmax_x = raw_x[rawfmax_ii]
rawfmax_y = rawf_y[rawfmax_ii]
# detect minima in filtered raw data
tmpmin_x = raw_x[np.intersect1d(np.argwhere(rawf_y >= MDP_MIN), np.argwhere(rawf_y <= MDP_MAX))]
tmpmin_y = rawf_y[np.intersect1d(np.argwhere(rawf_y >= MDP_MIN), np.argwhere(rawf_y <= MDP_MAX))]
tmpavg = int(round(WM_MIN*runavg)) if int(round(WM_MIN*runavg)) % 2 else int(round(WM_MIN*runavg))+1
rawfmin_iii = np.asarray(sp_sig.argrelmin(tmpmin_y, order=tmpavg)).ravel() # unfiltered minima
rawfmin_ii = np.asarray(np.where(np.in1d(raw_x.ravel(), np.intersect1d(raw_x, tmpmin_x[rawfmin_iii]).ravel()).reshape(raw_x.shape))).ravel()
rawfmin_x = raw_x[rawfmin_ii]
rawfmin_y = rawf_y[rawfmin_ii]
sys.stdout.write(7*"\t" + " [OK]\n")
sys.stdout.flush()
# analyze and reduce extrema in filtered raw data
sys.stdout.write(">> REDUCING... ")
sys.stdout.flush()
rawfmax_m = np.mean(rawfmax_y) # rough estimate due to assignment errors
rawfmin_m = np.mean(rawfmin_y)
rawfmaxmin_m = (rawfmax_m + rawfmin_m) / 2.0 # center between unreduced maxima and minima within limits (may differ from average of AVGMAX and AVGMIN)
if AUTORUN: # estimate range for reduction of extrema
# reduce maxima from unrestricted dataset
rawfmax_ii = np.argwhere(rawfmax_y >= rawfmaxmin_m).ravel() # use center to discriminate between maxima and minima
rawfmax_x = rawfmax_x[rawfmax_ii]
rawfmax_y = rawfmax_y[rawfmax_ii]
rawfmax_std = np.std(rawfmax_y, ddof=1) # standard deviation from the (estimated) arithmetic mean
AP_MAX = np.mean(rawfmax_y) + 4.0 * rawfmax_std # 99% confidence interval
AP_MIN = np.mean(rawfmax_y) - 4.0 * rawfmax_std
rawfmax_ii = functools.reduce(np.intersect1d, (rawfmax_iii, np.argwhere(rawf_y >= AP_MIN), np.argwhere(rawf_y <= AP_MAX)))
rawfmax_x = raw_x[rawfmax_ii]
rawfmax_y = rawf_y[rawfmax_ii]
# reduce minima from unrestricted dataset
rawfmin_ii = np.argwhere(rawfmin_y <= rawfmaxmin_m)
rawfmin_x = rawfmin_x[rawfmin_ii].ravel()
rawfmin_y = rawfmin_y[rawfmin_ii].ravel()
rawfmin_std = np.std(rawfmin_y, ddof=1)
MDP_MAX = np.mean(rawfmin_y) + 4.0 * rawfmin_std
MDP_MIN = np.mean(rawfmin_y) - 4.0 * rawfmin_std
rawfmin_ii = functools.reduce(np.intersect1d, (rawfmin_iii, np.argwhere(rawf_y >= MDP_MIN), np.argwhere(rawf_y <= MDP_MAX)))
rawfmin_x = raw_x[rawfmin_ii]
rawfmin_y = rawf_y[rawfmin_ii]
if APMODE: # check extrema for consistency - reduce maxima
badmax_ii = np.zeros(0)
badmin_ii = np.zeros(0)
rawfmin_i, badmax_ii = getneighbors(rawfmax_ii, rawfmin_ii, raw_x, rawf_y, AP_HWD, AP_AMP, bads=True)
rawfmax_i = np.delete(rawfmax_ii, badmax_ii)
rawfmin_i = rawfmin_i.astype(int) # casting required for indexing
# check extrema for boundary violations - reduce maxima and minima
while True: # rough check, assignment happens later
if rawfmax_i[0] < rawfmin_i[0]: # starts with a maximum
rawfmax_i = rawfmax_i[1:]
continue
elif rawfmin_i[1] < rawfmax_i[0]: # starts with two minima
rawfmin_i = rawfmin_i[1:]
continue
elif rawfmax_i[-1] > rawfmin_i[-1]: # ends with a maximum
rawfmax_i = rawfmax_i[0:-1]
continue
elif rawfmin_i[-2] > rawfmax_i[-1]: # ends with two minima
rawfmin_i = rawfmin_i[0:-1]
continue
else:
break
rawfmax_x = raw_x[rawfmax_i] # filtered and extracted maxima
rawfmax_y = rawf_y[rawfmax_i]
# assign minima to corresponding maxima - reduce minima
minmaxmin = np.asarray([3*[0] for i in range(rawfmax_i.size)]) # [[min_left_index, max_index, min_right_index], ...]
rawfmin_kdt = sp_spat.KDTree(list(zip(rawfmin_i, np.zeros(rawfmin_i.size))))
i = 0 # index
for max_i in rawfmax_i:
MIN_LEFT, MIN_RIGHT = False, False
minmaxmin[i][1] = max_i
for order_i in rawfmin_kdt.query([max_i, 0.0], k=None)[1]:
min_i = rawfmin_i[order_i]
if not MIN_LEFT and (min_i < max_i):
minmaxmin[i][0] = min_i
MIN_LEFT = True
elif not MIN_RIGHT and (min_i > max_i):
minmaxmin[i][2] = min_i
MIN_RIGHT = True
i += 1
rawfmin_i = np.unique(minmaxmin[:, [0, 2]].ravel())
rawfmin_x = raw_x[rawfmin_i] # filtered and extracted minima
rawfmin_y = rawf_y[rawfmin_i]
# find largest distance between left minima and maxima
ipg_hwl, ipg_tmp = 0.0, 0.0
for min_l, max_c in minmaxmin[:, [0, 1]]:
ipg_tmp = raw_x[max_c] - raw_x[min_l]
if ipg_tmp > ipg_hwl:
ipg_hwl = ipg_tmp
# find largest distance between right minima and maxima
ipg_hwr, ipg_tmp = 0.0, 0.0
for max_c, min_r in minmaxmin[:, [1, 2]]:
ipg_tmp = raw_x[min_r] - raw_x[max_c]
if ipg_tmp > ipg_hwr:
ipg_hwr = ipg_tmp
else: # beating rate
rawfmax_x = raw_x[rawfmax_ii] # pre-filtered maxima
rawfmax_y = rawf_y[rawfmax_ii]
rawfmin_x = raw_x[rawfmin_ii] # pre-filtered minima
rawfmin_y = rawf_y[rawfmin_ii]
rawfmax_m = np.mean(rawfmax_y) # refined estimate due to exlusion (ap_mode)
rawfmin_m = np.mean(rawfmin_y)
if rawfmax_y.size == 0: # no APs detected
raise Warning
else: # two or more APs
frate = 60000.0*(rawfmax_y.size/(rawfmax_x[-1]-rawfmax_x[0])) if rawfmax_y.size > 1 else float('nan') # AP firing rate (FR) [1/min]
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
# create extrema plot
sys.stdout.write(">> PLOTTING... ")
sys.stdout.flush()
mpp_setup(title="Extrema: " + name, xlabel='Time (ms)', ylabel='Voltage (mV)')
mpp.plot([raw_x[0], raw_x[-1]], [0.0, 0.0], '0.85') # X-Axis (grey line)
mpp.plot([raw_x[0], raw_x[-1]], [rawfmaxmin_m, rawfmaxmin_m], 'k--') # center between unfiltered maxima and unfiltered minima, i.e. not between AVGMAX and AVGMIN (black dashed line)
mpp.plot(raw_x, raw_y, '0.50', raw_x, rawf_y, 'r') # raw data and averaged data (grey, red line)
mpp.plot([raw_x[0], raw_x[-1]], [AP_MAX, AP_MAX], 'b') # upper limit for maxima (blue dotted line)
mpp.plot([raw_x[0], raw_x[-1]], [AP_MIN, AP_MIN], 'b:') # lower limit for maxima (blue dotted line)
mpp.plot([rawfmax_x, rawfmax_x], [AP_MIN, AP_MAX], 'b') # accepted maxima (blue line)
mpp.plot([raw_x[0], raw_x[-1]], [MDP_MIN, MDP_MIN], 'g') # lower limit for minima (green line)
mpp.plot([raw_x[0], raw_x[-1]], [MDP_MAX, MDP_MAX], 'g:') # upper limit for minima (green dotted line)
mpp.plot([rawfmin_x, rawfmin_x], [MDP_MIN, MDP_MAX], 'g') # accepted minima (green line)
mpp.plot([rawfmax_x[0], rawfmax_x[-1]], [rawfmax_m, rawfmax_m], 'k') # average of maxima, time interval used for firing rate count (black line)
mpp.plot([rawfmin_x[0], rawfmin_x[-1]], [rawfmin_m, rawfmin_m], 'k') # average of minima (black line)
mpp.plot(raw_x[rawfmax_ii], rawf_y[rawfmax_ii], 'bo') # maxima (blue dots)
mpp.plot(raw_x[rawfmin_ii], rawf_y[rawfmin_ii], 'go') # minima (green dots)
mpp.figtext(0.12, 0.91, "{0:<s} {1:<.4G}".format("AVGMAX (mV):", rawfmax_m), ha='left', va='center')
mpp.figtext(0.12, 0.88, "{0:<s} {1:<.4G}".format("FR (AP/min):", frate), ha='left', va='center')
mpp.figtext(0.12, 0.85, "{0:<s} {1:<.4G}".format("AVGMIN (mV):", rawfmin_m), ha='left', va='center')
mpp.tight_layout()
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
sys.stdout.write(">> SAVING... ")
sys.stdout.flush()
mpp.savefig(pdf_file, format='pdf', dpi=600)
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
if not AUTORUN:
toggle_mpp_imode(activate=False)
mpp.show()
mpp.clf()
if APMODE:
# slice raw data segments by minima and align by maxima
sys.stdout.write(">> AVERAGING... ")
sys.stdout.flush()
# align ap segments by maxima, extend and average ap segments
ipg_max = float((1.0+APEXT)*ipg_hwr)
ipg_min = -float((1.0+APEXT)*ipg_hwl)
avg_x = np.arange(ipg_min, ipg_max, ipg_t, dtype='float64') # interpolation grid
avgxsize = avg_x.size
avg_y = np.zeros(avgxsize, dtype='float64') # ap average array
mpp.subplot2grid((4, 1), (0, 0), rowspan=3) # upper subplot
TIMESTAMP = "[" + str(round(tmp_start, 2)) + "ms-" + str(round(tmp_stop, 2)) + "ms]"
mpp_setup(title='Analysis: ' + name + ' ' + TIMESTAMP, xlabel='Time (ms)', ylabel='Voltage (mV)')
mpp.plot([avg_x[0], avg_x[-1]], [0.0, 0.0], '0.85') # X-axis
N = 0 # current maximum
for min_l, max_c, min_r in minmaxmin: # slicing of ap segments, extend ap parts if possible
minext_l = int(min_l - APEXT*(max_c - min_l)) # use int for index slicing
minext_r = int(min_r + APEXT*(min_r - max_c))
# prepare ap SEGMENT
tmp_x = np.asarray(raw_x[:] - raw_x[max_c]) # align by maximum
tmp_y = np.interp(avg_x, tmp_x, raw_y[:])
# average ap segments
if N == 0: # first average
avg_y = np.copy(tmp_y)
else: # all other averages
i = 0 # array index
NW = (1.0/(N+1.0)) # new data weight
pw = (N/(N+1.0)) # previous data weight
for y in np.nditer(avg_y, op_flags=['readwrite']):
y[...] = pw*y + NW*tmp_y[i] # integrate raw data into averaged data
i += 1
N += 1
mpp.plot(avg_x, tmp_y, '0.75') # plot aligned raw data segments
sys.stdout.write("\t\t\t\t\t\t\t [OK]\n")
sys.stdout.flush()
# analyze AP parameters with given criteria
sys.stdout.write(">> ANALYZING... ")
sys.stdout.flush()
# filter noise of averaged data with Savitzky-Golay
avgf_y = sp_sig.savgol_filter(avg_y, runavg, POLYNOM, mode='nearest')
# detect "Peak potential: Maximum potential of AP" (PP) (mV)
avgfmax_i = np.argwhere(avg_x == 0.0) # data point for maximum centered
if not avgfmax_i.size > 0: # data point for maximum left or right of center
tmpavg = int(round(WM_MAX*runavg)) if int(round(WM_MAX*runavg)) % 2 else int(round(WM_MAX*runavg))+1
avgfmax_ii = np.asarray(sp_sig.argrelmax(avgf_y, order=tmpavg)).ravel() # find all maxima
avgfmax_i = avgfmax_ii[np.argmin(np.abs(avg_x[avgfmax_ii] - 0.0))] # return the maximum closest to X = 0.0
avgfmax_x = avg_x[avgfmax_i]
avgfmax_y = avgf_y[avgfmax_i]
pp_y = float(avgfmax_y)
pp = pp_y
# detect and reduce (several) minima in filtered average data,
tmpavg = int(round(WM_MIN*runavg)) if int(round(WM_MIN*runavg)) % 2 else int(round(WM_MIN*runavg))+1
avgfmin_ii = np.asarray(sp_sig.argrelmin(avgf_y, order=tmpavg)).ravel() # find all minima
avgfmin_i = getneighbors(np.asarray([avgfmax_i]), avgfmin_ii, avg_x, avgf_y, AP_HWD, AP_AMP)
avgfmin_x = avg_x[avgfmin_i]
avgfmin_y = avgf_y[avgfmin_i]
# determine "Maximum diastolic potential 1: Minimum potential preceding PP" (MDP1) (mV)
mdp1_i = avgfmin_i[0]
mdp1_x = avg_x[mdp1_i]
mdp1_y = avgf_y[mdp1_i]
mdp1 = mdp1_y
# determine "Maximum diastolic potential 2: Minimum potential following PP" (MDP2) (mV)
mdp2_i = avgfmin_i[-1]
mdp2_x = avg_x[mdp2_i]
mdp2_y = avgf_y[mdp2_i]
mdp2 = mdp2_y
# determine "Cycle length: Time interval MDP1-MDP2" (CL) (ms)
cl = float(mdp2_x - mdp1_x)
# determine "Action potential amplitude: Potential difference of PP minus MDP2" (APA) (mV)
apa = pp - mdp2
# determine "AP duration 30: Time interval at 30% of maximum repolarization" (APD30) (ms)
apd30_l = (pp - 0.30*apa) # threshold value
apd30_i = functools.reduce(np.intersect1d, (np.argwhere(avgf_y > apd30_l), np.argwhere(avg_x >= mdp1_x), np.argwhere(avg_x <= mdp2_x)))
apd30_x = (avg_x[apd30_i[0]-1], avg_x[apd30_i[-1]+1]) # equal to or smaller than apd30_l
apd30_y = (avgf_y[apd30_i[0]-1], avgf_y[apd30_i[-1]+1])
apd30 = float(apd30_x[-1] - apd30_x[0])
# determine "AP duration 50: Time interval at 50% of maximum repolarization" (APD50) (ms)
apd50_l = (pp - 0.50*apa) # threshold value
apd50_i = functools.reduce(np.intersect1d, (np.argwhere(avgf_y > apd50_l), np.argwhere(avg_x >= mdp1_x), np.argwhere(avg_x <= mdp2_x)))
apd50_x = (avg_x[apd50_i[0]-1], avg_x[apd50_i[-1]+1]) # equal to or smaller than apd50_l
apd50_y = (avgf_y[apd50_i[0]-1], avgf_y[apd50_i[-1]+1])
apd50 = float(apd50_x[-1] - apd50_x[0])
# determine "AP duration 90: Time interval at 90% of maximum repolarization" (APD90) (ms)
apd90_l = pp - 0.90*apa
apd90_i = functools.reduce(np.intersect1d, (np.argwhere(avgf_y > apd90_l), np.argwhere(avg_x >= mdp1_x), np.argwhere(avg_x <= mdp2_x)))
apd90_x = (avg_x[apd90_i[0]-1], avg_x[apd90_i[-1]+1]) # equal to or smaller than apd90_l
apd90_y = (avgf_y[apd90_i[0]-1], avgf_y[apd90_i[-1]+1])
apd90 = float(apd90_x[-1] - apd90_x[0])
# calculate derivative of averaged data (mV/ms)
avgfg_y = np.ediff1d(avgf_y) # dY/1, differences between values
avgfg_y = np.insert(avgfg_y, 0, avgfg_y[0]) # preserve array size
avgfg_y = avgfg_y / ipg_t # dY/dX, differences per increment
# filter derivative of averaged data
tmpavg = int(round(WM_DER*runavg)) if int(round(WM_DER*runavg)) % 2 else int(round(WM_DER*runavg))+1
avgfgf_y = sp_sig.savgol_filter(avgfg_y, tmpavg, POLYNOM, mode='nearest')
# determine "Maximum upstroke velocity: Maximum of derivative between MDP1 and PP" (MUV) (mV/ms)
tmpavg = int(round(WM_MAX*runavg)) if int(round(WM_MAX*runavg)) % 2 else int(round(WM_MAX*runavg))+1
avgfgfmax_ii = functools.reduce(np.intersect1d, (sp_sig.argrelmax(avgfgf_y, order=tmpavg), np.argwhere(avg_x >= mdp1_x), np.argwhere(avg_x <= avgfmax_x)))
avgfgfmax_i = getneighbors(np.asarray([avgfmax_i]), avgfgfmax_ii, avg_x, avgfgf_y)[0] # avoid errors from large ap part extensions
avgfgfmax_x = avg_x[avgfgfmax_i]
avgfgfmax_y = avgfgf_y[avgfgfmax_i]
muv = float(avgfgfmax_y)
# determine "Maximum repolarization rate: Minimum of derivative between PP and MDP2" (MRR) (mV/ms)
tmpavg = int(round(WM_MIN*runavg)) if int(round(WM_MIN*runavg)) % 2 else int(round(WM_MIN*runavg))+1
avgfgfmin_ii = functools.reduce(np.intersect1d, (sp_sig.argrelmin(avgfgf_y, order=tmpavg), np.argwhere(avg_x >= avgfmax_x), np.argwhere(avg_x <= mdp2_x)))
avgfgfmin_i = getneighbors(np.asarray([apd90_i[-1]+1]), avgfgfmin_ii, avg_x, avgfgf_y)[0] # mrr or trr
# determine "Transient repolarization rate: Second minimum of derivative between PP and MDP2 after PP, if distinct from MRR" (TRR) (mV/ms)
avgfgfmin_i = np.append(avgfgfmin_i, getneighbors(np.asarray([avgfgfmax_i]), avgfgfmin_ii, avg_x, avgfgf_y)[1]) # trr only
avgfgfmin_x = avg_x[avgfgfmin_i]
avgfgfmin_y = avgfgf_y[avgfgfmin_i]
mrr = float(avgfgf_y[avgfgfmin_i][0])
if avgfgfmin_i[0] == avgfgfmin_i[1]: # no trr
WITH_TRR = False
trr = float("nan")
else:
WITH_TRR = True
trr = float(avgfgf_y[avgfgfmin_i][1]) # True
# approximate diastolic duration in filtered derivative
da_i, da_x, da_y, da_m, da_n, da_r = getbestlinearfit(avg_x, avgfgf_y, mdp1_x, apd90_x[0], 10, 90, 1, 40) # get a baseline for the derivative before exceeding the threshold
# determine "Threshold potential: Potential separating DD and APD." (THR) (mV)
thr_i = functools.reduce(np.intersect1d, (np.argwhere(avgfgf_y >= ((da_m*avg_x + da_n) + 0.5)), np.argwhere(avg_x >= avg_x[da_i[-1]]), np.argwhere(avg_x <= apd50_x[0])))[0].astype(int) # determine baseline-corrected threshold level
thr_x = avg_x[thr_i]
thr_y = avgf_y[thr_i]
thr = float(thr_y)
# determine "Late AP duration"
lapd_l = thr
lapd_i = functools.reduce(np.intersect1d, (np.argwhere(avgf_y > lapd_l), np.argwhere(avg_x >= avgfmax_x), np.argwhere(avg_x <= mdp2_x)))
lapd_x = (0.5*(avg_x[lapd_i[-1]]+avg_x[lapd_i[-1]+1])) # equal to or smaller than lapd_l
lapd_y = (0.5*(avgf_y[lapd_i[-1]]+avgf_y[lapd_i[-1]+1]))
lapd = float(mdp2_x - lapd_x)
# determine "Early diastolic duration: Time from MDP1 to end of linear fit for DDR" (EDD) (ms)
edd_i, edd_x, edd_y, edd_m, edd_n, edd_r = getbestlinearfit(avg_x, avgf_y, mdp1_x, thr_x, 10, 50, 1, 20) # fit EDD within the threshold level determined earlier
edd = float(edd_x[-1]-mdp1_x)
# determine "Diastolic depolarization rate: Potential change rate at end of EDD" (DDR) (mV/ms)
ddr = float(edd_m) # or: np.mean(avgfgf_y[edd_i])
# determine "Diastolic duration: EDD plus LDD" (DD) (ms)
dd = float(thr_x - mdp1_x)
# determine "Late diastolic duration: Time from end of linear fit for DDR to THR" (LDD) (ms)
ldd = float(thr_x - edd_x[-1])
# determine "Action potential duration: Time between THR and MDP2" (APD) (ms)
apd = float(mdp2_x - thr_x)
sys.stdout.write("\t\t\t\t\t\t\t [OK]\n")
sys.stdout.flush()
# create analysis plot
sys.stdout.write(">> PLOTTING... ") # the X-axis and the individual segments are already plotted during averaging
sys.stdout.flush()
mpp.plot([mdp1_x, thr_x], [mdp1_y, mdp1_y], 'k-.') # DD (black dashed/dotted line)
mpp.plot([thr_x, mdp2_x], [mdp2_y, mdp2_y], 'k') # APD (black line)
mpp.plot([lapd_x, mdp2_x], [lapd_y, lapd_y], 'r-.') # LAPD (black line)
mpp.plot([apd50_x[0], apd50_x[1]], [apd50_y[1], apd50_y[1]], 'k') # APD50 (black line)
mpp.plot([apd90_x[0], apd90_x[1]], [apd90_y[1], apd90_y[1]], 'k') # APD90 (black line)
mpp.plot([mdp1_x, mdp1_x], [mdp1_y, 0.0], 'k:') # MDP1 indicator (black dotted line)
mpp.plot([mdp2_x, mdp2_x], [mdp2_y, 0.0], 'k:') # MDP2 indicator (black dotted line)
mpp.plot([avgfgfmax_x, avgfgfmax_x], [mdp2_y, avgf_y[avgfgfmax_i]], 'k:') # MUV indicator (black dotted line)
mpp.plot([avgfgfmin_x[0], avgfgfmin_x[0]], [mdp2_y, avgf_y[avgfgfmin_i[0]]], 'k:') # MRR indicator (black dotted line)
mpp.plot([edd_x[-1], edd_x[-1]], [mdp2_y, 0.0], 'k:') # EDD/LDD separator (black dashed line)
mpp.plot([thr_x, thr_x], [thr_y, 0.0], 'k:') # DD/APD upper separator (black dotted line)
mpp.plot([thr_x, thr_x], [mdp2_y, thr_y], 'k:') # DD/APD lower separator (black dotted line)
mpp.plot(avg_x, avg_y, 'k', avg_x, avgf_y, 'r') # averaged data and filtered averaged data (black, red lines)
mpp.plot(avg_x[edd_i], avgf_y[edd_i], 'g') # best linear fit segment for DDR (green line)
mpp.plot(avg_x, (edd_m*avg_x + edd_n), 'k--') # DDR (black dashed line)
mpp.plot([edd_x[-1]], [edd_y[-1]], 'ko') # EDD-LDD separator (black dot)
mpp.plot(lapd_x, lapd_y, 'ro') # LAPD (black dot)
mpp.plot([apd50_x[1]], [apd50_y[1]], 'ko') # APD50 (black dots)
mpp.plot(apd90_x[1], apd90_y[1], 'ko') # APD90 (black dots)
mpp.plot(thr_x, avgf_y[thr_i], 'ro') # THR (red dot)
mpp.plot(avgfgfmax_x, avgf_y[avgfgfmax_i], 'wo') # MUV (white dot)
mpp.plot(avgfgfmin_x[0], avgf_y[avgfgfmin_i[0]], 'wo') # MRR (white dot)
if WITH_TRR:
mpp.plot([avgfgfmin_x[1], avgfgfmin_x[1]], [mdp2_y, avgf_y[avgfgfmin_i[1]]], 'k:') # trr indicator (black dotted line)
mpp.plot(avgfgfmin_x[1], avgf_y[avgfgfmin_i[1]], 'wo') # trr (white dot)
mpp.plot(avgfmax_x, pp_y, 'bo') # PP (blue dot)
mpp.plot(avgfmin_x, avgfmin_y, 'go') # MDP1, MDP2 (green dots)
mpp.figtext(0.12, 0.91, "{0:<s} {1:<.4G}".format("APs (#):", rawfmax_y.size), ha='left', va='center')
mpp.figtext(0.12, 0.88, "{0:<s} {1:<.4G}".format("FR (AP/min):", frate), ha='left', va='center')
mpp.figtext(0.12, 0.85, "{0:<s} {1:<.4G}".format("CL (ms):", cl), ha='left', va='center')
mpp.figtext(0.12, 0.82, "{0:<s} {1:<.4G}".format("DD (ms):", dd), ha='left', va='center')
mpp.figtext(0.12, 0.79, "{0:<s} {1:<.4G}".format("EDD (ms):", edd), ha='left', va='center')
mpp.figtext(0.12, 0.76, "{0:<s} {1:<.4G}".format("LDD (ms):", ldd), ha='left', va='center')
mpp.figtext(0.12, 0.73, "{0:<s} {1:<.4G}".format("APD (ms):", apd), ha='left', va='center')
mpp.figtext(0.12, 0.70, "{0:<s} {1:<.4G}".format("LAPD (ms):", lapd), ha='left', va='center')
mpp.figtext(0.12, 0.67, "{0:<s} {1:<.4G}".format("APD50 (ms):", apd50), ha='left', va='center')
mpp.figtext(0.12, 0.64, "{0:<s} {1:<.4G}".format("APD90 (ms):", apd90), ha='left', va='center')
mpp.figtext(0.12, 0.61, "{0:<s} {1:<.4G}".format("MDP1 (mV):", mdp1), ha='left', va='center')
mpp.figtext(0.12, 0.58, "{0:<s} {1:<.4G}".format("MDP2 (mV):", mdp2), ha='left', va='center')
mpp.figtext(0.12, 0.55, "{0:<s} {1:<.4G}".format("THR (mV):", thr), ha='left', va='center')
mpp.figtext(0.12, 0.52, "{0:<s} {1:<.4G}".format("PP (mV):", pp), ha='left', va='center')
mpp.figtext(0.12, 0.49, "{0:<s} {1:<.4G}".format("APA (mV):", apa), ha='left', va='center')
mpp.figtext(0.12, 0.46, "{0:<s} {1:<.4G}".format("DDR (mV/ms):", ddr), ha='left', va='center')
mpp.figtext(0.12, 0.43, "{0:<s} {1:<.4G}".format("MUV (mV/ms):", muv), ha='left', va='center')
mpp.figtext(0.12, 0.40, "{0:<s} {1:<.4G}".format("TRR (mV/ms):", trr), ha='left', va='center')
mpp.figtext(0.12, 0.37, "{0:<s} {1:<.4G}".format("MRR (mV/ms):", mrr), ha='left', va='center')
mpp.subplot2grid((4, 1), (3, 0)) # lower subplot
mpp_setup(title="", xlabel='Time (ms)', ylabel='(mV/ms)')
mpp.plot([avg_x[0], avg_x[-1]], [0.0, 0.0], '0.85') # x axis
mpp.plot([avgfgfmin_x[0], avgfgfmin_x[0]], [avgfgfmin_y[0], avgfgfmax_y], 'k:') # MRR indicator (black dotted line)
if WITH_TRR:
mpp.plot([avgfgfmin_x[1], avgfgfmin_x[1]], [avgfgfmin_y[1], avgfgfmax_y], 'k:') # TRR indicator (black dotted line)
mpp.plot([thr_x, thr_x], [avgfgf_y[thr_i], avgfgfmax_y], 'k:') # THR indicator (black dotted line)
mpp.plot(avg_x, avgfg_y, 'c', avg_x, avgfgf_y, 'm') # derivative and filtered derivative
mpp.plot(avg_x[da_i], avgfgf_y[da_i], 'g') # best linear fit SEGMENT for THR (green line)
mpp.plot(avg_x, (da_m*avg_x + da_n), 'k--') # best linear fit for THR (black dashed line)
mpp.plot(thr_x, avgfgf_y[thr_i], 'ro') # THR (red dot)
mpp.plot(avgfgfmax_x, avgfgfmax_y, 'bo') # derivative maximum (blue dot)
mpp.plot(avgfgfmin_x, avgfgfmin_y, 'go') # derivative minima (green dots)
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
# data summary
sys.stdout.write(">> SAVING... ")
sys.stdout.flush()
avg_file = os.path.join(WORKDIR, name + "_" + TIMESTAMP + "_avg.dat")
UHEADER = "" +\
"Analysis start time: " + 4*"\t" + str(tmp_start) + " ms\n" + \
"Analysis stop time:" + 4*"\t" + str(tmp_stop) + " ms\n" + \
"Upper limit for maxima:" + 3*"\t" + str(AP_MAX) + " mV\n" + \
"Lower limit for maxima:" + 3*"\t" + str(AP_MIN) + " mV\n" + \
"Upper limit for minima:" + 3*"\t" + str(MDP_MAX) + " mV\n" + \
"Lower limit for minima:" + 3*"\t" + str(MDP_MIN) + " mV\n" + \
"Maximum peak half width:" + 3*"\t" + str(AP_HWD) + " ms\n" + \
"Minimum peak amplitude:" + 3*"\t" + str(AP_AMP) + " mV\n" + \
"Running average window size:" + 2*"\t" + str(runavg) + "\n" + \
"Window multiplier for derivative:" + "\t" + str(WM_DER) + "\n" + \
"Window multiplier for maxima:" + 2*"\t" + str(WM_MAX) + "\n" + \
"Window multiplier for minima:" + 2*"\t" + str(WM_MIN) + "\n" + \
"Time (ms)" + "\t" + "Averaged signal (mV)" + "\t" + "Filtered average (mV)"
np.savetxt(avg_file, np.column_stack((avg_x, avg_y, avgf_y)), fmt='%e', delimiter='\t', header=UHEADER)
mpp.tight_layout()
mpp.savefig(pdf_file, format='pdf', dpi=600)
sum_file = os.path.join(WORKDIR, "ParamAP.log")
NEWFILE = not bool(os.path.exists(sum_file))
with open(sum_file, 'a') as targetfile: # append file
if NEWFILE: # write header
targetfile.write("{0:s}\t{1:s}\t{2:s}\t{3:s}\t{4:s}\t{5:s}\t{6:s}\t{7:s}\t{8:s}\t{9:s}\t{10:s}\t{11:s}\t{12:s}\t{13:s}\t{14:s}\t{15:s}\t{16:s}\t{17:s}\t{18:s}\t{19:s}\t{20:s}\t{21:s}".format(
"File ( )", "Start (ms)", "Stop (ms)", "APs (#)", "FR (AP/min)", "CL (ms)", "DD (ms)", "EDD (ms)", "LDD (ms)", "APD (ms)", "LAPD (ms)", "APD50 (ms)", "APD90 (ms)", "MDP1 (mV)", "MDP2 (mV)", "THR (mV)", "PP (mV)", "APA (mV)", "DDR (mV/ms)", "MUV (mV/ms)", "TRR (mV/ms)", "MRR (mV/ms)") + "\n")
targetfile.write("{0:s}\t{1:4G}\t{2:4G}\t{3:4G}\t{4:4G}\t{5:4G}\t{6:4G}\t{7:4G}\t{8:4G}\t{9:4G}\t{10:4G}\t{11:4G}\t{12:4G}\t{13:4G}\t{14:4G}\t{15:4G}\t{16:4G}\t{17:4G}\t{18:4G}\t{19:4G}\t{20:4G}\t{21:4G}".format(
name, tmp_start, tmp_stop, rawfmax_y.size, frate, cl, dd, edd, ldd, apd, lapd, apd50, apd90, mdp1, mdp2, thr, pp, apa, ddr, muv, trr, mrr) + "\n")
targetfile.flush()
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
if not AUTORUN:
mpp.show()
except IndexError as ierr: # check running average and window multiplier
sys.stdout.write("\n" + 9*"\t" + " [ER]")
print("\r ## Run failed. Detection of extrema or threshold failed.")
except PermissionError as perr: # file already opened or storage read-only
sys.stdout.write("\n" + 9*"\t" + " [ER]")
print("\r ## Run failed. File access denied by system.")
except Warning as werr: # increase averaging window time
sys.stdout.write("\n" + 9*"\t" + " [ER]")
print("\r ## Run failed. Identification of action potentials failed.")
except Exception as uerr: # unknown
sys.stdout.write("\n" + 9*"\t" + " [UN]")
print("\r ## Run failed. Error was: {0}".format(uerr) + ".")
except KeyboardInterrupt as kerr: # user canceled this file
sys.stdout.write("\n" + 9*"\t" + " [KO]")
print("\r ## Run skipped. Canceled by user.")
if SERIES: # check for next frame
if tmp_stop + AVG_FRAME <= avg_stop:
SEGMENT += 1.0
tmp_start = avg_start + SEGMENT*AVG_FRAME # prepare next frame for preview
tmp_stop = tmp_start + AVG_FRAME
raw_i = np.argwhere((RAW_XY[0] >= tmp_start) & (RAW_XY[0] <= tmp_stop)).ravel()
raw_x = RAW_XY[0][raw_i[0]:raw_i[-1]+1]
raw_y = RAW_XY[1][raw_i[0]:raw_i[-1]+1]
print()
print("RUN:\t" + str(int(SEGMENT + 1)) + "/" + str(math.floor((avg_stop-avg_start)/AVG_FRAME)))
print()
else: # not enough data left in file
break
else: # no time series analysis
break
if not AUTORUN: # check for next file
print()
NEXTFILE = askboolean("Continue with next file?", True)
if NEXTFILE:
break
else: # re-run current file
raw_x = RAW_XY[0] # recover original rawdata
raw_y = RAW_XY[1]
continue
else: # autorun
break
# housekeeping after each file
FILE += 1
sys.stdout.write(">> CLEANING... ")
sys.stdout.flush()
pdf_file.close() # close multi-pdf file and remove if empty
mpp.clf() # clear canvas
gc.collect() # start garbage collection to prevent memory fragmentation
sys.stdout.write(8*"\t" + " [OK]\n")
sys.stdout.flush()
# print summary
print('{0:^79}'.format(SEPARBOLD))
SUMMARY = "End of run: " + str(FILE) + str(" files" if FILE != 1 else " file") + " processed."
print('{0:^79}'.format(SUMMARY))
print('{0:^79}'.format(SEPARBOLD) + os.linesep)
WAIT = input("Press ENTER to end this program.")