Skip to content

Commit

Permalink
Move threshold processing from core.Engine to MetricsEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Jan 27, 2023
1 parent daa866a commit 391c063
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 124 deletions.
2 changes: 0 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,6 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
if engine.IsTainted() {
if err == nil {
err = errors.New("some thresholds have failed")
} else {
logger.Error("some thresholds have failed") // log this, even if there was already a previous error
}
err = errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThresholdsAfterTestEnd,
Expand Down
3 changes: 2 additions & 1 deletion cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ func TestAbortedByThreshold(t *testing.T) {
)
cmd.ExecuteWithGlobalState(ts.GlobalState)

assert.True(t, testutils.LogContains(ts.LoggerHook.Drain(), logrus.ErrorLevel, `test run aborted by failed thresholds`))
expErr := "thresholds on metrics 'iterations' were breached; at least one has abortOnFail enabled, stopping test prematurely"
assert.True(t, testutils.LogContains(ts.LoggerHook.Drain(), logrus.ErrorLevel, expErr))
stdOut := ts.Stdout.String()
t.Log(stdOut)
assert.Contains(t, stdOut, `✗ iterations`)
Expand Down
149 changes: 53 additions & 96 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"context"
"errors"
"strings"
"sync"
"time"

Expand All @@ -17,10 +18,7 @@ import (
"go.k6.io/k6/output"
)

const (
collectRate = 50 * time.Millisecond
thresholdsRate = 2 * time.Second
)
const collectRate = 50 * time.Millisecond

// The Engine is the beating heart of k6.
type Engine struct {
Expand All @@ -47,10 +45,6 @@ type Engine struct {
stopChan chan struct{}

Samples chan metrics.SampleContainer

// Are thresholds tainted?
thresholdsTaintedLock sync.Mutex
thresholdsTainted bool
}

// NewEngine instantiates a new Engine, without doing any heavy initialization.
Expand All @@ -75,7 +69,7 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o
e.MetricsEngine = me

if !(testState.RuntimeOptions.NoSummary.Bool && testState.RuntimeOptions.NoThresholds.Bool) {
e.ingester = me.GetIngester()
e.ingester = me.CreateIngester()
outputs = append(outputs, e.ingester)
}

Expand Down Expand Up @@ -112,7 +106,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
}

// TODO: move all of this in a separate struct? see main TODO above
runSubCtx, runSubCancel := context.WithCancel(runCtx)
runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, e.logger)

execRunResult := make(chan error)
engineRunResult := make(chan error)
Expand All @@ -123,7 +117,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
e.logger.WithError(err).Debug("Execution scheduler terminated")

select {
case <-runSubCtx.Done():
case <-runCtx.Done():
// do nothing, the test run was aborted somehow
default:
execRunResult <- err // we finished normally, so send the result
Expand All @@ -142,7 +136,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
}

waitFn := e.startBackgroundProcesses(
globalCtx, runCtx, execRunResult, engineRunResult, runSubCancel, processMetricsAfterRun,
globalCtx, runCtx, execRunResult, engineRunResult, runSubAbort, processMetricsAfterRun,
)
return runFn, waitFn, nil
}
Expand All @@ -158,20 +152,19 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
// process is about to exit.
func (e *Engine) startBackgroundProcesses(
globalCtx, runCtx context.Context, execRunResult, engineRunResult chan error,
runSubCancel func(), processMetricsAfterRun chan struct{},
runSubAbort func(error), processMetricsAfterRun chan struct{},
) (wait func()) {
processes := new(sync.WaitGroup)

// Siphon and handle all produced metric samples
processes.Add(1)
go func() {
defer processes.Done()
e.processMetrics(globalCtx, processMetricsAfterRun)
e.processMetrics(globalCtx, processMetricsAfterRun, runSubAbort)
}()

// Update the test run status when the test finishes
processes.Add(1)
thresholdAbortChan := make(chan struct{})
go func() {
defer processes.Done()
var err error
Expand All @@ -186,58 +179,32 @@ func (e *Engine) startBackgroundProcesses(
} else {
e.logger.Debug("run: execution scheduler finished nominally")
}
// do nothing, return the same err value we got from the Run()
// ExecutionScheduler result, we just set the run_status based on it
case <-runCtx.Done():
e.logger.Debug("run: context expired; exiting...")
err = errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort),
errext.AbortedByUser,
)
}
}()

// Listen for stop calls from the REST API
processes.Add(1)
go func() {
defer processes.Done()
select {
case <-e.stopChan:
runSubCancel()
e.logger.Debug("run: stopped by user via REST API; exiting...")
err = errext.WithAbortReasonIfNone(
err := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI),
errext.AbortedByUser,
)
case <-thresholdAbortChan:
e.logger.Debug("run: stopped by thresholds; exiting...")
runSubCancel()
err = errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(errors.New("test run aborted by failed thresholds"), exitcodes.ThresholdsHaveFailed),
errext.AbortedByThreshold,
)
runSubAbort(err)
case <-runCtx.Done():
// do nothing
}
}()

// Run thresholds, if not disabled.
if !e.runtimeOptions.NoThresholds.Bool {
processes.Add(1)
go func() {
defer processes.Done()
defer e.logger.Debug("Engine: Thresholds terminated")
ticker := time.NewTicker(thresholdsRate)
defer ticker.Stop()

for {
select {
case <-ticker.C:
thresholdsTainted, shouldAbort := e.MetricsEngine.EvaluateThresholds(true)
e.thresholdsTaintedLock.Lock()
e.thresholdsTainted = thresholdsTainted
e.thresholdsTaintedLock.Unlock()
if shouldAbort {
close(thresholdAbortChan)
return
}
case <-runCtx.Done():
return
}
}
}()
}

return processes.Wait
}

Expand All @@ -248,29 +215,16 @@ func (e *Engine) startBackgroundProcesses(
// The `processMetricsAfterRun` channel argument is used by the caller to signal
// that the test run is finished, no more metric samples will be produced, and that
// the metrics samples remaining in the pipeline should be should be processed.
func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRun chan struct{}) {
func (e *Engine) processMetrics(
globalCtx context.Context, processMetricsAfterRun chan struct{}, runSubAbort func(error),
) {
sampleContainers := []metrics.SampleContainer{}

defer func() {
// Process any remaining metrics in the pipeline, by this point Run()
// has already finished and nothing else should be producing metrics.
e.logger.Debug("Metrics processing winding down...")

close(e.Samples)
for sc := range e.Samples {
sampleContainers = append(sampleContainers, sc)
}
e.OutputManager.AddMetricSamples(sampleContainers)

if !e.runtimeOptions.NoThresholds.Bool {
// Process the thresholds one final time
thresholdsTainted, _ := e.MetricsEngine.EvaluateThresholds(false)
e.thresholdsTaintedLock.Lock()
e.thresholdsTainted = thresholdsTainted
e.thresholdsTaintedLock.Unlock()
}
e.logger.Debug("Metrics processing finished!")
}()
// Run thresholds, if not disabled.
var finalizeThresholds func() (breached []string)
if !e.runtimeOptions.NoThresholds.Bool {
finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(runSubAbort)
}

ticker := time.NewTicker(collectRate)
defer ticker.Stop()
Expand All @@ -285,44 +239,47 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu
sampleContainers = make([]metrics.SampleContainer, 0, cap(sampleContainers))
}
}

finalize := func() {
// Process any remaining metrics in the pipeline, by this point Run()
// has already finished and nothing else should be producing metrics.
e.logger.Debug("Metrics processing winding down...")

close(e.Samples)
for sc := range e.Samples {
sampleContainers = append(sampleContainers, sc)
}
processSamples()

if finalizeThresholds != nil {
// Ensure the ingester flushes any buffered metrics
_ = e.ingester.Stop()
breached := finalizeThresholds()
e.logger.Debugf("Engine: thresholds done, breached: '%s'", strings.Join(breached, ", "))
}
e.logger.Debug("Metrics processing finished!")
}

for {
select {
case <-ticker.C:
processSamples()
case <-processMetricsAfterRun:
getCachedMetrics:
for {
select {
case sc := <-e.Samples:
sampleContainers = append(sampleContainers, sc)
default:
break getCachedMetrics
}
}
e.logger.Debug("Processing metrics and thresholds after the test run has ended...")
processSamples()
if !e.runtimeOptions.NoThresholds.Bool {
// Ensure the ingester flushes any buffered metrics
_ = e.ingester.Stop()
thresholdsTainted, _ := e.MetricsEngine.EvaluateThresholds(false)
e.thresholdsTaintedLock.Lock()
e.thresholdsTainted = thresholdsTainted
e.thresholdsTaintedLock.Unlock()
}
finalize()
processMetricsAfterRun <- struct{}{}

return
case sc := <-e.Samples:
sampleContainers = append(sampleContainers, sc)
case <-globalCtx.Done():
finalize()
return
}
}
}

func (e *Engine) IsTainted() bool {
e.thresholdsTaintedLock.Lock()
defer e.thresholdsTaintedLock.Unlock()
return e.thresholdsTainted
return e.MetricsEngine.GetMetricsWithBreachedThresholdsCount() > 0
}

// Stop closes a signal channel, forcing a running Engine to return
Expand Down
23 changes: 16 additions & 7 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ type testStruct struct {

func getTestPreInitState(tb testing.TB) *lib.TestPreInitState {
reg := metrics.NewRegistry()
logger := testutils.NewLogger(tb)
logger.SetLevel(logrus.DebugLevel)
return &lib.TestPreInitState{
Logger: testutils.NewLogger(tb),
Logger: logger,
RuntimeOptions: lib.RuntimeOptions{},
Registry: reg,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg),
Expand Down Expand Up @@ -413,7 +415,7 @@ func TestEngineThresholdsWillAbort(t *testing.T) {
assert.Fail(t, "Test should have completed within 10 seconds")
}
test.wait()
assert.True(t, test.engine.thresholdsTainted)
assert.True(t, test.engine.IsTainted())
}

func TestEngineAbortedByThresholds(t *testing.T) {
Expand All @@ -434,7 +436,8 @@ func TestEngineAbortedByThresholds(t *testing.T) {

thresholds := map[string]metrics.Thresholds{metric.Name: ths}

done := make(chan struct{})
doneIter := make(chan struct{})
doneRun := make(chan struct{})
runner := &minirunner.MiniRunner{
Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error {
out <- metrics.Sample{
Expand All @@ -446,7 +449,7 @@ func TestEngineAbortedByThresholds(t *testing.T) {
Value: 1.25,
}
<-ctx.Done()
close(done)
close(doneIter)
return nil
},
}
Expand All @@ -455,12 +458,18 @@ func TestEngineAbortedByThresholds(t *testing.T) {
defer test.wait()

go func() {
require.ErrorContains(t, test.run(), "aborted by failed thresholds")
defer close(doneRun)
t.Logf("test run done with err '%s'", err)
assert.ErrorContains(t, test.run(), "thresholds on metrics 'my_metric' were breached")
}()

select {
case <-done:
return
case <-doneIter:
case <-time.After(10 * time.Second):
assert.Fail(t, "Iteration should have completed within 10 seconds")
}
select {
case <-doneRun:
case <-time.After(10 * time.Second):
assert.Fail(t, "Test should have completed within 10 seconds")
}
Expand Down
9 changes: 2 additions & 7 deletions execution/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr
logger.Debug("Start all executors...")
e.state.SetExecutionStatus(lib.ExecutionStatusRunning)

// We are using this context to allow lib.Executor implementations to cancel
// this context effectively stopping all executions.
//
// This is for addressing test.abort().
execCtx, _ := NewTestRunContext(runSubCtx, logger)
for _, exec := range e.executors {
go e.runExecutor(execCtx, runResults, engineOut, exec)
go e.runExecutor(runSubCtx, runResults, engineOut, exec)
}

// Wait for all executors to finish
Expand All @@ -478,7 +473,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr
return err
}
}
if err := GetCancelReasonIfTestAborted(execCtx); err != nil && errext.IsInterruptError(err) {
if err := GetCancelReasonIfTestAborted(runSubCtx); err != nil {
interrupted = true
return err
}
Expand Down
Loading

0 comments on commit 391c063

Please sign in to comment.