-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
119 lines (99 loc) · 2.06 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package haro
import (
"context"
"sync"
"time"
)
type Subscriber[Payload any] func(context.Context, Payload) error
type Topic[Payload any] interface {
Publish(context.Context, Payload) error
Subscribe(sub Subscriber[Payload], configurators ...SubscriberConfigFunc)
}
type payloadPair[Payload any] struct {
ctx context.Context
payload Payload
}
type topic[Payload any] struct {
mtx sync.Mutex
subs []subscriberPair[Payload]
stream chan payloadPair[Payload]
}
type subscriberPair[Payload any] struct {
subscriber Subscriber[Payload]
cfg *subscriberConfig
}
func DeclareTopic[Payload any]() Topic[Payload] {
t := &topic[Payload]{
mtx: sync.Mutex{},
subs: make([]subscriberPair[Payload], 0),
stream: make(chan payloadPair[Payload], 0),
}
t.run()
return t
}
func (t *topic[any]) run() {
go func() {
for {
select {
case p := <-t.stream:
for _, pair := range t.subs {
callSubscriber[any](pair.cfg, pair.subscriber, p)
}
}
}
}()
}
func callSubscriber[Payload any](
cfg *subscriberConfig,
sub Subscriber[Payload],
p payloadPair[Payload],
) {
if cfg.retry <= 0 {
err := sub(p.ctx, p.payload)
if err != nil && cfg.onError != nil {
cfg.onError(err)
}
if err == nil && cfg.onSuccess != nil {
cfg.onSuccess()
}
} else {
for i := 0; i < cfg.retry; i++ {
err := sub(p.ctx, p.payload)
if err != nil && cfg.onError != nil {
cfg.onError(err)
}
if err != nil {
time.Sleep(cfg.retryDelay)
} else {
if cfg.onSuccess != nil {
cfg.onSuccess()
}
break
}
}
}
}
func (t *topic[any]) Publish(ctx context.Context, a any) error {
go func() {
t.stream <- payloadPair[any]{
ctx: ctx,
payload: a,
}
}()
return nil
}
func (t *topic[any]) Subscribe(sub Subscriber[any], configurators ...SubscriberConfigFunc) {
t.mtx.Lock()
if t.subs == nil {
t.subs = make([]subscriberPair[any], 0)
}
var cfg subscriberConfig
for _, c := range configurators {
c(&cfg)
}
t.subs = append(t.subs, subscriberPair[any]{
subscriber: sub,
cfg: &cfg,
})
t.mtx.Unlock()
}