-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlive_emg.py
70 lines (52 loc) · 1.63 KB
/
live_emg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from matplotlib import pyplot as plt
from collections import deque
from threading import Lock, Thread
import myo
import numpy as np
class EmgCollector(myo.DeviceListener):
"""
Collects EMG data in a queue with *n* maximum number of elements.
"""
def __init__(self, n):
self.n = n
self.lock = Lock()
self.emg_data_queue = deque(maxlen=n)
def get_emg_data(self):
with self.lock:
return list(self.emg_data_queue)
# myo.DeviceListener
def on_connected(self, event):
event.device.stream_emg(True)
def on_emg(self, event):
with self.lock:
self.emg_data_queue.append((event.timestamp, event.emg))
class Plot(object):
def __init__(self, listener):
self.n = listener.n
self.listener = listener
self.fig = plt.figure()
self.axes = [self.fig.add_subplot('81' + str(i)) for i in range(1, 9)]
[(ax.set_ylim([-100, 100])) for ax in self.axes]
self.graphs = [ax.plot(np.arange(self.n), np.zeros(self.n))[0] for ax in self.axes]
plt.ion()
def update_plot(self):
emg_data = self.listener.get_emg_data()
emg_data = np.array([x[1] for x in emg_data]).T
for g, data in zip(self.graphs, emg_data):
if len(data) < self.n:
# Fill the left side with zeroes.
data = np.concatenate([np.zeros(self.n - len(data)), data])
g.set_ydata(data)
plt.draw()
def main(self):
while True:
self.update_plot()
plt.pause(1.0 / 30)
def main():
myo.init(sdk_path='myo-sdk-win-0.9.0')
hub = myo.Hub()
listener = EmgCollector(512)
with hub.run_in_background(listener.on_event):
Plot(listener).main()
if __name__ == '__main__':
main()