From 07c2f07b280fe6c89d22f578d85c60371cdb5097 Mon Sep 17 00:00:00 2001 From: venkataanil Date: Fri, 17 Nov 2023 14:17:48 +0530 Subject: [PATCH] Add support for uperf driver Similar to iperf, user can run uperf along with netperf using "--uperf" option benchmark-wrapper is used a reference for parsing 1) user options and creating uperf config file (input to uperf client command) 2) uperf output uperf driver supports only TCP_STREAM, UPD_STREAM, TCP_RR, and UDP_RR tests. For each test in full-run.yaml, Uperf driver will create a uperf profile file inside the client pod and uses it to run the test. Parallelism is implemented using uperf's nproc option uperf server can't be run using "&&" option in the same container inside server pod i.e "netserver && iperf3 -s -p 22865 && uperf -s -v -P 30000 && sleep 10000000" So we are creating a separate containers for netperf, iperf and uperf inside server pod. Add driver details in RR P99 Latency results --- README.md | 7 + cmd/k8s-netperf/k8s-netperf.go | 45 ++++++- containers/Containerfile | 3 + pkg/config/config.go | 1 + pkg/k8s/kubernetes.go | 62 +++++++-- pkg/results/result.go | 4 +- pkg/uperf/uperf.go | 226 +++++++++++++++++++++++++++++++++ 7 files changed, 327 insertions(+), 21 deletions(-) create mode 100644 pkg/uperf/uperf.go diff --git a/README.md b/README.md index 1ff3a027..8ce8c1a6 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ Flags: --debug Enable debug log -h, --help help for k8s-netperf --iperf Use iperf3 as load driver (along with netperf) + --uperf Use uperf as load driver (along with netperf) --json Instead of human-readable output, return JSON to stdout --local Run network performance tests with Server-Pods/Client-Pods on the same Node --metrics Show all system metrics retrieved from prom @@ -124,16 +125,22 @@ $ ./k8s-netperf --tcp-tolerance 1 +-------------------+---------+------------+-------------+--------------+---------+--------------+-----------+----------+---------+--------------------+ | 📊 Stream Results | netperf | TCP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 2661.006667 (Mb/s) | | 📊 Stream Results | iperf3 | TCP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 2483.078229 (Mb/s) | +| 📊 Stream Results | uperf | TCP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 2581.705097 (Mb/s) | | 📊 Stream Results | netperf | TCP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 2702.230000 (Mb/s) | | 📊 Stream Results | iperf3 | TCP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 2523.434069 (Mb/s) | +| 📊 Stream Results | uperf | TCP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 2567.665412 (Mb/s) | | 📊 Stream Results | netperf | TCP_STREAM | 1 | true | false | 8192 | false | 10 | 3 | 2697.276667 (Mb/s) | | 📊 Stream Results | iperf3 | TCP_STREAM | 1 | true | false | 8192 | false | 10 | 3 | 2542.793728 (Mb/s) | +| 📊 Stream Results | uperf | TCP_STREAM | 1 | true | false | 8192 | false | 10 | 3 | 2571.881579 (Mb/s) | | 📊 Stream Results | netperf | TCP_STREAM | 1 | false | false | 8192 | false | 10 | 3 | 2707.076667 (Mb/s) | | 📊 Stream Results | iperf3 | TCP_STREAM | 1 | false | false | 8192 | false | 10 | 3 | 2604.067072 (Mb/s) | +| 📊 Stream Results | uperf | TCP_STREAM | 1 | false | false | 8192 | false | 10 | 3 | 2687.276667 (Mb/s) | | 📊 Stream Results | netperf | UDP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 1143.926667 (Mb/s) | | 📊 Stream Results | iperf3 | UDP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 1202.428288 (Mb/s) | +| 📊 Stream Results | uperf | UDP_STREAM | 1 | true | false | 1024 | false | 10 | 3 | 1242.059988 (Mb/s) | | 📊 Stream Results | netperf | UDP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 1145.066667 (Mb/s) | | 📊 Stream Results | iperf3 | UDP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 1239.580672 (Mb/s) | +| 📊 Stream Results | uperf | UDP_STREAM | 1 | false | false | 1024 | false | 10 | 3 | 1261.840000 (Mb/s) | +-------------------+---------+------------+-------------+--------------+---------+--------------+-----------+----------+---------+--------------------+ +---------------+---------+----------+-------------+--------------+---------+--------------+-----------+----------+---------+---------------------+ | RESULT TYPE | DRIVER | SCENARIO | PARALLELISM | HOST NETWORK | SERVICE | MESSAGE SIZE | SAME NODE | DURATION | SAMPLES | AVG VALUE | diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 3a0324e2..0768f046 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -20,6 +20,7 @@ import ( "github.com/cloud-bulldozer/k8s-netperf/pkg/netperf" result "github.com/cloud-bulldozer/k8s-netperf/pkg/results" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" + uperf_driver "github.com/cloud-bulldozer/k8s-netperf/pkg/uperf" "github.com/google/uuid" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,6 +37,7 @@ var ( nl bool clean bool iperf3 bool + uperf bool acrossAZ bool full bool debug bool @@ -158,24 +160,36 @@ var rootCmd = &cobra.Command{ if s.HostNetwork { // No need to run hostNetwork through Service. if !nc.Service { - npr := executeWorkload(nc, s, true, false) + npr := executeWorkload(nc, s, true, false, false) sr.Results = append(sr.Results, npr) if iperf3 { - ipr := executeWorkload(nc, s, true, true) + ipr := executeWorkload(nc, s, true, true, false) if len(ipr.Profile) > 1 { sr.Results = append(sr.Results, ipr) } } + if uperf { + upr := executeWorkload(nc, s, true, false, true) + if len(upr.Profile) > 1 { + sr.Results = append(sr.Results, upr) + } + } } } - npr := executeWorkload(nc, s, false, false) + npr := executeWorkload(nc, s, false, false, false) sr.Results = append(sr.Results, npr) if iperf3 { - ipr := executeWorkload(nc, s, false, true) + ipr := executeWorkload(nc, s, false, true, false) if len(ipr.Profile) > 1 { sr.Results = append(sr.Results, ipr) } } + if uperf { + upr := executeWorkload(nc, s, false, false, true) + if len(upr.Profile) > 1 { + sr.Results = append(sr.Results, upr) + } + } } var fTime time.Time @@ -323,7 +337,7 @@ func cleanup(client *kubernetes.Clientset) { } -func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool) result.Data { +func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool, uperf bool) result.Data { serverIP := "" service := false sameNode := true @@ -332,6 +346,8 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe service = true if iperf3 { serverIP = s.IperfService.Spec.ClusterIP + } else if uperf { + serverIP = s.UperfService.Spec.ClusterIP } else { serverIP = s.NetperfService.Spec.ClusterIP } @@ -356,6 +372,12 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe return npr } } + if uperf { + // uperf doesn't support all tests cases + if !uperf_driver.TestSupported(nc.Profile) { + return npr + } + } npr.Config = nc npr.Metric = nc.Metric @@ -383,6 +405,18 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe log.Error(err) os.Exit(1) } + } else if uperf { + npr.Driver = "uperf" + r, err := uperf_driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + if err != nil { + log.Error(err) + os.Exit(1) + } + nr, err = uperf_driver.ParseResults(&r) + if err != nil { + log.Error(err) + os.Exit(1) + } } else { npr.Driver = "netperf" r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) @@ -435,6 +469,7 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe func main() { rootCmd.Flags().StringVar(&cfgfile, "config", "netperf.yml", "K8s netperf Configuration File") rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver (along with netperf)") + rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver (along with netperf)") rootCmd.Flags().BoolVar(&clean, "clean", true, "Clean-up resources created by k8s-netperf") rootCmd.Flags().BoolVar(&json, "json", false, "Instead of human-readable output, return JSON to stdout") rootCmd.Flags().BoolVar(&nl, "local", false, "Run network performance tests with Server-Pods/Client-Pods on the same Node") diff --git a/containers/Containerfile b/containers/Containerfile index 7d3ba93d..26cee777 100644 --- a/containers/Containerfile +++ b/containers/Containerfile @@ -3,7 +3,10 @@ ARG RHEL_VERSION FROM registry.access.redhat.com/${RHEL_VERSION}:latest COPY appstream.repo /etc/yum.repos.d/centos8-appstream.repo + COPY netperf.diff /tmp/netperf.diff +RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm && dnf clean all +RUN dnf install -y uperf && dnf clean all RUN dnf install -y --nodocs make automake --enablerepo=centos9 --allowerasing && \ dnf install -y --nodocs gcc git bc lksctp-tools-devel texinfo --enablerepo=* diff --git a/pkg/config/config.go b/pkg/config/config.go index 6f43abcd..3486f0ce 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -41,6 +41,7 @@ type PerfScenarios struct { ServerHost apiv1.PodList NetperfService *apiv1.Service IperfService *apiv1.Service + UperfService *apiv1.Service RestConfig rest.Config ClientSet *kubernetes.Clientset } diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go index 0e778233..85ea37b8 100644 --- a/pkg/k8s/kubernetes.go +++ b/pkg/k8s/kubernetes.go @@ -16,6 +16,7 @@ import ( ) // DeploymentParams describes the deployment +// Server pod can run multiple containers, each command in Commands will represent a container command type DeploymentParams struct { HostNetwork bool Name string @@ -23,7 +24,7 @@ type DeploymentParams struct { Replicas int32 Image string Labels map[string]string - Command []string + Commands [][]string PodAffinity apiv1.PodAffinity PodAntiAffinity apiv1.PodAntiAffinity NodeAffinity apiv1.NodeAffinity @@ -47,12 +48,18 @@ const NetperfServerCtlPort = 12865 // IperfServerCtlPort control port for the service const IperfServerCtlPort = 22865 +// UperferverCtlPort control port for the service +const UperfServerCtlPort = 30000 + // NetperfServerDataPort data port for the service const NetperfServerDataPort = 42424 // IperfServerDataPort data port for the service const IperfServerDataPort = 43433 +// UperfServerDataPort data port for the service +const UperfServerDataPort = 30001 + // Labels we will apply to k8s assets. const serverRole = "server" const clientRole = "client-local" @@ -136,7 +143,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Replicas: 1, Image: "quay.io/cloud-bulldozer/netperf:latest", Labels: map[string]string{"role": clientRole}, - Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, } if z != "" && numNodes > 1 { @@ -180,6 +187,19 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { return fmt.Errorf("😥 Unable to create iperf service") } + // Create uperf service + uperfSVC := ServiceParams{ + Name: "uperf-service", + Namespace: "netperf", + Labels: map[string]string{"role": serverRole}, + CtlPort: UperfServerCtlPort, + DataPort: UperfServerDataPort, + } + s.UperfService, err = CreateService(uperfSVC, client) + if err != nil { + return fmt.Errorf("😥 Unable to create uperf service") + } + // Create netperf service netperfSVC := ServiceParams{ Name: "netperf-service", @@ -198,7 +218,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Replicas: 1, Image: "quay.io/cloud-bulldozer/netperf:latest", Labels: map[string]string{"role": clientAcrossRole}, - Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, } cdpAcross.PodAntiAffinity = apiv1.PodAntiAffinity{ @@ -212,7 +232,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { HostNetwork: true, Image: "quay.io/cloud-bulldozer/netperf:latest", Labels: map[string]string{"role": hostNetClientRole}, - Command: []string{"/bin/bash", "-c", "sleep 10000000"}, + Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, } if z != "" { @@ -247,6 +267,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { return err } } + + // Use separate containers for servers + dpCommands := [][]string{{"/bin/bash", "-c", "netserver && sleep 10000000"}, + {"/bin/bash", "-c", fmt.Sprintf("iperf3 -s -p %d && sleep 10000000", IperfServerCtlPort)}, + {"/bin/bash", "-c", fmt.Sprintf("uperf -s -v -P %d && sleep 10000000", UperfServerCtlPort)}} + sdpHost := DeploymentParams{ Name: "server-host", Namespace: "netperf", @@ -254,7 +280,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { HostNetwork: true, Image: "quay.io/cloud-bulldozer/netperf:latest", Labels: map[string]string{"role": hostNetServerRole}, - Command: []string{"/bin/bash", "-c", fmt.Sprintf("netserver && iperf3 -s -p %d && sleep 10000000", IperfServerCtlPort)}, + Commands: dpCommands, Port: NetperfServerCtlPort, } // Start netperf server @@ -264,7 +290,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Replicas: 1, Image: "quay.io/cloud-bulldozer/netperf:latest", Labels: map[string]string{"role": serverRole}, - Command: []string{"/bin/bash", "-c", fmt.Sprintf("netserver && iperf3 -s -p %d && sleep 10000000", IperfServerCtlPort)}, + Commands: dpCommands, Port: NetperfServerCtlPort, } if s.NodeLocal { @@ -451,6 +477,21 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv } log.Infof("🚀 Starting Deployment for: %s in namespace: %s", dp.Name, dp.Namespace) dc := client.AppsV1().Deployments(dp.Namespace) + + // Add containers to deployment + var cmdContainers []apiv1.Container + for i := 0; i < len(dp.Commands); i++ { + // each container should have a unique name + containerName := fmt.Sprintf("%s-%d", dp.Name, i) + cmdContainers = append(cmdContainers, + apiv1.Container{ + Name: containerName, + Image: dp.Image, + Command: dp.Commands[i], + ImagePullPolicy: apiv1.PullAlways, + }) + } + deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: dp.Name, @@ -467,14 +508,7 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv Spec: apiv1.PodSpec{ ServiceAccountName: sa, HostNetwork: dp.HostNetwork, - Containers: []apiv1.Container{ - { - Name: dp.Name, - Image: dp.Image, - Command: dp.Command, - ImagePullPolicy: apiv1.PullAlways, - }, - }, + Containers: cmdContainers, Affinity: &apiv1.Affinity{ NodeAffinity: &dp.NodeAffinity, PodAffinity: &dp.PodAffinity, diff --git a/pkg/results/result.go b/pkg/results/result.go index 772ba080..69fe64c0 100644 --- a/pkg/results/result.go +++ b/pkg/results/result.go @@ -281,11 +281,11 @@ func ShowRRResult(s ScenarioResults) { func ShowLatencyResult(s ScenarioResults) { if checkResults(s, "RR") { logging.Debug("Rendering RR P99 Latency results") - table := initTable([]string{"Result Type", "Scenario", "Parallelism", "Host Network", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg 99%tile value"}) + table := initTable([]string{"Result Type", "Driver", "Scenario", "Parallelism", "Host Network", "Service", "Message Size", "Same node", "Duration", "Samples", "Avg 99%tile value"}) for _, r := range s.Results { if strings.Contains(r.Profile, "RR") { p99, _ := Average(r.LatencySummary) - table.Append([]string{"RR Latency Results", r.Profile, strconv.Itoa(r.Parallelism), strconv.FormatBool(r.HostNetwork), strconv.FormatBool(r.Service), strconv.Itoa(r.MessageSize), strconv.FormatBool(r.SameNode), strconv.Itoa(r.Duration), strconv.Itoa(r.Samples), fmt.Sprintf("%f (%s)", p99, "usec")}) + table.Append([]string{"RR Latency Results", r.Driver, r.Profile, strconv.Itoa(r.Parallelism), strconv.FormatBool(r.HostNetwork), strconv.FormatBool(r.Service), strconv.Itoa(r.MessageSize), strconv.FormatBool(r.SameNode), strconv.Itoa(r.Duration), strconv.Itoa(r.Samples), fmt.Sprintf("%f (%s)", p99, "usec")}) } } table.Render() diff --git a/pkg/uperf/uperf.go b/pkg/uperf/uperf.go new file mode 100644 index 00000000..762bf579 --- /dev/null +++ b/pkg/uperf/uperf.go @@ -0,0 +1,226 @@ +package uperf + +import ( + "bytes" + "context" + "fmt" + "regexp" + "strconv" + "strings" + + apiv1 "k8s.io/api/core/v1" + + "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" + "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" + "github.com/montanaflynn/stats" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +type Result struct { + Data struct { + TCPRetransmit struct { + Count float64 `json:"retransmits"` + } `json:"sum_sent"` + TCPStream struct { + Rate float32 `json:"bits_per_second"` + } `json:"sum_received"` + UDPStream struct { + Rate float64 `json:"bits_per_second"` + LossPercent float64 `json:"lost_percent"` + } `json:"sum"` + } `json:"end"` +} + +const workload = "uperf" + +// ServerDataPort data port for the service +const ServerDataPort = 30001 + +// ServerCtlPort control port for the service +const ServerCtlPort = 30000 + +// TestSupported Determine if the test is supproted for driver +func TestSupported(test string) bool { + return !strings.Contains(test, "TCP_CRR") +} + +// uperf needs "rr" or "stream" profiles which are config files passed to uperf command through -m option +// We need to create these profiles based on the test using provided configuration +func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Config, pod apiv1.Pod, serverIP string) (string, error) { + var stdout, stderr bytes.Buffer + + var fileContent string + var filePath string + + protocol := "tcp" + if strings.Contains(nc.Profile, "UDP") { + protocol = "udp" + } + + if strings.Contains(nc.Profile, "STREAM") { + fileContent = fmt.Sprintf(` + + + + + + + + + + + + `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize) + filePath = fmt.Sprintf("/tmp/uperf-stream-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) + } else { + fileContent = fmt.Sprintf(` + + + + + + + + + + + + + + `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize) + filePath = fmt.Sprintf("/tmp/uperf-rr-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) + } + + var cmd []string + uperfCmd := "echo '" + fileContent + "' > " + filePath + cmd = []string{"bash", "-c", uperfCmd} + + //Empty buffer + stdout = bytes.Buffer{} + + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return filePath, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return filePath, err + } + + log.Debug(strings.TrimSpace(stdout.String())) + return filePath, nil +} + +// Run will invoke uperf in a client container +func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { + var stdout, stderr bytes.Buffer + var exec remotecommand.Executor + + pod := client.Items[0] + log.Debugf("🔥 Client (%s,%s) starting uperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) + config.Show(nc, workload) + + filePath, err := createUperfProfile(c, rc, nc, pod, serverIP) + if err != nil { + return stdout, err + } + + //Empty buffer + stdout = bytes.Buffer{} + stderr = bytes.Buffer{} + + cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(ServerCtlPort)} + log.Debug(cmd) + + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err = remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + + return stdout, nil +} + +// ParseResults accepts the stdout from the execution of the benchmark. +// It will return a Sample struct or error +func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { + sample := sample.Sample{} + sample.Driver = workload + sample.Metric = "Mb/s" + + transactions := regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)\r`).FindAllStringSubmatch(stdout.String(), -1) + + var prevTimestamp, normLtcy float64 + var prevBytes, prevOps, normOps float64 + var byteSummary, latSummary, opSummary []float64 + + for _, transaction := range transactions { + + timestamp, _ := strconv.ParseFloat(transaction[1], 64) + bytes, _ := strconv.ParseFloat(transaction[2], 64) + ops, _ := strconv.ParseFloat(transaction[3], 64) + + normOps = ops - prevOps + if normOps != 0 && prevTimestamp != 0.0 { + normLtcy = ((timestamp - prevTimestamp) / float64(normOps)) * 1000 + byteSummary = append(byteSummary, bytes-prevBytes) + latSummary = append(latSummary, float64(normLtcy)) + opSummary = append(opSummary, normOps) + } + prevTimestamp, prevBytes, prevOps = timestamp, bytes, ops + + } + averageByte, _ := stats.Mean(byteSummary) + averageOps, _ := stats.Mean(opSummary) + sample.Throughput = float64(averageByte*8) / 1000000 + sample.Latency99ptile, _ = stats.Percentile(latSummary, 99) + log.Debugf("Storing uperf sample throughput: %f Mbps, P99 Latency %f, Average ops: %f ", sample.Throughput, sample.Latency99ptile, averageOps) + + return sample, nil + +}