diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 079f8e0ab8..11de9236a9 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -74,6 +74,9 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched // // Schedule() completely overrides the set of previously running components. // Components which have been removed since the last call to Schedule will be stopped. +// +// updateConsumers is called after the components are paused and before starting the new components. +// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component. func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() @@ -109,7 +112,7 @@ func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h ote updateConsumers() // 4. Start the new components - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) + level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents)) cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 477809c960..a7c50a8d24 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "go.uber.org/atomic" + + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" @@ -59,6 +62,65 @@ func TestScheduler(t *testing.T) { require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) + t.Run("Pause callbacks are called", func(t *testing.T) { + var ( + pauseCalls = &atomic.Int32{} + resumeCalls = &atomic.Int32{} + l = util.TestLogger(t) + cs = scheduler.NewWithPauseCallbacks( + l, + func() { pauseCalls.Inc() }, + func() { resumeCalls.Inc() }, + ) + h = scheduler.NewHost(l) + ) + ctx, cancel := context.WithCancel(context.Background()) + + // Run our scheduler in the background. + go func() { + err := cs.Run(ctx) + require.NoError(t, err) + }() + + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + // The Run function starts the components. They should be paused and then resumed. + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + // Schedule our component, which should notify the started and stopped + // trigger once it starts and stops respectively. + component, started, stopped := newTriggerComponent() + cs.Schedule(ctx, func() {}, h, component) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + // Wait for the component to start, and then unschedule all components, which + // should cause our running component to terminate. + require.NoError(t, started.Wait(5*time.Second), "component did not start") + cs.Schedule(ctx, func() {}, h) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") + + // Stop the scheduler + cancel() + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should not be called on shutdown") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + }) + t.Run("Running components get stopped on shutdown", func(t *testing.T) { var ( l = util.TestLogger(t)