Skip to content

Commit

Permalink
test and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Jan 2, 2025
1 parent f1da2d7 commit f5312dd
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
5 changes: 4 additions & 1 deletion internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched
//
// Schedule() completely overrides the set of previously running components.
// Components which have been removed since the last call to Schedule will be stopped.
//
// updateConsumers is called after the components are paused and before starting the new components.
// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component.
func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()
Expand Down Expand Up @@ -109,7 +112,7 @@ func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h ote
updateConsumers()

// 4. Start the new components
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents))
level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents))
cs.schedComponents = cs.startComponents(ctx, h, cc...)
cs.host = h
//TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers?
Expand Down
62 changes: 62 additions & 0 deletions internal/component/otelcol/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"
"time"

"go.uber.org/atomic"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"

Expand Down Expand Up @@ -59,6 +62,65 @@ func TestScheduler(t *testing.T) {
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

t.Run("Pause callbacks are called", func(t *testing.T) {
var (
pauseCalls = &atomic.Int32{}
resumeCalls = &atomic.Int32{}
l = util.TestLogger(t)
cs = scheduler.NewWithPauseCallbacks(
l,
func() { pauseCalls.Inc() },
func() { resumeCalls.Inc() },
)
h = scheduler.NewHost(l)
)
ctx, cancel := context.WithCancel(context.Background())

// Run our scheduler in the background.
go func() {
err := cs.Run(ctx)
require.NoError(t, err)
}()

toInt := func(a *atomic.Int32) int { return int(a.Load()) }

// The Run function starts the components. They should be paused and then resumed.
require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run")
assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(ctx, func() {}, h, component)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule")
assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(ctx, func() {}, h)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")

// Stop the scheduler
cancel()

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should not be called on shutdown")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")
})

t.Run("Running components get stopped on shutdown", func(t *testing.T) {
var (
l = util.TestLogger(t)
Expand Down

0 comments on commit f5312dd

Please sign in to comment.