From 16814cf9eda7423b3b9b4d53293a5b3a0ac04908 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Tue, 16 Jul 2024 10:32:54 +0100 Subject: [PATCH] Log error on failed component Run (#1286) --- internal/runtime/alloy.go | 7 +++--- .../controller/node_builtin_component.go | 17 +++++-------- .../internal/controller/node_config_import.go | 12 ++++------ .../controller/node_custom_component.go | 12 +++++----- .../runtime/internal/controller/scheduler.go | 24 ++++++++++++++----- .../internal/controller/scheduler_test.go | 12 ++++++---- 6 files changed, 47 insertions(+), 37 deletions(-) diff --git a/internal/runtime/alloy.go b/internal/runtime/alloy.go index 3417e387da..4479ae13c3 100644 --- a/internal/runtime/alloy.go +++ b/internal/runtime/alloy.go @@ -51,6 +51,9 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/internal/controller" "github.com/grafana/alloy/internal/runtime/internal/worker" @@ -58,8 +61,6 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/internal/service" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" ) // Options holds static options for an Alloy controller. @@ -180,7 +181,7 @@ func newController(o controllerOptions) *Runtime { opts: o, updateQueue: controller.NewQueue(), - sched: controller.NewScheduler(), + sched: controller.NewScheduler(log), modules: o.ModuleRegistry, diff --git a/internal/runtime/internal/controller/node_builtin_component.go b/internal/runtime/internal/controller/node_builtin_component.go index 15898f6d32..43e4893a2f 100644 --- a/internal/runtime/internal/controller/node_builtin_component.go +++ b/internal/runtime/internal/controller/node_builtin_component.go @@ -13,15 +13,15 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/trace" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/logging" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" - "github.com/prometheus/client_golang/prometheus" - "go.opentelemetry.io/otel/trace" ) // ComponentID is a fully-qualified name of a component. Each element in @@ -316,17 +316,12 @@ func (cn *BuiltinComponentNode) Run(ctx context.Context) error { cn.setRunHealth(component.HealthTypeHealthy, "started component") err := cn.managed.Run(ctx) - var exitMsg string - logger := cn.managedOpts.Logger + // Note: logging of this error is handled by the scheduler. if err != nil { - level.Error(logger).Log("msg", "component exited with error", "err", err) - exitMsg = fmt.Sprintf("component shut down with error: %s", err) + cn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("component shut down with error: %s", err)) } else { - level.Info(logger).Log("msg", "component exited") - exitMsg = "component shut down normally" + cn.setRunHealth(component.HealthTypeExited, "component shut down cleanly") } - - cn.setRunHealth(component.HealthTypeExited, exitMsg) return err } diff --git a/internal/runtime/internal/controller/node_config_import.go b/internal/runtime/internal/controller/node_config_import.go index 829ee9bc36..f01795405d 100644 --- a/internal/runtime/internal/controller/node_config_import.go +++ b/internal/runtime/internal/controller/node_config_import.go @@ -14,6 +14,8 @@ import ( "go.uber.org/atomic" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runner" "github.com/grafana/alloy/internal/runtime/internal/importsource" @@ -22,7 +24,6 @@ import ( "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/parser" "github.com/grafana/alloy/syntax/vm" - "github.com/prometheus/client_golang/prometheus" ) // ImportConfigNode imports declare and import blocks via a managed import source. @@ -365,15 +366,12 @@ func (cn *ImportConfigNode) Run(ctx context.Context) error { err = cn.run(errChan, updateTasks) - var exitMsg string + // Note: logging of this error is handled by the scheduler. if err != nil { - level.Error(cn.logger).Log("msg", "import exited with error", "err", err) - exitMsg = fmt.Sprintf("import shut down with error: %s", err) + cn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("import shut down with error: %s", err)) } else { - level.Info(cn.logger).Log("msg", "import exited") - exitMsg = "import shut down normally" + cn.setRunHealth(component.HealthTypeExited, "import shut down cleanly") } - cn.setRunHealth(component.HealthTypeExited, exitMsg) return err } diff --git a/internal/runtime/internal/controller/node_custom_component.go b/internal/runtime/internal/controller/node_custom_component.go index 6f4340231f..52fff29bc0 100644 --- a/internal/runtime/internal/controller/node_custom_component.go +++ b/internal/runtime/internal/controller/node_custom_component.go @@ -10,8 +10,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) @@ -211,7 +211,6 @@ func (cn *CustomComponentNode) evaluate(evalScope *vm.Scope) error { func (cn *CustomComponentNode) Run(ctx context.Context) error { cn.mut.RLock() managed := cn.managed - logger := cn.logger cn.mut.RUnlock() if managed == nil { @@ -220,12 +219,13 @@ func (cn *CustomComponentNode) Run(ctx context.Context) error { cn.setRunHealth(component.HealthTypeHealthy, "started custom component") err := managed.Run(ctx) + + // Note: logging of this error is handled by the scheduler. if err != nil { - level.Error(logger).Log("msg", "error running custom component", "id", cn.nodeID, "err", err) + cn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("custom component shut down with error: %s", err)) + } else { + cn.setRunHealth(component.HealthTypeExited, "custom component shut down cleanly") } - - level.Info(logger).Log("msg", "custom component exited") - cn.setRunHealth(component.HealthTypeExited, "custom component shut down") return err } diff --git a/internal/runtime/internal/controller/scheduler.go b/internal/runtime/internal/controller/scheduler.go index 10993aa801..f5e7f127fc 100644 --- a/internal/runtime/internal/controller/scheduler.go +++ b/internal/runtime/internal/controller/scheduler.go @@ -4,6 +4,10 @@ import ( "context" "fmt" "sync" + + "github.com/go-kit/log" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) // RunnableNode is any BlockNode which can also be run. @@ -17,6 +21,7 @@ type Scheduler struct { ctx context.Context cancel context.CancelFunc running sync.WaitGroup + logger log.Logger tasksMut sync.Mutex tasks map[string]*task @@ -26,11 +31,12 @@ type Scheduler struct { // components which are running. // // Call Close to stop the Scheduler and all running components. -func NewScheduler() *Scheduler { +func NewScheduler(logger log.Logger) *Scheduler { ctx, cancel := context.WithCancel(context.Background()) return &Scheduler{ ctx: ctx, cancel: cancel, + logger: logger, tasks: make(map[string]*task), } @@ -85,9 +91,15 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error { opts := taskOptions{ Context: s.ctx, Runnable: newRunnable, - OnDone: func() { + OnDone: func(err error) { defer s.running.Done() + if err != nil { + level.Error(s.logger).Log("msg", "node exited with error", "node", nodeID, "err", err) + } else { + level.Info(s.logger).Log("msg", "node exited without error", "node", nodeID) + } + s.tasksMut.Lock() defer s.tasksMut.Unlock() delete(s.tasks, nodeID) @@ -121,7 +133,7 @@ type task struct { type taskOptions struct { Context context.Context Runnable RunnableNode - OnDone func() + OnDone func(error) } // newTask creates and starts a new task. @@ -135,9 +147,9 @@ func newTask(opts taskOptions) *task { } go func() { - defer opts.OnDone() - defer close(t.exited) - _ = opts.Runnable.Run(t.ctx) + err := opts.Runnable.Run(t.ctx) + close(t.exited) + opts.OnDone(err) }() return t } diff --git a/internal/runtime/internal/controller/scheduler_test.go b/internal/runtime/internal/controller/scheduler_test.go index b571f4e312..3820e7cab2 100644 --- a/internal/runtime/internal/controller/scheduler_test.go +++ b/internal/runtime/internal/controller/scheduler_test.go @@ -2,17 +2,21 @@ package controller_test import ( "context" + "os" "sync" "testing" + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runtime/internal/controller" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" - "github.com/stretchr/testify/require" ) func TestScheduler_Synchronize(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) t.Run("Can start new jobs", func(t *testing.T) { var started, finished sync.WaitGroup started.Add(3) @@ -26,7 +30,7 @@ func TestScheduler_Synchronize(t *testing.T) { return nil } - sched := controller.NewScheduler() + sched := controller.NewScheduler(logger) sched.Synchronize([]controller.RunnableNode{ fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}}, fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}}, @@ -48,7 +52,7 @@ func TestScheduler_Synchronize(t *testing.T) { return nil } - sched := controller.NewScheduler() + sched := controller.NewScheduler(logger) for i := 0; i < 10; i++ { // If a new runnable is created, runFunc will panic since the WaitGroup @@ -74,7 +78,7 @@ func TestScheduler_Synchronize(t *testing.T) { return nil } - sched := controller.NewScheduler() + sched := controller.NewScheduler(logger) sched.Synchronize([]controller.RunnableNode{ fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},