-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmpmc.go
222 lines (192 loc) · 5.62 KB
/
mpmc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package mpmc
import (
"context"
"sync"
)
type FastMpmc[T any] struct {
bufMinCap int
buffer *[]T
bufferPool *sync.Pool
// chan waiting for notification before emptying the buffer
popAllCondChan chan struct{}
bufferMu *sync.Mutex
}
// NewFastMpmc creates a new MPMC queue with the specified minimum buffer capacity.
func NewFastMpmc[T any](bufMinCap int) *FastMpmc[T] {
var buffer = make([]T, 0, bufMinCap)
return &FastMpmc[T]{
bufMinCap: bufMinCap,
buffer: &buffer,
bufferMu: &sync.Mutex{},
popAllCondChan: make(chan struct{}, 1),
bufferPool: &sync.Pool{
New: func() interface{} {
var buffer = make([]T, 0, bufMinCap)
return &buffer
},
},
}
}
// enableSignal give a signal indicating that the buffer is not empty
func (b *FastMpmc[T]) enableSignal() {
select {
case b.popAllCondChan <- struct{}{}:
default:
}
}
// Push adds one or more elements to the queue.
func (b *FastMpmc[T]) Push(v ...T) {
b.PushSlice(v)
}
// PushSlice adds a slice of elements to the queue.
func (b *FastMpmc[T]) PushSlice(values []T) {
if len(values) == 0 {
return
}
b.bufferMu.Lock()
*b.buffer = append(*b.buffer, values...)
if len(*b.buffer) == len(values) {
b.enableSignal()
}
b.bufferMu.Unlock()
}
// popAll removes and returns all elements from the queue.
//
// If the queue is empty, it returns nil, this happens only with a low probability when SwapBuffer is called.
//
// The caller should wait the signal before calling this function.
func (b *FastMpmc[T]) popAll() *[]T {
var newBuffer = b.bufferPool.Get().(*[]T)
var ret *[]T
b.bufferMu.Lock()
defer b.bufferMu.Unlock()
// low probability
if len(*b.buffer) == 0 {
b.bufferPool.Put(newBuffer)
return nil
}
ret = b.buffer
b.buffer = newBuffer
return ret
}
// swapWhenNotEmpty swaps the current buffer with the provided new buffer only if the current buffer is not empty.
// If the swap is performed, it returns the old buffer. If the current buffer is empty, it returns nil.
//
// The caller should wait the signal before calling this function.
func (b *FastMpmc[T]) swapWhenNotEmpty(newBuffer *[]T) *[]T {
var ret *[]T
b.bufferMu.Lock()
defer b.bufferMu.Unlock()
if len(*b.buffer) == 0 {
// low probability
return nil
}
ret = b.buffer
b.buffer = newBuffer
// case 1: non-empty buffer -> non-empty buffer
if len(*newBuffer) != 0 {
b.enableSignal()
}
// case 2: non-empty buffer -> empty buffer, do nothing
return ret
}
// WaitSwapBuffer waits for a signal indicating that the buffer is not empty,
// then swaps the current buffer with the provided new buffer and returns the old buffer.
// This method blocks until the buffer is not empty and the swap is performed.
//
// Note: The function will directly replace the old buffer with the new buffer without clearing the new buffer's elements.
func (b *FastMpmc[T]) WaitSwapBuffer(newBuffer *[]T) *[]T {
for range b.popAllCondChan {
if elements := b.swapWhenNotEmpty(newBuffer); elements != nil {
return elements
}
}
panic("unreachable")
}
// WaitSwapBufferContext like WaitSwapBuffer but with a context.
func (b *FastMpmc[T]) WaitSwapBufferContext(ctx context.Context, newBuffer *[]T) (*[]T, bool) {
for {
select {
case <-b.popAllCondChan:
if elements := b.swapWhenNotEmpty(newBuffer); elements != nil {
return elements, true
}
case <-ctx.Done():
return nil, false
}
}
}
// SwapBuffer swaps the current buffer with a new buffer and returns the old buffer even if the current buffer is empty.
//
// Note: The function will directly replace the old buffer with the new buffer without clearing the new buffer's elements.
func (b *FastMpmc[T]) SwapBuffer(newBuffer *[]T) *[]T {
var ret *[]T
b.bufferMu.Lock()
defer b.bufferMu.Unlock()
ret = b.buffer
b.buffer = newBuffer
if len(*ret) == 0 && len(*newBuffer) != 0 {
// case 1: empty buffer -> non-empty buffer
b.enableSignal()
} else if len(*ret) != 0 && len(*newBuffer) == 0 {
// case 2: non-empty buffer -> empty buffer
select {
case <-b.popAllCondChan:
default:
// Uh-oh, there's a poor kid who can't read the buffer.
}
}
// case 3: empty buffer -> empty buffer, do nothing
// case 4: non-empty buffer -> non-empty buffer, do nothing
return ret
}
// RecycleBuffer returns the given buffer to the buffer pool for reuse.
// This helps to reduce memory allocations by reusing previously allocated buffers.
func (b *FastMpmc[T]) RecycleBuffer(buffer *[]T) {
*buffer = (*buffer)[:0]
b.bufferPool.Put(buffer)
}
// WaitPopAll waits for elements to be available and then removes and returns all elements from the queue.
// After calling this function, you can call RecycleBuffer to recycle the buffer.
func (b *FastMpmc[T]) WaitPopAll() *[]T {
for range b.popAllCondChan {
if elements := b.popAll(); elements != nil {
return elements
}
}
panic("unreachable")
}
// WaitPopAllContext like WaitPopAll but with a context.
func (b *FastMpmc[T]) WaitPopAllContext(ctx context.Context) (*[]T, bool) {
for {
select {
case <-b.popAllCondChan:
if elements := b.popAll(); elements != nil {
return elements, true
}
case <-ctx.Done():
return nil, false
}
}
}
// TryPopAll tries to remove and return all elements from the queue without blocking. Returns the elements and a boolean indicating success.
func (b *FastMpmc[T]) TryPopAll() (*[]T, bool) {
select {
case <-b.popAllCondChan:
if elements := b.popAll(); elements != nil {
return elements, true
}
return nil, false
default:
return nil, false
}
}
func (b *FastMpmc[T]) Len() int {
return len(*b.buffer)
}
func (b *FastMpmc[T]) Cap() int {
return cap(*b.buffer)
}
func (b *FastMpmc[T]) IsEmpty() bool {
return len(*b.buffer) == 0
}