-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkerpool.go
106 lines (85 loc) · 1.65 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package workerpool
import (
"sync"
"github.com/google/uuid"
)
type WorkerPool[T any] struct {
id string
numWorker int
bufSize int
onPanic func(interface{})
worker Worker[T]
buf chan T
running bool
wg sync.WaitGroup
}
func defaultOnPanic(issue interface{}) {
panic(issue)
}
func NewWorkerPool[T any](worker Worker[T], options ...WorkerPoolOption[T]) *WorkerPool[T] {
pool := &WorkerPool[T]{
id: uuid.NewString(),
worker: worker,
onPanic: defaultOnPanic,
}
for _, opt := range options {
opt(pool)
}
pool.buf = make(chan T, pool.BufSize())
return pool
}
func (pool WorkerPool[T]) ID() string {
return pool.id
}
func (pool WorkerPool[T]) NumWorker() int {
numWorker := pool.numWorker
if pool.numWorker <= 0 {
numWorker = 10
}
return numWorker
}
func (pool WorkerPool[T]) BufSize() int {
bufSize := pool.bufSize
if pool.bufSize <= 0 {
bufSize = 1000
}
return bufSize
}
func (pool *WorkerPool[T]) recoverPanic() {
r := recover()
if r != nil {
pool.onPanic(r)
}
}
func (pool *WorkerPool[T]) doSeed() {
defer close(pool.buf)
defer pool.recoverPanic()
defer pool.wg.Done()
pool.worker.Seed(pool.buf)
}
func (pool *WorkerPool[T]) doJob() {
defer pool.recoverPanic()
defer pool.wg.Done()
for data := range pool.buf {
pool.worker.Job(data)
}
}
func (pool *WorkerPool[T]) Do() {
if pool.running {
return
}
pool.running = true
pool.wg.Add(pool.NumWorker() + 1)
for i := 0; i < pool.NumWorker(); i++ {
go pool.doJob()
}
go pool.doSeed()
pool.wg.Wait()
pool.running = false
}
func (pool *WorkerPool[T]) DoAsync() {
go func(wp *WorkerPool[T]) {
defer wp.recoverPanic()
wp.Do()
}(pool)
}