-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsyncer_loki.go
389 lines (318 loc) · 8.32 KB
/
syncer_loki.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
package rklogger
import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
)
// isValidLabelName returns true iff name qualified for loki label name
func isValidLabelName(name string) bool {
if len(name) == 0 {
return false
}
for i, b := range name {
if !((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || b == ':' || (b >= '0' && b <= '9' && i > 0)) {
return false
}
}
return true
}
// LokiSyncerOption options for lokiSyncer
type LokiSyncerOption func(syncer *LokiSyncer)
// WithLokiAddr provide loki address
func WithLokiAddr(addr string) LokiSyncerOption {
return func(syncer *LokiSyncer) {
if len(addr) > 0 {
syncer.addr = addr
}
}
}
// WithLokiPath provide loki path
func WithLokiPath(in string) LokiSyncerOption {
return func(syncer *LokiSyncer) {
if len(in) > 0 {
syncer.path = in
}
}
}
// WithLokiUsername provide loki username
func WithLokiUsername(name string) LokiSyncerOption {
return func(syncer *LokiSyncer) {
syncer.username = name
}
}
// WithLokiPassword provide loki password
func WithLokiPassword(pass string) LokiSyncerOption {
return func(syncer *LokiSyncer) {
syncer.password = pass
}
}
// WithLokiClientTls provide loki http client TLS config
func WithLokiClientTls(conf *tls.Config) LokiSyncerOption {
return func(syncer *LokiSyncer) {
syncer.tlsConfig = conf
}
}
// WithLokiLabel provide labels, should follow isValidLabelName()
func WithLokiLabel(key, value string) LokiSyncerOption {
return func(syncer *LokiSyncer) {
if len(key) > 0 && len(value) > 0 && isValidLabelName(key) {
syncer.labels.Set(key, value)
}
}
}
// WithLokiMaxBatchWaitMs provide max batch wait time in milli
func WithLokiMaxBatchWaitMs(in time.Duration) LokiSyncerOption {
return func(syncer *LokiSyncer) {
if in.Milliseconds() > 0 {
syncer.maxBatchWaitMs = in
}
}
}
// WithLokiMaxBatchSize provide max batch size
func WithLokiMaxBatchSize(batchSize int) LokiSyncerOption {
return func(syncer *LokiSyncer) {
if batchSize > 0 {
syncer.maxBatchSize = batchSize
}
}
}
// NewLokiSyncer create new lokiSyncer
func NewLokiSyncer(opts ...LokiSyncerOption) *LokiSyncer {
syncer := &LokiSyncer{
addr: "localhost:3100",
path: "/loki/api/v1/push",
labels: newAtomicMap(),
maxBatchWaitMs: 3000 * time.Millisecond,
maxBatchSize: 1000,
quitChannel: make(chan struct{}),
buffer: newAtomicSlice(),
}
for i := range opts {
opts[i](syncer)
}
// convert label key if illegal
syncer.labels.Set("rk_logger", "v1")
// init http client
syncer.initHttpClient()
// init basic auth
syncer.initBasicAuth()
// add wait group
syncer.waitGroup.Add(1)
return syncer
}
// Init http client
func (syncer *LokiSyncer) initHttpClient() {
// adjust loki addr
strings.TrimPrefix(syncer.addr, "http://")
strings.TrimPrefix(syncer.addr, "https://")
syncer.httpClient = &http.Client{}
if syncer.tlsConfig != nil {
syncer.httpClient.Transport = &http.Transport{
TLSClientConfig: syncer.tlsConfig,
}
syncer.addr = "https://" + syncer.addr
} else {
syncer.addr = "http://" + syncer.addr
}
}
// Init basic auth header
func (syncer *LokiSyncer) initBasicAuth() {
if len(syncer.username) > 0 && len(syncer.password) > 0 {
auth := syncer.username + ":" + syncer.password
syncer.basicAuthHeader = "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
}
}
// LokiSyncer which will periodically send logs to Loki
type LokiSyncer struct {
addr string `yaml:"addr" json:"addr"`
path string `yaml:"path" json:"path"`
username string `yaml:"username" json:"username"`
password string `yaml:"-" json:"-"`
basicAuthHeader string `yaml:"-" json:"-"`
tlsConfig *tls.Config `yaml:"-" json:"-"`
maxBatchWaitMs time.Duration `yaml:"maxBatchWaitMs" json:"maxBatchWaitMs"`
maxBatchSize int `yaml:"maxBatchSize" json:"maxBatchSize"`
labels *atomicMap `yaml:"-" json:"-"`
buffer *atomicSlice `yaml:"-" json:"-"`
quitChannel chan struct{} `yaml:"-" json:"-"`
waitGroup sync.WaitGroup `yaml:"-" json:"-"`
httpClient *http.Client `yaml:"-" json:"-"`
}
// Send message to remote loki server
func (syncer *LokiSyncer) send() {
values := syncer.buffer.snapshotAndClear()
if len(values) < 1 {
return
}
streams := syncer.newLokiStreamList(values)
req, _ := http.NewRequest(http.MethodPost, syncer.addr+syncer.path, bytes.NewBuffer(streams))
req.Header.Set("Content-Type", "application/json")
if len(syncer.basicAuthHeader) > 0 {
req.Header.Add("Authorization", syncer.basicAuthHeader)
}
resp, err := syncer.httpClient.Do(req)
if err != nil {
log.Printf("Failed to send an HTTP request: %s\n", err)
return
}
if resp.StatusCode != 204 {
log.Printf("Unexpected HTTP status code: %d\n", resp.StatusCode)
return
}
}
// ************* Bootstrap & Interrupt *************
// Bootstrap run periodic jobs
func (syncer *LokiSyncer) Bootstrap(context.Context) {
go func() {
waitChannel := time.NewTimer(syncer.maxBatchWaitMs)
defer func() {
syncer.send()
syncer.waitGroup.Done()
}()
for {
select {
case <-syncer.quitChannel:
return
case <-waitChannel.C:
syncer.send()
waitChannel.Reset(syncer.maxBatchWaitMs)
default:
if syncer.buffer.len() >= syncer.maxBatchSize {
syncer.send()
waitChannel.Reset(syncer.maxBatchWaitMs)
}
time.Sleep(time.Duration(1) * time.Second)
}
}
}()
}
// Interrupt goroutine
func (syncer *LokiSyncer) Interrupt(context.Context) {
close(syncer.quitChannel)
syncer.waitGroup.Wait()
}
// ************* Model *************
func (syncer *LokiSyncer) AddLabel(key, value string) {
syncer.labels.Set(key, value)
}
// Create new lokiStreamList
func (syncer *LokiSyncer) newLokiStreamList(values []*lokiValue) []byte {
msg := &lokiStreamList{
Streams: []*lokiStream{},
}
for i := range values {
val := values[i]
labels := syncer.labels.Copy()
for k, v := range val.Labels {
labels[k] = v
}
msg.Streams = append(msg.Streams, &lokiStream{
Stream: labels,
Values: [][]string{val.Values},
})
}
bytes, _ := json.Marshal(msg)
return bytes
}
// Refer https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
type lokiValue struct {
Values []string `json:"-"`
Labels map[string]string `json:"-"`
}
// Refer https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
type lokiStream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
}
// Refer https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
type lokiStreamList struct {
Streams []*lokiStream `json:"streams"`
}
// ************* Implementation of zapcore.WriteSyncer *************
// Write to logChannel
func (syncer *LokiSyncer) Write(p []byte) (n int, err error) {
syncer.buffer.add(&lokiValue{
Values: []string{fmt.Sprintf("%d", time.Now().UnixNano()), string(p)},
})
return len(p), nil
}
// Noop
func (syncer *LokiSyncer) Sync() error {
syncer.send()
return nil
}
func newAtomicSlice() *atomicSlice {
return &atomicSlice{
buf: make([]*lokiValue, 0),
mutex: sync.Mutex{},
}
}
type atomicSlice struct {
buf []*lokiValue
mutex sync.Mutex
}
func (a *atomicSlice) add(item *lokiValue) {
if item == nil {
return
}
a.mutex.Lock()
defer a.mutex.Unlock()
a.buf = append(a.buf, item)
}
func (a *atomicSlice) snapshotAndClear() []*lokiValue {
a.mutex.Lock()
defer a.mutex.Unlock()
res := make([]*lokiValue, 0)
for i := range a.buf {
res = append(res, a.buf[i])
}
a.buf = make([]*lokiValue, 0)
return res
}
func (a *atomicSlice) len() int {
a.mutex.Lock()
defer a.mutex.Unlock()
return len(a.buf)
}
func newAtomicMap() *atomicMap {
return &atomicMap{
mutex: sync.Mutex{},
m: make(map[string]string),
}
}
type atomicMap struct {
mutex sync.Mutex
m map[string]string
}
func (a *atomicMap) Set(k, v string) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.m[k] = v
}
func (a *atomicMap) Get(k string) string {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.m[k]
}
func (a *atomicMap) Copy() map[string]string {
a.mutex.Lock()
defer a.mutex.Unlock()
res := map[string]string{}
for k, v := range a.m {
res[k] = v
}
return res
}
func (a *atomicMap) Delete(k string) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.m, k)
}