diff --git a/alloc.go b/alloc.go new file mode 100644 index 0000000..eab54df --- /dev/null +++ b/alloc.go @@ -0,0 +1,171 @@ +package main + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "text/template" + + "github.com/hashicorp/nomad/api" +) + +// fetchRunningAllocs fetches all the current allocations in the cluster. +// It ignores the alloc which aren't running on the current node. +func (app *App) fetchRunningAllocs() (map[string]*api.Allocation, error) { + allocs := make(map[string]*api.Allocation, 0) + + // Only fetch the allocations running on this noe. + params := map[string]string{} + params["filter"] = fmt.Sprintf("NodeID==\"%s\"", app.nodeID) + + // Prepare params for listing alloc. + query := &api.QueryOptions{ + Params: params, + Namespace: "*", + } + + // Query list of allocs. + currentAllocs, meta, err := app.nomadClient.Allocations().List(query) + if err != nil { + return nil, err + } + app.log.Debug("fetched existing allocs", "count", len(currentAllocs), "took", meta.RequestTime) + + // For each alloc, check if it's running and get the underlying alloc info. + for _, allocStub := range currentAllocs { + if allocStub.ClientStatus != "running" { + app.log.Debug("ignoring alloc since it's not running", "name", allocStub.Name, "status", allocStub.ClientStatus) + continue + } + // Skip the allocations which aren't running on this node. + if allocStub.NodeID != app.nodeID { + app.log.Debug("skipping alloc because it doesn't run on this node", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) + continue + } else { + app.log.Debug("alloc belongs to the current node", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) + } + + prefix := path.Join(app.opts.nomadDataDir, allocStub.ID) + app.log.Debug("checking if alloc log dir exists", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) + _, err := os.Stat(prefix) + if errors.Is(err, os.ErrNotExist) { + app.log.Debug("log dir doesn't exist", "dir", prefix, "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) + // Skip the allocation if it has been GC'ed from host but still the API returned. + // Unlikely case to happen. + continue + } else if err != nil { + app.log.Error("error checking if alloc dir exists on host", "error", err) + continue + } + + if alloc, _, err := app.nomadClient.Allocations().Info(allocStub.ID, &api.QueryOptions{Namespace: allocStub.Namespace}); err != nil { + app.log.Error("unable to fetch alloc info", "error", err) + continue + } else { + allocs[alloc.ID] = alloc + } + } + + // Return map of allocs. + return allocs, nil +} + +// generateConfig generates a vector config file by iterating on a +// map of allocations in the cluster and adding some extra metadata about the alloc. +// It creates a config file on the disk which vector is _live_ watching and reloading +// whenever it changes. +func (app *App) generateConfig(allocs map[string]*api.Allocation) error { + // Create a config dir to store templates. + if err := os.MkdirAll(app.opts.vectorConfigDir, os.ModePerm); err != nil { + return fmt.Errorf("error creating dir %s: %v", app.opts.vectorConfigDir, err) + } + + // Load the vector config template. + tpl, err := template.ParseFS(vectorTmpl, "vector.toml.tmpl") + if err != nil { + return fmt.Errorf("unable to parse template: %v", err) + } + + // Special case to handle where there are no allocs. + // If this is the case, the user supplie config which relies on sources/transforms + // as the input for the sink can fail as the generated `nomad.toml` will be empty. + // To avoid this, remove all files inside the existing config dir and exit the function. + if len(allocs) == 0 { + app.log.Info("no current alloc is running, cleaning up config dir", "vector_dir", app.opts.vectorConfigDir) + dir, err := ioutil.ReadDir(app.opts.vectorConfigDir) + if err != nil { + return fmt.Errorf("error reading vector config dir") + } + for _, d := range dir { + if err := os.RemoveAll(path.Join([]string{app.opts.vectorConfigDir, d.Name()}...)); err != nil { + return fmt.Errorf("error cleaning up config dir") + } + } + return nil + } + + data := make([]AllocMeta, 0) + + // Iterate on allocs in the map. + for _, alloc := range allocs { + // Add metadata for each task in the alloc. + for task := range alloc.TaskResources { + // Add task to the data. + data = append(data, AllocMeta{ + Key: fmt.Sprintf("nomad_alloc_%s_%s", alloc.ID, task), + ID: alloc.ID, + LogDir: filepath.Join(fmt.Sprintf("%s/%s", app.opts.nomadDataDir, alloc.ID), "alloc/logs/"+task+"*"), + Namespace: alloc.Namespace, + Group: alloc.TaskGroup, + Node: alloc.NodeName, + Task: task, + Job: alloc.JobID, + }) + } + } + + app.log.Info("generating config with total tasks", "count", len(data)) + file, err := os.Create(filepath.Join(app.opts.vectorConfigDir, "nomad.toml")) + if err != nil { + return fmt.Errorf("error creating vector config file: %v", err) + } + defer file.Close() + + if err := tpl.Execute(file, data); err != nil { + return fmt.Errorf("error executing template: %v", err) + } + + // Load all user provided templates. + if app.opts.extraTemplatesDir != "" { + // Loop over all files mentioned in the templates dir. + files, err := ioutil.ReadDir(app.opts.extraTemplatesDir) + if err != nil { + return fmt.Errorf("error opening extra template file: %v", err) + } + + // For all files, template it out and store in vector config dir. + for _, file := range files { + // Load the vector config template. + t, err := template.ParseFiles(filepath.Join(app.opts.extraTemplatesDir, file.Name())) + if err != nil { + return fmt.Errorf("unable to parse template: %v", err) + } + + // Create the underlying file. + f, err := os.Create(filepath.Join(app.opts.vectorConfigDir, file.Name())) + if err != nil { + return fmt.Errorf("error creating extra template file: %v", err) + } + defer f.Close() + + if err := t.Execute(f, data); err != nil { + return fmt.Errorf("error executing extra template: %v", err) + } + } + } + + return nil +} diff --git a/app.go b/app.go index 0b195f1..ccc2b7f 100644 --- a/app.go +++ b/app.go @@ -2,17 +2,9 @@ package main import ( "context" - "fmt" - "html/template" - "io/ioutil" - "os" - "path" - "path/filepath" "sync" "time" - "errors" - "github.com/hashicorp/nomad/api" "github.com/zerodha/logf" ) @@ -35,6 +27,7 @@ type App struct { nodeID string // Self NodeID where this program is running. allocs map[string]*api.Allocation // Map of Alloc ID and Allocation object running in the cluster. expiredAllocs []string + configUpdated chan bool } type AllocMeta struct { @@ -67,6 +60,13 @@ func (app *App) Start(ctx context.Context) { app.CleanupAllocs(ctx, app.opts.removeAllocInterval) }() + // Start a background worker for updating config. + wg.Add(1) + go func() { + defer wg.Done() + app.ConfigUpdater(ctx) + }() + // Wait for all routines to finish. wg.Wait() } @@ -89,43 +89,47 @@ func (app *App) UpdateAllocs(ctx context.Context, refreshInterval time.Duration) continue } - // Compare the running allocs with the one that's already present in the map. + // Copy the map of allocs so we don't have to hold the lock longer. app.RLock() + presentAllocs := app.allocs + app.RUnlock() - // If this is the first run of the program, `app.allocs` will be empty. - if len(app.allocs) > 0 { + updateCnt := 0 + if len(presentAllocs) > 0 { for _, a := range runningAllocs { // If an alloc is present in the running list but missing in our map, that means we should add it to our map. - if _, ok := app.allocs[a.ID]; !ok { + if _, ok := presentAllocs[a.ID]; !ok { app.log.Info("adding new alloc to map", "id", a.ID, "namespace", a.Namespace, "job", a.Job.Name, "group", a.TaskGroup) + app.Lock() app.allocs[a.ID] = a + app.Unlock() + updateCnt++ } } } else { + // If this is the first run of the program, `allocs` will be empty. // This ideally only happens once when the program boots up. + app.Lock() app.allocs = runningAllocs + app.Unlock() + app.configUpdated <- true + break } - // Now check if the allocs inside our map are missing any running allocs. - for _, r := range app.allocs { + // Only generate config if there were additions. + if updateCnt > 0 { + app.configUpdated <- true + } + + // Now check if present allocs include allocs which aren't running anymore. + for _, r := range presentAllocs { // This means that the alloc id we have in our map isn't running anymore, so enqueue it for deletion. if _, ok := runningAllocs[r.ID]; !ok { - app.log.Info("alloc not found as running, enqueuing for deletion", "id", r.ID, "namespace", r.Namespace, "job", r.Job.Name, "group", r.TaskGroup) + app.log.Info("enqueing non running alloc for deletion", "id", r.ID, "namespace", r.Namespace, "job", r.Job.Name, "group", r.TaskGroup) app.expiredAllocs = append(app.expiredAllocs, r.ID) } } - // Making a copy of map so we don't have to hold the lock for longer. - presentAllocs := app.allocs - app.RUnlock() - - // Generate a config once all allocs are added to the map. - err = app.generateConfig(presentAllocs) - if err != nil { - app.log.Error("error generating config", "error", err) - continue - } - case <-ctx.Done(): app.log.Warn("context cancellation received, quitting update worker") return @@ -133,166 +137,6 @@ func (app *App) UpdateAllocs(ctx context.Context, refreshInterval time.Duration) } } -// fetchRunningAllocs fetches all the current allocations in the cluster. -// It ignores the alloc which aren't running on the current node. -func (app *App) fetchRunningAllocs() (map[string]*api.Allocation, error) { - - allocs := make(map[string]*api.Allocation, 0) - - // Only fetch the allocations running on this noe. - params := map[string]string{} - params["filter"] = fmt.Sprintf("NodeID==\"%s\"", app.nodeID) - - // Prepare params for listing alloc. - query := &api.QueryOptions{ - Params: params, - Namespace: "*", - } - - // Query list of allocs. - currentAllocs, meta, err := app.nomadClient.Allocations().List(query) - if err != nil { - return nil, err - } - app.log.Debug("fetched existing allocs", "count", len(currentAllocs), "took", meta.RequestTime) - - // For each alloc, check if it's running and get the underlying alloc info. - for _, allocStub := range currentAllocs { - if allocStub.ClientStatus != "running" { - app.log.Debug("ignoring alloc since it's not running", "name", allocStub.Name, "status", allocStub.ClientStatus) - continue - } - // Skip the allocations which aren't running on this node. - if allocStub.NodeID != app.nodeID { - app.log.Debug("skipping alloc because it doesn't run on this node", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) - continue - } else { - app.log.Debug("alloc belongs to the current node", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) - } - - prefix := path.Join(app.opts.nomadDataDir, allocStub.ID) - app.log.Debug("checking if alloc log dir exists", "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) - _, err := os.Stat(prefix) - if errors.Is(err, os.ErrNotExist) { - app.log.Debug("log dir doesn't exist", "dir", prefix, "name", allocStub.Name, "alloc_node", allocStub.NodeID, "node", app.nodeID) - // Skip the allocation if it has been GC'ed from host but still the API returned. - // Unlikely case to happen. - continue - } else if err != nil { - app.log.Error("error checking if alloc dir exists on host", "error", err) - continue - } - - if alloc, _, err := app.nomadClient.Allocations().Info(allocStub.ID, &api.QueryOptions{Namespace: allocStub.Namespace}); err != nil { - app.log.Error("unable to fetch alloc info", "error", err) - continue - } else { - allocs[alloc.ID] = alloc - } - } - - // Return map of allocs. - return allocs, nil -} - -// generateConfig generates a vector config file by iterating on a -// map of allocations in the cluster and adding some extra metadata about the alloc. -// It creates a config file on the disk which vector is _live_ watching and reloading -// whenever it changes. -func (app *App) generateConfig(allocs map[string]*api.Allocation) error { - - // Create a config dir to store templates. - if err := os.MkdirAll(app.opts.vectorConfigDir, os.ModePerm); err != nil { - return fmt.Errorf("error creating dir %s: %v", app.opts.vectorConfigDir, err) - } - - // Load the vector config template. - tpl, err := template.ParseFS(vectorTmpl, "vector.toml.tmpl") - if err != nil { - return fmt.Errorf("unable to parse template: %v", err) - } - - // Special case to handle where there are no allocs. - // If this is the case, the user supplie config which relies on sources/transforms - // as the input for the sink can fail as the generated `nomad.toml` will be empty. - // To avoid this, remove all files inside the existing config dir and exit the function. - if len(allocs) == 0 { - app.log.Info("no current alloc is running, cleaning up config dir", "vector_dir", app.opts.vectorConfigDir) - dir, err := ioutil.ReadDir(app.opts.vectorConfigDir) - if err != nil { - return fmt.Errorf("error reading vector config dir") - } - for _, d := range dir { - if err := os.RemoveAll(path.Join([]string{app.opts.vectorConfigDir, d.Name()}...)); err != nil { - return fmt.Errorf("error cleaning up config dir") - } - } - return nil - } - - data := make([]AllocMeta, 0) - - // Iterate on allocs in the map. - for _, alloc := range allocs { - // Add metadata for each task in the alloc. - for task := range alloc.TaskResources { - // Add task to the data. - data = append(data, AllocMeta{ - Key: fmt.Sprintf("nomad_alloc_%s_%s", alloc.ID, task), - ID: alloc.ID, - LogDir: filepath.Join(fmt.Sprintf("%s/%s", app.opts.nomadDataDir, alloc.ID), "alloc/logs/"+task+"*"), - Namespace: alloc.Namespace, - Group: alloc.TaskGroup, - Node: alloc.NodeName, - Task: task, - Job: alloc.JobID, - }) - } - } - - app.log.Info("generating config with total tasks", "count", len(data)) - file, err := os.Create(filepath.Join(app.opts.vectorConfigDir, "nomad.toml")) - if err != nil { - return fmt.Errorf("error creating vector config file: %v", err) - } - defer file.Close() - - if err := tpl.Execute(file, data); err != nil { - return fmt.Errorf("error executing template: %v", err) - } - - // Load all user provided templates. - if app.opts.extraTemplatesDir != "" { - // Loop over all files mentioned in the templates dir. - files, err := ioutil.ReadDir(app.opts.extraTemplatesDir) - if err != nil { - return fmt.Errorf("error opening extra template file: %v", err) - } - - // For all files, template it out and store in vector config dir. - for _, file := range files { - // Load the vector config template. - t, err := template.ParseFiles(filepath.Join(app.opts.extraTemplatesDir, file.Name())) - if err != nil { - return fmt.Errorf("unable to parse template: %v", err) - } - - // Create the underlying file. - f, err := os.Create(filepath.Join(app.opts.vectorConfigDir, file.Name())) - if err != nil { - return fmt.Errorf("error creating extra template file: %v", err) - } - defer f.Close() - - if err := t.Execute(f, data); err != nil { - return fmt.Errorf("error executing extra template: %v", err) - } - } - } - - return nil -} - // CleanupAllocs removes the alloc from the map which are marked for deletion. // These could be old allocs that aren't running anymore but which need to be removed from // the config after a delay to ensure Vector has finished pushing all logs to upstream sink. @@ -304,16 +148,44 @@ func (app *App) CleanupAllocs(ctx context.Context, removeAllocInterval time.Dura for { select { case <-ticker: + deleteCnt := 0 app.Lock() for _, id := range app.expiredAllocs { app.log.Info("cleaning up alloc", "id", id) delete(app.allocs, id) + deleteCnt++ } + // Reset the expired allocs to nil. + app.expiredAllocs = nil app.Unlock() + // Only generate config if there were deletions. + if deleteCnt > 0 { + app.configUpdated <- true + } + case <-ctx.Done(): app.log.Warn("context cancellation received, quitting cleanup worker") return } } } + +func (app *App) ConfigUpdater(ctx context.Context) { + for { + select { + case <-app.configUpdated: + app.RLock() + allocs := app.allocs + app.RUnlock() + err := app.generateConfig(allocs) + if err != nil { + app.log.Error("error generating config", "error", err) + } + + case <-ctx.Done(): + app.log.Warn("context cancellation received, quitting config worker") + return + } + } +} diff --git a/examples/vector/nomad.toml b/examples/vector/nomad.toml index 0871125..ea99bd1 100644 --- a/examples/vector/nomad.toml +++ b/examples/vector/nomad.toml @@ -1,68 +1,17 @@ -[sources.source_nomad_alloc_a0717c55-b2a2-8c90-1a39-b875a95d5a1a_server] +[sources.source_nomad_alloc_7687b251-2861-0f4d-02d0-2496fa7d9e9a_proxy] type = "file" -include = [ "/opt/nomad/data/alloc/a0717c55-b2a2-8c90-1a39-b875a95d5a1a/alloc/logs/server*" ] +include = [ "/opt/nomad/data/alloc/7687b251-2861-0f4d-02d0-2496fa7d9e9a/alloc/logs/proxy*" ] line_delimiter = "\n" read_from = "beginning" -[transforms.transform_nomad_alloc_a0717c55-b2a2-8c90-1a39-b875a95d5a1a_server] +[transforms.transform_nomad_alloc_7687b251-2861-0f4d-02d0-2496fa7d9e9a_proxy] type = "remap" -inputs = ["source_nomad_alloc_a0717c55-b2a2-8c90-1a39-b875a95d5a1a_server"] +inputs = ["source_nomad_alloc_7687b251-2861-0f4d-02d0-2496fa7d9e9a_proxy"] source = ''' # Store Nomad metadata. .nomad.namespace = "default" .nomad.node_name = "pop-os" -.nomad.job_name = "http" -.nomad.group_name = "app" -.nomad.task_name = "server" -.nomad.alloc_id = "a0717c55-b2a2-8c90-1a39-b875a95d5a1a" -''' -[sources.source_nomad_alloc_8056b311-d9d9-28b3-1a60-1a7e71420e46_server] -type = "file" -include = [ "/opt/nomad/data/alloc/8056b311-d9d9-28b3-1a60-1a7e71420e46/alloc/logs/server*" ] -line_delimiter = "\n" -read_from = "beginning" -[transforms.transform_nomad_alloc_8056b311-d9d9-28b3-1a60-1a7e71420e46_server] -type = "remap" -inputs = ["source_nomad_alloc_8056b311-d9d9-28b3-1a60-1a7e71420e46_server"] -source = ''' -# Store Nomad metadata. -.nomad.namespace = "default" -.nomad.node_name = "pop-os" -.nomad.job_name = "http" -.nomad.group_name = "app" -.nomad.task_name = "server" -.nomad.alloc_id = "8056b311-d9d9-28b3-1a60-1a7e71420e46" -''' -[sources.source_nomad_alloc_5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35_server] -type = "file" -include = [ "/opt/nomad/data/alloc/5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35/alloc/logs/server*" ] -line_delimiter = "\n" -read_from = "beginning" -[transforms.transform_nomad_alloc_5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35_server] -type = "remap" -inputs = ["source_nomad_alloc_5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35_server"] -source = ''' -# Store Nomad metadata. -.nomad.namespace = "default" -.nomad.node_name = "pop-os" -.nomad.job_name = "http" -.nomad.group_name = "app" -.nomad.task_name = "server" -.nomad.alloc_id = "5fc6ee1a-fcfa-9a34-a70b-44f5ba627f35" -''' -[sources.source_nomad_alloc_1b139a2a-ada0-0b7f-b0ef-7390ffc55e56_server] -type = "file" -include = [ "/opt/nomad/data/alloc/1b139a2a-ada0-0b7f-b0ef-7390ffc55e56/alloc/logs/server*" ] -line_delimiter = "\n" -read_from = "beginning" -[transforms.transform_nomad_alloc_1b139a2a-ada0-0b7f-b0ef-7390ffc55e56_server] -type = "remap" -inputs = ["source_nomad_alloc_1b139a2a-ada0-0b7f-b0ef-7390ffc55e56_server"] -source = ''' -# Store Nomad metadata. -.nomad.namespace = "default" -.nomad.node_name = "pop-os" -.nomad.job_name = "http" -.nomad.group_name = "app" -.nomad.task_name = "server" -.nomad.alloc_id = "1b139a2a-ada0-0b7f-b0ef-7390ffc55e56" +.nomad.job_name = "nginx" +.nomad.group_name = "nginx" +.nomad.task_name = "proxy" +.nomad.alloc_id = "7687b251-2861-0f4d-02d0-2496fa7d9e9a" ''' diff --git a/main.go b/main.go index d504daa..8b45d1d 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,8 @@ import ( "os" "os/signal" "syscall" + + "github.com/hashicorp/nomad/api" ) var ( @@ -29,8 +31,11 @@ func main() { // Initialise a new instance of app. app := App{ - log: initLogger(ko), - opts: initOpts(ko), + log: initLogger(ko), + opts: initOpts(ko), + configUpdated: make(chan bool, 1000), + allocs: make(map[string]*api.Allocation, 0), + expiredAllocs: make([]string, 0), } // Initialise nomad events stream.