-
Notifications
You must be signed in to change notification settings - Fork 0
/
publish_options.go
238 lines (209 loc) · 8.64 KB
/
publish_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package clarimq
import (
"fmt"
"time"
)
const (
undefinedPublisher string = "undefined_publisher"
)
type (
// PublishingCache is an interface for a cache of messages that
// could not be published due to a missing broker connection.
PublishingCache interface {
// Put adds a publishing to the cache.
Put(p Publishing) error
// PopAll gets all publishing's from the cache and removes them.
PopAll() ([]Publishing, error)
// Len returns the number of publishing in the cache.
Len() int
// Flush removes all publishing's from the cache.
Flush() error
}
// PublisherOptions are the options for a publisher.
PublisherOptions struct {
// PublisherName is the name of the publisher.
PublisherName string
// PublishingCache is the publishing cache.
PublishingCache PublishingCache
// PublishingOptions are the options for publishing messages.
PublishingOptions *PublishOptions
}
// PublisherOption is an option for a Publisher.
PublisherOption func(*PublisherOptions)
// PublishOptions are used to control how data is published.
PublishOptions struct {
// Application or exchange specific fields,
// the headers exchange will inspect this field.
Headers Table
// Message timestamp.
Timestamp time.Time
// Exchange name.
Exchange string
// MIME content type.
ContentType string
// Expiration time in ms that a message will expire from a queue.
// See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers
Expiration string
// MIME content encoding.
ContentEncoding string
// Correlation identifier.
CorrelationID string
// Address to reply to (ex: RPC).
ReplyTo string
// Message identifier.
MessageID string
// Message type name.
Type string
// Creating user id - default: "guest".
UserID string
// creating application id.
AppID string
// Mandatory fails to publish if there are no queues
// bound to the routing key.
Mandatory bool
// Message priority level from 1 to 5 (0 == no priority).
Priority Priority
// Transient (0 or 1) or Persistent (2).
DeliveryMode DeliveryMode
}
)
func defaultPublisherOptions() *PublisherOptions {
return &PublisherOptions{
PublisherName: newDefaultPublisherName(),
PublishingCache: nil,
PublishingOptions: defaultPublishOptions(),
}
}
func newDefaultPublisherName() string {
return fmt.Sprintf("%s_%s", undefinedPublisher, newRandomString())
}
func defaultPublishOptions() *PublishOptions {
return &PublishOptions{
Headers: make(Table),
Exchange: "",
ContentType: "",
Expiration: "",
ContentEncoding: "",
CorrelationID: "",
ReplyTo: "",
MessageID: "",
Type: "",
UserID: "",
AppID: "",
Mandatory: false,
Priority: NoPriority,
DeliveryMode: TransientDelivery,
}
}
// WithCustomPublishOptions sets the publish options.
//
// It can be used to set all publisher options at once.
func WithCustomPublishOptions(options *PublisherOptions) PublisherOption {
return func(opt *PublisherOptions) {
if options != nil {
if options.PublishingOptions != nil {
opt.PublishingOptions.AppID = options.PublishingOptions.AppID
opt.PublishingOptions.ContentEncoding = options.PublishingOptions.ContentEncoding
opt.PublishingOptions.ContentType = options.PublishingOptions.ContentType
opt.PublishingOptions.CorrelationID = options.PublishingOptions.CorrelationID
opt.PublishingOptions.DeliveryMode = options.PublishingOptions.DeliveryMode
opt.PublishingOptions.Exchange = options.PublishingOptions.Exchange
opt.PublishingOptions.Expiration = options.PublishingOptions.Expiration
opt.PublishingOptions.Mandatory = options.PublishingOptions.Mandatory
opt.PublishingOptions.MessageID = options.PublishingOptions.MessageID
opt.PublishingOptions.Priority = options.PublishingOptions.Priority
opt.PublishingOptions.ReplyTo = options.PublishingOptions.ReplyTo
opt.PublishingOptions.Timestamp = options.PublishingOptions.Timestamp
opt.PublishingOptions.Type = options.PublishingOptions.Type
opt.PublishingOptions.UserID = options.PublishingOptions.UserID
if options.PublishingOptions.Headers != nil {
opt.PublishingOptions.Headers = options.PublishingOptions.Headers
}
}
if options.PublishingCache != nil {
opt.PublishingCache = options.PublishingCache
}
}
}
}
// WithPublishOptionExchange sets the exchange to publish to.
func WithPublishOptionExchange(exchange string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Exchange = exchange }
}
// WithPublishOptionMandatory sets whether the publishing is mandatory, which means when a queue is not
// bound to the routing key a message will be sent back on the returns channel for you to handle.
//
// Default: false.
func WithPublishOptionMandatory(mandatory bool) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Mandatory = mandatory }
}
// WithPublishOptionContentType sets the content type, i.e. "application/json".
func WithPublishOptionContentType(contentType string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.ContentType = contentType }
}
// WithPublishOptionDeliveryMode sets the message delivery mode. Transient messages will
// not be restored to durable queues, persistent messages will be restored to
// durable queues and lost on non-durable queues during broker restart. By default publishing's
// are transient.
func WithPublishOptionDeliveryMode(deliveryMode DeliveryMode) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.DeliveryMode = deliveryMode }
}
// WithPublishOptionExpiration sets the expiry/TTL of a message. As per RabbitMq spec, it must be a.
// string value in milliseconds.
func WithPublishOptionExpiration(expiration string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Expiration = expiration }
}
// WithPublishOptionHeaders sets message header values, i.e. "msg-id".
func WithPublishOptionHeaders(headers Table) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Headers = headers }
}
// WithPublishOptionContentEncoding sets the content encoding, i.e. "utf-8".
func WithPublishOptionContentEncoding(contentEncoding string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.ContentEncoding = contentEncoding }
}
// WithPublishOptionPriority sets the content priority from 0 to 9.
func WithPublishOptionPriority(priority Priority) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Priority = priority }
}
// WithPublishOptionTracing sets the content correlation identifier.
func WithPublishOptionTracing(correlationID string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.CorrelationID = correlationID }
}
// WithPublishOptionReplyTo sets the reply to field.
func WithPublishOptionReplyTo(replyTo string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.ReplyTo = replyTo }
}
// WithPublishOptionMessageID sets the message identifier.
func WithPublishOptionMessageID(messageID string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.MessageID = messageID }
}
// WithPublishOptionTimestamp sets the timestamp for the message.
func WithPublishOptionTimestamp(timestamp time.Time) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Timestamp = timestamp }
}
// WithPublishOptionType sets the message type name.
func WithPublishOptionType(messageType string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.Type = messageType }
}
// WithPublishOptionUserID sets the user id e.g. "user".
func WithPublishOptionUserID(userID string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.UserID = userID }
}
// WithPublishOptionAppID sets the application id.
func WithPublishOptionAppID(appID string) PublisherOption {
return func(options *PublisherOptions) { options.PublishingOptions.AppID = appID }
}
// WithPublisherOptionPublishingCache enables the publishing cache.
//
// An implementation of the PublishingCache interface must be provided.
func WithPublisherOptionPublishingCache(cache PublishingCache) PublisherOption {
return func(options *PublisherOptions) { options.PublishingCache = cache }
}
// WithPublisherOptionPublisherName sets the name of the publisher.
//
// If unset a random name will be given.
func WithPublisherOptionPublisherName(publisherName string) PublisherOption {
return func(options *PublisherOptions) {
options.PublisherName = publisherName
}
}