From 4077da3160004fe2114692f28910a4fa7d27c8e7 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 7 Dec 2022 16:43:50 +0200 Subject: [PATCH] Add new AbortReason type, use old RunStatus type only for k6 cloud code RunStatus is very specific to the k6 cloud and doesn't map perfectly to how k6 OSS sees the causes of prematurely stopped tests. So, this commit restricts the usage of RunStatus only to the cloudapi/ package and to the `k6 cloud` command handling in `cmd/cloud.go`. It does that by introducing a new type, `errext.AbortReason`, and a way to attach this type to test run errors. This allows us a cleaner error propagation to the outputs, and every output can map these generic values to its own internal ones however it wants. It is not necessary for that mapping to be exactly 1:1 and, indeed, the `errext.AbortReason`:`cloudapi.RunStatus` mapping in cloudapi/ is not 1:1. --- api/v1/setup_teardown_routes_test.go | 2 +- api/v1/status_routes_test.go | 2 +- cmd/cloud.go | 1 + cmd/integration_test.go | 6 +-- cmd/run.go | 10 +++-- core/engine.go | 34 ++++++--------- core/engine_test.go | 4 +- errext/abort_reason.go | 54 ++++++++++++++++++++++++ errext/exception.go | 1 + errext/interrupt_error.go | 11 ++++- js/runner.go | 15 ++++--- js/runner_test.go | 2 +- js/timeout_error.go | 13 ++++-- output/cloud/output.go | 62 +++++++++++++++++++++------- output/cloud/output_test.go | 2 - output/manager.go | 27 ++++++------ output/types.go | 14 +++++-- 17 files changed, 181 insertions(+), 79 deletions(-) create mode 100644 errext/abort_reason.go diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index fa61aeecb14..8829755c901 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -142,7 +142,7 @@ func TestSetupData(t *testing.T) { require.NoError(t, err) require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs() + defer engine.OutputManager.StopOutputs(nil) globalCtx, globalCancel := context.WithCancel(context.Background()) runCtx, runCancel := context.WithCancel(globalCtx) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 685a2c25737..03ce1f2d4f5 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -116,7 +116,7 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, err) require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs() + defer engine.OutputManager.StopOutputs(nil) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) run, wait, err := engine.Init(ctx, ctx) diff --git a/cmd/cloud.go b/cmd/cloud.go index 0a352c47bce..65a29fedef6 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -62,6 +62,7 @@ func (c *cmdCloud) preRun(cmd *cobra.Command, args []string) error { } // TODO: split apart some more +// //nolint:funlen,gocognit,cyclop func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { printBanner(c.gs) diff --git a/cmd/integration_test.go b/cmd/integration_test.go index cbd1c86a218..88b43badd0f 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -1401,7 +1401,7 @@ func TestMinIterationDuration(t *testing.T) { import { Counter } from 'k6/metrics'; export let options = { - minIterationDuration: '5s', + minIterationDuration: '7s', setupTimeout: '2s', teardownTimeout: '2s', thresholds: { @@ -1421,9 +1421,9 @@ func TestMinIterationDuration(t *testing.T) { start := time.Now() newRootCommand(ts.globalState).execute() elapsed := time.Since(start) - assert.Greater(t, elapsed, 5*time.Second, "expected more time to have passed because of minIterationDuration") + assert.Greater(t, elapsed, 7*time.Second, "expected more time to have passed because of minIterationDuration") assert.Less( - t, elapsed, 10*time.Second, + t, elapsed, 14*time.Second, "expected less time to have passed because minIterationDuration should not affect setup() and teardown() ", ) diff --git a/cmd/run.go b/cmd/run.go index 67f7456c15d..b754b7555c6 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -36,7 +36,7 @@ type cmdRun struct { // TODO: split apart some more // //nolint:funlen,gocognit,gocyclo,cyclop -func (c *cmdRun) run(cmd *cobra.Command, args []string) error { +func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { printBanner(c.gs) test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig) @@ -158,7 +158,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { if err != nil { return err } - defer engine.OutputManager.StopOutputs() + defer func() { + engine.OutputManager.StopOutputs(err) + }() printExecutionDescription( c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, @@ -273,7 +275,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { } else { logger.Error("some thresholds have failed") // log this, even if there was already a previous error } - err = errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThresholdsAfterTestEnd, + ) } return err } diff --git a/core/engine.go b/core/engine.go index 7afcf1660cf..5e31a567a4f 100644 --- a/core/engine.go +++ b/core/engine.go @@ -8,7 +8,6 @@ import ( "github.com/sirupsen/logrus" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib" @@ -108,7 +107,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait // TODO: if we ever need metrics processing in the init context, we can move // this below the other components... or even start them concurrently? if err := e.ExecutionScheduler.Init(runCtx, e.Samples); err != nil { - e.setRunStatusFromError(err) return nil, nil, err } @@ -148,18 +146,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait return runFn, waitFn, nil } -func (e *Engine) setRunStatusFromError(err error) { - var serr errext.Exception - switch { - case errors.As(err, &serr): - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedScriptError) - case errext.IsInterruptError(err): - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedUser) - default: - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedSystem) - } -} - // This starts a bunch of goroutines to process metrics, thresholds, and set the // test run status when it ends. It returns a function that can be used after // the provided context is called, to wait for the complete winding down of all @@ -196,27 +182,31 @@ func (e *Engine) startBackgroundProcesses( case err = <-execRunResult: if err != nil { e.logger.WithError(err).Debug("run: execution scheduler returned an error") - e.setRunStatusFromError(err) } else { e.logger.Debug("run: execution scheduler finished nominally") - e.OutputManager.SetRunStatus(cloudapi.RunStatusFinished) } // do nothing, return the same err value we got from the Run() // ExecutionScheduler result, we just set the run_status based on it case <-runCtx.Done(): e.logger.Debug("run: context expired; exiting...") - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedUser) - err = errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort), + errext.AbortedByUser, + ) case <-e.stopChan: runSubCancel() e.logger.Debug("run: stopped by user via REST API; exiting...") - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedUser) - err = errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI), + errext.AbortedByUser, + ) case <-thresholdAbortChan: e.logger.Debug("run: stopped by thresholds; exiting...") runSubCancel() - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedThreshold) - err = errext.WithExitCodeIfNone(errors.New("test run aborted by failed thresholds"), exitcodes.ThresholdsHaveFailed) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(errors.New("test run aborted by failed thresholds"), exitcodes.ThresholdsHaveFailed), + errext.AbortedByThreshold, + ) } }() diff --git a/core/engine_test.go b/core/engine_test.go index fb22d048df3..d06962dd242 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -109,7 +109,7 @@ func newTestEngineWithTestPreInitState( //nolint:golint test.runCancel() globalCancel() waitFn() - engine.OutputManager.StopOutputs() + engine.OutputManager.StopOutputs(nil) }, piState: piState, } @@ -1333,7 +1333,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) cancel() waitFn() - engine.OutputManager.StopOutputs() + engine.OutputManager.StopOutputs(nil) require.False(t, engine.IsTainted()) } diff --git a/errext/abort_reason.go b/errext/abort_reason.go new file mode 100644 index 00000000000..a9452f92be7 --- /dev/null +++ b/errext/abort_reason.go @@ -0,0 +1,54 @@ +package errext + +import "errors" + +// AbortReason is used to signal to outputs what type of error caused the test +// run to be stopped prematurely. +type AbortReason uint8 + +// These are the various reasons why a test might have been aborted prematurely. +const ( + AbortedByUser AbortReason = iota + 1 + AbortedByThreshold + AbortedByThresholdsAfterTestEnd // TODO: rename? + AbortedByScriptError + AbortedByScriptAbort + AbortedByTimeout +) + +// HasAbortReason is a wrapper around an error with an attached abort reason. +type HasAbortReason interface { + error + AbortReason() AbortReason +} + +// WithAbortReasonIfNone can attach an abort reason to the given error, if it +// doesn't have one already. It won't do anything if the error already had an +// abort reason attached. Similarly, if there is no error (i.e. the given error +// is nil), it also won't do anything and will return nil. +func WithAbortReasonIfNone(err error, abortReason AbortReason) error { + if err == nil { + return nil // No error, do nothing + } + var arerr HasAbortReason + if errors.As(err, &arerr) { + // The given error already has an abort reason, do nothing + return err + } + return withAbortReason{err, abortReason} +} + +type withAbortReason struct { + error + abortReason AbortReason +} + +func (ar withAbortReason) Unwrap() error { + return ar.error +} + +func (ar withAbortReason) AbortReason() AbortReason { + return ar.abortReason +} + +var _ HasAbortReason = withAbortReason{} diff --git a/errext/exception.go b/errext/exception.go index 77d33f4afeb..cce7260de04 100644 --- a/errext/exception.go +++ b/errext/exception.go @@ -5,5 +5,6 @@ package errext // a stack trace that lead to them. type Exception interface { error + HasAbortReason StackTrace() string } diff --git a/errext/interrupt_error.go b/errext/interrupt_error.go index 637622daed2..77a6e649163 100644 --- a/errext/interrupt_error.go +++ b/errext/interrupt_error.go @@ -11,7 +11,10 @@ type InterruptError struct { Reason string } -var _ HasExitCode = &InterruptError{} +var _ interface { + HasExitCode + HasAbortReason +} = &InterruptError{} // Error returns the reason of the interruption. func (i *InterruptError) Error() string { @@ -23,6 +26,12 @@ func (i *InterruptError) ExitCode() exitcodes.ExitCode { return exitcodes.ScriptAborted } +// AbortReason is used to signal that an InterruptError is caused by the +// test.abort() functin in k6/execution. +func (i *InterruptError) AbortReason() AbortReason { + return AbortedByScriptAbort +} + // AbortTest is the reason emitted when a test script calls test.abort() const AbortTest = "test aborted" diff --git a/js/runner.go b/js/runner.go index a8bd4a0a9a9..edb1d43c648 100644 --- a/js/runner.go +++ b/js/runner.go @@ -845,11 +845,12 @@ type scriptException struct { inner *goja.Exception } -var ( - _ errext.Exception = &scriptException{} - _ errext.HasExitCode = &scriptException{} - _ errext.HasHint = &scriptException{} -) +var _ interface { + errext.Exception + errext.HasExitCode + errext.HasHint + errext.HasAbortReason +} = &scriptException{} func (s *scriptException) Error() string { // this calls String instead of error so that by default if it's printed to print the stacktrace @@ -868,6 +869,10 @@ func (s *scriptException) Hint() string { return "script exception" } +func (s *scriptException) AbortReason() errext.AbortReason { + return errext.AbortedByScriptError +} + func (s *scriptException) ExitCode() exitcodes.ExitCode { return exitcodes.ScriptException } diff --git a/js/runner_test.go b/js/runner_test.go index 8645faddfc9..d3b5ee744d7 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -391,7 +391,7 @@ func TestDataIsolation(t *testing.T) { engine, err := core.NewEngine(testRunState, execScheduler, []output.Output{mockOutput}) require.NoError(t, err) require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs() + defer engine.OutputManager.StopOutputs(nil) ctx, cancel := context.WithCancel(context.Background()) run, wait, err := engine.Init(ctx, ctx) diff --git a/js/timeout_error.go b/js/timeout_error.go index 72cc1d7aa79..e887b0922b2 100644 --- a/js/timeout_error.go +++ b/js/timeout_error.go @@ -15,10 +15,11 @@ type timeoutError struct { d time.Duration } -var ( - _ errext.HasExitCode = timeoutError{} - _ errext.HasHint = timeoutError{} -) +var _ interface { + errext.HasExitCode + errext.HasHint + errext.HasAbortReason +} = timeoutError{} // newTimeoutError returns a new timeout error, reporting that a timeout has // happened at the given place and given duration. @@ -44,6 +45,10 @@ func (t timeoutError) Hint() string { return hint } +func (t timeoutError) AbortReason() errext.AbortReason { + return errext.AbortedByTimeout +} + // ExitCode returns the coresponding exit code value to the place. func (t timeoutError) ExitCode() exitcodes.ExitCode { // TODO: add handleSummary() diff --git a/output/cloud/output.go b/output/cloud/output.go index 0a73c5f16e5..2b8e3d43716 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -14,6 +14,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/cloudapi" + "go.k6.io/k6/errext" "go.k6.io/k6/output" "go.k6.io/k6/lib" @@ -36,8 +37,6 @@ type Output struct { thresholds map[string][]*metrics.Threshold client *MetricsClient - runStatus cloudapi.RunStatus - bufferMutex sync.Mutex bufferHTTPTrails []*httpext.Trail bufferSamples []*Sample @@ -65,7 +64,7 @@ type Output struct { // Verify that Output implements the wanted interfaces var _ interface { - output.WithRunStatusUpdates + output.WithStopWithTestError output.WithThresholds output.WithTestRunStop } = &Output{} @@ -259,9 +258,18 @@ func (out *Output) startBackgroundProcesses() { }() } -// Stop gracefully stops all metric emission from the output and when all metric -// samples are emitted, it sends an API to the cloud to finish the test run. +// Stop gracefully stops all metric emission from the output: when all metric +// samples are emitted, it makes a cloud API call to finish the test run. +// +// Deprecated: use StopWithTestError() instead. func (out *Output) Stop() error { + return out.StopWithTestError(nil) +} + +// StopWithTestError gracefully stops all metric emission from the output: when +// all metric samples are emitted, it makes a cloud API call to finish the test +// run. If testErr was specified, it extracts the RunStatus from it. +func (out *Output) StopWithTestError(testErr error) error { out.logger.Debug("Stopping the cloud output...") close(out.stopAggregation) out.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation @@ -269,7 +277,7 @@ func (out *Output) Stop() error { close(out.stopOutput) out.outputDone.Wait() out.logger.Debug("Metric emission stopped, calling cloud API...") - err := out.testFinished() + err := out.testFinished(testErr) if err != nil { out.logger.WithFields(logrus.Fields{"error": err}).Warn("Failed to send test finished to the cloud") } else { @@ -283,9 +291,37 @@ func (out *Output) Description() string { return fmt.Sprintf("cloud (%s)", cloudapi.URLForResults(out.referenceID, out.config)) } -// SetRunStatus receives the latest run status from the Engine. -func (out *Output) SetRunStatus(status cloudapi.RunStatus) { - out.runStatus = status +// getRunStatus determines the run status of the test based on the error. +func (out *Output) getRunStatus(testErr error) cloudapi.RunStatus { + if testErr == nil { + return cloudapi.RunStatusFinished + } + + var err errext.HasAbortReason + if errors.As(testErr, &err) { + abortReason := err.AbortReason() + switch abortReason { + case errext.AbortedByUser: + return cloudapi.RunStatusAbortedUser + case errext.AbortedByThreshold: + return cloudapi.RunStatusAbortedThreshold + case errext.AbortedByScriptError: + return cloudapi.RunStatusAbortedScriptError + case errext.AbortedByScriptAbort: + return cloudapi.RunStatusAbortedUser // TODO: have a better value than this? + case errext.AbortedByTimeout: + return cloudapi.RunStatusAbortedLimit + case errext.AbortedByThresholdsAfterTestEnd: + // The test finished normally, it wasn't prematurely aborted. Such + // failures are tracked by the restult_status, not the run_status + // (so called "tainted" in some places of the API here). + return cloudapi.RunStatusFinished + } + } + + // By default, the catch-all error is "aborted by system", but let's log that + out.logger.WithError(testErr).Debug("unknown test error classified as 'aborted by system'") + return cloudapi.RunStatusAbortedSystem } // SetThresholds receives the thresholds before the output is Start()-ed. @@ -617,7 +653,7 @@ func (out *Output) pushMetrics() { }).Debug("Pushing metrics to cloud finished") } -func (out *Output) testFinished() error { +func (out *Output) testFinished(testErr error) error { if out.referenceID == "" || out.config.PushRefID.Valid { return nil } @@ -634,11 +670,7 @@ func (out *Output) testFinished() error { } } - runStatus := cloudapi.RunStatusFinished - if out.runStatus != cloudapi.RunStatusQueued { - runStatus = out.runStatus - } - + runStatus := out.getRunStatus(testErr) out.logger.WithFields(logrus.Fields{ "ref": out.referenceID, "tainted": testTainted, diff --git a/output/cloud/output_test.go b/output/cloud/output_test.go index 6cd04612219..7b16e03a1f7 100644 --- a/output/cloud/output_test.go +++ b/output/cloud/output_test.go @@ -506,7 +506,6 @@ func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) { require.NoError(t, out.Stop()) - require.Equal(t, cloudapi.RunStatusQueued, out.runStatus) select { case <-out.stopSendingMetrics: // all is fine @@ -598,7 +597,6 @@ func TestCloudOutputAggregationPeriodZeroNoBlock(t *testing.T) { tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) require.NoError(t, out.Stop()) - require.Equal(t, cloudapi.RunStatusQueued, out.runStatus) } func TestCloudOutputPushRefID(t *testing.T) { diff --git a/output/manager.go b/output/manager.go index 1ab3ba29ca5..4d041dd346c 100644 --- a/output/manager.go +++ b/output/manager.go @@ -2,7 +2,6 @@ package output import ( "github.com/sirupsen/logrus" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/metrics" ) @@ -35,7 +34,7 @@ func (om *Manager) StartOutputs() error { } if err := out.Start(); err != nil { - om.stopOutputs(i) + om.stopOutputs(err, i) return err } } @@ -43,25 +42,23 @@ func (om *Manager) StartOutputs() error { } // StopOutputs stops all configured outputs. -func (om *Manager) StopOutputs() { - om.stopOutputs(len(om.outputs)) +func (om *Manager) StopOutputs(testErr error) { + om.stopOutputs(testErr, len(om.outputs)) } -func (om *Manager) stopOutputs(upToID int) { +func (om *Manager) stopOutputs(testErr error, upToID int) { om.logger.Debugf("Stopping %d outputs...", upToID) for i := 0; i < upToID; i++ { - if err := om.outputs[i].Stop(); err != nil { - om.logger.WithError(err).Errorf("Stopping output %d failed", i) + out := om.outputs[i] + var err error + if sout, ok := out.(WithStopWithTestError); ok { + err = sout.StopWithTestError(testErr) + } else { + err = out.Stop() } - } -} -// SetRunStatus checks which outputs implement the WithRunStatusUpdates -// interface and sets the provided RunStatus to them. -func (om *Manager) SetRunStatus(status cloudapi.RunStatus) { - for _, out := range om.outputs { - if statUpdOut, ok := out.(WithRunStatusUpdates); ok { - statUpdOut.SetRunStatus(status) + if err != nil { + om.logger.WithError(err).Errorf("Stopping output %d failed", i) } } } diff --git a/output/types.go b/output/types.go index 92a2648d1d0..646bacc8589 100644 --- a/output/types.go +++ b/output/types.go @@ -11,7 +11,6 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/afero" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" ) @@ -76,10 +75,17 @@ type WithTestRunStop interface { SetTestRunStopCallback(func(error)) } -// WithRunStatusUpdates means the output can receive test run status updates. -type WithRunStatusUpdates interface { +// WithStopWithTestError allows output to receive the error value that the test +// finished with. It could be nil, if the test finished nominally. +// +// If this interface is implemented by the output, StopWithError() will be +// called instead of Stop(). +// +// TODO: refactor the main interface to use this method instead of Stop()? Or +// something else along the lines of https://github.com/grafana/k6/issues/2430 ? +type WithStopWithTestError interface { Output - SetRunStatus(latestStatus cloudapi.RunStatus) + StopWithTestError(testRunErr error) error // nil testRunErr means error-free test run } // WithBuiltinMetrics means the output can receive the builtin metrics.