diff --git a/api/server_test.go b/api/server_test.go index e6c412d7d91..dd7fc06c814 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -13,7 +13,7 @@ import ( "go.k6.io/k6/api/common" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -71,7 +71,7 @@ func TestWithEngine(t *testing.T) { Runner: &minirunner.MiniRunner{}, } - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 6dc06f74854..460fb0e4635 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" @@ -50,7 +50,7 @@ func TestGetGroups(t *testing.T) { assert.NoError(t, err) testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 131f16278f7..5612d5f3523 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -11,7 +11,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" @@ -23,7 +23,7 @@ func TestGetMetrics(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -82,7 +82,7 @@ func TestGetMetric(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 8829755c901..4b9e06b150e 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -16,7 +16,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" @@ -136,7 +136,7 @@ func TestSetupData(t *testing.T) { TeardownTimeout: types.NullDurationFrom(5 * time.Second), }, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/api/v1/status_routes.go b/api/v1/status_routes.go index f9d444dae16..dc18a822660 100644 --- a/api/v1/status_routes.go +++ b/api/v1/status_routes.go @@ -7,7 +7,7 @@ import ( "net/http" "go.k6.io/k6/api/common" - "go.k6.io/k6/lib" + "go.k6.io/k6/execution" "go.k6.io/k6/lib/executor" ) @@ -24,9 +24,7 @@ func handleGetStatus(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func getFirstExternallyControlledExecutor( - execScheduler lib.ExecutionScheduler, -) (*executor.ExternallyControlled, error) { +func getFirstExternallyControlledExecutor(execScheduler *execution.Scheduler) (*executor.ExternallyControlled, error) { executors := execScheduler.GetExecutors() for _, s := range executors { if mex, ok := s.(*executor.ExternallyControlled); ok { diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 03ce1f2d4f5..bb31003ed2b 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -14,7 +14,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" ) @@ -23,7 +23,7 @@ func TestGetStatus(t *testing.T) { t.Parallel() testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -110,7 +110,7 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{}) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) diff --git a/cmd/integration_tests/eventloop/eventloop_test.go b/cmd/integration_tests/eventloop/eventloop_test.go index 06b492418f4..d24407e8746 100644 --- a/cmd/integration_tests/eventloop/eventloop_test.go +++ b/cmd/integration_tests/eventloop/eventloop_test.go @@ -10,7 +10,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "go.k6.io/k6/cmd/integration_tests/testmodules/events" - "go.k6.io/k6/core/local" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/js/modules" "go.k6.io/k6/lib" @@ -57,7 +57,7 @@ func eventLoopTest(t *testing.T, script []byte, testHandle func(context.Context, RunTags: piState.Registry.RootTagSet().WithTagsFromMap(newOpts.RunTags), } - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) samples := make(chan metrics.SampleContainer, newOpts.MetricSamplesBufferSize.Int64) diff --git a/cmd/run.go b/cmd/run.go index af5f173a309..b51ec887d3f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -19,9 +19,9 @@ import ( "go.k6.io/k6/api" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" @@ -71,7 +71,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { logger := testRunState.Logger // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) if err != nil { return err } @@ -329,7 +329,7 @@ a commandline interface for interacting with it.`, return runCmd } -func reportUsage(execScheduler *local.ExecutionScheduler) error { +func reportUsage(execScheduler *execution.Scheduler) error { execState := execScheduler.GetState() executorConfigs := execScheduler.GetExecutorConfigs() diff --git a/core/engine.go b/core/engine.go index 5e31a567a4f..aa37d4bf4d1 100644 --- a/core/engine.go +++ b/core/engine.go @@ -10,6 +10,7 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" "go.k6.io/k6/metrics/engine" @@ -33,7 +34,7 @@ type Engine struct { // TODO: completely remove the engine and use all of these separately, in a // much more composable and testable manner - ExecutionScheduler lib.ExecutionScheduler + ExecutionScheduler *execution.Scheduler MetricsEngine *engine.MetricsEngine OutputManager *output.Manager @@ -53,7 +54,7 @@ type Engine struct { } // NewEngine instantiates a new Engine, without doing any heavy initialization. -func NewEngine(testState *lib.TestRunState, ex lib.ExecutionScheduler, outputs []output.Output) (*Engine, error) { +func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []output.Output) (*Engine, error) { if ex == nil { return nil, errors.New("missing ExecutionScheduler instance") } diff --git a/core/engine_test.go b/core/engine_test.go index 86c09617997..2513be3b794 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/executor" @@ -82,7 +82,7 @@ func newTestEngineWithTestPreInitState( //nolint:golint testRunState := getTestRunState(t, piState, newOpts, runner) - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) engine, err := NewEngine(testRunState, execScheduler, outputs) @@ -927,7 +927,7 @@ func TestVuInitException(t *testing.T) { testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := NewEngine(testState, execScheduler, nil) require.NoError(t, err) @@ -1315,7 +1315,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := local.NewExecutionScheduler(testState) + execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) engine, err := NewEngine(testState, execScheduler, []output.Output{mockOutput}) require.NoError(t, err) diff --git a/core/local/util_test.go b/core/local/util_test.go deleted file mode 100644 index 55426902c19..00000000000 --- a/core/local/util_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package local - -//TODO: translate this test to the new paradigm -/* -func TestProcessStages(t *testing.T) { - type checkpoint struct { - D time.Duration - Keep bool - VUs null.Int - } - testdata := map[string]struct { - Start int64 - Stages []lib.Stage - Checkpoints []checkpoint - }{ - "none": { - 0, - []lib.Stage{}, - []checkpoint{ - {0 * time.Second, false, null.NewInt(0, false)}, - {10 * time.Second, false, null.NewInt(0, false)}, - {24 * time.Hour, false, null.NewInt(0, false)}, - }, - }, - "one": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {10 * time.Second, true, null.NewInt(0, false)}, - {11 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "one/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - {1 * time.Second, true, null.NewInt(5, false)}, - {10 * time.Second, true, null.NewInt(5, false)}, - {11 * time.Second, false, null.NewInt(5, false)}, - }, - }, - "one/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second), Target: null.IntFrom(100)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(10)}, - {2 * time.Second, true, null.IntFrom(20)}, - {3 * time.Second, true, null.IntFrom(30)}, - {4 * time.Second, true, null.IntFrom(40)}, - {5 * time.Second, true, null.IntFrom(50)}, - {6 * time.Second, true, null.IntFrom(60)}, - {7 * time.Second, true, null.IntFrom(70)}, - {8 * time.Second, true, null.IntFrom(80)}, - {9 * time.Second, true, null.IntFrom(90)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, false, null.IntFrom(100)}, - }, - }, - "one/targeted/start": { - 50, - []lib.Stage{ - {Duration: types.NullDurationFrom(10 * time.Second), Target: null.IntFrom(100)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(50)}, - {1 * time.Second, true, null.IntFrom(55)}, - {2 * time.Second, true, null.IntFrom(60)}, - {3 * time.Second, true, null.IntFrom(65)}, - {4 * time.Second, true, null.IntFrom(70)}, - {5 * time.Second, true, null.IntFrom(75)}, - {6 * time.Second, true, null.IntFrom(80)}, - {7 * time.Second, true, null.IntFrom(85)}, - {8 * time.Second, true, null.IntFrom(90)}, - {9 * time.Second, true, null.IntFrom(95)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, false, null.IntFrom(100)}, - }, - }, - "two": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {11 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "two/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - {1 * time.Second, true, null.NewInt(5, false)}, - {11 * time.Second, false, null.NewInt(5, false)}, - }, - }, - "two/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(100)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(0)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(20)}, - {2 * time.Second, true, null.IntFrom(40)}, - {3 * time.Second, true, null.IntFrom(60)}, - {4 * time.Second, true, null.IntFrom(80)}, - {5 * time.Second, true, null.IntFrom(100)}, - {6 * time.Second, true, null.IntFrom(80)}, - {7 * time.Second, true, null.IntFrom(60)}, - {8 * time.Second, true, null.IntFrom(40)}, - {9 * time.Second, true, null.IntFrom(20)}, - {10 * time.Second, true, null.IntFrom(0)}, - {11 * time.Second, false, null.IntFrom(0)}, - }, - }, - "three": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(10 * time.Second)}, - {Duration: types.NullDurationFrom(15 * time.Second)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Second, true, null.NewInt(0, false)}, - {15 * time.Second, true, null.NewInt(0, false)}, - {30 * time.Second, true, null.NewInt(0, false)}, - {31 * time.Second, false, null.NewInt(0, false)}, - }, - }, - "three/targeted": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(50)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(100)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(0)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - {1 * time.Second, true, null.IntFrom(10)}, - {2 * time.Second, true, null.IntFrom(20)}, - {3 * time.Second, true, null.IntFrom(30)}, - {4 * time.Second, true, null.IntFrom(40)}, - {5 * time.Second, true, null.IntFrom(50)}, - {6 * time.Second, true, null.IntFrom(60)}, - {7 * time.Second, true, null.IntFrom(70)}, - {8 * time.Second, true, null.IntFrom(80)}, - {9 * time.Second, true, null.IntFrom(90)}, - {10 * time.Second, true, null.IntFrom(100)}, - {11 * time.Second, true, null.IntFrom(80)}, - {12 * time.Second, true, null.IntFrom(60)}, - {13 * time.Second, true, null.IntFrom(40)}, - {14 * time.Second, true, null.IntFrom(20)}, - {15 * time.Second, true, null.IntFrom(0)}, - {16 * time.Second, false, null.IntFrom(0)}, - }, - }, - "mix": { - 0, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(20)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - {Duration: types.NullDurationFrom(2 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(20)}, - {Duration: types.NullDurationFrom(2 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - }, - []checkpoint{ - {0 * time.Second, true, null.IntFrom(0)}, - - {1 * time.Second, true, null.IntFrom(4)}, - {2 * time.Second, true, null.IntFrom(8)}, - {3 * time.Second, true, null.IntFrom(12)}, - {4 * time.Second, true, null.IntFrom(16)}, - {5 * time.Second, true, null.IntFrom(20)}, - - {6 * time.Second, true, null.IntFrom(18)}, - {7 * time.Second, true, null.IntFrom(16)}, - {8 * time.Second, true, null.IntFrom(14)}, - {9 * time.Second, true, null.IntFrom(12)}, - {10 * time.Second, true, null.IntFrom(10)}, - - {11 * time.Second, true, null.IntFrom(10)}, - {12 * time.Second, true, null.IntFrom(10)}, - - {13 * time.Second, true, null.IntFrom(12)}, - {14 * time.Second, true, null.IntFrom(14)}, - {15 * time.Second, true, null.IntFrom(16)}, - {16 * time.Second, true, null.IntFrom(18)}, - {17 * time.Second, true, null.IntFrom(20)}, - - {18 * time.Second, true, null.IntFrom(20)}, - {19 * time.Second, true, null.IntFrom(20)}, - - {20 * time.Second, true, null.IntFrom(18)}, - {21 * time.Second, true, null.IntFrom(16)}, - {22 * time.Second, true, null.IntFrom(14)}, - {23 * time.Second, true, null.IntFrom(12)}, - {24 * time.Second, true, null.IntFrom(10)}, - }, - }, - "mix/start": { - 5, - []lib.Stage{ - {Duration: types.NullDurationFrom(5 * time.Second)}, - {Duration: types.NullDurationFrom(5 * time.Second), Target: null.IntFrom(10)}, - }, - []checkpoint{ - {0 * time.Second, true, null.NewInt(5, false)}, - - {1 * time.Second, true, null.NewInt(5, false)}, - {2 * time.Second, true, null.NewInt(5, false)}, - {3 * time.Second, true, null.NewInt(5, false)}, - {4 * time.Second, true, null.NewInt(5, false)}, - {5 * time.Second, true, null.NewInt(5, false)}, - - {6 * time.Second, true, null.NewInt(6, true)}, - {7 * time.Second, true, null.NewInt(7, true)}, - {8 * time.Second, true, null.NewInt(8, true)}, - {9 * time.Second, true, null.NewInt(9, true)}, - {10 * time.Second, true, null.NewInt(10, true)}, - }, - }, - "infinite": { - 0, - []lib.Stage{{}}, - []checkpoint{ - {0 * time.Second, true, null.NewInt(0, false)}, - {1 * time.Minute, true, null.NewInt(0, false)}, - {1 * time.Hour, true, null.NewInt(0, false)}, - {24 * time.Hour, true, null.NewInt(0, false)}, - {365 * 24 * time.Hour, true, null.NewInt(0, false)}, - }, - }, - } - for name, data := range testdata { - t.Run(name, func(t *testing.T) { - for _, ckp := range data.Checkpoints { - t.Run(ckp.D.String(), func(t *testing.T) { - vus, keepRunning := ProcessStages(data.Start, data.Stages, ckp.D) - assert.Equal(t, ckp.VUs, vus) - assert.Equal(t, ckp.Keep, keepRunning) - }) - } - }) - } -} -*/ diff --git a/execution/pkg.go b/execution/pkg.go new file mode 100644 index 00000000000..5cf88bf880e --- /dev/null +++ b/execution/pkg.go @@ -0,0 +1,8 @@ +// Package execution contains most of the components that schedule, execute and +// control individual k6 tests. +package execution + +// TODO: move ExecutionSegment and ESS here + +// TODO: move execotors interfaces here and implementations in a sub-folder +// TODO: move the execution state here diff --git a/core/local/local.go b/execution/scheduler.go similarity index 82% rename from core/local/local.go rename to execution/scheduler.go index 61c9b1e59a1..a2e7214330e 100644 --- a/core/local/local.go +++ b/execution/scheduler.go @@ -1,4 +1,4 @@ -package local +package execution import ( "context" @@ -16,8 +16,10 @@ import ( "go.k6.io/k6/ui/pb" ) -// ExecutionScheduler is the local implementation of lib.ExecutionScheduler -type ExecutionScheduler struct { +// A Scheduler is in charge of most of the test execution - initializing VUs and +// executors, running setup() and teardown(), and actually starting the +// executors for the different scenarios at the appropriate times. +type Scheduler struct { initProgress *pb.ProgressBar executorConfigs []lib.ExecutorConfig // sorted by (startTime, ID) executors []lib.Executor // sorted by (startTime, ID), excludes executors with no work @@ -31,14 +33,11 @@ type ExecutionScheduler struct { stopVUsEmission, vusEmissionStopped chan struct{} } -// Check to see if we implement the lib.ExecutionScheduler interface -var _ lib.ExecutionScheduler = &ExecutionScheduler{} - -// NewExecutionScheduler creates and returns a new local lib.ExecutionScheduler -// instance, without initializing it beyond the bare minimum. Specifically, it -// creates the needed executor instances and a lot of state placeholders, but it -// doesn't initialize the executors and it doesn't initialize or run VUs. -func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { +// NewScheduler creates and returns a new Scheduler instance, without +// initializing it beyond the bare minimum. Specifically, it creates the needed +// executor instances and a lot of state placeholders, but it doesn't initialize +// the executors and it doesn't initialize or run VUs. +func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { options := trs.Options et, err := lib.NewExecutionTuple(options.ExecutionSegment, options.ExecutionSegmentSequence) if err != nil { @@ -78,7 +77,7 @@ func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { } } - return &ExecutionScheduler{ + return &Scheduler{ initProgress: pb.New(pb.WithConstLeft("Init")), executors: executors, executorConfigs: executorConfigs, @@ -93,48 +92,48 @@ func NewExecutionScheduler(trs *lib.TestRunState) (*ExecutionScheduler, error) { } // GetRunner returns the wrapped lib.Runner instance. -func (e *ExecutionScheduler) GetRunner() lib.Runner { // TODO: remove +func (e *Scheduler) GetRunner() lib.Runner { // TODO: remove return e.state.Test.Runner } -// GetState returns a pointer to the execution state struct for the local -// execution scheduler. It's guaranteed to be initialized and present, though -// see the documentation in lib/execution.go for caveats about its usage. The -// most important one is that none of the methods beyond the pause-related ones +// GetState returns a pointer to the execution state struct for the execution +// scheduler. It's guaranteed to be initialized and present, though see the +// documentation in lib/execution.go for caveats about its usage. The most +// important one is that none of the methods beyond the pause-related ones // should be used for synchronization. -func (e *ExecutionScheduler) GetState() *lib.ExecutionState { +func (e *Scheduler) GetState() *lib.ExecutionState { return e.state } // GetExecutors returns the slice of configured executor instances which // have work, sorted by their (startTime, name) in an ascending order. -func (e *ExecutionScheduler) GetExecutors() []lib.Executor { +func (e *Scheduler) GetExecutors() []lib.Executor { return e.executors } // GetExecutorConfigs returns the slice of all executor configs, sorted by // their (startTime, name) in an ascending order. -func (e *ExecutionScheduler) GetExecutorConfigs() []lib.ExecutorConfig { +func (e *Scheduler) GetExecutorConfigs() []lib.ExecutorConfig { return e.executorConfigs } // GetInitProgressBar returns the progress bar associated with the Init // function. After the Init is done, it is "hijacked" to display real-time // execution statistics as a text bar. -func (e *ExecutionScheduler) GetInitProgressBar() *pb.ProgressBar { +func (e *Scheduler) GetInitProgressBar() *pb.ProgressBar { return e.initProgress } // GetExecutionPlan is a helper method so users of the local execution scheduler // don't have to calculate the execution plan again. -func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep { +func (e *Scheduler) GetExecutionPlan() []lib.ExecutionStep { return e.executionPlan } // initVU is a helper method that's used to both initialize the planned VUs // in the Init() method, and also passed to executors so they can initialize // any unplanned VUs themselves. -func (e *ExecutionScheduler) initVU( +func (e *Scheduler) initVU( ctx context.Context, samplesOut chan<- metrics.SampleContainer, logger logrus.FieldLogger, ) (lib.InitializedVU, error) { // Get the VU IDs here, so that the VUs are (mostly) ordered by their @@ -151,7 +150,7 @@ func (e *ExecutionScheduler) initVU( // getRunStats is a helper function that can be used as the execution // scheduler's progressbar substitute (i.e. hijack). -func (e *ExecutionScheduler) getRunStats() string { +func (e *Scheduler) getRunStats() string { status := "running" if e.state.IsPaused() { status = "paused" @@ -169,7 +168,7 @@ func (e *ExecutionScheduler) getRunStats() string { ) } -func (e *ExecutionScheduler) initVUsConcurrently( +func (e *Scheduler) initVUsConcurrently( ctx context.Context, samplesOut chan<- metrics.SampleContainer, count uint64, concurrency int, logger logrus.FieldLogger, ) chan error { @@ -206,7 +205,7 @@ func (e *ExecutionScheduler) initVUsConcurrently( return doneInits } -func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { +func (e *Scheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- metrics.SampleContainer) { e.state.Test.Logger.Debug("Starting emission of VUs and VUsMax metrics...") tags := e.state.Test.RunTags @@ -259,7 +258,7 @@ func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- me // Init concurrently initializes all of the planned VUs and then sequentially // initializes all of the configured executors. -func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { +func (e *Scheduler) Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) (err error) { e.emitVUsAndVUsMax(ctx, samplesOut) defer func() { if err != nil { @@ -268,7 +267,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics } }() - logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-init") + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan) logger.WithFields(logrus.Fields{ "neededVUs": vusToInitialize, @@ -342,7 +341,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- metrics // executor, each time in a new goroutine. It is responsible for waiting out the // configured startTime for the specific executor and then running its Run() // method. -func (e *ExecutionScheduler) runExecutor( +func (e *Scheduler) runExecutor( runCtx context.Context, runResults chan<- error, engineOut chan<- metrics.SampleContainer, executor lib.Executor, ) { executorConfig := executor.GetConfig() @@ -389,18 +388,18 @@ func (e *ExecutionScheduler) runExecutor( runResults <- err } -// Run the ExecutionScheduler, funneling all generated metric samples through the supplied +// Run the Scheduler, funneling all generated metric samples through the supplied // out channel. // //nolint:funlen -func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { +func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metrics.SampleContainer) error { defer func() { close(e.stopVUsEmission) <-e.vusEmissionStopped }() executorsCount := len(e.executors) - logger := e.state.Test.Logger.WithField("phase", "local-execution-scheduler-run") + logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") e.initProgress.Modify(pb.WithConstLeft("Run")) var interrupted bool defer func() { @@ -487,10 +486,26 @@ func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut ch return firstErr } -// SetPaused pauses a test, if called with true. And if called with false, tries -// to start/resume it. See the lib.ExecutionScheduler interface documentation of -// the methods for the various caveats about its usage. -func (e *ExecutionScheduler) SetPaused(pause bool) error { +// SetPaused pauses the test, or start/resumes it. To check if a test is paused, +// use GetState().IsPaused(). +// +// Currently, any executor, so any test, can be started in a paused state. This +// will cause k6 to initialize all needed VUs, but it won't actually start the +// test. Later, the test can be started for real by resuming/unpausing it from +// the REST API. +// +// After a test is actually started, it may become impossible to pause it again. +// That is signaled by having SetPaused(true) return an error. The likely cause +// is that some of the executors for the test don't support pausing after the +// test has been started. +// +// IMPORTANT: Currently only the externally controlled executor can be paused +// and resumed multiple times in the middle of the test execution! Even then, +// "pausing" is a bit misleading, since k6 won't pause in the middle of the +// currently executing iterations. It will allow the currently in-progress +// iterations to finish, and it just won't start any new ones nor will it +// increment the value returned by GetCurrentTestRunDuration(). +func (e *Scheduler) SetPaused(pause bool) error { if !e.state.HasStarted() && e.state.IsPaused() { if pause { return fmt.Errorf("execution is already paused") diff --git a/core/local/k6execution_test.go b/execution/scheduler_exec_test.go similarity index 95% rename from core/local/k6execution_test.go rename to execution/scheduler_exec_test.go index c97fd15ab32..570d20e00d3 100644 --- a/core/local/k6execution_test.go +++ b/execution/scheduler_exec_test.go @@ -1,4 +1,4 @@ -package local +package execution import ( "encoding/json" @@ -18,6 +18,8 @@ import ( "go.k6.io/k6/metrics" ) +// TODO: rewrite and/or move these as integration tests to reduce boilerplate +// and improve reliability? func TestExecutionInfoVUSharing(t *testing.T) { t.Parallel() script := []byte(` @@ -81,7 +83,7 @@ func TestExecutionInfoVUSharing(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() type vuStat struct { @@ -194,7 +196,7 @@ func TestExecutionInfoScenarioIter(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) @@ -276,7 +278,7 @@ func TestSharedIterationsStable(t *testing.T) { ) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) @@ -409,7 +411,7 @@ func TestExecutionInfoAll(t *testing.T) { }, nil) require.NoError(t, err) - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() errCh := make(chan error, 1) diff --git a/core/local/local_test.go b/execution/scheduler_test.go similarity index 92% rename from core/local/local_test.go rename to execution/scheduler_test.go index dcb0ddf4a72..761725fbdd4 100644 --- a/core/local/local_test.go +++ b/execution/scheduler_test.go @@ -1,4 +1,4 @@ -package local +package execution import ( "context" @@ -56,9 +56,9 @@ func getTestRunState( } } -func newTestExecutionScheduler( +func newTestScheduler( t *testing.T, runner lib.Runner, logger *logrus.Logger, opts lib.Options, -) (ctx context.Context, cancel func(), execScheduler *ExecutionScheduler, samples chan metrics.SampleContainer) { +) (ctx context.Context, cancel func(), execScheduler *Scheduler, samples chan metrics.SampleContainer) { if runner == nil { runner = &minirunner.MiniRunner{} } @@ -73,7 +73,7 @@ func newTestExecutionScheduler( testRunState.Logger = logger } - execScheduler, err = NewExecutionScheduler(testRunState) + execScheduler, err = NewScheduler(testRunState) require.NoError(t, err) samples = make(chan metrics.SampleContainer, newOpts.MetricSamplesBufferSize.Int64) @@ -92,9 +92,9 @@ func newTestExecutionScheduler( return ctx, cancel, execScheduler, samples } -func TestExecutionSchedulerRun(t *testing.T) { +func TestSchedulerRun(t *testing.T) { t.Parallel() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, nil, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, nil, nil, lib.Options{}) defer cancel() err := make(chan error, 1) @@ -102,7 +102,7 @@ func TestExecutionSchedulerRun(t *testing.T) { assert.NoError(t, <-err) } -func TestExecutionSchedulerRunNonDefault(t *testing.T) { +func TestSchedulerRunNonDefault(t *testing.T) { t.Parallel() testCases := []struct { @@ -136,7 +136,7 @@ func TestExecutionSchedulerRunNonDefault(t *testing.T) { testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -165,7 +165,7 @@ func TestExecutionSchedulerRunNonDefault(t *testing.T) { } } -func TestExecutionSchedulerRunEnv(t *testing.T) { +func TestSchedulerRunEnv(t *testing.T) { t.Parallel() scriptTemplate := ` @@ -253,7 +253,7 @@ func TestExecutionSchedulerRunEnv(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -280,7 +280,7 @@ func TestExecutionSchedulerRunEnv(t *testing.T) { } } -func TestExecutionSchedulerSystemTags(t *testing.T) { +func TestSchedulerSystemTags(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -322,7 +322,7 @@ func TestExecutionSchedulerSystemTags(t *testing.T) { }))) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -372,7 +372,7 @@ func TestExecutionSchedulerSystemTags(t *testing.T) { } } -func TestExecutionSchedulerRunCustomTags(t *testing.T) { +func TestSchedulerRunCustomTags(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) sr := tb.Replacer.Replace @@ -453,7 +453,7 @@ func TestExecutionSchedulerRunCustomTags(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -496,7 +496,7 @@ func TestExecutionSchedulerRunCustomTags(t *testing.T) { // Ensure that custom executor settings are unique per executor and // that there's no "crossover"/"pollution" between executors. // Also test that custom tags are properly set on checks and groups metrics. -func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { +func TestSchedulerRunCustomConfigNoCrossover(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -617,7 +617,7 @@ func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -686,7 +686,7 @@ func TestExecutionSchedulerRunCustomConfigNoCrossover(t *testing.T) { require.Equal(t, 8, gotSampleTags, "received wrong amount of samples with expected tags") } -func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { +func TestSchedulerSetupTeardownRun(t *testing.T) { t.Parallel() t.Run("Normal", func(t *testing.T) { t.Parallel() @@ -702,7 +702,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{}) err := make(chan error, 1) go func() { err <- execScheduler.Run(ctx, ctx, samples) }() @@ -718,7 +718,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return nil, errors.New("setup error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{}) defer cancel() assert.EqualError(t, execScheduler.Run(ctx, ctx, samples), "setup error") }) @@ -732,7 +732,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ NoSetup: null.BoolFrom(true), VUs: null.IntFrom(1), Iterations: null.IntFrom(1), @@ -751,7 +751,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }) @@ -769,7 +769,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { return errors.New("teardown error") }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ NoTeardown: null.BoolFrom(true), VUs: null.IntFrom(1), Iterations: null.IntFrom(1), @@ -779,7 +779,7 @@ func TestExecutionSchedulerSetupTeardownRun(t *testing.T) { }) } -func TestExecutionSchedulerStages(t *testing.T) { +func TestSchedulerStages(t *testing.T) { t.Parallel() testdata := map[string]struct { Duration time.Duration @@ -815,7 +815,7 @@ func TestExecutionSchedulerStages(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(1), Stages: data.Stages, }) @@ -826,7 +826,7 @@ func TestExecutionSchedulerStages(t *testing.T) { } } -func TestExecutionSchedulerEndTime(t *testing.T) { +func TestSchedulerEndTime(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -834,7 +834,7 @@ func TestExecutionSchedulerEndTime(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, nil, lib.Options{ + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, nil, lib.Options{ VUs: null.IntFrom(10), Duration: types.NullDurationFrom(1 * time.Second), }) @@ -851,7 +851,7 @@ func TestExecutionSchedulerEndTime(t *testing.T) { assert.True(t, runTime < 10*time.Second, "took more than 10 seconds") } -func TestExecutionSchedulerRuntimeErrors(t *testing.T) { +func TestSchedulerRuntimeErrors(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -864,7 +864,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) { }, } logger, hook := logtest.NewNullLogger() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan()) @@ -883,7 +883,7 @@ func TestExecutionSchedulerRuntimeErrors(t *testing.T) { } } -func TestExecutionSchedulerEndErrors(t *testing.T) { +func TestSchedulerEndErrors(t *testing.T) { t.Parallel() exec := executor.NewConstantVUsConfig("we_need_hard_stop") @@ -901,7 +901,7 @@ func TestExecutionSchedulerEndErrors(t *testing.T) { }, } logger, hook := logtest.NewNullLogger() - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, lib.Options{}) defer cancel() endTime, isFinal := lib.GetEndOffset(execScheduler.GetExecutionPlan()) @@ -917,7 +917,7 @@ func TestExecutionSchedulerEndErrors(t *testing.T) { assert.Empty(t, hook.Entries) } -func TestExecutionSchedulerEndIterations(t *testing.T) { +func TestSchedulerEndIterations(t *testing.T) { t.Parallel() registry := metrics.NewRegistry() metric := registry.MustNewMetric("test_metric", metrics.Counter) @@ -954,7 +954,7 @@ func TestExecutionSchedulerEndIterations(t *testing.T) { defer cancel() testRunState := getTestRunState(t, getTestPreInitState(t), runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) samples := make(chan metrics.SampleContainer, 300) @@ -972,7 +972,7 @@ func TestExecutionSchedulerEndIterations(t *testing.T) { } } -func TestExecutionSchedulerIsRunning(t *testing.T) { +func TestSchedulerIsRunning(t *testing.T) { t.Parallel() runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { @@ -980,7 +980,7 @@ func TestExecutionSchedulerIsRunning(t *testing.T) { return nil }, } - ctx, cancel, execScheduler, _ := newTestExecutionScheduler(t, runner, nil, lib.Options{}) + ctx, cancel, execScheduler, _ := newTestScheduler(t, runner, nil, lib.Options{}) state := execScheduler.GetState() err := make(chan error) @@ -995,7 +995,7 @@ func TestExecutionSchedulerIsRunning(t *testing.T) { assert.NoError(t, <-err) } -// TestDNSResolver checks the DNS resolution behavior at the ExecutionScheduler level. +// TestDNSResolver checks the DNS resolution behavior at the Scheduler level. func TestDNSResolver(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -1072,7 +1072,7 @@ func TestDNSResolver(t *testing.T) { mr := mockresolver.New(nil, net.LookupIP) runner.ActualResolver = mr.LookupIPAll - ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, tc.opts) + ctx, cancel, execScheduler, samples := newTestScheduler(t, runner, logger, tc.opts) defer cancel() mr.Set("myhost", sr("HTTPBIN_IP")) @@ -1160,7 +1160,7 @@ func TestRealTimeAndSetupTeardownMetrics(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, options, runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1295,7 +1295,7 @@ func TestSetPaused(t *testing.T) { t.Run("second pause is an error", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} @@ -1308,7 +1308,7 @@ func TestSetPaused(t *testing.T) { t.Run("unpause at the start is an error", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} err = sched.SetPaused(false) @@ -1319,7 +1319,7 @@ func TestSetPaused(t *testing.T) { t.Run("second unpause is an error", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) sched.executors = []lib.Executor{pausableExecutor{err: nil}} require.NoError(t, sched.SetPaused(true)) @@ -1332,7 +1332,7 @@ func TestSetPaused(t *testing.T) { t.Run("an error on pausing is propagated", func(t *testing.T) { t.Parallel() testRunState := getTestRunState(t, getTestPreInitState(t), lib.Options{}, &minirunner.MiniRunner{}) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) expectedErr := errors.New("testing pausable executor error") sched.executors = []lib.Executor{pausableExecutor{err: expectedErr}} @@ -1351,7 +1351,7 @@ func TestSetPaused(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, getTestPreInitState(t), options, runner) - sched, err := NewExecutionScheduler(testRunState) + sched, err := NewScheduler(testRunState) require.NoError(t, err) err = sched.SetPaused(true) require.Error(t, err) @@ -1359,7 +1359,7 @@ func TestSetPaused(t *testing.T) { }) } -func TestNewExecutionSchedulerHasWork(t *testing.T) { +func TestNewSchedulerHasWork(t *testing.T) { t.Parallel() script := []byte(` import http from 'k6/http'; @@ -1406,7 +1406,7 @@ func TestNewExecutionSchedulerHasWork(t *testing.T) { require.NoError(t, err) testRunState := getTestRunState(t, piState, runner.GetOptions(), runner) - execScheduler, err := NewExecutionScheduler(testRunState) + execScheduler, err := NewScheduler(testRunState) require.NoError(t, err) assert.Len(t, execScheduler.executors, 2) diff --git a/js/runner_test.go b/js/runner_test.go index 50f267feb6e..e487fe95121 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -34,8 +34,8 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/core" - "go.k6.io/k6/core/local" "go.k6.io/k6/errext" + "go.k6.io/k6/execution" "go.k6.io/k6/js/modules/k6" k6http "go.k6.io/k6/js/modules/k6/http" k6metrics "go.k6.io/k6/js/modules/k6/metrics" @@ -384,7 +384,7 @@ func TestDataIsolation(t *testing.T) { RunTags: runner.preInitState.Registry.RootTagSet().WithTagsFromMap(options.RunTags), } - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) mockOutput := mockoutput.New() @@ -2670,7 +2670,7 @@ func TestExecutionInfo(t *testing.T) { Runner: r, } - execScheduler, err := local.NewExecutionScheduler(testRunState) + execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) ctx = lib.WithExecutionState(ctx, execScheduler.GetState()) diff --git a/lib/execution.go b/lib/execution.go index 2d21eae6864..65c66c10348 100644 --- a/lib/execution.go +++ b/lib/execution.go @@ -9,62 +9,8 @@ import ( "time" "github.com/sirupsen/logrus" - - "go.k6.io/k6/metrics" ) -// An ExecutionScheduler is in charge of initializing executors and using them -// to initialize and schedule VUs created by a wrapped Runner. It decouples how -// a swarm of VUs is controlled from the details of how or even where they're -// scheduled. -// -// The core/local execution scheduler schedules VUs on the local machine, but -// the same interface may be implemented to control a test running on a cluster -// or in the cloud. -// -// TODO: flesh out the interface after actually having more than one -// implementation... -type ExecutionScheduler interface { - // Returns the wrapped runner. May return nil if not applicable, eg. - // if we're remote controlling a test running on another machine. - GetRunner() Runner - - // Return the ExecutionState instance from which different statistics for the - // current state of the runner could be retrieved. - GetState() *ExecutionState - - // Return the instances of the configured executors - GetExecutors() []Executor - - // Init initializes all executors, including all of their needed VUs. - Init(ctx context.Context, samplesOut chan<- metrics.SampleContainer) error - - // Run the ExecutionScheduler, funneling the generated metric samples - // through the supplied out channel. - Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) error - - // Pause a test, or start/resume it. To check if a test is paused, use - // GetState().IsPaused(). - // - // Currently, any executor, so any test, can be started in a paused state. - // This will cause k6 to initialize all needed VUs, but it won't actually - // start the test. Later, the test can be started for real by - // resuming/unpausing it from the REST API. - // - // After a test is actually started, it may become impossible to pause it - // again. That is denoted by having SetPaused(true) return an error. The - // likely cause is that some of the executors for the test don't support - // pausing after the test has been started. - // - // IMPORTANT: Currently only the externally controlled executor can be - // paused and resumed multiple times in the middle of the test execution! - // Even then, "pausing" is a bit misleading, since k6 won't pause in the - // middle of the currently executing iterations. It will allow the currently - // in progress iterations to finish, and it just won't start any new ones - // nor will it increment the value returned by GetCurrentTestRunDuration(). - SetPaused(paused bool) error -} - // MaxTimeToWaitForPlannedVU specifies the maximum allowable time for an executor // to wait for a planned VU to be retrieved from the ExecutionState.PlannedVUs // buffer. If it's exceeded, k6 will emit a warning log message, since it either @@ -82,9 +28,12 @@ const MaxTimeToWaitForPlannedVU = 400 * time.Millisecond // MaxTimeToWaitForPlannedVU before we actually return an error. const MaxRetriesGetPlannedVU = 5 -// ExecutionStatus is similar to RunStatus, but more fine grained and concerns -// only local execution. -//go:generate enumer -type=ExecutionStatus -trimprefix ExecutionStatus -output execution_status_gen.go +// ExecutionStatus is used to mark the possible states of a test run at any +// given time in its execution, from its start to its finish. +// +// execution_status_gen.go +// +//go:generate enumer -type=ExecutionStatus -trimprefix ExecutionStatus -output type ExecutionStatus uint32 // Possible execution status values @@ -103,13 +52,13 @@ const ( ) // ExecutionState contains a few different things: -// - Some convenience items, that are needed by all executors, like the +// - Some convenience items, that are needed by all executors, like the // execution segment and the unique VU ID generator. By keeping those here, // we can just pass the ExecutionState to the different executors, instead of // individually passing them each item. -// - Mutable counters that different executors modify and other parts of +// - Mutable counters that different executors modify and other parts of // k6 can read, e.g. for the vus and vus_max metrics k6 emits every second. -// - Pausing controls and statistics. +// - Pausing controls and statistics. // // The counters and timestamps here are primarily meant to be used for // information extraction and avoidance of ID collisions. Using many of the @@ -498,9 +447,10 @@ func (es *ExecutionState) Resume() error { // // And, since tests won't be paused most of the time, it's // probably better to check for that like this: -// if executionState.IsPaused() { -// <-executionState.ResumeNotify() -// } +// +// if executionState.IsPaused() { +// <-executionState.ResumeNotify() +// } func (es *ExecutionState) ResumeNotify() <-chan struct{} { es.pauseStateLock.RLock() defer es.pauseStateLock.RUnlock()