-
-
Notifications
You must be signed in to change notification settings - Fork 841
/
Copy pathconcurrency.go
136 lines (117 loc) · 3.5 KB
/
concurrency.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package lo
import (
"context"
"sync"
"time"
)
type synchronize struct {
locker sync.Locker
}
func (s *synchronize) Do(cb func()) {
s.locker.Lock()
Try0(cb)
s.locker.Unlock()
}
// Synchronize wraps the underlying callback in a mutex. It receives an optional mutex.
func Synchronize(opt ...sync.Locker) *synchronize {
if len(opt) > 1 {
panic("unexpected arguments")
} else if len(opt) == 0 {
opt = append(opt, &sync.Mutex{})
}
return &synchronize{
locker: opt[0],
}
}
// Async executes a function in a goroutine and returns the result in a channel.
func Async[A any](f func() A) <-chan A {
ch := make(chan A, 1)
go func() {
ch <- f()
}()
return ch
}
// Async0 executes a function in a goroutine and returns a channel set once the function finishes.
func Async0(f func()) <-chan struct{} {
ch := make(chan struct{}, 1)
go func() {
f()
ch <- struct{}{}
}()
return ch
}
// Async1 is an alias to Async.
func Async1[A any](f func() A) <-chan A {
return Async(f)
}
// Async2 has the same behavior as Async, but returns the 2 results as a tuple inside the channel.
func Async2[A, B any](f func() (A, B)) <-chan Tuple2[A, B] {
ch := make(chan Tuple2[A, B], 1)
go func() {
ch <- T2(f())
}()
return ch
}
// Async3 has the same behavior as Async, but returns the 3 results as a tuple inside the channel.
func Async3[A, B, C any](f func() (A, B, C)) <-chan Tuple3[A, B, C] {
ch := make(chan Tuple3[A, B, C], 1)
go func() {
ch <- T3(f())
}()
return ch
}
// Async4 has the same behavior as Async, but returns the 4 results as a tuple inside the channel.
func Async4[A, B, C, D any](f func() (A, B, C, D)) <-chan Tuple4[A, B, C, D] {
ch := make(chan Tuple4[A, B, C, D], 1)
go func() {
ch <- T4(f())
}()
return ch
}
// Async5 has the same behavior as Async, but returns the 5 results as a tuple inside the channel.
func Async5[A, B, C, D, E any](f func() (A, B, C, D, E)) <-chan Tuple5[A, B, C, D, E] {
ch := make(chan Tuple5[A, B, C, D, E], 1)
go func() {
ch <- T5(f())
}()
return ch
}
// Async6 has the same behavior as Async, but returns the 6 results as a tuple inside the channel.
func Async6[A, B, C, D, E, F any](f func() (A, B, C, D, E, F)) <-chan Tuple6[A, B, C, D, E, F] {
ch := make(chan Tuple6[A, B, C, D, E, F], 1)
go func() {
ch <- T6(f())
}()
return ch
}
// WaitFor runs periodically until a condition is validated.
func WaitFor(condition func(i int) bool, timeout time.Duration, heartbeatDelay time.Duration) (totalIterations int, elapsed time.Duration, conditionFound bool) {
conditionWithContext := func(_ context.Context, currentIteration int) bool {
return condition(currentIteration)
}
return WaitForWithContext(context.Background(), conditionWithContext, timeout, heartbeatDelay)
}
// WaitForWithContext runs periodically until a condition is validated or context is canceled.
func WaitForWithContext(ctx context.Context, condition func(ctx context.Context, currentIteration int) bool, timeout time.Duration, heartbeatDelay time.Duration) (totalIterations int, elapsed time.Duration, conditionFound bool) {
start := time.Now()
if ctx.Err() != nil {
return totalIterations, time.Since(start), false
}
ctx, cleanCtx := context.WithTimeout(ctx, timeout)
ticker := time.NewTicker(heartbeatDelay)
defer func() {
cleanCtx()
ticker.Stop()
}()
for {
select {
case <-ctx.Done():
return totalIterations, time.Since(start), false
case <-ticker.C:
totalIterations++
if condition(ctx, totalIterations-1) {
return totalIterations, time.Since(start), true
}
}
}
}