From c99337eabc7621573cab41ddc74d22a90a56e559 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Sevilla?= Date: Thu, 22 Feb 2024 12:51:59 +0100 Subject: [PATCH] Refactor drivers code (#131) * Refactor drivers code Signed-off-by: Raul Sevilla * Update docs Signed-off-by: Raul Sevilla * Fix hostnet arguments Signed-off-by: Raul Sevilla * Fixing netperf flags Signed-off-by: Raul Sevilla * Fix hostNetwork code Signed-off-by: Raul Sevilla * Remove binary from clean target Signed-off-by: Raul Sevilla * Require one driver at least Signed-off-by: Raul Sevilla * Inform about k8s-netperf version Signed-off-by: Raul Sevilla --------- Signed-off-by: Raul Sevilla --- Makefile | 3 +- README.md | 27 ++-- cmd/k8s-netperf/k8s-netperf.go | 221 +++++++++++----------------- pkg/drivers/driver.go | 49 ++++++ pkg/{iperf => drivers}/iperf.go | 52 +++---- pkg/{netperf => drivers}/netperf.go | 27 ++-- pkg/{uperf => drivers}/uperf.go | 34 +++-- pkg/logging/log.go | 10 ++ 8 files changed, 226 insertions(+), 197 deletions(-) create mode 100644 pkg/drivers/driver.go rename pkg/{iperf => drivers}/iperf.go (81%) rename pkg/{netperf => drivers}/netperf.go (86%) rename pkg/{uperf => drivers}/uperf.go (88%) diff --git a/Makefile b/Makefile index 426c1e10..081816f6 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,7 @@ RHEL_VERSION = ubi9 CONTAINER ?= podman CONTAINER_BUILD ?= podman build --force-rm CONTAINER_NS ?= quay.io/cloud-bulldozer/netperf +SOURCES := $(shell find . -type f -name "*.go") # k8s-netperf version GIT_COMMIT = $(shell git rev-parse HEAD) @@ -49,7 +50,7 @@ gha-push: @echo "Pushing Container Images & manifest" $(CONTAINER) manifest push $(CONTAINER_NS)-manifest:latest $(CONTAINER_NS) -clean: $(BIN_PATH) +clean: rm -rf bin/$(ARCH) $(BIN_PATH): $(SOURCES) diff --git a/README.md b/README.md index 8ce8c1a6..46c3ab5a 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Running Networking Performance Tests against K8s | netperf | TCP_CRR | Working | No| ## Setup + ```shell $ git clone http://github.com/cloud-bulldozer/k8s-netperf $ cd k8s-netperf @@ -37,7 +38,7 @@ $ kubectl create ns netperf $ kubectl create sa -n netperf netperf ``` -If you run with `-all`, you will need to allow `hostNetwork` for the netperf sa. +If you run with `--all`, you will need to allow `hostNetwork` for the netperf sa. Example ```shell @@ -55,21 +56,25 @@ Usage: k8s-netperf [flags] Flags: - --across Place the client and server across availability zones - --all Run all tests scenarios - hostNet and podNetwork (if possible) - --clean Clean-up resources created by k8s-netperf (default true) --config string K8s netperf Configuration File (default "netperf.yml") - --debug Enable debug log - -h, --help help for k8s-netperf - --iperf Use iperf3 as load driver (along with netperf) - --uperf Use uperf as load driver (along with netperf) + --netperf Use netperf as load driver (default true) + --iperf Use iperf3 as load driver + --uperf Use uperf as load driver + --clean Clean-up resources created by k8s-netperf (default true) --json Instead of human-readable output, return JSON to stdout --local Run network performance tests with Server-Pods/Client-Pods on the same Node - --metrics Show all system metrics retrieved from prom + --across Place the client and server across availability zones + --all Run all tests scenarios - hostNet and podNetwork (if possible) + --debug Enable debug log --prom string Prometheus URL + --uuid string User provided UUID --search string OpenSearch URL, if you have auth, pass in the format of https://user:pass@url:port + --metrics Show all system metrics retrieved from prom --tcp-tolerance float Allowed %diff from hostNetwork to podNetwork, anything above tolerance will result in k8s-netperf exiting 1. (default 10) - --uuid string User provided UUID + --version k8s-netperf version + -h, --help help for k8s-netperf + + ``` - `--across` will force the client to be across availability zones from the server @@ -79,12 +84,14 @@ Flags: - When using `--prom` with a non-openshift clsuter, it will be necessary to pass the prometheus URL. - `--metrics` will enable displaying prometheus captured metrics to stdout. By default they will be written to a csv file. - `--iperf` will enable the iperf3 load driver for any stream test (TCP_STREAM, UDP_STREAM). iperf3 doesn't have a RR or CRR test-type. +- `--uperf` will enable the uperf load driver for any stream test (TCP_STREAM, UDP_STREAM). uperf doesn't have CRR test-type. > *Note: With OpenShift, we attempt to discover the OpenShift route. If that route is not reachable, it might be required to `port-forward` the service and pass that via the `--prom` option.* ### Config file #### Config File v2 The v2 config file will be executed in the order the tests are presented in the config file. + ```yml tests : - TCPStream: # Place-holder of a test name diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 5c83a3a7..c6038f8f 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -14,14 +14,12 @@ import ( cmdVersion "github.com/cloud-bulldozer/go-commons/version" "github.com/cloud-bulldozer/k8s-netperf/pkg/archive" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" - "github.com/cloud-bulldozer/k8s-netperf/pkg/iperf" + "github.com/cloud-bulldozer/k8s-netperf/pkg/drivers" "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/metrics" - "github.com/cloud-bulldozer/k8s-netperf/pkg/netperf" result "github.com/cloud-bulldozer/k8s-netperf/pkg/results" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" - uperf_driver "github.com/cloud-bulldozer/k8s-netperf/pkg/uperf" "github.com/google/uuid" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +35,7 @@ var ( cfgfile string nl bool clean bool + netperf bool iperf3 bool uperf bool acrossAZ bool @@ -54,7 +53,12 @@ var ( var rootCmd = &cobra.Command{ Use: "k8s-netperf", Short: "A tool to run network performance tests in Kubernetes cluster", + PreRun: func(cmd *cobra.Command, args []string) { + log.Infof("Starting k8s-netperf (%s@%s)", cmdVersion.Version, cmdVersion.GitCommit) + }, Run: func(cmd *cobra.Command, args []string) { + var acrossAZ, hostNetwork bool + var sr result.ScenarioResults if version { fmt.Println("Version:", cmdVersion.Version) fmt.Println("Git Commit:", cmdVersion.GitCommit) @@ -63,7 +67,9 @@ var rootCmd = &cobra.Command{ fmt.Println("OS/Arch:", cmdVersion.OsArch) os.Exit(0) } - + if !(uperf || netperf || iperf3) { + log.Fatalf("😭 At least one driver needs to be enabled") + } uid := "" if len(id) > 0 { uid = id @@ -84,8 +90,7 @@ var rootCmd = &cobra.Command{ if err != nil { cf, err := config.ParseV2Conf(cfgfile) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } cfg = cf } @@ -95,13 +100,11 @@ var rootCmd = &cobra.Command{ &clientcmd.ConfigOverrides{}) rconfig, err := kconfig.ClientConfig() if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } client, err := kubernetes.NewForConfig(rconfig) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } if clean { cleanup(client) @@ -117,8 +120,7 @@ var rootCmd = &cobra.Command{ // Get node count nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } if !s.NodeLocal && len(nodes.Items) < 2 { log.Error("Node count too low to run pod to pod across nodes.") @@ -141,17 +143,14 @@ var rootCmd = &cobra.Command{ // Build the SUT (Deployments) err = k8s.BuildSUT(client, &s) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } - var sr result.ScenarioResults sr.Version = cmdVersion.Version sr.GitCommit = cmdVersion.GitCommit // If the client and server needs to be across zones lz, zones, _ := k8s.GetZone(client) nodesInZone := zones[lz] - var acrossAZ bool if nodesInZone > 1 { acrossAZ = false } else { @@ -167,36 +166,24 @@ var rootCmd = &cobra.Command{ } nc.Metric = metric nc.AcrossAZ = acrossAZ - - if s.HostNetwork { - // No need to run hostNetwork through Service. - if !nc.Service { - npr := executeWorkload(nc, s, true, false, false) - sr.Results = append(sr.Results, npr) - if iperf3 { - ipr := executeWorkload(nc, s, true, true, false) - if len(ipr.Profile) > 1 { - sr.Results = append(sr.Results, ipr) - } - } - if uperf { - upr := executeWorkload(nc, s, true, false, true) - if len(upr.Profile) > 1 { - sr.Results = append(sr.Results, upr) - } - } - } + // No need to run hostNetwork through Service. + if s.HostNetwork && !nc.Service { + hostNetwork = true + } else { + hostNetwork = false + } + if netperf { + npr := executeWorkload(nc, s, hostNetwork, true, false, false) + sr.Results = append(sr.Results, npr) } - npr := executeWorkload(nc, s, false, false, false) - sr.Results = append(sr.Results, npr) if iperf3 { - ipr := executeWorkload(nc, s, false, true, false) + ipr := executeWorkload(nc, s, hostNetwork, false, true, false) if len(ipr.Profile) > 1 { sr.Results = append(sr.Results, ipr) } } if uperf { - upr := executeWorkload(nc, s, false, false, true) + upr := executeWorkload(nc, s, hostNetwork, false, false, true) if len(upr.Profile) > 1 { sr.Results = append(sr.Results, upr) } @@ -223,7 +210,7 @@ var rootCmd = &cobra.Command{ if err == nil { sr.Metadata.ClusterMetadata = metadata } else { - log.Error(" issue getting common metadata using go-commons") + log.Error("Issue getting common metadata using go-commons") } } @@ -244,13 +231,11 @@ var rootCmd = &cobra.Command{ if len(searchURL) > 1 { jdocs, err := archive.BuildDocs(sr, uid) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } esClient, err := archive.Connect(searchURL, index, true) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } log.Infof("Indexing [%d] documents in %s with UUID %s", len(jdocs), index, uid) resp, err := (*esClient).Index(jdocs, indexers.IndexingOpts{}) @@ -278,20 +263,17 @@ var rootCmd = &cobra.Command{ } err = archive.WriteCSVResult(sr) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } if pavail { err = archive.WritePromCSVResult(sr) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } } err = archive.WriteSpecificCSV(sr) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } // Initially we are just checking against TCP_STREAM results. retCode := 0 @@ -320,8 +302,7 @@ func cleanup(client *kubernetes.Clientset) { log.Info("Cleaning resources created by k8s-netperf") svcList, err := k8s.GetServices(client, namespace) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } for svc := range svcList.Items { err = k8s.DestroyService(client, svcList.Items[svc]) @@ -331,8 +312,7 @@ func cleanup(client *kubernetes.Clientset) { } dpList, err := k8s.GetDeployments(client, namespace) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } for dp := range dpList.Items { err = k8s.DestroyDeployment(client, dpList.Items[dp]) @@ -341,20 +321,18 @@ func cleanup(client *kubernetes.Clientset) { } _, err := k8s.WaitForDelete(client, dpList.Items[dp]) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } } } -func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool, uperf bool) result.Data { +// executeWorkload executes the workload and returns the result data. +func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, netperf, iperf3, uperf bool) result.Data { serverIP := "" - service := false - sameNode := true Client := s.Client + var driver drivers.Driver if nc.Service { - service = true if iperf3 { serverIP = s.IperfService.Spec.ClusterIP } else if uperf { @@ -371,29 +349,15 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe } if !s.NodeLocal { Client = s.ClientAcross - sameNode = false } if hostNet { Client = s.ClientHost } npr := result.Data{} - if iperf3 { - // iperf doesn't support all tests cases - if !iperf.TestSupported(nc.Profile) { - return npr - } - } - if uperf { - // uperf doesn't support all tests cases - if !uperf_driver.TestSupported(nc.Profile) { - return npr - } - } - npr.Config = nc npr.Metric = nc.Metric - npr.Service = service - npr.SameNode = sameNode + npr.Service = nc.Service + npr.SameNode = s.NodeLocal npr.HostNetwork = hostNet if s.AcrossAZ { npr.AcrossAZ = true @@ -401,67 +365,55 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe npr.AcrossAZ = nc.AcrossAZ } npr.StartTime = time.Now() - log.Debugf("Executing workloads. hostNetwork is %t, service is %t", hostNet, service) + log.Debugf("Executing workloads. hostNetwork is %t, service is %t", hostNet, nc.Service) for i := 0; i < nc.Samples; i++ { nr := sample.Sample{} + if netperf { + driver = drivers.NewDriver("netperf") + npr.Driver = "netperf" + } if iperf3 { + driver = drivers.NewDriver("iperf3") npr.Driver = "iperf3" - r, err := iperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err = iperf.ParseResults(&r) - if err != nil { - log.Error(err) - os.Exit(1) - } - } else if uperf { + } + if uperf { + driver = drivers.NewDriver("uperf") npr.Driver = "uperf" - r, err := uperf_driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err = uperf_driver.ParseResults(&r) - if err != nil { - log.Error(err) - os.Exit(1) - } - } else { - npr.Driver = "netperf" - r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err = netperf.ParseResults(&r) - if err != nil { - log.Error(err) - try := 0 - success := false - // Retry the current test. - for try < retry { - log.Warn("Rerunning test.") - r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - continue - } - nr, err = netperf.ParseResults(&r) - if err != nil { - log.Error(err) - try++ - } else { - success = true - break - } + } + // Check if test is supported + if !driver.IsTestSupported(nc.Profile) { + log.Warnf("Test %s is not supported with driver %s. Skipping.", nc.Profile, npr.Driver) + return npr + } + r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + if err != nil { + log.Fatal(err) + } + nr, err = driver.ParseResults(&r) + if err != nil { + log.Error(err) + try := 0 + success := false + // Retry the current test. + for try < retry { + log.Warn("Rerunning test.") + r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + if err != nil { + log.Error(err) + continue } - if !success { - log.Error("test was unsuccessful after retry.") - os.Exit(1) + nr, err = driver.ParseResults(&r) + if err != nil { + log.Error(err) + try++ + } else { + success = true + break } } + if !success { + log.Fatal("test was unsuccessful after retry.") + } } npr.LossSummary = append(npr.LossSummary, float64(nr.LossPercent)) npr.RetransmitSummary = append(npr.RetransmitSummary, nr.Retransmits) @@ -479,8 +431,9 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe func main() { rootCmd.Flags().StringVar(&cfgfile, "config", "netperf.yml", "K8s netperf Configuration File") - rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver (along with netperf)") - rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver (along with netperf)") + rootCmd.Flags().BoolVar(&netperf, "netperf", true, "Use netperf as load driver") + rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver") + rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver") rootCmd.Flags().BoolVar(&clean, "clean", true, "Clean-up resources created by k8s-netperf") rootCmd.Flags().BoolVar(&json, "json", false, "Instead of human-readable output, return JSON to stdout") rootCmd.Flags().BoolVar(&nl, "local", false, "Run network performance tests with Server-Pods/Client-Pods on the same Node") @@ -493,10 +446,8 @@ func main() { rootCmd.Flags().BoolVar(&showMetrics, "metrics", false, "Show all system metrics retrieved from prom") rootCmd.Flags().Float64Var(&tcpt, "tcp-tolerance", 10, "Allowed %diff from hostNetwork to podNetwork, anything above tolerance will result in k8s-netperf exiting 1.") rootCmd.Flags().BoolVar(&version, "version", false, "k8s-netperf version") - + rootCmd.Flags().SortFlags = false if err := rootCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + log.Fatal(err) } - } diff --git a/pkg/drivers/driver.go b/pkg/drivers/driver.go new file mode 100644 index 00000000..cd265c76 --- /dev/null +++ b/pkg/drivers/driver.go @@ -0,0 +1,49 @@ +package drivers + +import ( + "bytes" + + "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Driver interface { + IsTestSupported(string) bool + Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) + ParseResults(stdout *bytes.Buffer) (sample.Sample, error) +} + +type netperf struct { + driverName string +} + +type iperf3 struct { + driverName string +} + +type uperf struct { + driverName string +} + +// NewDriver creates a new driver based on the provided driver name. +// +// It takes a string parameter `driverName` and returns a Driver. +func NewDriver(driverName string) Driver { + switch driverName { + case "iperf3": + return &iperf3{ + driverName: driverName, + } + case "uperf": + return &uperf{ + driverName: driverName, + } + default: + return &netperf{ + driverName: driverName, + } + } +} diff --git a/pkg/iperf/iperf.go b/pkg/drivers/iperf.go similarity index 81% rename from pkg/iperf/iperf.go rename to pkg/drivers/iperf.go index 9eb997f6..0ab4b40d 100644 --- a/pkg/iperf/iperf.go +++ b/pkg/drivers/iperf.go @@ -1,4 +1,4 @@ -package iperf +package drivers import ( "bytes" @@ -12,6 +12,7 @@ import ( apiv1 "k8s.io/api/core/v1" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" "github.com/google/uuid" @@ -21,7 +22,15 @@ import ( "k8s.io/client-go/tools/remotecommand" ) -type Result struct { +var Iperf iperf3 + +func init() { + Iperf = iperf3{ + driverName: "iperf", + } +} + +type IperfResult struct { Data struct { TCPRetransmit struct { Count float64 `json:"retransmits"` @@ -36,27 +45,19 @@ type Result struct { } `json:"end"` } -const workload = "iperf3" - -// ServerDataPort data port for the service -const ServerDataPort = 43433 - -// ServerCtlPort control port for the service -const ServerCtlPort = 22865 - -// TestSupported Determine if the test is supproted for driver -func TestSupported(test string) bool { +// IsTestSupported Determine if the test is supported for driver +func (i *iperf3) IsTestSupported(test string) bool { return strings.Contains(test, "STREAM") } // Run will invoke iperf3 in a client container -func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer id := uuid.New() file := fmt.Sprintf("/tmp/iperf-%s", id.String()) pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting iperf3 against server : %s", pod.Name, pod.Status.PodIP, serverIP) - config.Show(nc, workload) + config.Show(nc, i.driverName) tcp := true if !strings.Contains(nc.Profile, "STREAM") { return bytes.Buffer{}, fmt.Errorf("unable to run iperf3 with non-stream tests") @@ -71,7 +72,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-J", "-t", fmt.Sprint(nc.Duration), "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), fmt.Sprintf("--logfile=%s", file), } } else { @@ -79,7 +80,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-t", fmt.Sprint(nc.Duration), "-u", "-J", "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), "-b", "0", fmt.Sprintf("--logfile=%s", file), } @@ -90,7 +91,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-t", fmt.Sprint(nc.Duration), "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), fmt.Sprintf("--logfile=%s", file), } } else { @@ -98,7 +99,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-t", fmt.Sprint(nc.Duration), "-u", "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), "-b", "0", fmt.Sprintf("--logfile=%s", file), } @@ -171,15 +172,14 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 // ParseResults accepts the stdout from the execution of the benchmark. // It will return a Sample struct or error -func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { +func (i *iperf3) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample := sample.Sample{} - sample.Driver = workload - result := Result{} + sample.Driver = i.driverName + result := IperfResult{} sample.Metric = "Mb/s" - error := json.NewDecoder(stdout).Decode(&result) - if error != nil { - log.Error("Issue while decoding") - log.Error(error) + err := json.NewDecoder(stdout).Decode(&result) + if err != nil { + log.Errorf("Issue while decoding: %v", err) } if result.Data.TCPStream.Rate > 0 { sample.Throughput = float64(result.Data.TCPStream.Rate) / 1000000 @@ -190,7 +190,7 @@ func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample.LossPercent = result.Data.UDPStream.LossPercent } - log.Debugf("Storing %s sample throughput: %f", sample.Driver, sample.Throughput) + log.Debugf("Storing %s sample throughput: %f", sample.Driver, sample.Throughput) return sample, nil } diff --git a/pkg/netperf/netperf.go b/pkg/drivers/netperf.go similarity index 86% rename from pkg/netperf/netperf.go rename to pkg/drivers/netperf.go index abae8aea..f0ad8144 100644 --- a/pkg/netperf/netperf.go +++ b/pkg/drivers/netperf.go @@ -1,4 +1,4 @@ -package netperf +package drivers import ( "bytes" @@ -11,6 +11,7 @@ import ( apiv1 "k8s.io/api/core/v1" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" "k8s.io/client-go/kubernetes" @@ -19,21 +20,24 @@ import ( "k8s.io/client-go/tools/remotecommand" ) -const workload = "netperf" +var Netperf netperf -// ServerDataPort data port for the service -const ServerDataPort = 42424 +func init() { + Netperf = netperf{ + driverName: "netperf", + } +} // omniOptions are netperf specific options that we will pass to the netperf client. const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_recv_calls,local_send_calls,local_transport_retrans" // Run will use the k8s client to run the netperf binary in the container image // it will return a bytes.Buffer of the stdout. -func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting netperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) - config.Show(nc, workload) + config.Show(nc, n.driverName) var cmd []string if nc.Service { cmd = []string{"bash", "super-netperf", "1", "-H", @@ -43,7 +47,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 "--", "-k", fmt.Sprint(omniOptions), "-m", fmt.Sprint(nc.MessageSize), - "-P", fmt.Sprint(ServerDataPort), + "-P", fmt.Sprint(k8s.NetperfServerDataPort), "-R", "1"} } else { cmd = []string{"bash", "super-netperf", strconv.Itoa(nc.Parallelism), "-H", @@ -89,9 +93,9 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 // ParseResults accepts the stdout from the execution of the benchmark. It also needs // It will return a Sample struct or error -func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { +func (n *netperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample := sample.Sample{} - sample.Driver = workload + sample.Driver = n.driverName send := 0.0 recv := 0.0 if len(strings.Split(stdout.String(), "\n")) < 5 { @@ -138,3 +142,8 @@ func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { } return sample, nil } + +// IsTestSupported Determine if the test is supported for driver +func (n *netperf) IsTestSupported(test string) bool { + return true +} diff --git a/pkg/uperf/uperf.go b/pkg/drivers/uperf.go similarity index 88% rename from pkg/uperf/uperf.go rename to pkg/drivers/uperf.go index 762bf579..3bd9321d 100644 --- a/pkg/uperf/uperf.go +++ b/pkg/drivers/uperf.go @@ -1,4 +1,4 @@ -package uperf +package drivers import ( "bytes" @@ -11,6 +11,7 @@ import ( apiv1 "k8s.io/api/core/v1" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" "github.com/montanaflynn/stats" @@ -35,16 +36,16 @@ type Result struct { } `json:"end"` } -const workload = "uperf" +var Uperf uperf -// ServerDataPort data port for the service -const ServerDataPort = 30001 - -// ServerCtlPort control port for the service -const ServerCtlPort = 30000 +func init() { + Uperf = uperf{ + driverName: "uperf", + } +} -// TestSupported Determine if the test is supproted for driver -func TestSupported(test string) bool { +// TestSupported Determine if the test is supported for driver +func (u *uperf) IsTestSupported(test string) bool { return !strings.Contains(test, "TCP_CRR") } @@ -74,7 +75,7 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi - `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize) + `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, k8s.UperfServerCtlPort+1, nc.Duration, nc.MessageSize) filePath = fmt.Sprintf("/tmp/uperf-stream-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) } else { fileContent = fmt.Sprintf(` @@ -91,7 +92,7 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi - `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize) + `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, k8s.UperfServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize) filePath = fmt.Sprintf("/tmp/uperf-rr-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) } @@ -135,13 +136,14 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi } // Run will invoke uperf in a client container -func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { + +func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer var exec remotecommand.Executor pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting uperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) - config.Show(nc, workload) + config.Show(nc, u.driverName) filePath, err := createUperfProfile(c, rc, nc, pod, serverIP) if err != nil { @@ -152,7 +154,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 stdout = bytes.Buffer{} stderr = bytes.Buffer{} - cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(ServerCtlPort)} + cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(k8s.UperfServerCtlPort)} log.Debug(cmd) req := c.CoreV1().RESTClient(). @@ -188,9 +190,9 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 // ParseResults accepts the stdout from the execution of the benchmark. // It will return a Sample struct or error -func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { +func (u *uperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample := sample.Sample{} - sample.Driver = workload + sample.Driver = u.driverName sample.Metric = "Mb/s" transactions := regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)\r`).FindAllStringSubmatch(stdout.String(), -1) diff --git a/pkg/logging/log.go b/pkg/logging/log.go index b46f3f32..c3964847 100644 --- a/pkg/logging/log.go +++ b/pkg/logging/log.go @@ -79,3 +79,13 @@ func Warn(args ...interface{}) { func Warnf(format string, args ...interface{}) { defaultLog.Warnf(format, args...) } + +// Fatal - Fatal Message +func Fatal(args ...interface{}) { + defaultLog.Fatal(args...) +} + +// Fatalf - Fatal Message +func Fatalf(format string, args ...interface{}) { + defaultLog.Fatalf(format, args...) +}