diff --git a/Makefile b/Makefile
index 426c1e10..081816f6 100644
--- a/Makefile
+++ b/Makefile
@@ -17,6 +17,7 @@ RHEL_VERSION = ubi9
CONTAINER ?= podman
CONTAINER_BUILD ?= podman build --force-rm
CONTAINER_NS ?= quay.io/cloud-bulldozer/netperf
+SOURCES := $(shell find . -type f -name "*.go")
# k8s-netperf version
GIT_COMMIT = $(shell git rev-parse HEAD)
@@ -49,7 +50,7 @@ gha-push:
@echo "Pushing Container Images & manifest"
$(CONTAINER) manifest push $(CONTAINER_NS)-manifest:latest $(CONTAINER_NS)
-clean: $(BIN_PATH)
+clean:
rm -rf bin/$(ARCH)
$(BIN_PATH): $(SOURCES)
diff --git a/README.md b/README.md
index 8ce8c1a6..46c3ab5a 100644
--- a/README.md
+++ b/README.md
@@ -13,6 +13,7 @@ Running Networking Performance Tests against K8s
| netperf | TCP_CRR | Working | No|
## Setup
+
```shell
$ git clone http://github.com/cloud-bulldozer/k8s-netperf
$ cd k8s-netperf
@@ -37,7 +38,7 @@ $ kubectl create ns netperf
$ kubectl create sa -n netperf netperf
```
-If you run with `-all`, you will need to allow `hostNetwork` for the netperf sa.
+If you run with `--all`, you will need to allow `hostNetwork` for the netperf sa.
Example
```shell
@@ -55,21 +56,25 @@ Usage:
k8s-netperf [flags]
Flags:
- --across Place the client and server across availability zones
- --all Run all tests scenarios - hostNet and podNetwork (if possible)
- --clean Clean-up resources created by k8s-netperf (default true)
--config string K8s netperf Configuration File (default "netperf.yml")
- --debug Enable debug log
- -h, --help help for k8s-netperf
- --iperf Use iperf3 as load driver (along with netperf)
- --uperf Use uperf as load driver (along with netperf)
+ --netperf Use netperf as load driver (default true)
+ --iperf Use iperf3 as load driver
+ --uperf Use uperf as load driver
+ --clean Clean-up resources created by k8s-netperf (default true)
--json Instead of human-readable output, return JSON to stdout
--local Run network performance tests with Server-Pods/Client-Pods on the same Node
- --metrics Show all system metrics retrieved from prom
+ --across Place the client and server across availability zones
+ --all Run all tests scenarios - hostNet and podNetwork (if possible)
+ --debug Enable debug log
--prom string Prometheus URL
+ --uuid string User provided UUID
--search string OpenSearch URL, if you have auth, pass in the format of https://user:pass@url:port
+ --metrics Show all system metrics retrieved from prom
--tcp-tolerance float Allowed %diff from hostNetwork to podNetwork, anything above tolerance will result in k8s-netperf exiting 1. (default 10)
- --uuid string User provided UUID
+ --version k8s-netperf version
+ -h, --help help for k8s-netperf
+
+
```
- `--across` will force the client to be across availability zones from the server
@@ -79,12 +84,14 @@ Flags:
- When using `--prom` with a non-openshift clsuter, it will be necessary to pass the prometheus URL.
- `--metrics` will enable displaying prometheus captured metrics to stdout. By default they will be written to a csv file.
- `--iperf` will enable the iperf3 load driver for any stream test (TCP_STREAM, UDP_STREAM). iperf3 doesn't have a RR or CRR test-type.
+- `--uperf` will enable the uperf load driver for any stream test (TCP_STREAM, UDP_STREAM). uperf doesn't have CRR test-type.
> *Note: With OpenShift, we attempt to discover the OpenShift route. If that route is not reachable, it might be required to `port-forward` the service and pass that via the `--prom` option.*
### Config file
#### Config File v2
The v2 config file will be executed in the order the tests are presented in the config file.
+
```yml
tests :
- TCPStream: # Place-holder of a test name
diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go
index 5c83a3a7..c6038f8f 100644
--- a/cmd/k8s-netperf/k8s-netperf.go
+++ b/cmd/k8s-netperf/k8s-netperf.go
@@ -14,14 +14,12 @@ import (
cmdVersion "github.com/cloud-bulldozer/go-commons/version"
"github.com/cloud-bulldozer/k8s-netperf/pkg/archive"
"github.com/cloud-bulldozer/k8s-netperf/pkg/config"
- "github.com/cloud-bulldozer/k8s-netperf/pkg/iperf"
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/drivers"
"github.com/cloud-bulldozer/k8s-netperf/pkg/k8s"
log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
"github.com/cloud-bulldozer/k8s-netperf/pkg/metrics"
- "github.com/cloud-bulldozer/k8s-netperf/pkg/netperf"
result "github.com/cloud-bulldozer/k8s-netperf/pkg/results"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
- uperf_driver "github.com/cloud-bulldozer/k8s-netperf/pkg/uperf"
"github.com/google/uuid"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -37,6 +35,7 @@ var (
cfgfile string
nl bool
clean bool
+ netperf bool
iperf3 bool
uperf bool
acrossAZ bool
@@ -54,7 +53,12 @@ var (
var rootCmd = &cobra.Command{
Use: "k8s-netperf",
Short: "A tool to run network performance tests in Kubernetes cluster",
+ PreRun: func(cmd *cobra.Command, args []string) {
+ log.Infof("Starting k8s-netperf (%s@%s)", cmdVersion.Version, cmdVersion.GitCommit)
+ },
Run: func(cmd *cobra.Command, args []string) {
+ var acrossAZ, hostNetwork bool
+ var sr result.ScenarioResults
if version {
fmt.Println("Version:", cmdVersion.Version)
fmt.Println("Git Commit:", cmdVersion.GitCommit)
@@ -63,7 +67,9 @@ var rootCmd = &cobra.Command{
fmt.Println("OS/Arch:", cmdVersion.OsArch)
os.Exit(0)
}
-
+ if !(uperf || netperf || iperf3) {
+ log.Fatalf("😠At least one driver needs to be enabled")
+ }
uid := ""
if len(id) > 0 {
uid = id
@@ -84,8 +90,7 @@ var rootCmd = &cobra.Command{
if err != nil {
cf, err := config.ParseV2Conf(cfgfile)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
cfg = cf
}
@@ -95,13 +100,11 @@ var rootCmd = &cobra.Command{
&clientcmd.ConfigOverrides{})
rconfig, err := kconfig.ClientConfig()
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
client, err := kubernetes.NewForConfig(rconfig)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
if clean {
cleanup(client)
@@ -117,8 +120,7 @@ var rootCmd = &cobra.Command{
// Get node count
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="})
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
if !s.NodeLocal && len(nodes.Items) < 2 {
log.Error("Node count too low to run pod to pod across nodes.")
@@ -141,17 +143,14 @@ var rootCmd = &cobra.Command{
// Build the SUT (Deployments)
err = k8s.BuildSUT(client, &s)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
- var sr result.ScenarioResults
sr.Version = cmdVersion.Version
sr.GitCommit = cmdVersion.GitCommit
// If the client and server needs to be across zones
lz, zones, _ := k8s.GetZone(client)
nodesInZone := zones[lz]
- var acrossAZ bool
if nodesInZone > 1 {
acrossAZ = false
} else {
@@ -167,36 +166,24 @@ var rootCmd = &cobra.Command{
}
nc.Metric = metric
nc.AcrossAZ = acrossAZ
-
- if s.HostNetwork {
- // No need to run hostNetwork through Service.
- if !nc.Service {
- npr := executeWorkload(nc, s, true, false, false)
- sr.Results = append(sr.Results, npr)
- if iperf3 {
- ipr := executeWorkload(nc, s, true, true, false)
- if len(ipr.Profile) > 1 {
- sr.Results = append(sr.Results, ipr)
- }
- }
- if uperf {
- upr := executeWorkload(nc, s, true, false, true)
- if len(upr.Profile) > 1 {
- sr.Results = append(sr.Results, upr)
- }
- }
- }
+ // No need to run hostNetwork through Service.
+ if s.HostNetwork && !nc.Service {
+ hostNetwork = true
+ } else {
+ hostNetwork = false
+ }
+ if netperf {
+ npr := executeWorkload(nc, s, hostNetwork, true, false, false)
+ sr.Results = append(sr.Results, npr)
}
- npr := executeWorkload(nc, s, false, false, false)
- sr.Results = append(sr.Results, npr)
if iperf3 {
- ipr := executeWorkload(nc, s, false, true, false)
+ ipr := executeWorkload(nc, s, hostNetwork, false, true, false)
if len(ipr.Profile) > 1 {
sr.Results = append(sr.Results, ipr)
}
}
if uperf {
- upr := executeWorkload(nc, s, false, false, true)
+ upr := executeWorkload(nc, s, hostNetwork, false, false, true)
if len(upr.Profile) > 1 {
sr.Results = append(sr.Results, upr)
}
@@ -223,7 +210,7 @@ var rootCmd = &cobra.Command{
if err == nil {
sr.Metadata.ClusterMetadata = metadata
} else {
- log.Error(" issue getting common metadata using go-commons")
+ log.Error("Issue getting common metadata using go-commons")
}
}
@@ -244,13 +231,11 @@ var rootCmd = &cobra.Command{
if len(searchURL) > 1 {
jdocs, err := archive.BuildDocs(sr, uid)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
esClient, err := archive.Connect(searchURL, index, true)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
log.Infof("Indexing [%d] documents in %s with UUID %s", len(jdocs), index, uid)
resp, err := (*esClient).Index(jdocs, indexers.IndexingOpts{})
@@ -278,20 +263,17 @@ var rootCmd = &cobra.Command{
}
err = archive.WriteCSVResult(sr)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
if pavail {
err = archive.WritePromCSVResult(sr)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
}
err = archive.WriteSpecificCSV(sr)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
// Initially we are just checking against TCP_STREAM results.
retCode := 0
@@ -320,8 +302,7 @@ func cleanup(client *kubernetes.Clientset) {
log.Info("Cleaning resources created by k8s-netperf")
svcList, err := k8s.GetServices(client, namespace)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
for svc := range svcList.Items {
err = k8s.DestroyService(client, svcList.Items[svc])
@@ -331,8 +312,7 @@ func cleanup(client *kubernetes.Clientset) {
}
dpList, err := k8s.GetDeployments(client, namespace)
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
for dp := range dpList.Items {
err = k8s.DestroyDeployment(client, dpList.Items[dp])
@@ -341,20 +321,18 @@ func cleanup(client *kubernetes.Clientset) {
}
_, err := k8s.WaitForDelete(client, dpList.Items[dp])
if err != nil {
- log.Error(err)
- os.Exit(1)
+ log.Fatal(err)
}
}
}
-func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool, uperf bool) result.Data {
+// executeWorkload executes the workload and returns the result data.
+func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, netperf, iperf3, uperf bool) result.Data {
serverIP := ""
- service := false
- sameNode := true
Client := s.Client
+ var driver drivers.Driver
if nc.Service {
- service = true
if iperf3 {
serverIP = s.IperfService.Spec.ClusterIP
} else if uperf {
@@ -371,29 +349,15 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
}
if !s.NodeLocal {
Client = s.ClientAcross
- sameNode = false
}
if hostNet {
Client = s.ClientHost
}
npr := result.Data{}
- if iperf3 {
- // iperf doesn't support all tests cases
- if !iperf.TestSupported(nc.Profile) {
- return npr
- }
- }
- if uperf {
- // uperf doesn't support all tests cases
- if !uperf_driver.TestSupported(nc.Profile) {
- return npr
- }
- }
-
npr.Config = nc
npr.Metric = nc.Metric
- npr.Service = service
- npr.SameNode = sameNode
+ npr.Service = nc.Service
+ npr.SameNode = s.NodeLocal
npr.HostNetwork = hostNet
if s.AcrossAZ {
npr.AcrossAZ = true
@@ -401,67 +365,55 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
npr.AcrossAZ = nc.AcrossAZ
}
npr.StartTime = time.Now()
- log.Debugf("Executing workloads. hostNetwork is %t, service is %t", hostNet, service)
+ log.Debugf("Executing workloads. hostNetwork is %t, service is %t", hostNet, nc.Service)
for i := 0; i < nc.Samples; i++ {
nr := sample.Sample{}
+ if netperf {
+ driver = drivers.NewDriver("netperf")
+ npr.Driver = "netperf"
+ }
if iperf3 {
+ driver = drivers.NewDriver("iperf3")
npr.Driver = "iperf3"
- r, err := iperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
- if err != nil {
- log.Error(err)
- os.Exit(1)
- }
- nr, err = iperf.ParseResults(&r)
- if err != nil {
- log.Error(err)
- os.Exit(1)
- }
- } else if uperf {
+ }
+ if uperf {
+ driver = drivers.NewDriver("uperf")
npr.Driver = "uperf"
- r, err := uperf_driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
- if err != nil {
- log.Error(err)
- os.Exit(1)
- }
- nr, err = uperf_driver.ParseResults(&r)
- if err != nil {
- log.Error(err)
- os.Exit(1)
- }
- } else {
- npr.Driver = "netperf"
- r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
- if err != nil {
- log.Error(err)
- os.Exit(1)
- }
- nr, err = netperf.ParseResults(&r)
- if err != nil {
- log.Error(err)
- try := 0
- success := false
- // Retry the current test.
- for try < retry {
- log.Warn("Rerunning test.")
- r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
- if err != nil {
- log.Error(err)
- continue
- }
- nr, err = netperf.ParseResults(&r)
- if err != nil {
- log.Error(err)
- try++
- } else {
- success = true
- break
- }
+ }
+ // Check if test is supported
+ if !driver.IsTestSupported(nc.Profile) {
+ log.Warnf("Test %s is not supported with driver %s. Skipping.", nc.Profile, npr.Driver)
+ return npr
+ }
+ r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
+ if err != nil {
+ log.Fatal(err)
+ }
+ nr, err = driver.ParseResults(&r)
+ if err != nil {
+ log.Error(err)
+ try := 0
+ success := false
+ // Retry the current test.
+ for try < retry {
+ log.Warn("Rerunning test.")
+ r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
+ if err != nil {
+ log.Error(err)
+ continue
}
- if !success {
- log.Error("test was unsuccessful after retry.")
- os.Exit(1)
+ nr, err = driver.ParseResults(&r)
+ if err != nil {
+ log.Error(err)
+ try++
+ } else {
+ success = true
+ break
}
}
+ if !success {
+ log.Fatal("test was unsuccessful after retry.")
+ }
}
npr.LossSummary = append(npr.LossSummary, float64(nr.LossPercent))
npr.RetransmitSummary = append(npr.RetransmitSummary, nr.Retransmits)
@@ -479,8 +431,9 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
func main() {
rootCmd.Flags().StringVar(&cfgfile, "config", "netperf.yml", "K8s netperf Configuration File")
- rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver (along with netperf)")
- rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver (along with netperf)")
+ rootCmd.Flags().BoolVar(&netperf, "netperf", true, "Use netperf as load driver")
+ rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver")
+ rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver")
rootCmd.Flags().BoolVar(&clean, "clean", true, "Clean-up resources created by k8s-netperf")
rootCmd.Flags().BoolVar(&json, "json", false, "Instead of human-readable output, return JSON to stdout")
rootCmd.Flags().BoolVar(&nl, "local", false, "Run network performance tests with Server-Pods/Client-Pods on the same Node")
@@ -493,10 +446,8 @@ func main() {
rootCmd.Flags().BoolVar(&showMetrics, "metrics", false, "Show all system metrics retrieved from prom")
rootCmd.Flags().Float64Var(&tcpt, "tcp-tolerance", 10, "Allowed %diff from hostNetwork to podNetwork, anything above tolerance will result in k8s-netperf exiting 1.")
rootCmd.Flags().BoolVar(&version, "version", false, "k8s-netperf version")
-
+ rootCmd.Flags().SortFlags = false
if err := rootCmd.Execute(); err != nil {
- fmt.Println(err)
- os.Exit(1)
+ log.Fatal(err)
}
-
}
diff --git a/pkg/drivers/driver.go b/pkg/drivers/driver.go
new file mode 100644
index 00000000..cd265c76
--- /dev/null
+++ b/pkg/drivers/driver.go
@@ -0,0 +1,49 @@
+package drivers
+
+import (
+ "bytes"
+
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/config"
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
+ apiv1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+)
+
+type Driver interface {
+ IsTestSupported(string) bool
+ Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error)
+ ParseResults(stdout *bytes.Buffer) (sample.Sample, error)
+}
+
+type netperf struct {
+ driverName string
+}
+
+type iperf3 struct {
+ driverName string
+}
+
+type uperf struct {
+ driverName string
+}
+
+// NewDriver creates a new driver based on the provided driver name.
+//
+// It takes a string parameter `driverName` and returns a Driver.
+func NewDriver(driverName string) Driver {
+ switch driverName {
+ case "iperf3":
+ return &iperf3{
+ driverName: driverName,
+ }
+ case "uperf":
+ return &uperf{
+ driverName: driverName,
+ }
+ default:
+ return &netperf{
+ driverName: driverName,
+ }
+ }
+}
diff --git a/pkg/iperf/iperf.go b/pkg/drivers/iperf.go
similarity index 81%
rename from pkg/iperf/iperf.go
rename to pkg/drivers/iperf.go
index 9eb997f6..0ab4b40d 100644
--- a/pkg/iperf/iperf.go
+++ b/pkg/drivers/iperf.go
@@ -1,4 +1,4 @@
-package iperf
+package drivers
import (
"bytes"
@@ -12,6 +12,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"github.com/cloud-bulldozer/k8s-netperf/pkg/config"
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s"
log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
"github.com/google/uuid"
@@ -21,7 +22,15 @@ import (
"k8s.io/client-go/tools/remotecommand"
)
-type Result struct {
+var Iperf iperf3
+
+func init() {
+ Iperf = iperf3{
+ driverName: "iperf",
+ }
+}
+
+type IperfResult struct {
Data struct {
TCPRetransmit struct {
Count float64 `json:"retransmits"`
@@ -36,27 +45,19 @@ type Result struct {
} `json:"end"`
}
-const workload = "iperf3"
-
-// ServerDataPort data port for the service
-const ServerDataPort = 43433
-
-// ServerCtlPort control port for the service
-const ServerCtlPort = 22865
-
-// TestSupported Determine if the test is supproted for driver
-func TestSupported(test string) bool {
+// IsTestSupported Determine if the test is supported for driver
+func (i *iperf3) IsTestSupported(test string) bool {
return strings.Contains(test, "STREAM")
}
// Run will invoke iperf3 in a client container
-func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
+func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
var stdout, stderr bytes.Buffer
id := uuid.New()
file := fmt.Sprintf("/tmp/iperf-%s", id.String())
pod := client.Items[0]
log.Debugf("🔥 Client (%s,%s) starting iperf3 against server : %s", pod.Name, pod.Status.PodIP, serverIP)
- config.Show(nc, workload)
+ config.Show(nc, i.driverName)
tcp := true
if !strings.Contains(nc.Profile, "STREAM") {
return bytes.Buffer{}, fmt.Errorf("unable to run iperf3 with non-stream tests")
@@ -71,7 +72,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
serverIP, "-J", "-t",
fmt.Sprint(nc.Duration),
"-l", fmt.Sprint(nc.MessageSize),
- "-p", fmt.Sprint(ServerCtlPort),
+ "-p", fmt.Sprint(k8s.IperfServerCtlPort),
fmt.Sprintf("--logfile=%s", file),
}
} else {
@@ -79,7 +80,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
serverIP, "-t",
fmt.Sprint(nc.Duration), "-u", "-J",
"-l", fmt.Sprint(nc.MessageSize),
- "-p", fmt.Sprint(ServerCtlPort),
+ "-p", fmt.Sprint(k8s.IperfServerCtlPort),
"-b", "0",
fmt.Sprintf("--logfile=%s", file),
}
@@ -90,7 +91,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
serverIP, "-t",
fmt.Sprint(nc.Duration),
"-l", fmt.Sprint(nc.MessageSize),
- "-p", fmt.Sprint(ServerCtlPort),
+ "-p", fmt.Sprint(k8s.IperfServerCtlPort),
fmt.Sprintf("--logfile=%s", file),
}
} else {
@@ -98,7 +99,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
serverIP, "-t",
fmt.Sprint(nc.Duration), "-u",
"-l", fmt.Sprint(nc.MessageSize),
- "-p", fmt.Sprint(ServerCtlPort),
+ "-p", fmt.Sprint(k8s.IperfServerCtlPort),
"-b", "0",
fmt.Sprintf("--logfile=%s", file),
}
@@ -171,15 +172,14 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
// ParseResults accepts the stdout from the execution of the benchmark.
// It will return a Sample struct or error
-func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
+func (i *iperf3) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
sample := sample.Sample{}
- sample.Driver = workload
- result := Result{}
+ sample.Driver = i.driverName
+ result := IperfResult{}
sample.Metric = "Mb/s"
- error := json.NewDecoder(stdout).Decode(&result)
- if error != nil {
- log.Error("Issue while decoding")
- log.Error(error)
+ err := json.NewDecoder(stdout).Decode(&result)
+ if err != nil {
+ log.Errorf("Issue while decoding: %v", err)
}
if result.Data.TCPStream.Rate > 0 {
sample.Throughput = float64(result.Data.TCPStream.Rate) / 1000000
@@ -190,7 +190,7 @@ func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
sample.LossPercent = result.Data.UDPStream.LossPercent
}
- log.Debugf("Storing %s sample throughput: %f", sample.Driver, sample.Throughput)
+ log.Debugf("Storing %s sample throughput: %f", sample.Driver, sample.Throughput)
return sample, nil
}
diff --git a/pkg/netperf/netperf.go b/pkg/drivers/netperf.go
similarity index 86%
rename from pkg/netperf/netperf.go
rename to pkg/drivers/netperf.go
index abae8aea..f0ad8144 100644
--- a/pkg/netperf/netperf.go
+++ b/pkg/drivers/netperf.go
@@ -1,4 +1,4 @@
-package netperf
+package drivers
import (
"bytes"
@@ -11,6 +11,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"github.com/cloud-bulldozer/k8s-netperf/pkg/config"
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s"
log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
"k8s.io/client-go/kubernetes"
@@ -19,21 +20,24 @@ import (
"k8s.io/client-go/tools/remotecommand"
)
-const workload = "netperf"
+var Netperf netperf
-// ServerDataPort data port for the service
-const ServerDataPort = 42424
+func init() {
+ Netperf = netperf{
+ driverName: "netperf",
+ }
+}
// omniOptions are netperf specific options that we will pass to the netperf client.
const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_recv_calls,local_send_calls,local_transport_retrans"
// Run will use the k8s client to run the netperf binary in the container image
// it will return a bytes.Buffer of the stdout.
-func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
+func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
var stdout, stderr bytes.Buffer
pod := client.Items[0]
log.Debugf("🔥 Client (%s,%s) starting netperf against server : %s", pod.Name, pod.Status.PodIP, serverIP)
- config.Show(nc, workload)
+ config.Show(nc, n.driverName)
var cmd []string
if nc.Service {
cmd = []string{"bash", "super-netperf", "1", "-H",
@@ -43,7 +47,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
"--",
"-k", fmt.Sprint(omniOptions),
"-m", fmt.Sprint(nc.MessageSize),
- "-P", fmt.Sprint(ServerDataPort),
+ "-P", fmt.Sprint(k8s.NetperfServerDataPort),
"-R", "1"}
} else {
cmd = []string{"bash", "super-netperf", strconv.Itoa(nc.Parallelism), "-H",
@@ -89,9 +93,9 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
// ParseResults accepts the stdout from the execution of the benchmark. It also needs
// It will return a Sample struct or error
-func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
+func (n *netperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
sample := sample.Sample{}
- sample.Driver = workload
+ sample.Driver = n.driverName
send := 0.0
recv := 0.0
if len(strings.Split(stdout.String(), "\n")) < 5 {
@@ -138,3 +142,8 @@ func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
}
return sample, nil
}
+
+// IsTestSupported Determine if the test is supported for driver
+func (n *netperf) IsTestSupported(test string) bool {
+ return true
+}
diff --git a/pkg/uperf/uperf.go b/pkg/drivers/uperf.go
similarity index 88%
rename from pkg/uperf/uperf.go
rename to pkg/drivers/uperf.go
index 762bf579..3bd9321d 100644
--- a/pkg/uperf/uperf.go
+++ b/pkg/drivers/uperf.go
@@ -1,4 +1,4 @@
-package uperf
+package drivers
import (
"bytes"
@@ -11,6 +11,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"github.com/cloud-bulldozer/k8s-netperf/pkg/config"
+ "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s"
log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
"github.com/montanaflynn/stats"
@@ -35,16 +36,16 @@ type Result struct {
} `json:"end"`
}
-const workload = "uperf"
+var Uperf uperf
-// ServerDataPort data port for the service
-const ServerDataPort = 30001
-
-// ServerCtlPort control port for the service
-const ServerCtlPort = 30000
+func init() {
+ Uperf = uperf{
+ driverName: "uperf",
+ }
+}
-// TestSupported Determine if the test is supproted for driver
-func TestSupported(test string) bool {
+// TestSupported Determine if the test is supported for driver
+func (u *uperf) IsTestSupported(test string) bool {
return !strings.Contains(test, "TCP_CRR")
}
@@ -74,7 +75,7 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi
- `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize)
+ `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, k8s.UperfServerCtlPort+1, nc.Duration, nc.MessageSize)
filePath = fmt.Sprintf("/tmp/uperf-stream-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism)
} else {
fileContent = fmt.Sprintf(`
@@ -91,7 +92,7 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi
- `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize)
+ `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, k8s.UperfServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize)
filePath = fmt.Sprintf("/tmp/uperf-rr-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism)
}
@@ -135,13 +136,14 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi
}
// Run will invoke uperf in a client container
-func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
+
+func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) {
var stdout, stderr bytes.Buffer
var exec remotecommand.Executor
pod := client.Items[0]
log.Debugf("🔥 Client (%s,%s) starting uperf against server : %s", pod.Name, pod.Status.PodIP, serverIP)
- config.Show(nc, workload)
+ config.Show(nc, u.driverName)
filePath, err := createUperfProfile(c, rc, nc, pod, serverIP)
if err != nil {
@@ -152,7 +154,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
stdout = bytes.Buffer{}
stderr = bytes.Buffer{}
- cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(ServerCtlPort)}
+ cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(k8s.UperfServerCtlPort)}
log.Debug(cmd)
req := c.CoreV1().RESTClient().
@@ -188,9 +190,9 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1
// ParseResults accepts the stdout from the execution of the benchmark.
// It will return a Sample struct or error
-func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
+func (u *uperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) {
sample := sample.Sample{}
- sample.Driver = workload
+ sample.Driver = u.driverName
sample.Metric = "Mb/s"
transactions := regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)\r`).FindAllStringSubmatch(stdout.String(), -1)
diff --git a/pkg/logging/log.go b/pkg/logging/log.go
index b46f3f32..c3964847 100644
--- a/pkg/logging/log.go
+++ b/pkg/logging/log.go
@@ -79,3 +79,13 @@ func Warn(args ...interface{}) {
func Warnf(format string, args ...interface{}) {
defaultLog.Warnf(format, args...)
}
+
+// Fatal - Fatal Message
+func Fatal(args ...interface{}) {
+ defaultLog.Fatal(args...)
+}
+
+// Fatalf - Fatal Message
+func Fatalf(format string, args ...interface{}) {
+ defaultLog.Fatalf(format, args...)
+}