-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrency_test.go
84 lines (74 loc) · 1.67 KB
/
concurrency_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package consistentio
import (
"bytes"
"fmt"
"sort"
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func TestConcurrentConsistentIO(t *testing.T) {
_, err := NewConcurrentConsistentIO(nil)
require.NotNil(t, err)
k1, w1 := "k1", bytes.NewBuffer(nil)
cio, err := NewConsistentIO(AddWriter(k1, w1))
require.Nil(t, err)
cio.o = nil
_, err = NewConcurrentConsistentIO(cio)
require.NotNil(t, err)
cio, err = NewConsistentIO(AddWriter(k1, w1))
require.Nil(t, err)
cio.o.Writers = nil
_, err = NewConcurrentConsistentIO(cio)
require.NotNil(t, err)
cio, err = NewConsistentIO(AddWriter(k1, w1))
require.Nil(t, err)
_, err = NewConcurrentConsistentIO(cio)
require.Nil(t, err)
}
func TestConcurrentConsistentIOWrite(t *testing.T) {
var (
opts []opt
ks []int
n = 1000
ws = make(map[string]*bytes.Buffer, n)
)
for i := 0; i < n; i++ {
k, w := fmt.Sprintf("%d", i), bytes.NewBuffer(nil)
ws[k] = w
ks = append(ks, i)
opts = append(opts, AddWriter(k, w))
}
cio, err := NewConsistentIO(opts...)
require.Nil(t, err)
require.Equal(t, n, len(cio.writers))
ccio, err := NewConcurrentConsistentIO(cio)
require.Nil(t, err)
var (
nums []int
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(n)
for i := 0; i < n; i++ {
k := fmt.Sprintf("%d", i)
go func(k string) {
defer wg.Done()
reqID, ch := ccio.Write(k, []byte(k))
e := <-ch
require.Nil(t, e.err)
require.Equal(t, reqID, e.reqID)
i, err := strconv.Atoi(ws[k].String())
require.Nil(t, err)
mu.Lock()
nums = append(nums, i)
mu.Unlock()
}(k)
}
wg.Wait()
sort.Slice(nums, func(i, j int) bool {
return nums[i] < nums[j]
})
require.Equal(t, ks, nums)
}