Skip to content

Commit

Permalink
feat: add removal of alloc
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-karan committed Sep 3, 2022
1 parent 531303a commit b7afb83
Show file tree
Hide file tree
Showing 15 changed files with 499 additions and 80 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ jobs:
with:
go-version: 1.18

- name: Login to GitHub Container Registry
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
Expand Down
34 changes: 34 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,37 @@ archives:
- README.md
- LICENSE
- config.sample.toml

dockers:
- # ID of the image, needed if you want to filter by it later on (e.g. on custom publishers).
id: nomad-vector-logger

# GOOS of the built binaries/packages that should be used.
goos: linux

# GOARCH of the built binaries/packages that should be used.
goarch: amd64

# IDs to filter the binaries/packages.
ids:
- nomad-vector-logger

# Templates of the Docker image names.
image_templates:
- "ghcr.io/mr-karan/nomad-vector-logger:{{ .Tag }}"
- "ghcr.io/mr-karan/nomad-vector-logger:latest"

skip_push: false
dockerfile: Dockerfile
use: docker
# Template of the docker build flags.
build_flag_templates:
- "--pull"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--platform=linux/amd64"

extra_files:
- config.sample.toml
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM ubuntu:22.04
WORKDIR /app
COPY nomad-vector-logger.bin .
COPY config.sample.toml .
CMD ["./nomad-vector-logger.bin"]
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ fresh: build run

.PHONY: lint
lint:
docker run --rm -v $(pwd):/app -w /app golangci/golangci-lint:v1.43.0 golangci-lint run -v
docker run --rm -v $(pwd):/app -w /app golangci/golangci-lint:latest golangci-lint run -v

.PHONY: dev-suite
dev-suite:
vector -c examples/vector.toml
nomad agent -bind 0.0.0.0 -dev
.PHONY: dev
dev:
./run.sh
94 changes: 80 additions & 14 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@ import (
)

type Opts struct {
refreshInterval time.Duration
nomadDataDir string
vectorConfigDir string
extraTemplatesDir string
refreshInterval time.Duration
removeAllocInterval time.Duration
nomadDataDir string
vectorConfigDir string
extraTemplatesDir string
}

// App is the global container that holds
// objects of various routines that run on boot.
type App struct {
nomadClient *api.Client
log logf.Logger
opts Opts
nodeID string
sync.RWMutex
log logf.Logger
opts Opts
nomadClient *api.Client
nodeID string // Self NodeID where this program is running.
allocs map[string]*api.Allocation // Map of Alloc ID and Allocation object running in the cluster.
expiredAllocs []string
}

type AllocMeta struct {
Expand All @@ -56,6 +60,13 @@ func (app *App) Start(ctx context.Context) {
app.UpdateAllocs(ctx, app.opts.refreshInterval)
}()

// Start a background worker for removing expired allocs.
wg.Add(1)
go func() {
defer wg.Done()
app.CleanupAllocs(ctx, app.opts.removeAllocInterval)
}()

// Wait for all routines to finish.
wg.Wait()
}
Expand All @@ -72,29 +83,59 @@ func (app *App) UpdateAllocs(ctx context.Context, refreshInterval time.Duration)
select {
case <-ticker:
// Fetch the list of allocs running on this node.
allocs, err := app.fetchAllocs()
runningAllocs, err := app.fetchRunningAllocs()
if err != nil {
app.log.Error("error fetching allocs", "error", err)
continue
}

// Compare the running allocs with the one that's already present in the map.
app.RLock()

// If this is the first run of the program, `app.allocs` will be empty.
if len(app.allocs) > 0 {
for _, a := range runningAllocs {
// If an alloc is present in the running list but missing in our map, that means we should add it to our map.
if _, ok := app.allocs[a.ID]; !ok {
app.log.Info("adding new alloc to map", "id", a.ID, "namespace", a.Namespace, "job", a.Job.Name, "group", a.TaskGroup)
app.allocs[a.ID] = a
}
}
} else {
// This ideally only happens once when the program boots up.
app.allocs = runningAllocs
}

// Now check if the allocs inside our map are missing any running allocs.
for _, r := range app.allocs {
// This means that the alloc id we have in our map isn't running anymore, so enqueue it for deletion.
if _, ok := runningAllocs[r.ID]; !ok {
app.log.Info("alloc not found as running, enqueuing for deletion", "id", r.ID, "namespace", r.Namespace, "job", r.Job.Name, "group", r.TaskGroup)
app.expiredAllocs = append(app.expiredAllocs, r.ID)
}
}

// Making a copy of map so we don't have to hold the lock for longer.
presentAllocs := app.allocs
app.RUnlock()

// Generate a config once all allocs are added to the map.
err = app.generateConfig(allocs)
err = app.generateConfig(presentAllocs)
if err != nil {
app.log.Error("error generating config", "error", err)
continue
}

case <-ctx.Done():
app.log.Warn("context cancellation received, quitting update services worker")
app.log.Warn("context cancellation received, quitting update worker")
return
}
}
}

// fetchAllocs fetches all the current allocations in the cluster.
// fetchRunningAllocs fetches all the current allocations in the cluster.
// It ignores the alloc which aren't running on the current node.
func (app *App) fetchAllocs() (map[string]*api.Allocation, error) {
func (app *App) fetchRunningAllocs() (map[string]*api.Allocation, error) {

allocs := make(map[string]*api.Allocation, 0)

Expand Down Expand Up @@ -166,7 +207,7 @@ func (app *App) generateConfig(allocs map[string]*api.Allocation) error {
}

// Load the vector config template.
tpl, err := template.ParseFS(vectorTmpl, "vector.tmpl")
tpl, err := template.ParseFS(vectorTmpl, "vector.toml.tmpl")
if err != nil {
return fmt.Errorf("unable to parse template: %v", err)
}
Expand Down Expand Up @@ -251,3 +292,28 @@ func (app *App) generateConfig(allocs map[string]*api.Allocation) error {

return nil
}

// CleanupAllocs removes the alloc from the map which are marked for deletion.
// These could be old allocs that aren't running anymore but which need to be removed from
// the config after a delay to ensure Vector has finished pushing all logs to upstream sink.
func (app *App) CleanupAllocs(ctx context.Context, removeAllocInterval time.Duration) {
var (
ticker = time.NewTicker(removeAllocInterval).C
)

for {
select {
case <-ticker:
app.Lock()
for _, id := range app.expiredAllocs {
app.log.Info("cleaning up alloc", "id", id)
delete(app.allocs, id)
}
app.Unlock()

case <-ctx.Done():
app.log.Warn("context cancellation received, quitting cleanup worker")
return
}
}
}
10 changes: 4 additions & 6 deletions config.sample.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
[app]
log_level = "debug" # `debug` for verbose logs. `info` otherwise.
env = "dev" # dev|prod.
refresh_interval = "10s" # Interval at which `index.json` gets stored in `data_dir`.
nomad_data_dir = "/opt/nomad/data/alloc" # Nomad data directory where allocs are stored.
refresh_interval = "10s" # Interval at which list of allocations is updated.
remove_alloc_interval = "30s" # If the alloc is completed or stopped, the allocation isn't removed immediately from vector's config. You can delay the removal of alloc by `n` duration to ensure that vector has finished collecting all logs till then.
nomad_data_dir = "/opt/nomad/data/alloc" # Nomad data directory where alloc logs are stored.
vector_config_dir = "examples/vector/" # Path to the generated vector config file.
extra_templates_dir = "static/" # Extra templates that can be given. They will be rendered in `$vector_config_dir`. You can use variables mentioned in vector.tmpl if required.

[stream]
max_reconnect_attempts = 5 # Maximum reconnection attempts with Nomad Events API. After this limit is breached, program exits.
extra_templates_dir = "static/" # Extra templates that can be given. They will be rendered in directory mentioned by `vector_config_dir`. You can use variables mentioned in vector.toml.tmpl if required.
2 changes: 1 addition & 1 deletion examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
.PHONY: vector
vector:
vector --config=base.toml --config-dir=vector/ --watch-config vector/
vector --watch-config --config=base.toml --config-dir=vector/
68 changes: 68 additions & 0 deletions examples/vector/nomad.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
[sources.source_nomad_alloc_a0717c55-b2a2-8c90-1a39-b875a95d5a1a_server]
type = "file"
include = [ "/opt/nomad/data/alloc/a0717c55-b2a2-8c90-1a39-b875a95d5a1a/alloc/logs/server*" ]
line_delimiter = "\n"
read_from = "beginning"
[transforms.transform_nomad_alloc_a0717c55-b2a2-8c90-1a39-b875a95d5a1a_server]
type = "remap"
inputs = ["source_nomad_alloc_a0717c55-b2a2-8c90-1a39-b875a95d5a1a_server"]
source = '''
# Store Nomad metadata.
.nomad.namespace = "default"
.nomad.node_name = "pop-os"
.nomad.job_name = "http"
.nomad.group_name = "app"
.nomad.task_name = "server"
.nomad.alloc_id = "a0717c55-b2a2-8c90-1a39-b875a95d5a1a"
'''
[sources.source_nomad_alloc_8056b311-d9d9-28b3-1a60-1a7e71420e46_server]
type = "file"
include = [ "/opt/nomad/data/alloc/8056b311-d9d9-28b3-1a60-1a7e71420e46/alloc/logs/server*" ]
line_delimiter = "\n"
read_from = "beginning"
[transforms.transform_nomad_alloc_8056b311-d9d9-28b3-1a60-1a7e71420e46_server]
type = "remap"
inputs = ["source_nomad_alloc_8056b311-d9d9-28b3-1a60-1a7e71420e46_server"]
source = '''
# Store Nomad metadata.
.nomad.namespace = "default"
.nomad.node_name = "pop-os"
.nomad.job_name = "http"
.nomad.group_name = "app"
.nomad.task_name = "server"
.nomad.alloc_id = "8056b311-d9d9-28b3-1a60-1a7e71420e46"
'''
[sources.source_nomad_alloc_5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35_server]
type = "file"
include = [ "/opt/nomad/data/alloc/5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35/alloc/logs/server*" ]
line_delimiter = "\n"
read_from = "beginning"
[transforms.transform_nomad_alloc_5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35_server]
type = "remap"
inputs = ["source_nomad_alloc_5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35_server"]
source = '''
# Store Nomad metadata.
.nomad.namespace = "default"
.nomad.node_name = "pop-os"
.nomad.job_name = "http"
.nomad.group_name = "app"
.nomad.task_name = "server"
.nomad.alloc_id = "5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35"
'''
[sources.source_nomad_alloc_1b139a2a-ada0-0b7f-b0ef-7390ffc55e56_server]
type = "file"
include = [ "/opt/nomad/data/alloc/1b139a2a-ada0-0b7f-b0ef-7390ffc55e56/alloc/logs/server*" ]
line_delimiter = "\n"
read_from = "beginning"
[transforms.transform_nomad_alloc_1b139a2a-ada0-0b7f-b0ef-7390ffc55e56_server]
type = "remap"
inputs = ["source_nomad_alloc_1b139a2a-ada0-0b7f-b0ef-7390ffc55e56_server"]
source = '''
# Store Nomad metadata.
.nomad.namespace = "default"
.nomad.node_name = "pop-os"
.nomad.job_name = "http"
.nomad.group_name = "app"
.nomad.task_name = "server"
.nomad.alloc_id = "1b139a2a-ada0-0b7f-b0ef-7390ffc55e56"
'''
7 changes: 7 additions & 0 deletions examples/vector/sink.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[sinks.stdout_nomad]
type = "console"
inputs = ["transform_nomad_alloc*"]
target = "stdout"

[sinks.stdout_nomad.encoding]
codec = "json"
21 changes: 10 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
module github.com/mr-karan/nomad-vector-logger

go 1.18
go 1.19

require (
github.com/hashicorp/nomad/api v0.0.0-20211103234928-04cab9dbecd3
github.com/knadh/koanf v1.4.1
github.com/mr-karan/nomad-events-sink v0.2.0
github.com/hashicorp/nomad/api v0.0.0-20220902193006-d33f1eac719c
github.com/knadh/koanf v1.4.3
github.com/spf13/pflag v1.0.5
github.com/zerodha/logf v0.5.5
)

require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/hashicorp/cronexpr v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/cronexpr v1.1.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pelletier/go-toml v1.7.0 // indirect
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
)
Loading

0 comments on commit b7afb83

Please sign in to comment.