-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess_control.py
53 lines (39 loc) · 1.39 KB
/
process_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import time
from multiprocessing import Process, Queue
from gestures_recognition_demo import GestureEngine
from sensor_fusion import FusionEngine
from speech_recognition_demo import SpeechEngine
def start_gesture_recognition(queue: Queue):
ge = GestureEngine(queue=queue)
ge.start_prediction()
def start_speech_engine(queue: Queue):
se = SpeechEngine(queue=queue)
se.start_recognition()
def start_fusion_engine(queue: Queue):
FusionEngine(_queue=queue)
class ProcessManager:
def __init__(self):
self.procs = []
def start_engines(self):
com_queue = Queue()
print("Starting Engines...")
engines = [start_fusion_engine, start_gesture_recognition, start_speech_engine]
# engines = [start_fusion_engine, start_gesture_recognition]
# engines = [start_fusion_engine, start_speech_engine]
for engine in engines:
proc = Process(target=engine, args=(com_queue,))
self.procs.append(proc)
proc.start()
time.sleep(5)
print("Waiting for Engines...")
for proc in self.procs:
try:
proc.join()
except KeyboardInterrupt:
proc.terminate()
proc.join()
print("Stopping Engines...")
def stop_engines(self):
# complete the processes
for proc in self.procs:
proc.join()