-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimer_service.py
93 lines (77 loc) · 2.46 KB
/
timer_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
""" DefaultTimer module.
Generated by YAKINDU Statechart Tools code generator.
"""
import threading
import time
class TimerService:
"""Default timer service for timed statecharts.
"""
def __init__(self):
self.is_default_timer = True
self.timer_queue = {}
def set_timer(self, callback, event_id, interval, periodic):
"""Schedule the time event.
Checks if the given time event is already in queue and creates a
timer based on the `periodic`parameter (Single or Repeat).
Note: statemachine class converts time from sec to milliseconds,
so do vice versa.
"""
timer_id = (callback, event_id)
if timer_id in self.timer_queue:
self.unset_timer(callback, event_id)
m_interval = float(interval)/1000.0
if periodic is False:
self.timer_queue[timer_id] = SingleTimer(m_interval, callback, event_id)
else:
self.timer_queue[timer_id] = RepeatedTimer(m_interval, callback, event_id)
def unset_timer(self, callback, event_id):
"""Cancel a certain event identified bei `callback` and `event_id`.
"""
timer_id = (callback, event_id)
with threading.RLock():
if timer_id in self.timer_queue:
event_timer = self.timer_queue[timer_id]
if isinstance(event_timer, RepeatedTimer):
event_timer.stop()
else:
event_timer.single_timer.cancel()
del event_timer
def cancel(self):
"""Cancel all events that are currently running.
"""
with threading.RLock():
for (callback, event_id) in self.timer_queue:
self.unset_timer(callback, event_id)
class SingleTimer:
"""Call `function` after `period` seconds.
"""
def __init__(self, period, callback, event_id):
self.callback = callback
self.event_id = event_id
self.single_timer = threading.Timer(period, self.callback.time_elapsed, [self.event_id])
self.single_timer.start()
class RepeatedTimer:
"""Repeat `callback` every `interval` seconds.
"""
def __init__(self, interval, callback, event_id):
self.interval = interval
self.callback = callback
self.event_id = event_id
self.start = time.time()
self.event = threading.Event()
self.thread = threading.Thread(target=self._target)
self.thread.start()
def _target(self):
while not self.event.wait(self._time):
self.callback.time_elapsed(self.event_id)
@property
def _time(self):
return self.interval - ((time.time() - self.start) % self.interval)
def stop(self):
"""Stops the repeated timer.
"""
self.event.set()
try:
self.thread.join()
except RuntimeError:
print('Can not join thread')