Skip to content

Commit

Permalink
Merge branch 'kubernetes:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hakuna-matatah authored Apr 11, 2024
2 parents 41673b0 + 5f61547 commit e16f655
Show file tree
Hide file tree
Showing 75 changed files with 2,083 additions and 259 deletions.
2 changes: 1 addition & 1 deletion clusterloader2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/stretchr/testify v1.9.0
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/oauth2 v0.0.0-20210323180902-22b0adad7558
golang.org/x/sync v0.4.0
golang.org/x/sync v0.7.0
golang.org/x/time v0.5.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.22.15
Expand Down
4 changes: 2 additions & 2 deletions clusterloader2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1189,8 +1189,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
46 changes: 29 additions & 17 deletions clusterloader2/pkg/measurement/common/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (
)

const (
execName = "Exec"
defaultTimeoutString = "1h"
execName = "Exec"
defaultTimeout = 1 * time.Hour
defaultBackoffDelay = 1 * time.Second
)

func init() {
Expand All @@ -47,11 +48,7 @@ func createExecMeasurement() measurement.Measurement {
type execMeasurement struct{}

func (e *execMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
timeoutStr, err := util.GetStringOrDefault(config.Params, "timeout", defaultTimeoutString)
if err != nil {
return nil, err
}
timeout, err := time.ParseDuration(timeoutStr)
timeout, err := util.GetDurationOrDefault(config.Params, "timeout", defaultTimeout)
if err != nil {
return nil, err
}
Expand All @@ -62,22 +59,37 @@ func (e *execMeasurement) Execute(config *measurement.Config) ([]measurement.Sum
if len(command) == 0 {
return nil, fmt.Errorf("command is a required argument. Got empty slice instead")
}

retries, err := util.GetIntOrDefault(config.Params, "retries", 1)
if err != nil || retries < 1 {
return nil, fmt.Errorf("error getting retries, retries: %v, err: %v", retries, err)
}
backoffDelay, err := util.GetDurationOrDefault(config.Params, "backoffDelay", defaultBackoffDelay)
if err != nil {
return nil, err
}
// Make a copy of command, to avoid overriding a slice we don't own.
command = append([]string{}, command...)
for i := range command {
command[i] = os.ExpandEnv(command[i])
}
klog.V(2).Infof("Running %v with timeout %v", command, timeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
out, err := cmd.CombinedOutput()
klog.V(2).Infof("Exec command output: %v", string(out))
if err != nil {
return nil, fmt.Errorf("command %v failed: %v", command, err)
var lastErr error
for i := 0; i < retries; i++ {
klog.V(2).Infof("Running %v with timeout %v, attempt %v", command, timeout, i+1)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := exec.CommandContext(ctx, command[0], command[1:]...)
out, err := cmd.CombinedOutput()
klog.V(2).Infof("Exec command output: %v", string(out))
if err == nil {
klog.V(2).Infof("Command %v succeeded in attempt %v", command, i+1)
return nil, nil
}
klog.V(2).Infof("Command %v failed in attempt %v: %v", command, i+1, err)
lastErr = err
time.Sleep(backoffDelay)
}
return nil, nil
// All attempts failed.
return nil, fmt.Errorf("command %v failed: %v", command, lastErr)
}

func (e *execMeasurement) Dispose() {}
Expand Down
179 changes: 179 additions & 0 deletions clusterloader2/pkg/measurement/common/neg_latency_measurement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"fmt"
"math"
"strconv"
"time"

"github.com/prometheus/common/model"
"k8s.io/klog/v2"
"k8s.io/perf-tests/clusterloader2/pkg/errors"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)

const (
negLatencyMeasurementName = "NegLatency"
negInitLatencyQuery = `sum(sum_over_time(neg_controller_neg_initialization_duration_seconds_bucket[%v])) by (le)`
negOpLatencyQuery = `sum(sum_over_time(neg_controller_neg_operation_duration_seconds_bucket{operation="%v"}[%v])) by (le)`
negInitializationMetricName = "neg_initialization"
negAttachOpMetricName = "neg_attach_endpoint"
negDetachOpMetricName = "neg_detach_endpoint"
negAttachOpKey = "Attach"
negDetachOpKey = "Detach"
negInfBucketSLO = "+Inf"

negQueryInterval = 10 * time.Minute

defaultNegInitBucketSLO float64 = 16
defaultNegInitPercentileSLO float64 = 95
defaultNegAttachBucketSLO float64 = 64
defaultNegAttachPercentileSLO float64 = 95
defaultNegDetachBucketSLO float64 = 32
defaultNegDetachPercentileSLO float64 = 95
)

func init() {
create := func() measurement.Measurement {
return CreatePrometheusMeasurement(&negLatencyGatherer{})
}
if err := measurement.Register(negLatencyMeasurementName, create); err != nil {
klog.Fatalf("Cannot register %s: %v", negLatencyMeasurementName, err)
}
}

type negLatencyGatherer struct{}

type negLatencyMetricMap map[string]map[string]int

func (g *negLatencyGatherer) Gather(executor QueryExecutor, startTime, endTime time.Time, config *measurement.Config) ([]measurement.Summary, error) {
negMetrics := g.gatherMetrics(executor, startTime, endTime)

content, err := util.PrettyPrintJSON(negMetrics)
if err != nil {
return nil, err
}

summaries := []measurement.Summary{measurement.CreateSummary(negLatencyMeasurementName, "json", content)}
return summaries, validateNegSLOs(negMetrics, config)
}

func validateNegSLOs(result negLatencyMetricMap, config *measurement.Config) error {
if err := validateNegSLO(result, negInitializationMetricName, "negInitBucketSLO", "negInitPercentileSLO", defaultNegInitBucketSLO, defaultNegInitPercentileSLO, config); err != nil {
return err
}
if err := validateNegSLO(result, negAttachOpMetricName, "negAttachBucketSLO", "negAttachPercentileSLO", defaultNegAttachBucketSLO, defaultNegAttachPercentileSLO, config); err != nil {
return err
}
if err := validateNegSLO(result, negDetachOpMetricName, "negDetachBucketSLO", "negDetachPercentileSLO", defaultNegDetachBucketSLO, defaultNegDetachPercentileSLO, config); err != nil {
return err
}
return nil
}

func validateNegSLO(result negLatencyMetricMap, metricName, bucketName, percentileName string, defaultBucketSLO, defaultPercentileSLO float64, config *measurement.Config) error {
bucketNumSLO, err := util.GetFloat64OrDefault(config.Params, bucketName, defaultBucketSLO)
if err != nil || bucketNumSLO == 0 {
klog.V(2).Infof("Using default value for %s: %f, because %s param is invalid: %v", bucketName, defaultBucketSLO, bucketName, err)
bucketNumSLO = defaultBucketSLO
}
bucketSLO := strconv.FormatFloat(bucketNumSLO, 'g', -1, 64)

percentileSLO, err := util.GetFloat64OrDefault(config.Params, percentileName, defaultPercentileSLO)
if err != nil || percentileSLO == 0 {
klog.V(2).Infof("Using default value for %s: %f, because %s param is invalid: %v", percentileName, defaultPercentileSLO, percentileName, err)
percentileSLO = defaultPercentileSLO
}

if buckets, ok := result[metricName]; ok {
totalEvents := buckets[negInfBucketSLO]
if totalEvents != 0 {
acceptedEvents := buckets[bucketSLO]
perc := (float64(acceptedEvents) / float64(totalEvents)) * 100
if perc < percentileSLO {
return errors.NewMetricViolationError(
metricName,
fmt.Sprintf("Updates for %ss latency is within %d%%, expected %d%%, buckets:\n%v",
bucketSLO,
int(math.Floor(perc)),
int(math.Floor(percentileSLO)),
buckets,
),
)
}
}
}
return nil
}

func (g *negLatencyGatherer) gatherMetrics(executor QueryExecutor, startTime, endTime time.Time) negLatencyMetricMap {
result := make(negLatencyMetricMap)
gatherAndProcessSamples := func(metricName, queryTpl string, queryParams ...interface{}) {
var samples []*model.Sample
prevQueryTime := startTime
currQueryTime := startTime.Add(negQueryInterval)

for {
if currQueryTime.After(endTime) {
currQueryTime = endTime
}
interval := currQueryTime.Sub(prevQueryTime)
promDuration := measurementutil.ToPrometheusTime(interval)
query := fmt.Sprintf(queryTpl, append(queryParams, promDuration)...)
newSamples, err := executor.Query(query, currQueryTime)
if err == nil {
samples = append(samples, newSamples...)
} else {
klog.V(2).Infof("Got error querying Prometheus: %v", err)
}
if currQueryTime == endTime {
break
}
prevQueryTime = currQueryTime
currQueryTime = currQueryTime.Add(queryInterval)
}

for _, sample := range samples {
bucket, value := string(sample.Metric["le"]), int(math.Round(float64(sample.Value)))
if _, ok := result[metricName]; !ok {
result[metricName] = make(map[string]int)
}
result[metricName][bucket] = value
}
}

gatherAndProcessSamples(negInitializationMetricName, negInitLatencyQuery)
gatherAndProcessSamples(negAttachOpMetricName, negOpLatencyQuery, negAttachOpKey)
gatherAndProcessSamples(negDetachOpMetricName, negOpLatencyQuery, negDetachOpKey)
return result
}

func (g *negLatencyGatherer) Configure(_ *measurement.Config) error {
return nil
}

func (g *negLatencyGatherer) IsEnabled(_ *measurement.Config) bool {
return true
}

func (*negLatencyGatherer) String() string {
return negLatencyMeasurementName
}
Loading

0 comments on commit e16f655

Please sign in to comment.