-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsemaphore.go
108 lines (87 loc) · 2.4 KB
/
semaphore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package timequeue
type semaphore struct {
buffered bool
signal internalSignal
ch chan internalSignal
processedSignals map[internalSignal]bool
actions map[internalSignal]func()
broadcasts map[internalSignal][]*semaphore
}
func newSemaphore(buf int) *semaphore {
buffered := buf > 0
return &semaphore{
buffered: buffered,
ch: make(chan internalSignal, buf),
processedSignals: make(map[internalSignal]bool),
actions: make(map[internalSignal]func()),
broadcasts: make(map[internalSignal][]*semaphore),
}
}
func (sema *semaphore) channel() <-chan internalSignal { return sema.ch }
func (sema *semaphore) addr() *internalSignal { return &sema.signal }
func (sema *semaphore) peek(expected internalSignal) {
for sema.signal = range sema.ch {
sema.doAction()
if sema.signal == expected {
break
}
}
}
func (sema *semaphore) view() internalSignal { return sema.signal }
func (sema *semaphore) doAction() {
if sema.processedSignals[sema.signal] {
action, ok := sema.actions[sema.signal]
if ok {
action()
}
}
}
func (sema *semaphore) processAvailableSignal() {
if sema.buffered && len(sema.ch) > 0 {
sema.signal = <-sema.ch
sema.doAction()
}
}
func (sema *semaphore) send(signal internalSignal) *semaphore {
sema.ch <- signal
return sema
}
func (sema *semaphore) processedSignal(signals ...internalSignal) *semaphore {
for k := range sema.processedSignals {
delete(sema.processedSignals, k)
}
for _, signal := range signals {
sema.processedSignals[signal] = true
}
return sema
}
func (sema *semaphore) register(signal internalSignal, action func()) *semaphore {
sema.actions[signal] = action
return sema
}
func (sema *semaphore) deRegister(signal internalSignal) *semaphore {
delete(sema.actions, signal)
return sema
}
func (sema *semaphore) expect(signal internalSignal) bool { return sema.signal == signal }
func (sema *semaphore) broadcastTo(target *semaphore, signal internalSignal) *semaphore {
targets, ok := sema.broadcasts[signal]
if ok {
for _, t := range targets {
if target == t {
return sema
}
}
}
sema.broadcasts[signal] = append(sema.broadcasts[signal], target)
return sema
}
func (sema *semaphore) broadcast(signal internalSignal) *semaphore {
targets, ok := sema.broadcasts[signal]
if ok {
for _, target := range targets {
target.send(signal)
}
}
return sema
}