Skip to content

Commit

Permalink
Populate __ENV.K6_CLOUDRUN_TEST_RUN_ID on local executions of k6 clou…
Browse files Browse the repository at this point in the history
…d run (#4092)

* Create test run before starting output.Cloud

* Create the cloud test run earlier

* Cloud local execution creates the test run before delegating

* Move the cloud test run creation into the test configuration

* Keep the compatibility w/old behavior in Cloud output

* Remove the Cloud-specific handling code of the cloud local execution

* Apply suggestions from code review

Co-authored-by: Oleg Bespalov <oleg.bespalov@grafana.com>

---------

Co-authored-by: Oleg Bespalov <oleg.bespalov@grafana.com>
  • Loading branch information
joanlopez and olegbespalov authored Dec 24, 2024
1 parent 419ea09 commit 3b15ea0
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 15 deletions.
8 changes: 4 additions & 4 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ type Config struct {
StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"`
APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"`

// PushRefID represents the test run id.
// Note: It is a legacy name used by the backend, the code in k6 open-source
// references it as test run id.
// Currently, a renaming is not planned.
// PushRefID is the identifier used by k6 Cloud to correlate all the things that
// belong to the same test run/execution. Currently, it is equivalent to the test run id.
// But, in the future, or in future solutions (e.g. Synthetic Monitoring), there might be
// no test run id, and we may still need an identifier to correlate all the things.
PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"`

// Defines the max allowed number of time series in a single batch.
Expand Down
12 changes: 12 additions & 0 deletions cmd/cloud_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ func (c *cmdCloudRun) preRun(cmd *cobra.Command, args []string) error {

func (c *cmdCloudRun) run(cmd *cobra.Command, args []string) error {
if c.localExecution {
c.runCmd.loadConfiguredTest = func(*cobra.Command, []string) (*loadedAndConfiguredTest, execution.Controller, error) {
test, err := loadAndConfigureLocalTest(c.runCmd.gs, cmd, args, getCloudRunLocalExecutionConfig)
if err != nil {
return nil, nil, fmt.Errorf("could not load and configure the test: %w", err)
}

if err := createCloudTest(c.runCmd.gs, test); err != nil {
return nil, nil, fmt.Errorf("could not create the cloud test run: %w", err)
}

return test, local.NewController(), nil
}
return c.runCmd.run(cmd, args)
}

Expand Down
178 changes: 178 additions & 0 deletions cmd/outputs_cloud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package cmd

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/metrics"
)

const (
defaultTestName = "k6 test"
testRunIDKey = "K6_CLOUDRUN_TEST_RUN_ID"
)

// createCloudTest performs some test and Cloud configuration validations and if everything
// looks good, then it creates a test run in the k6 Cloud, using the Cloud API, meant to be used
// for streaming test results.
//
// This method is also responsible for filling the test run id on the test environment, so it can be used later,
// and to populate the Cloud configuration back in case the Cloud API returned some overrides,
// as expected by the Cloud output.
//
//nolint:funlen
func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error {
// Otherwise, we continue normally with the creation of the test run in the k6 Cloud backend services.
conf, warn, err := cloudapi.GetConsolidatedConfig(
test.derivedConfig.Collectors[builtinOutputCloud.String()],
gs.Env,
"", // Historically used for -o cloud=..., no longer used (deprecated).
test.derivedConfig.Options.Cloud,
test.derivedConfig.Options.External,
)
if err != nil {
return err
}

if warn != "" {
gs.Logger.Warn(warn)
}

// If not, we continue with some validations and the creation of the test run.
if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil {
return err
}

if !conf.Name.Valid || conf.Name.String == "" {
scriptPath := test.source.URL.String()
if scriptPath == "" {
// Script from stdin without a name, likely from stdin
return errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name")
}

conf.Name = null.StringFrom(filepath.Base(scriptPath))
}
if conf.Name.String == "-" {
conf.Name = null.StringFrom(defaultTestName)
}

thresholds := make(map[string][]string)
for name, t := range test.derivedConfig.Thresholds {
for _, threshold := range t.Thresholds {
thresholds[name] = append(thresholds[name], threshold.Source)
}
}

et, err := lib.NewExecutionTuple(
test.derivedConfig.Options.ExecutionSegment,
test.derivedConfig.Options.ExecutionSegmentSequence,
)
if err != nil {
return err
}
executionPlan := test.derivedConfig.Options.Scenarios.GetFullExecutionRequirements(et)

duration, testEnds := lib.GetEndOffset(executionPlan)
if !testEnds {
return errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud")
}

if conf.MetricPushConcurrency.Int64 < 1 {
return fmt.Errorf("metrics push concurrency must be a positive number but is %d",
conf.MetricPushConcurrency.Int64)
}

if conf.MaxTimeSeriesInBatch.Int64 < 1 {
return fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d",
conf.MaxTimeSeriesInBatch.Int64)
}

var testArchive *lib.Archive
if !test.derivedConfig.NoArchiveUpload.Bool {
testArchive = test.initRunner.MakeArchive()
}

testRun := &cloudapi.TestRun{
Name: conf.Name.String,
ProjectID: conf.ProjectID.Int64,
VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)), //nolint:gosec
Thresholds: thresholds,
Duration: int64(duration / time.Second),
Archive: testArchive,
}

logger := gs.Logger.WithFields(logrus.Fields{"output": builtinOutputCloud.String()})

apiClient := cloudapi.NewClient(
logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration())

response, err := apiClient.CreateTestRun(testRun)
if err != nil {
return err
}

// We store the test run id in the environment, so it can be used later.
test.preInitState.RuntimeOptions.Env[testRunIDKey] = response.ReferenceID

// If the Cloud API returned configuration overrides, we apply them to the current configuration.
// Then, we serialize the overridden configuration back, so it can be used by the Cloud output.
if response.ConfigOverride != nil {
logger.WithFields(logrus.Fields{"override": response.ConfigOverride}).Debug("overriding config options")

raw, err := cloudConfToRawMessage(conf.Apply(*response.ConfigOverride))
if err != nil {
return fmt.Errorf("could not serialize overridden cloud configuration: %w", err)
}

if test.derivedConfig.Collectors == nil {
test.derivedConfig.Collectors = make(map[string]json.RawMessage)
}
test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw
}

return nil
}

// validateRequiredSystemTags checks if all required tags are present.
func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error {
var missingRequiredTags []string
requiredTags := metrics.SystemTagSet(metrics.TagName |
metrics.TagMethod |
metrics.TagStatus |
metrics.TagError |
metrics.TagCheck |
metrics.TagGroup)
for _, tag := range metrics.SystemTagValues() {
if requiredTags.Has(tag) && !scriptTags.Has(tag) {
missingRequiredTags = append(missingRequiredTags, tag.String())
}
}
if len(missingRequiredTags) > 0 {
return fmt.Errorf(
"the cloud output needs the following system tags enabled: %s",
strings.Join(missingRequiredTags, ", "),
)
}
return nil
}

func cloudConfToRawMessage(conf cloudapi.Config) (json.RawMessage, error) {
var buff bytes.Buffer
enc := json.NewEncoder(&buff)
if err := enc.Encode(conf); err != nil {
return nil, err
}
return buff.Bytes(), nil
}
40 changes: 35 additions & 5 deletions cmd/tests/cmd_cloud_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
"io"
"net/http"
"path/filepath"
"strconv"
"testing"

"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib/fsext"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.k6.io/k6/cloudapi"

"github.com/stretchr/testify/assert"
"go.k6.io/k6/cloudapi"
"go.k6.io/k6/cmd"
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib/fsext"
)

func TestK6CloudRun(t *testing.T) {
Expand Down Expand Up @@ -169,6 +169,36 @@ export default function() {};`
assert.Contains(t, stdout, "execution: local")
assert.Contains(t, stdout, "output: cloud (https://some.other.url/foo/tests/org/1337?bar=baz)")
})

t.Run("the script can read the test run id to the environment", func(t *testing.T) {
t.Parallel()

script := `
export const options = {
cloud: {
name: 'Hello k6 Cloud!',
projectID: 123456,
},
};
export default function() {
` + "console.log(`The test run id is ${__ENV.K6_CLOUDRUN_TEST_RUN_ID}`);" + `
};`

ts := makeTestState(t, script, []string{"--local-execution", "--log-output=stdout"}, 0)

const testRunID = 1337
srv := getCloudTestEndChecker(t, testRunID, nil, cloudapi.RunStatusFinished, cloudapi.ResultStatusPassed)
ts.Env["K6_CLOUD_HOST"] = srv.URL

cmd.ExecuteWithGlobalState(ts.GlobalState)

stdout := ts.Stdout.String()
t.Log(stdout)
assert.Contains(t, stdout, "execution: local")
assert.Contains(t, stdout, "output: cloud (https://app.k6.io/runs/1337)")
assert.Contains(t, stdout, "The test run id is "+strconv.Itoa(testRunID))
})
}

func makeTestState(tb testing.TB, script string, cliFlags []string, expExitCode exitcodes.ExitCode) *GlobalTestState {
Expand Down
3 changes: 1 addition & 2 deletions lib/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ type Options struct {
// iteration is shorter than the specified value.
MinIterationDuration types.NullDuration `json:"minIterationDuration" envconfig:"K6_MIN_ITERATION_DURATION"`

// Cloud is the config for the cloud
// formally known as ext.loadimpact
// Cloud is the configuration for the k6 Cloud, formerly known as ext.loadimpact.
Cloud json.RawMessage `json:"cloud,omitempty"`

// These values are for third party collectors' benefit.
Expand Down
15 changes: 11 additions & 4 deletions output/cloud/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/errext"
"go.k6.io/k6/lib"
Expand All @@ -17,11 +19,12 @@ import (
"go.k6.io/k6/output"
cloudv2 "go.k6.io/k6/output/cloud/expv2"
"go.k6.io/k6/usage"
"gopkg.in/guregu/null.v3"
)

// TestName is the default k6 Cloud test name
const TestName = "k6 test"
const (
defaultTestName = "k6 test"
testRunIDKey = "K6_CLOUDRUN_TEST_RUN_ID"
)

// versionedOutput represents an output implementing
// metrics samples aggregation and flushing to the
Expand Down Expand Up @@ -119,7 +122,7 @@ func newOutput(params output.Params) (*Output, error) {
conf.Name = null.StringFrom(filepath.Base(scriptPath))
}
if conf.Name.String == "-" {
conf.Name = null.StringFrom(TestName)
conf.Name = null.StringFrom(defaultTestName)
}

duration, testEnds := lib.GetEndOffset(params.ExecutionPlan)
Expand Down Expand Up @@ -147,6 +150,7 @@ func newOutput(params output.Params) (*Output, error) {
duration: int64(duration / time.Second),
logger: logger,
usage: params.Usage,
testRunID: params.RuntimeOptions.Env[testRunIDKey],
}, nil
}

Expand Down Expand Up @@ -178,6 +182,9 @@ func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error {
func (out *Output) Start() error {
if out.config.PushRefID.Valid {
out.testRunID = out.config.PushRefID.String
}

if out.testRunID != "" {
out.logger.WithField("testRunId", out.testRunID).Debug("Directly pushing metrics without init")
return out.startVersionedOutput()
}
Expand Down

0 comments on commit 3b15ea0

Please sign in to comment.