forked from mr-karan/nomad-vector-logger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalloc.go
171 lines (149 loc) · 5.82 KB
/
alloc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package main
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"text/template"
"github.com/hashicorp/nomad/api"
)
// fetchRunningAllocs fetches all the current allocations in the cluster.
// It ignores the alloc which aren't running on the current node.
func (app *App) fetchRunningAllocs() (map[string]*api.Allocation, error) {
allocs := make(map[string]*api.Allocation, 0)
// Only fetch the allocations running on this noe.
params := map[string]string{}
params["filter"] = fmt.Sprintf("NodeID==\"%s\"", app.nodeID)
// Prepare params for listing alloc.
query := &api.QueryOptions{
Params: params,
Namespace: "*",
}
// Query list of allocs.
currentAllocs, meta, err := app.nomadClient.Allocations().List(query)
if err != nil {
return nil, err
}
app.log.Debug("fetched existing allocs", "count", len(currentAllocs), "took", meta.RequestTime)
// For each alloc, check if it's running and get the underlying alloc info.
for _, allocStub := range currentAllocs {
if allocStub.ClientStatus != "running" {
app.log.Debug("ignoring alloc since it's not running", "name", allocStub.Name, "status", allocStub.ClientStatus)
continue
}
// Skip the allocations which aren't running on this node.
if allocStub.NodeID != app.nodeID {
app.log.Debug("skipping alloc because it doesn't run on this node", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID)
continue
} else {
app.log.Debug("alloc belongs to the current node", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID)
}
prefix := path.Join(app.opts.nomadDataDir, allocStub.ID)
app.log.Debug("checking if alloc log dir exists", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID)
_, err := os.Stat(prefix)
if errors.Is(err, os.ErrNotExist) {
app.log.Debug("log dir doesn't exist", "dir", prefix, "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID)
// Skip the allocation if it has been GC'ed from host but still the API returned.
// Unlikely case to happen.
continue
} else if err != nil {
app.log.Error("error checking if alloc dir exists on host", "error", err)
continue
}
if alloc, _, err := app.nomadClient.Allocations().Info(allocStub.ID, &api.QueryOptions{Namespace: allocStub.Namespace}); err != nil {
app.log.Error("unable to fetch alloc info", "error", err)
continue
} else {
allocs[alloc.ID] = alloc
}
}
// Return map of allocs.
return allocs, nil
}
// generateConfig generates a vector config file by iterating on a
// map of allocations in the cluster and adding some extra metadata about the alloc.
// It creates a config file on the disk which vector is _live_ watching and reloading
// whenever it changes.
func (app *App) generateConfig(allocs map[string]*api.Allocation) error {
// Create a config dir to store templates.
if err := os.MkdirAll(app.opts.vectorConfigDir, os.ModePerm); err != nil {
return fmt.Errorf("error creating dir %s: %v", app.opts.vectorConfigDir, err)
}
// Load the vector config template.
tpl, err := template.ParseFS(vectorTmpl, "vector.toml.tmpl")
if err != nil {
return fmt.Errorf("unable to parse template: %v", err)
}
// Special case to handle where there are no allocs.
// If this is the case, the user supplie config which relies on sources/transforms
// as the input for the sink can fail as the generated `nomad.toml` will be empty.
// To avoid this, remove all files inside the existing config dir and exit the function.
if len(allocs) == 0 {
app.log.Info("no current alloc is running, cleaning up config dir", "vector_dir", app.opts.vectorConfigDir)
dir, err := ioutil.ReadDir(app.opts.vectorConfigDir)
if err != nil {
return fmt.Errorf("error reading vector config dir")
}
for _, d := range dir {
if err := os.RemoveAll(path.Join([]string{app.opts.vectorConfigDir, d.Name()}...)); err != nil {
return fmt.Errorf("error cleaning up config dir")
}
}
return nil
}
data := make([]AllocMeta, 0)
// Iterate on allocs in the map.
for _, alloc := range allocs {
// Add metadata for each task in the alloc.
for task := range alloc.TaskResources {
// Add task to the data.
data = append(data, AllocMeta{
Key: fmt.Sprintf("nomad_alloc_%s_%s", alloc.ID, task),
ID: alloc.ID,
LogDir: filepath.Join(fmt.Sprintf("%s/%s", app.opts.nomadDataDir, alloc.ID), "alloc/logs/"+task+"*"),
Namespace: alloc.Namespace,
Group: alloc.TaskGroup,
Node: alloc.NodeName,
Task: task,
Job: alloc.JobID,
})
}
}
app.log.Info("generating config with total tasks", "count", len(data))
file, err := os.Create(filepath.Join(app.opts.vectorConfigDir, "nomad.toml"))
if err != nil {
return fmt.Errorf("error creating vector config file: %v", err)
}
defer file.Close()
if err := tpl.Execute(file, data); err != nil {
return fmt.Errorf("error executing template: %v", err)
}
// Load all user provided templates.
if app.opts.extraTemplatesDir != "" {
// Loop over all files mentioned in the templates dir.
files, err := ioutil.ReadDir(app.opts.extraTemplatesDir)
if err != nil {
return fmt.Errorf("error opening extra template file: %v", err)
}
// For all files, template it out and store in vector config dir.
for _, file := range files {
// Load the vector config template.
t, err := template.ParseFiles(filepath.Join(app.opts.extraTemplatesDir, file.Name()))
if err != nil {
return fmt.Errorf("unable to parse template: %v", err)
}
// Create the underlying file.
f, err := os.Create(filepath.Join(app.opts.vectorConfigDir, file.Name()))
if err != nil {
return fmt.Errorf("error creating extra template file: %v", err)
}
defer f.Close()
if err := t.Execute(f, data); err != nil {
return fmt.Errorf("error executing extra template: %v", err)
}
}
}
return nil
}