forked from nyaruka/courier
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspool.go
139 lines (114 loc) · 3.63 KB
/
spool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package courier
import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"path"
"path/filepath"
"strings"
"time"
)
// FlusherFunc defines our interface for flushers, they are handed a filename and byte blob and are expected
// to try to flush that to the db, returning an error if the db is still down
type FlusherFunc func(filename string, contents []byte) error
// RegisterFlusher creates a new walker which we will use to flush files from the passed in directory
func RegisterFlusher(directory string, flusherFunc FlusherFunc) {
registeredFlushers = append(registeredFlushers, &flusherRegistration{directory, flusherFunc})
}
// WriteToSpool writes the passed in object to the passed in subdir
func WriteToSpool(spoolDir string, subdir string, contents any) error {
contentBytes, err := json.MarshalIndent(contents, "", " ")
if err != nil {
return err
}
filename := path.Join(spoolDir, subdir, fmt.Sprintf("%d.json", time.Now().UnixNano()))
return os.WriteFile(filename, contentBytes, 0640)
}
// starts our spool flusher, which every 30 seconds tries to write our pending msgs and statuses
func startSpoolFlushers(s Server) {
// create our actual flushers
flushers = make([]*flusher, len(registeredFlushers))
for i, reg := range registeredFlushers {
flushers[i] = newSpoolFlusher(s, reg.directory, reg.flusher)
}
s.WaitGroup().Add(1)
go func() {
defer s.WaitGroup().Done()
log := slog.With("comp", "spool")
log.Info("spool started", "state", "started")
// runs until stopped, checking every 30 seconds if there is anything to flush from our spool
for {
select {
// our server is shutting down, exit
case <-s.StopChan():
log.Info("spool stopped", "state", "stopped")
return
// every 30 seconds we check to see if there are any files to spool
case <-time.After(30 * time.Second):
for _, flusher := range flushers {
filepath.Walk(flusher.directory, flusher.walker)
}
}
}
}()
}
// EnsureSpoolDirPresent checks that the passed in spool directory is present and writable
func EnsureSpoolDirPresent(spoolDir string, subdir string) (err error) {
msgsDir := path.Join(spoolDir, subdir)
if _, err = os.Stat(msgsDir); os.IsNotExist(err) {
err = os.MkdirAll(msgsDir, 0770)
}
return err
}
// creates a new spool flusher
func newSpoolFlusher(s Server, dir string, flusherFunc FlusherFunc) *flusher {
return &flusher{func(filename string, info os.FileInfo, err error) error {
if filename == dir {
return nil
}
// we've been stopped, exit
if s.Stopped() {
return errors.New("spool flush process stopped")
}
// we don't care about subdirectories
if info.IsDir() {
return filepath.SkipDir
}
// ignore non-json files
if !strings.HasSuffix(filename, ".json") {
return nil
}
log := slog.With("comp", "spool", "filename", filename)
// otherwise, read our msg json
contents, err := os.ReadFile(filename)
if err != nil {
log.Error("reading spool file", "error", err)
return nil
}
err = flusherFunc(filename, contents)
if err != nil {
log.Error("flushing spool file", "error", err)
return err
}
log.Info("flushed")
// we flushed, remove our file if it is still present
if _, e := os.Stat(filename); e == nil {
err = os.Remove(filename)
}
return err
}, dir}
}
// simple struct that represents our walking function and the directory that gets walked
type flusher struct {
walker filepath.WalkFunc
directory string
}
var flushers []*flusher
// simple struct to keep track of who has registered to flush and for what directories
type flusherRegistration struct {
directory string
flusher FlusherFunc
}
var registeredFlushers []*flusherRegistration