Skip to content

Commit

Permalink
Adding virt in the drivers.
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Talerico aka rook <joe.talerico@gmail.com>
  • Loading branch information
jtaleric committed Sep 4, 2024
1 parent 00dd2a3 commit b0d40db
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 178 deletions.
49 changes: 39 additions & 10 deletions cmd/k8s-netperf/k8s-netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,31 +207,57 @@ var rootCmd = &cobra.Command{
var pr result.Data
for _, driver := range requestedDrivers {
if s.HostNetwork && !nc.Service {
pr = executeWorkload(nc, s, true, driver)
pr = executeWorkload(nc, s, true, driver, false)
if len(pr.Profile) > 1 {
sr.Results = append(sr.Results, pr)
}
}
pr = executeWorkload(nc, s, false, driver)
pr = executeWorkload(nc, s, false, driver, false)
if len(pr.Profile) > 1 {
sr.Results = append(sr.Results, pr)
}
}
}
} else {
log.Info("Connecting via ssh to the VMI")
err = k8s.SSHConnect(&s)
client, err := k8s.SSHConnect(&s)
if err != nil {
log.Fatal(err)
}
s.SSHClient = client
for _, nc := range s.Configs {
// Determine the metric for the test
metric := string("OP/s")
if strings.Contains(nc.Profile, "STREAM") {
metric = "Mb/s"
}
nc.Metric = metric
nc.AcrossAZ = acrossAZ
// No need to run hostNetwork through Service.
var pr result.Data
for _, driver := range requestedDrivers {
if s.HostNetwork && !nc.Service {
pr = executeWorkload(nc, s, true, driver, true)
if len(pr.Profile) > 1 {
sr.Results = append(sr.Results, pr)
}
}
pr = executeWorkload(nc, s, false, driver, true)
if len(pr.Profile) > 1 {
sr.Results = append(sr.Results, pr)
}
}
}
}

if pavail {
for i, npr := range sr.Results {
sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ServerPodCPU, _ = metrics.TopPodCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime)
if len(npr.ClientNodeInfo.Hostname) > 0 && len(npr.ServerNodeInfo.Hostname) > 0 {
sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime)
sr.Results[i].ServerPodCPU, _ = metrics.TopPodCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime)
}
}
}

Expand Down Expand Up @@ -332,7 +358,10 @@ func cleanup(client *kubernetes.Clientset) {
}

// executeWorkload executes the workload and returns the result data.
func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, driverName string) result.Data {
func executeWorkload(nc config.Config,
s config.PerfScenarios,
hostNet bool,
driverName string, virt bool) result.Data {
serverIP := ""
Client := s.Client
var driver drivers.Driver
Expand Down Expand Up @@ -387,7 +416,7 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, dri
log.Warnf("Test %s is not supported with driver %s. Skipping.", nc.Profile, npr.Driver)
return npr
}
r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP, &s)
if err != nil {
log.Fatal(err)
}
Expand All @@ -399,7 +428,7 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, dri
// Retry the current test.
for try < retry {
log.Warn("Rerunning test.")
r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP, &s)
if err != nil {
log.Error(err)
continue
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"regexp"

kubevirtv1 "github.com/cloud-bulldozer/k8s-netperf/pkg/kubevirt/client-go/clientset/versioned/typed/core/v1"
"github.com/melbahja/goph"
apiv1 "k8s.io/api/core/v1"

log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
Expand Down Expand Up @@ -50,6 +51,7 @@ type PerfScenarios struct {
ClientSet *kubernetes.Clientset
KClient *kubevirtv1.KubevirtV1Client
DClient *dynamic.DynamicClient
SSHClient *goph.Client
}

// Tests we will support in k8s-netperf
Expand Down
2 changes: 1 addition & 1 deletion pkg/drivers/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type Driver interface {
IsTestSupported(string) bool
Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error)
Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string, perf *config.PerfScenarios) (bytes.Buffer, error)
ParseResults(stdout *bytes.Buffer) (sample.Sample, error)
}

Expand Down
173 changes: 117 additions & 56 deletions pkg/drivers/iperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"encoding/json"

Expand Down Expand Up @@ -51,7 +52,11 @@ func (i *iperf3) IsTestSupported(test string) bool {
}

// Run will invoke iperf3 in a client container
func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
func (i *iperf3) Run(c *kubernetes.Clientset,
rc rest.Config,
nc config.Config,
client apiv1.PodList,
serverIP string, perf *config.PerfScenarios) (bytes.Buffer, error) {
var stdout, stderr bytes.Buffer
id := uuid.New()
file := fmt.Sprintf("/tmp/iperf-%s", id.String())
Expand Down Expand Up @@ -85,68 +90,124 @@ func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config,
}
}
log.Debug(cmd)
req := c.CoreV1().RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&apiv1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL())
if err != nil {
return stdout, err
}
// Connect this process' std{in,out,err} to the remote shell process.
err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return stdout, err
if !perf.VM {
req := c.CoreV1().RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&apiv1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL())
if err != nil {
return stdout, err
}
// Connect this process' std{in,out,err} to the remote shell process.
err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return stdout, err
}
} else {
retry := 3
present := false
sshclient, err := k8s.SSHConnect(perf)
if err != nil {
return stdout, err
}
for i := 0; i <= retry; i++ {
log.Infof("⏰ Waiting for iperf3 to be present on VM")
_, err = sshclient.Run("until iperf3 -h; do sleep 30; done")
if err != nil {
time.Sleep(10 * time.Second)
continue
} else {
present = true
break
}
}
if !present {
sshclient.Close()
return stdout, fmt.Errorf("iperf3 binary is not present on the VM")
}
var stdout []byte
ran := false
for i := 0; i <= retry; i++ {
stdout, err = sshclient.Run(strings.Join(cmd[:], " "))
if err != nil {
log.Debugf("Failed running command %s", err)
log.Debugf("⏰ Retrying iperf3 command -- cloud-init still finishing up")
time.Sleep(60 * time.Second)
continue
} else {
ran = true
break
}
}
sshclient.Close()
if !ran {
return *bytes.NewBuffer(stdout), fmt.Errorf("Unable to run iperf3")
}
}

//Empty buffer
stdout = bytes.Buffer{}
stderr = bytes.Buffer{}

req = c.CoreV1().RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&apiv1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: []string{"cat", file},
Stdin: false,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err = remotecommand.NewSPDYExecutor(&rc, "POST", req.URL())
if err != nil {
return stdout, err
}
// Connect this process' std{in,out,err} to the remote shell process.
err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return stdout, err
if !perf.VM {
req := c.CoreV1().RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&apiv1.PodExecOptions{
Container: pod.Spec.Containers[0].Name,
Command: []string{"cat", file},
Stdin: false,
Stdout: true,
Stderr: true,
TTY: true,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL())
if err != nil {
return stdout, err
}
// Connect this process' std{in,out,err} to the remote shell process.
err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return stdout, err
}
log.Debug(strings.TrimSpace(stdout.String()))
return stdout, nil
} else {
sshclient, err := k8s.SSHConnect(perf)
if err != nil {
return stdout, err
}
stdout, err := sshclient.Run(fmt.Sprintf("cat %s", file))
if err != nil {
sshclient.Close()
return *bytes.NewBuffer(stdout), err
}
log.Debug(strings.TrimSpace(bytes.NewBuffer(stdout).String()))
sshclient.Close()
return *bytes.NewBuffer(stdout), nil
}

log.Debug(strings.TrimSpace(stdout.String()))
return stdout, nil
}

// ParseResults accepts the stdout from the execution of the benchmark.
Expand Down
Loading

0 comments on commit b0d40db

Please sign in to comment.