Skip to content

Commit

Permalink
Add benchmark workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 27, 2024
1 parent 73130b2 commit 53c57f2
Show file tree
Hide file tree
Showing 8 changed files with 461 additions and 63 deletions.
54 changes: 54 additions & 0 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
name: Benchmark

on:
pull_request:
paths:
- pkg/**/*
- cmd/**/*
- test/**/*
- hack/**/*
- kustomize/**/*
- go.mod
- .github/workflows/benchmark.yaml
- '!hack/releases-helm-chart.sh'
push:
paths:
- pkg/**/*
- cmd/**/*
- test/**/*
- hack/**/*
- kustomize/**/*
- go.mod
- .github/workflows/benchmark.yaml
- '!hack/releases-helm-chart.sh'

env:
CGO_ENABLED: "0"
GO_VERSION: "1.23.0"

jobs:
benchmark:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}

- name: Test Benchmark Hack
shell: bash
run: |
./hack/e2e-test.sh e2e/kwokctl/benchmark-hack
- name: Test Benchmark
shell: bash
run: |
./hack/e2e-test.sh e2e/kwokctl/benchmark
- name: Upload logs
uses: actions/upload-artifact@v4
if: failure()
with:
name: kwok-logs-benchmark
path: ${{ github.workspace }}/logs
6 changes: 0 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,6 @@ jobs:
fi
./hack/e2e-test.sh kwokctl/kwokctl_${{ matrix.kwokctl-runtime }}
- name: Test Benchmark
if: ${{ matrix.os == 'ubuntu-latest' && matrix.kwokctl-runtime == 'binary' }}
shell: bash
run: |
./hack/e2e-test.sh e2e/kwokctl/benchmark
- name: Test Auto Detect
if: ${{ matrix.kwokctl-runtime == 'binary' }}
shell: bash
Expand Down
3 changes: 3 additions & 0 deletions pkg/kwok/server/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func (s *Server) InstallProfilingHandler(enableProfilingLogHandler bool, enableC

// Setup pprof handlers.
s.restfulCont.Handle(pprofBasePath, http.HandlerFunc(pprof.Index))
s.restfulCont.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
s.restfulCont.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
s.restfulCont.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
if enableContentionProfiling {
runtime.SetBlockProfileRate(1)
}
Expand Down
164 changes: 107 additions & 57 deletions test/e2e/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,101 +17,150 @@ limitations under the License.
package e2e

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"testing"
"time"

"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"
)

func waitResource(ctx context.Context, t *testing.T, kwokctlPath, name, resource, reason string, want, gap, tolerance int) error {
var prev int
for {
if ctx.Err() != nil {
return ctx.Err()
}
cmd := exec.Command(kwokctlPath, "--name", name, "kubectl", "get", "--no-headers", resource) // #nosec G204
output, err := cmd.Output()
func waitResource(ctx context.Context, t *testing.T, kwokctlPath, name, resource, reason string, want, gap, tolerance int, startFunc func() error) error {
watchCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()
cmd := exec.CommandContext(watchCtx, kwokctlPath, "--name", name, "kubectl", "get", "--no-headers", "--watch", resource)
cmd.Stderr = os.Stderr
pr, err := cmd.StdoutPipe()
if err != nil {
return err
}

err = cmd.Start()
if err != nil {
return err
}

time.Sleep(1 * time.Second)
if startFunc != nil {
err = startFunc()
if err != nil {
return err
}
raw := string(output)
got := strings.Count(raw, reason)
}

uniq := map[string]int{}
prev := 0
got := 0
var latestTime time.Time
reader := bufio.NewReader(pr)
for {
line, _, err := reader.ReadLine()
if err != nil {
if err != io.EOF {
return err
}
cancel()
watchCtx, cancel = context.WithCancel(ctx)
cmd = exec.CommandContext(watchCtx, kwokctlPath, "--name", name, "kubectl", "get", "--no-headers", "--watch", resource)
cmd.Stderr = os.Stderr
pr, err = cmd.StdoutPipe()
if err != nil {
return err
}

err = cmd.Start()
if err != nil {
return err
}

reader = bufio.NewReader(pr)
t.Logf("EOF: %s %d => %d, %v\n", resource, got, want, latestTime)
continue
}

key := string(line[:bytes.IndexByte(line, byte(' '))])

_, ok := uniq[key]
if !ok {
uniq[key] = 0
}

if uniq[key] == 0 {
if bytes.Contains(line, []byte(reason)) {
uniq[key] = 1
got++
}
}

if got == want {
t.Logf("%s %d, %v\n", resource, got, latestTime)
return nil
}
all := strings.Count(raw, "\n")
t.Logf("%s %d/%d => %d\n", resource, got, all, want)

if time.Since(latestTime) < time.Second {
continue
}

latestTime = time.Now()

all := len(uniq)

t.Logf("%s %d/%d => %d, %v\n", resource, got, all, want, latestTime)
if prev != 0 && got == prev {
return fmt.Errorf("resource %s not changed", resource)
}
prev = got
if gap != 0 && got != 0 && (all-got) > gap {
if tolerance > 0 {
t.Logf("Error %s gap too large, actual: %d, expected: %d, retrying...\n", resource, all-got, gap)
tolerance--
} else {
t.Logf("Error %s gap too large, actual: %d, expected: %d\n", resource, all-got, gap)
return fmt.Errorf("gap too large for resource %s", resource)
}
//if tolerance > 0 {
t.Logf("Error %s gap too large, actual: %d, expected: %d, retrying...\n", resource, all-got, gap)
// tolerance--
//} else {
// t.Logf("Error %s gap too large, actual: %d, expected: %d\n", resource, all-got, gap)
// return fmt.Errorf("gap too large for resource %s", resource)
//}
}
time.Sleep(1 * time.Second)
}
}

func scaleCreatePod(ctx context.Context, t *testing.T, kwokctlPath string, name string, size int) error {
cmd := exec.CommandContext(ctx, kwokctlPath, "--name", name, "kubectl", "get", "node", "-o", "jsonpath={.items.*.metadata.name}") // #nosec G204
out, err := cmd.Output()
if err != nil {
return fmt.Errorf("failed to run command: %w", err)
}
nodeName := ""
nodes := strings.Split(string(out), " ")
for _, node := range nodes {
if strings.Contains(node, "fake-") {
nodeName = node
break
}
}
if nodeName == "" {
return fmt.Errorf("no fake- node found")
}

nodeName := "fake-node-000000"
scaleCmd := exec.CommandContext(ctx, kwokctlPath, "--name", name, "scale", "pod", "fake-pod", "--replicas", strconv.Itoa(size), "--param", fmt.Sprintf(".nodeName=%q", nodeName)) // #nosec G204
if err := scaleCmd.Start(); err != nil {
return fmt.Errorf("failed to start scale command: %w", err)
}
scaleCmd.Stdout = os.Stderr
scaleCmd.Stderr = os.Stderr

if err := waitResource(ctx, t, kwokctlPath, name, "Pod", "Running", size, 5, 1); err != nil {
if err := waitResource(ctx, t, kwokctlPath, name, "Pod", "Running", size, 5, 5, scaleCmd.Start); err != nil {
return fmt.Errorf("failed to wait for resource: %w", err)
}

return nil
}

func scaleDeletePod(ctx context.Context, t *testing.T, kwokctlPath string, name string, size int) error {
scaleCmd := exec.CommandContext(ctx, kwokctlPath, "--name", name, "scale", "pod", "fake-pod", "--replicas", strconv.Itoa(size)) // #nosec G204
if err := scaleCmd.Start(); err != nil {
return fmt.Errorf("failed to start scale command: %w", err)
}
scaleCmd := exec.CommandContext(ctx, kwokctlPath, "--name", name, "scale", "pod", "fake-pod", "--replicas", strconv.Itoa(0)) // #nosec G204
scaleCmd.Stdout = os.Stderr
scaleCmd.Stderr = os.Stderr

if err := waitResource(ctx, t, kwokctlPath, name, "Pod", "fake-pod-", size, 0, 0); err != nil {
if err := waitResource(ctx, t, kwokctlPath, name, "Pod", "Terminating", size, 0, 0, scaleCmd.Start); err != nil {
return fmt.Errorf("failed to wait for resource: %w", err)
}
return nil
}

func scaleCreateNode(ctx context.Context, t *testing.T, kwokctlPath string, name string, size int) error {
scaleCmd := exec.CommandContext(ctx, kwokctlPath, "--name", name, "scale", "node", "fake-node", "--replicas", strconv.Itoa(size)) // #nosec G204
if err := scaleCmd.Start(); err != nil {
return fmt.Errorf("failed to start scale command: %w", err)
}
scaleCmd.Stdout = os.Stderr
scaleCmd.Stderr = os.Stderr

if err := waitResource(ctx, t, kwokctlPath, name, "Node", "Ready", size, 10, 5); err != nil {
if err := waitResource(ctx, t, kwokctlPath, name, "Node", "Ready", size, 10, 30, scaleCmd.Start); err != nil {
return fmt.Errorf("failed to wait for resource: %w", err)
}
return nil
Expand All @@ -120,30 +169,31 @@ func scaleCreateNode(ctx context.Context, t *testing.T, kwokctlPath string, name
func CaseBenchmark(kwokctlPath, clusterName string) *features.FeatureBuilder {
return features.New("Benchmark").
Assess("Create nodes", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
ctx0, cancel := context.WithTimeout(ctx, 120*time.Second)
ctx0, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

err := scaleCreateNode(ctx0, t, kwokctlPath, clusterName, 2000)
err := scaleCreateNode(ctx0, t, kwokctlPath, clusterName, 5000)
if err != nil {
t.Fatal(err)
}

return ctx
}).
Assess("Create pods", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
ctx0, cancel := context.WithTimeout(ctx, 240*time.Second)
ctx0, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

err := scaleCreatePod(ctx0, t, kwokctlPath, clusterName, 5000)
err := scaleCreatePod(ctx0, t, kwokctlPath, clusterName, 10000)
if err != nil {
t.Fatal(err)
}
return ctx
}).
Assess("Delete pods", func(ctx context.Context, t *testing.T, c *envconf.Config) context.Context {
ctx0, cancel := context.WithTimeout(ctx, 240*time.Second)
ctx0, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

err := scaleDeletePod(ctx0, t, kwokctlPath, clusterName, 0)
err := scaleDeletePod(ctx0, t, kwokctlPath, clusterName, 10000)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 53c57f2

Please sign in to comment.