-
-
Notifications
You must be signed in to change notification settings - Fork 309
/
subscriber.go
132 lines (111 loc) · 3.38 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package mercure
import (
"fmt"
"net/url"
"regexp"
"go.uber.org/zap/zapcore"
)
// Subscriber represents a client subscribed to a list of topics on a remote or on the current hub.
type Subscriber struct {
ID string
EscapedID string
Claims *claims
EscapedTopics []string
RequestLastEventID string
RemoteAddr string
SubscribedTopics []string
SubscribedTopicRegexps []*regexp.Regexp
AllowedPrivateTopics []string
AllowedPrivateRegexps []*regexp.Regexp
logger Logger
topicSelectorStore *TopicSelectorStore
}
func NewSubscriber(logger Logger, topicSelectorStore *TopicSelectorStore) *Subscriber {
return &Subscriber{
logger: logger,
topicSelectorStore: topicSelectorStore,
}
}
// SetTopics compiles topic selector regexps.
func (s *Subscriber) SetTopics(subscribedTopics, allowedPrivateTopics []string) {
s.SubscribedTopics = subscribedTopics
s.AllowedPrivateTopics = allowedPrivateTopics
s.EscapedTopics = escapeTopics(subscribedTopics)
}
func escapeTopics(topics []string) []string {
escapedTopics := make([]string, 0, len(topics))
for _, topic := range topics {
escapedTopics = append(escapedTopics, url.QueryEscape(topic))
}
return escapedTopics
}
// MatchTopics checks if the current subscriber can access to the given topic.
//
//nolint:gocognit
func (s *Subscriber) MatchTopics(topics []string, private bool) bool {
var subscribed bool
canAccess := !private
for _, topic := range topics {
if !subscribed {
for _, ts := range s.SubscribedTopics {
if s.topicSelectorStore.match(topic, ts) {
subscribed = true
break
}
}
}
if !canAccess {
for _, ts := range s.AllowedPrivateTopics {
if s.topicSelectorStore.match(topic, ts) {
canAccess = true
break
}
}
}
}
return subscribed && canAccess
}
// Match checks if the current subscriber can receive the given update.
func (s *Subscriber) Match(u *Update) bool {
return s.MatchTopics(u.Topics, u.Private)
}
// getSubscriptions return the list of subscriptions associated to this subscriber.
func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subscription {
var subscriptions []subscription //nolint:prealloc
for k, t := range s.SubscribedTopics {
if topic != "" && (!s.MatchTopics([]string{topic}, false) || t != topic) {
continue
}
subscription := subscription{
Context: context,
ID: "/.well-known/mercure/subscriptions/" + s.EscapedTopics[k] + "/" + s.EscapedID,
Type: "Subscription",
Subscriber: s.ID,
Topic: t,
Active: active,
}
if s.Claims != nil && s.Claims.Mercure.Payload != nil {
subscription.Payload = s.Claims.Mercure.Payload
}
subscriptions = append(subscriptions, subscription)
}
return subscriptions
}
func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("id", s.ID)
enc.AddString("last_event_id", s.RequestLastEventID)
if s.RemoteAddr != "" {
enc.AddString("remote_addr", s.RemoteAddr)
}
if s.AllowedPrivateTopics != nil {
if err := enc.AddArray("topic_selectors", stringArray(s.AllowedPrivateTopics)); err != nil {
return fmt.Errorf("log error: %w", err)
}
}
if s.SubscribedTopics != nil {
if err := enc.AddArray("topics", stringArray(s.SubscribedTopics)); err != nil {
return fmt.Errorf("log error: %w", err)
}
}
return nil
}