Skip to content

Commit

Permalink
Do not stop REST API before outputs are flushed
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Jan 24, 2025
1 parent 8354419 commit 45f5a03
Showing 1 changed file with 49 additions and 49 deletions.
98 changes: 49 additions & 49 deletions internal/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,55 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
runAbort(err)
})
samples := make(chan metrics.SampleContainer, test.derivedConfig.MetricSamplesBufferSize.Int64)
// Spin up the REST API server, if not disabled.
if c.gs.Flags.Address != "" { //nolint:nestif
initBar.Modify(pb.WithConstProgress(0, "Init API server"))

// We cannot use backgroundProcesses here, since we need the REST API to
// be down before we can close the samples channel above and finish the
// processing the metrics pipeline.
apiWG := &sync.WaitGroup{}
apiWG.Add(2)
defer apiWG.Wait()

srvCtx, srvCancel := context.WithCancel(globalCtx)
defer srvCancel()

srv := api.GetServer(
runCtx,
c.gs.Flags.Address, c.gs.Flags.ProfilingEnabled,
testRunState,
samples,
metricsEngine,
execScheduler,
)
go func() {
defer apiWG.Done()
logger.Debugf("Starting the REST API server on %s", c.gs.Flags.Address)
if c.gs.Flags.ProfilingEnabled {
logger.Debugf("Profiling exposed on http://%s/debug/pprof/", c.gs.Flags.Address)
}
if aerr := srv.ListenAndServe(); aerr != nil && !errors.Is(aerr, http.ErrServerClosed) {
// Only exit k6 if the user has explicitly set the REST API address
if cmd.Flags().Lookup("address").Changed {
logger.WithError(aerr).Error("Error from API server")
c.gs.OSExit(int(exitcodes.CannotStartRESTAPI))
} else {
logger.WithError(aerr).Warn("Error from API server")
}
}
}()
go func() {
defer apiWG.Done()
<-srvCtx.Done()
shutdCtx, shutdCancel := context.WithTimeout(globalCtx, 1*time.Second)
defer shutdCancel()
if aerr := srv.Shutdown(shutdCtx); aerr != nil {
logger.WithError(aerr).Debug("REST API server did not shut down correctly")
}
}()
}

waitOutputsFlushed, stopOutputs, err := outputManager.Start(samples)
if err != nil {
return err
Expand Down Expand Up @@ -291,55 +340,6 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
logger.Debug("Metrics and traces processing finished!")
}()

// Spin up the REST API server, if not disabled.
if c.gs.Flags.Address != "" { //nolint:nestif
initBar.Modify(pb.WithConstProgress(0, "Init API server"))

// We cannot use backgroundProcesses here, since we need the REST API to
// be down before we can close the samples channel above and finish the
// processing the metrics pipeline.
apiWG := &sync.WaitGroup{}
apiWG.Add(2)
defer apiWG.Wait()

srvCtx, srvCancel := context.WithCancel(globalCtx)
defer srvCancel()

srv := api.GetServer(
runCtx,
c.gs.Flags.Address, c.gs.Flags.ProfilingEnabled,
testRunState,
samples,
metricsEngine,
execScheduler,
)
go func() {
defer apiWG.Done()
logger.Debugf("Starting the REST API server on %s", c.gs.Flags.Address)
if c.gs.Flags.ProfilingEnabled {
logger.Debugf("Profiling exposed on http://%s/debug/pprof/", c.gs.Flags.Address)
}
if aerr := srv.ListenAndServe(); aerr != nil && !errors.Is(aerr, http.ErrServerClosed) {
// Only exit k6 if the user has explicitly set the REST API address
if cmd.Flags().Lookup("address").Changed {
logger.WithError(aerr).Error("Error from API server")
c.gs.OSExit(int(exitcodes.CannotStartRESTAPI))
} else {
logger.WithError(aerr).Warn("Error from API server")
}
}
}()
go func() {
defer apiWG.Done()
<-srvCtx.Done()
shutdCtx, shutdCancel := context.WithTimeout(globalCtx, 1*time.Second)
defer shutdCancel()
if aerr := srv.Shutdown(shutdCtx); aerr != nil {
logger.WithError(aerr).Debug("REST API server did not shut down correctly")
}
}()
}

printExecutionDescription(
c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs,
)
Expand Down

0 comments on commit 45f5a03

Please sign in to comment.