diff --git a/README.md b/README.md
index 1ff3a027..8ce8c1a6 100644
--- a/README.md
+++ b/README.md
@@ -62,6 +62,7 @@ Flags:
--debug Enable debug log
-h, --help help for k8s-netperf
--iperf Use iperf3 as load driver (along with netperf)
+ --uperf Use uperf as load driver (along with netperf)
--json Instead of human-readable output, return JSON to stdout
--local Run network performance tests with Server-Pods/Client-Pods on the same Node
--metrics Show all system metrics retrieved from prom
@@ -124,16 +125,22 @@ $ ./k8s-netperf --tcp-tolerance 1
+-------------------+---------+------------+-------------+--------------+---------+--------------+-----------+----------+---------+--------------------+
| 📊 Stream Results | netperf | TCP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 2661.006667 (Mb/s) |
| 📊 Stream Results | iperf3 | TCP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 2483.078229 (Mb/s) |
+| 📊 Stream Results | uperf | TCP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 2581.705097 (Mb/s) |
| 📊 Stream Results | netperf | TCP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 2702.230000 (Mb/s) |
| 📊 Stream Results | iperf3 | TCP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 2523.434069 (Mb/s) |
+| 📊 Stream Results | uperf | TCP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 2567.665412 (Mb/s) |
| 📊 Stream Results | netperf | TCP_STREAM | 1 | true | false | 8192 | false | 10 | 3 | 2697.276667 (Mb/s) |
| 📊 Stream Results | iperf3 | TCP_STREAM | 1 | true | false | 8192 | false | 10 | 3 | 2542.793728 (Mb/s) |
+| 📊 Stream Results | uperf | TCP_STREAM | 1 | true | false | 8192 | false | 10 | 3 | 2571.881579 (Mb/s) |
| 📊 Stream Results | netperf | TCP_STREAM | 1 | false | false | 8192 | false | 10 | 3 | 2707.076667 (Mb/s) |
| 📊 Stream Results | iperf3 | TCP_STREAM | 1 | false | false | 8192 | false | 10 | 3 | 2604.067072 (Mb/s) |
+| 📊 Stream Results | uperf | TCP_STREAM | 1 | false | false | 8192 | false | 10 | 3 | 2687.276667 (Mb/s) |
| 📊 Stream Results | netperf | UDP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 1143.926667 (Mb/s) |
| 📊 Stream Results | iperf3 | UDP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 1202.428288 (Mb/s) |
+| 📊 Stream Results | uperf | UDP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 1242.059988 (Mb/s) |
| 📊 Stream Results | netperf | UDP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 1145.066667 (Mb/s) |
| 📊 Stream Results | iperf3 | UDP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 1239.580672 (Mb/s) |
+| 📊 Stream Results | uperf | UDP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 1261.840000 (Mb/s) |
+-------------------+---------+------------+-------------+--------------+---------+--------------+-----------+----------+---------+--------------------+
+---------------+---------+----------+-------------+--------------+---------+--------------+-----------+----------+---------+---------------------+
| RESULT TYPE | DRIVER | SCENARIO | PARALLELISM | HOST NETWORK | SERVICE | MESSAGE SIZE | SAME NODE | DURATION | SAMPLES | AVG VALUE |
diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go
index 3a0324e2..0768f046 100644
--- a/cmd/k8s-netperf/k8s-netperf.go
+++ b/cmd/k8s-netperf/k8s-netperf.go
@@ -20,6 +20,7 @@ import (
"github.com/cloud-bulldozer/k8s-netperf/pkg/netperf"
result "github.com/cloud-bulldozer/k8s-netperf/pkg/results"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
+ uperf_driver "github.com/cloud-bulldozer/k8s-netperf/pkg/uperf"
"github.com/google/uuid"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -36,6 +37,7 @@ var (
nl bool
clean bool
iperf3 bool
+ uperf bool
acrossAZ bool
full bool
debug bool
@@ -158,24 +160,36 @@ var rootCmd = &cobra.Command{
if s.HostNetwork {
// No need to run hostNetwork through Service.
if !nc.Service {
- npr := executeWorkload(nc, s, true, false)
+ npr := executeWorkload(nc, s, true, false, false)
sr.Results = append(sr.Results, npr)
if iperf3 {
- ipr := executeWorkload(nc, s, true, true)
+ ipr := executeWorkload(nc, s, true, true, false)
if len(ipr.Profile) > 1 {
sr.Results = append(sr.Results, ipr)
}
}
+ if uperf {
+ upr := executeWorkload(nc, s, true, false, true)
+ if len(upr.Profile) > 1 {
+ sr.Results = append(sr.Results, upr)
+ }
+ }
}
}
- npr := executeWorkload(nc, s, false, false)
+ npr := executeWorkload(nc, s, false, false, false)
sr.Results = append(sr.Results, npr)
if iperf3 {
- ipr := executeWorkload(nc, s, false, true)
+ ipr := executeWorkload(nc, s, false, true, false)
if len(ipr.Profile) > 1 {
sr.Results = append(sr.Results, ipr)
}
}
+ if uperf {
+ upr := executeWorkload(nc, s, false, false, true)
+ if len(upr.Profile) > 1 {
+ sr.Results = append(sr.Results, upr)
+ }
+ }
}
var fTime time.Time
@@ -323,7 +337,7 @@ func cleanup(client *kubernetes.Clientset) {
}
-func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool) result.Data {
+func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool, uperf bool) result.Data {
serverIP := ""
service := false
sameNode := true
@@ -332,6 +346,8 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
service = true
if iperf3 {
serverIP = s.IperfService.Spec.ClusterIP
+ } else if uperf {
+ serverIP = s.UperfService.Spec.ClusterIP
} else {
serverIP = s.NetperfService.Spec.ClusterIP
}
@@ -356,6 +372,12 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
return npr
}
}
+ if uperf {
+ // uperf doesn't support all tests cases
+ if !uperf_driver.TestSupported(nc.Profile) {
+ return npr
+ }
+ }
npr.Config = nc
npr.Metric = nc.Metric
@@ -383,6 +405,18 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
log.Error(err)
os.Exit(1)
}
+ } else if uperf {
+ npr.Driver = "uperf"
+ r, err := uperf_driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
+ if err != nil {
+ log.Error(err)
+ os.Exit(1)
+ }
+ nr, err = uperf_driver.ParseResults(&r)
+ if err != nil {
+ log.Error(err)
+ os.Exit(1)
+ }
} else {
npr.Driver = "netperf"
r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
@@ -435,6 +469,7 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
func main() {
rootCmd.Flags().StringVar(&cfgfile, "config", "netperf.yml", "K8s netperf Configuration File")
rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver (along with netperf)")
+ rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver (along with netperf)")
rootCmd.Flags().BoolVar(&clean, "clean", true, "Clean-up resources created by k8s-netperf")
rootCmd.Flags().BoolVar(&json, "json", false, "Instead of human-readable output, return JSON to stdout")
rootCmd.Flags().BoolVar(&nl, "local", false, "Run network performance tests with Server-Pods/Client-Pods on the same Node")
diff --git a/containers/Containerfile b/containers/Containerfile
index 7d3ba93d..26cee777 100644
--- a/containers/Containerfile
+++ b/containers/Containerfile
@@ -3,7 +3,10 @@ ARG RHEL_VERSION
FROM registry.access.redhat.com/${RHEL_VERSION}:latest
COPY appstream.repo /etc/yum.repos.d/centos8-appstream.repo
+
COPY netperf.diff /tmp/netperf.diff
+RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm && dnf clean all
+RUN dnf install -y uperf && dnf clean all
RUN dnf install -y --nodocs make automake --enablerepo=centos9 --allowerasing && \
dnf install -y --nodocs gcc git bc lksctp-tools-devel texinfo --enablerepo=*
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 6f43abcd..3486f0ce 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -41,6 +41,7 @@ type PerfScenarios struct {
ServerHost apiv1.PodList
NetperfService *apiv1.Service
IperfService *apiv1.Service
+ UperfService *apiv1.Service
RestConfig rest.Config
ClientSet *kubernetes.Clientset
}
diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go
index 0e778233..0eede01c 100644
--- a/pkg/k8s/kubernetes.go
+++ b/pkg/k8s/kubernetes.go
@@ -16,6 +16,7 @@ import (
)
// DeploymentParams describes the deployment
+// Server pod can run multiple containers, each command in Commands will represent a container command
type DeploymentParams struct {
HostNetwork bool
Name string
@@ -23,7 +24,7 @@ type DeploymentParams struct {
Replicas int32
Image string
Labels map[string]string
- Command []string
+ Commands [][]string
PodAffinity apiv1.PodAffinity
PodAntiAffinity apiv1.PodAntiAffinity
NodeAffinity apiv1.NodeAffinity
@@ -47,12 +48,18 @@ const NetperfServerCtlPort = 12865
// IperfServerCtlPort control port for the service
const IperfServerCtlPort = 22865
+// UperferverCtlPort control port for the service
+const UperfServerCtlPort = 30000
+
// NetperfServerDataPort data port for the service
const NetperfServerDataPort = 42424
// IperfServerDataPort data port for the service
const IperfServerDataPort = 43433
+// UperfServerDataPort data port for the service
+const UperfServerDataPort = 30001
+
// Labels we will apply to k8s assets.
const serverRole = "server"
const clientRole = "client-local"
@@ -136,7 +143,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Replicas: 1,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Labels: map[string]string{"role": clientRole},
- Command: []string{"/bin/bash", "-c", "sleep 10000000"},
+ Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}},
Port: NetperfServerCtlPort,
}
if z != "" && numNodes > 1 {
@@ -180,6 +187,19 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
return fmt.Errorf("😥 Unable to create iperf service")
}
+ // Create uperf service
+ uperfSVC := ServiceParams{
+ Name: "uperf-service",
+ Namespace: "netperf",
+ Labels: map[string]string{"role": serverRole},
+ CtlPort: UperfServerCtlPort,
+ DataPort: UperfServerDataPort,
+ }
+ s.UperfService, err = CreateService(uperfSVC, client)
+ if err != nil {
+ return fmt.Errorf("😥 Unable to create uperf service")
+ }
+
// Create netperf service
netperfSVC := ServiceParams{
Name: "netperf-service",
@@ -198,7 +218,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Replicas: 1,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Labels: map[string]string{"role": clientAcrossRole},
- Command: []string{"/bin/bash", "-c", "sleep 10000000"},
+ Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}},
Port: NetperfServerCtlPort,
}
cdpAcross.PodAntiAffinity = apiv1.PodAntiAffinity{
@@ -212,7 +232,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
HostNetwork: true,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Labels: map[string]string{"role": hostNetClientRole},
- Command: []string{"/bin/bash", "-c", "sleep 10000000"},
+ Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}},
Port: NetperfServerCtlPort,
}
if z != "" {
@@ -247,6 +267,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
return err
}
}
+
+ // Use separate containers for servers
+ dpCommands := [][]string{{"/bin/bash", "-c", fmt.Sprintf("netserver && sleep 10000000")},
+ {"/bin/bash", "-c", fmt.Sprintf("iperf3 -s -p %d && sleep 10000000", IperfServerCtlPort)},
+ {"/bin/bash", "-c", fmt.Sprintf("uperf -s -v -P %d && sleep 10000000", UperfServerCtlPort)}}
+
sdpHost := DeploymentParams{
Name: "server-host",
Namespace: "netperf",
@@ -254,7 +280,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
HostNetwork: true,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Labels: map[string]string{"role": hostNetServerRole},
- Command: []string{"/bin/bash", "-c", fmt.Sprintf("netserver && iperf3 -s -p %d && sleep 10000000", IperfServerCtlPort)},
+ Commands: dpCommands,
Port: NetperfServerCtlPort,
}
// Start netperf server
@@ -264,7 +290,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Replicas: 1,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Labels: map[string]string{"role": serverRole},
- Command: []string{"/bin/bash", "-c", fmt.Sprintf("netserver && iperf3 -s -p %d && sleep 10000000", IperfServerCtlPort)},
+ Commands: dpCommands,
Port: NetperfServerCtlPort,
}
if s.NodeLocal {
@@ -451,6 +477,21 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv
}
log.Infof("🚀 Starting Deployment for: %s in namespace: %s", dp.Name, dp.Namespace)
dc := client.AppsV1().Deployments(dp.Namespace)
+
+ // Add containers to deployment
+ var cmdContainers []apiv1.Container
+ for i := 0; i < len(dp.Commands); i++ {
+ // each container should have a unique name
+ containerName := fmt.Sprintf("%s-%d", dp.Name, i)
+ cmdContainers = append(cmdContainers,
+ apiv1.Container{
+ Name: containerName,
+ Image: dp.Image,
+ Command: dp.Commands[i],
+ ImagePullPolicy: apiv1.PullAlways,
+ })
+ }
+
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: dp.Name,
@@ -467,14 +508,7 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv
Spec: apiv1.PodSpec{
ServiceAccountName: sa,
HostNetwork: dp.HostNetwork,
- Containers: []apiv1.Container{
- {
- Name: dp.Name,
- Image: dp.Image,
- Command: dp.Command,
- ImagePullPolicy: apiv1.PullAlways,
- },
- },
+ Containers: cmdContainers,
Affinity: &apiv1.Affinity{
NodeAffinity: &dp.NodeAffinity,
PodAffinity: &dp.PodAffinity,
diff --git a/pkg/results/result.go b/pkg/results/result.go
index 772ba080..69fe64c0 100644
--- a/pkg/results/result.go
+++ b/pkg/results/result.go
@@ -281,11 +281,11 @@ func ShowRRResult(s ScenarioResults) {
func ShowLatencyResult(s ScenarioResults) {
if checkResults(s, "RR") {
logging.Debug("Rendering RR P99 Latency results")
- table := initTable([]string{"Result Type", "Scenario", "Parallelism", "Host Network", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg 99%tile value"})
+ table := initTable([]string{"Result Type", "Driver", "Scenario", "Parallelism", "Host Network", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg 99%tile value"})
for _, r := range s.Results {
if strings.Contains(r.Profile, "RR") {
p99, _ := Average(r.LatencySummary)
- table.Append([]string{"RR Latency Results", r.Profile, strconv.Itoa(r.Parallelism), strconv.FormatBool(r.HostNetwork), strconv.FormatBool(r.Service), strconv.Itoa(r.MessageSize), strconv.FormatBool(r.SameNode), strconv.Itoa(r.Duration), strconv.Itoa(r.Samples), fmt.Sprintf("%f (%s)", p99, "usec")})
+ table.Append([]string{"RR Latency Results", r.Driver, r.Profile, strconv.Itoa(r.Parallelism), strconv.FormatBool(r.HostNetwork), strconv.FormatBool(r.Service), strconv.Itoa(r.MessageSize), strconv.FormatBool(r.SameNode), strconv.Itoa(r.Duration), strconv.Itoa(r.Samples), fmt.Sprintf("%f (%s)", p99, "usec")})
}
}
table.Render()
diff --git a/pkg/uperf/uperf.go b/pkg/uperf/uperf.go
new file mode 100644
index 00000000..4c2e8988
--- /dev/null
+++ b/pkg/uperf/uperf.go
@@ -0,0 +1,226 @@
+package uperf
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "regexp"
+ "strconv"
+ "strings"
+
+ apiv1 "k8s.io/api/core/v1"
+
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/config"
+ log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
+ "github.com/montanaflynn/stats"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+type Result struct {
+ Data struct {
+ TCPRetransmit struct {
+ Count float64 `json:"retransmits"`
+ } `json:"sum_sent"`
+ TCPStream struct {
+ Rate float32 `json:"bits_per_second"`
+ } `json:"sum_received"`
+ UDPStream struct {
+ Rate float64 `json:"bits_per_second"`
+ LossPercent float64 `json:"lost_percent"`
+ } `json:"sum"`
+ } `json:"end"`
+}
+
+const workload = "uperf"
+
+// ServerDataPort data port for the service
+const ServerDataPort = 30001
+
+// ServerCtlPort control port for the service
+const ServerCtlPort = 30000
+
+// TestSupported Determine if the test is supproted for driver
+func TestSupported(test string) bool {
+ return !strings.Contains(test, "TCP_CRR")
+}
+
+// uperf needs "rr" or "stream" profiles which are config files passed to uperf command through -m option
+// We need to create these profiles based on the test using provided configuration
+func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Config, pod apiv1.Pod, serverIP string) (string, error) {
+ var stdout, stderr bytes.Buffer
+
+ var fileContent string
+ var filePath string
+
+ protocol := "tcp"
+ if strings.Contains(nc.Profile, "UDP") {
+ protocol = "udp"
+ }
+
+ if strings.Contains(nc.Profile, "STREAM") {
+ fileContent = fmt.Sprintf(`
+
+
+
+
+
+
+
+
+
+
+
+ `, protocol, nc.MessageSize, nc.MessageSize, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize)
+ filePath = fmt.Sprintf("/tmp/uperf-stream-%s-%d-%d-1", protocol, nc.MessageSize, nc.MessageSize)
+ } else {
+ fileContent = fmt.Sprintf(`
+
+
+
+
+
+
+
+
+
+
+
+
+
+ `, protocol, nc.MessageSize, nc.MessageSize, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize)
+ filePath = fmt.Sprintf("/tmp/uperf-rr-%s-%d-%d-1", protocol, nc.MessageSize, nc.MessageSize)
+ }
+
+ var cmd []string
+ uperfCmd := "echo '" + fileContent + "' > " + filePath
+ cmd = []string{"bash", "-c", uperfCmd}
+
+ //Empty buffer
+ stdout = bytes.Buffer{}
+
+ req := c.CoreV1().RESTClient().
+ Post().
+ Namespace(pod.Namespace).
+ Resource("pods").
+ Name(pod.Name).
+ SubResource("exec").
+ VersionedParams(&apiv1.PodExecOptions{
+ Container: pod.Spec.Containers[0].Name,
+ Command: cmd,
+ Stdin: false,
+ Stdout: true,
+ Stderr: true,
+ TTY: true,
+ }, scheme.ParameterCodec)
+ exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL())
+ if err != nil {
+ return filePath, err
+ }
+ // Connect this process' std{in,out,err} to the remote shell process.
+ err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
+ Stdin: nil,
+ Stdout: &stdout,
+ Stderr: &stderr,
+ })
+ if err != nil {
+ return filePath, err
+ }
+
+ log.Debug(strings.TrimSpace(stdout.String()))
+ return filePath, nil
+}
+
+// Run will invoke uperf in a client container
+func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
+ var stdout, stderr bytes.Buffer
+ var exec remotecommand.Executor
+
+ pod := client.Items[0]
+ log.Debugf("🔥 Client (%s,%s) starting uperf against server : %s", pod.Name, pod.Status.PodIP, serverIP)
+ config.Show(nc, workload)
+
+ filePath, err := createUperfProfile(c, rc, nc, pod, serverIP)
+ if err != nil {
+ return stdout, err
+ }
+
+ //Empty buffer
+ stdout = bytes.Buffer{}
+ stderr = bytes.Buffer{}
+
+ cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(ServerCtlPort)}
+ log.Debug(cmd)
+
+ req := c.CoreV1().RESTClient().
+ Post().
+ Namespace(pod.Namespace).
+ Resource("pods").
+ Name(pod.Name).
+ SubResource("exec").
+ VersionedParams(&apiv1.PodExecOptions{
+ Container: pod.Spec.Containers[0].Name,
+ Command: cmd,
+ Stdin: false,
+ Stdout: true,
+ Stderr: true,
+ TTY: true,
+ }, scheme.ParameterCodec)
+ exec, err = remotecommand.NewSPDYExecutor(&rc, "POST", req.URL())
+ if err != nil {
+ return stdout, err
+ }
+ // Connect this process' std{in,out,err} to the remote shell process.
+ err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
+ Stdin: nil,
+ Stdout: &stdout,
+ Stderr: &stderr,
+ })
+ if err != nil {
+ return stdout, err
+ }
+
+ return stdout, nil
+}
+
+// ParseResults accepts the stdout from the execution of the benchmark.
+// It will return a Sample struct or error
+func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
+ sample := sample.Sample{}
+ sample.Driver = workload
+ sample.Metric = "Mb/s"
+
+ transactions := regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)\r`).FindAllStringSubmatch(stdout.String(), -1)
+
+ var prevTimestamp, normLtcy float64
+ var prevBytes, prevOps, normOps float64
+ var byteSummary, latSummary, opSummary []float64
+
+ for _, transaction := range transactions {
+
+ timestamp, _ := strconv.ParseFloat(transaction[1], 64)
+ bytes, _ := strconv.ParseFloat(transaction[2], 64)
+ ops, _ := strconv.ParseFloat(transaction[3], 64)
+
+ normOps = ops - prevOps
+ if normOps != 0 && prevTimestamp != 0.0 {
+ normLtcy = ((timestamp - prevTimestamp) / float64(normOps)) * 1000
+ byteSummary = append(byteSummary, bytes-prevBytes)
+ latSummary = append(latSummary, float64(normLtcy))
+ opSummary = append(opSummary, normOps)
+ }
+ prevTimestamp, prevBytes, prevOps = timestamp, bytes, ops
+
+ }
+ averageByte, _ := stats.Mean(byteSummary)
+ averageOps, _ := stats.Mean(opSummary)
+ sample.Throughput = float64(averageByte*8) / 1000000
+ sample.Latency99ptile, _ = stats.Percentile(latSummary, 99)
+ log.Debugf("Storing %s sample throughput: %f Mbps, P99 Latency %f, Average ops: %f ", sample.Driver, sample.Throughput, sample.Latency99ptile, averageOps)
+
+ return sample, nil
+
+}