From 9d1bc90ec5812c821a1ae1169e5dd2094b61b805 Mon Sep 17 00:00:00 2001 From: Raul Sevilla Date: Fri, 16 Feb 2024 10:59:21 +0100 Subject: [PATCH] Refactor drivers code Signed-off-by: Raul Sevilla --- cmd/k8s-netperf/k8s-netperf.go | 182 +++++++++++----------------- pkg/drivers/driver.go | 49 ++++++++ pkg/{iperf => drivers}/iperf.go | 43 +++---- pkg/{netperf => drivers}/netperf.go | 27 +++-- pkg/{uperf => drivers}/uperf.go | 34 +++--- pkg/logging/log.go | 10 ++ 6 files changed, 190 insertions(+), 155 deletions(-) create mode 100644 pkg/drivers/driver.go rename pkg/{iperf => drivers}/iperf.go (85%) rename pkg/{netperf => drivers}/netperf.go (86%) rename pkg/{uperf => drivers}/uperf.go (88%) diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 5c83a3a7..e198fdb1 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -14,14 +14,12 @@ import ( cmdVersion "github.com/cloud-bulldozer/go-commons/version" "github.com/cloud-bulldozer/k8s-netperf/pkg/archive" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" - "github.com/cloud-bulldozer/k8s-netperf/pkg/iperf" + "github.com/cloud-bulldozer/k8s-netperf/pkg/drivers" "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/metrics" - "github.com/cloud-bulldozer/k8s-netperf/pkg/netperf" result "github.com/cloud-bulldozer/k8s-netperf/pkg/results" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" - uperf_driver "github.com/cloud-bulldozer/k8s-netperf/pkg/uperf" "github.com/google/uuid" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +35,7 @@ var ( cfgfile string nl bool clean bool + netperf bool iperf3 bool uperf bool acrossAZ bool @@ -84,8 +83,7 @@ var rootCmd = &cobra.Command{ if err != nil { cf, err := config.ParseV2Conf(cfgfile) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } cfg = cf } @@ -95,13 +93,11 @@ var rootCmd = &cobra.Command{ &clientcmd.ConfigOverrides{}) rconfig, err := kconfig.ClientConfig() if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } client, err := kubernetes.NewForConfig(rconfig) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } if clean { cleanup(client) @@ -117,8 +113,7 @@ var rootCmd = &cobra.Command{ // Get node count nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } if !s.NodeLocal && len(nodes.Items) < 2 { log.Error("Node count too low to run pod to pod across nodes.") @@ -141,8 +136,7 @@ var rootCmd = &cobra.Command{ // Build the SUT (Deployments) err = k8s.BuildSUT(client, &s) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } var sr result.ScenarioResults @@ -171,32 +165,36 @@ var rootCmd = &cobra.Command{ if s.HostNetwork { // No need to run hostNetwork through Service. if !nc.Service { - npr := executeWorkload(nc, s, true, false, false) - sr.Results = append(sr.Results, npr) + if netperf { + npr := executeWorkload(nc, s, false, true, false, false) + sr.Results = append(sr.Results, npr) + } if iperf3 { - ipr := executeWorkload(nc, s, true, true, false) + ipr := executeWorkload(nc, s, true, true, true, false) if len(ipr.Profile) > 1 { sr.Results = append(sr.Results, ipr) } } if uperf { - upr := executeWorkload(nc, s, true, false, true) + upr := executeWorkload(nc, s, true, false, true, true) if len(upr.Profile) > 1 { sr.Results = append(sr.Results, upr) } } } } - npr := executeWorkload(nc, s, false, false, false) - sr.Results = append(sr.Results, npr) + if netperf { + npr := executeWorkload(nc, s, true, false, false, false) + sr.Results = append(sr.Results, npr) + } if iperf3 { - ipr := executeWorkload(nc, s, false, true, false) + ipr := executeWorkload(nc, s, false, false, true, false) if len(ipr.Profile) > 1 { sr.Results = append(sr.Results, ipr) } } if uperf { - upr := executeWorkload(nc, s, false, false, true) + upr := executeWorkload(nc, s, false, false, false, true) if len(upr.Profile) > 1 { sr.Results = append(sr.Results, upr) } @@ -244,13 +242,11 @@ var rootCmd = &cobra.Command{ if len(searchURL) > 1 { jdocs, err := archive.BuildDocs(sr, uid) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } esClient, err := archive.Connect(searchURL, index, true) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } log.Infof("Indexing [%d] documents in %s with UUID %s", len(jdocs), index, uid) resp, err := (*esClient).Index(jdocs, indexers.IndexingOpts{}) @@ -278,20 +274,17 @@ var rootCmd = &cobra.Command{ } err = archive.WriteCSVResult(sr) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } if pavail { err = archive.WritePromCSVResult(sr) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } } err = archive.WriteSpecificCSV(sr) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } // Initially we are just checking against TCP_STREAM results. retCode := 0 @@ -320,8 +313,7 @@ func cleanup(client *kubernetes.Clientset) { log.Info("Cleaning resources created by k8s-netperf") svcList, err := k8s.GetServices(client, namespace) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } for svc := range svcList.Items { err = k8s.DestroyService(client, svcList.Items[svc]) @@ -331,8 +323,7 @@ func cleanup(client *kubernetes.Clientset) { } dpList, err := k8s.GetDeployments(client, namespace) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } for dp := range dpList.Items { err = k8s.DestroyDeployment(client, dpList.Items[dp]) @@ -341,18 +332,18 @@ func cleanup(client *kubernetes.Clientset) { } _, err := k8s.WaitForDelete(client, dpList.Items[dp]) if err != nil { - log.Error(err) - os.Exit(1) + log.Fatal(err) } } } -func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool, uperf bool) result.Data { +func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, netperfDriver, iperf3, uperf bool) result.Data { serverIP := "" service := false sameNode := true Client := s.Client + var driver drivers.Driver if nc.Service { service = true if iperf3 { @@ -377,19 +368,6 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe Client = s.ClientHost } npr := result.Data{} - if iperf3 { - // iperf doesn't support all tests cases - if !iperf.TestSupported(nc.Profile) { - return npr - } - } - if uperf { - // uperf doesn't support all tests cases - if !uperf_driver.TestSupported(nc.Profile) { - return npr - } - } - npr.Config = nc npr.Metric = nc.Metric npr.Service = service @@ -404,64 +382,52 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe log.Debugf("Executing workloads. hostNetwork is %t, service is %t", hostNet, service) for i := 0; i < nc.Samples; i++ { nr := sample.Sample{} + if netperfDriver { + driver = drivers.NewDriver("netperf") + npr.Driver = "netperf" + } if iperf3 { + driver = drivers.NewDriver("iperf3") npr.Driver = "iperf3" - r, err := iperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err = iperf.ParseResults(&r) - if err != nil { - log.Error(err) - os.Exit(1) - } - } else if uperf { + } + if uperf { + driver = drivers.NewDriver("uperf") npr.Driver = "uperf" - r, err := uperf_driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err = uperf_driver.ParseResults(&r) - if err != nil { - log.Error(err) - os.Exit(1) - } - } else { - npr.Driver = "netperf" - r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - os.Exit(1) - } - nr, err = netperf.ParseResults(&r) - if err != nil { - log.Error(err) - try := 0 - success := false - // Retry the current test. - for try < retry { - log.Warn("Rerunning test.") - r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) - if err != nil { - log.Error(err) - continue - } - nr, err = netperf.ParseResults(&r) - if err != nil { - log.Error(err) - try++ - } else { - success = true - break - } + } + // Check if test is supported + if driver.IsTestSupported(nc.Profile) { + log.Warnf("Test %s is not supported with driver %s. Skipping.", nc.Profile, npr.Driver) + return npr + } + r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + if err != nil { + log.Fatal(err) + } + nr, err = driver.ParseResults(&r) + if err != nil { + log.Error(err) + try := 0 + success := false + // Retry the current test. + for try < retry { + log.Warn("Rerunning test.") + r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + if err != nil { + log.Error(err) + continue } - if !success { - log.Error("test was unsuccessful after retry.") - os.Exit(1) + nr, err = driver.ParseResults(&r) + if err != nil { + log.Error(err) + try++ + } else { + success = true + break } } + if !success { + log.Fatal("test was unsuccessful after retry.") + } } npr.LossSummary = append(npr.LossSummary, float64(nr.LossPercent)) npr.RetransmitSummary = append(npr.RetransmitSummary, nr.Retransmits) @@ -479,8 +445,9 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe func main() { rootCmd.Flags().StringVar(&cfgfile, "config", "netperf.yml", "K8s netperf Configuration File") - rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver (along with netperf)") - rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver (along with netperf)") + rootCmd.Flags().BoolVar(&netperf, "netperf", true, "Use netperf as load driver") + rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver") + rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver") rootCmd.Flags().BoolVar(&clean, "clean", true, "Clean-up resources created by k8s-netperf") rootCmd.Flags().BoolVar(&json, "json", false, "Instead of human-readable output, return JSON to stdout") rootCmd.Flags().BoolVar(&nl, "local", false, "Run network performance tests with Server-Pods/Client-Pods on the same Node") @@ -493,10 +460,7 @@ func main() { rootCmd.Flags().BoolVar(&showMetrics, "metrics", false, "Show all system metrics retrieved from prom") rootCmd.Flags().Float64Var(&tcpt, "tcp-tolerance", 10, "Allowed %diff from hostNetwork to podNetwork, anything above tolerance will result in k8s-netperf exiting 1.") rootCmd.Flags().BoolVar(&version, "version", false, "k8s-netperf version") - if err := rootCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) + log.Fatal(err) } - } diff --git a/pkg/drivers/driver.go b/pkg/drivers/driver.go new file mode 100644 index 00000000..cd265c76 --- /dev/null +++ b/pkg/drivers/driver.go @@ -0,0 +1,49 @@ +package drivers + +import ( + "bytes" + + "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" + apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Driver interface { + IsTestSupported(string) bool + Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) + ParseResults(stdout *bytes.Buffer) (sample.Sample, error) +} + +type netperf struct { + driverName string +} + +type iperf3 struct { + driverName string +} + +type uperf struct { + driverName string +} + +// NewDriver creates a new driver based on the provided driver name. +// +// It takes a string parameter `driverName` and returns a Driver. +func NewDriver(driverName string) Driver { + switch driverName { + case "iperf3": + return &iperf3{ + driverName: driverName, + } + case "uperf": + return &uperf{ + driverName: driverName, + } + default: + return &netperf{ + driverName: driverName, + } + } +} diff --git a/pkg/iperf/iperf.go b/pkg/drivers/iperf.go similarity index 85% rename from pkg/iperf/iperf.go rename to pkg/drivers/iperf.go index 9eb997f6..5abf4e20 100644 --- a/pkg/iperf/iperf.go +++ b/pkg/drivers/iperf.go @@ -1,4 +1,4 @@ -package iperf +package drivers import ( "bytes" @@ -12,6 +12,7 @@ import ( apiv1 "k8s.io/api/core/v1" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" "github.com/google/uuid" @@ -21,7 +22,15 @@ import ( "k8s.io/client-go/tools/remotecommand" ) -type Result struct { +var Iperf iperf3 + +func init() { + Iperf = iperf3{ + driverName: "iperf", + } +} + +type IperfResult struct { Data struct { TCPRetransmit struct { Count float64 `json:"retransmits"` @@ -36,27 +45,19 @@ type Result struct { } `json:"end"` } -const workload = "iperf3" - -// ServerDataPort data port for the service -const ServerDataPort = 43433 - -// ServerCtlPort control port for the service -const ServerCtlPort = 22865 - -// TestSupported Determine if the test is supproted for driver -func TestSupported(test string) bool { +// IsTestSupported Determine if the test is supported for driver +func (i *iperf3) IsTestSupported(test string) bool { return strings.Contains(test, "STREAM") } // Run will invoke iperf3 in a client container -func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer id := uuid.New() file := fmt.Sprintf("/tmp/iperf-%s", id.String()) pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting iperf3 against server : %s", pod.Name, pod.Status.PodIP, serverIP) - config.Show(nc, workload) + config.Show(nc, i.driverName) tcp := true if !strings.Contains(nc.Profile, "STREAM") { return bytes.Buffer{}, fmt.Errorf("unable to run iperf3 with non-stream tests") @@ -71,7 +72,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-J", "-t", fmt.Sprint(nc.Duration), "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), fmt.Sprintf("--logfile=%s", file), } } else { @@ -79,7 +80,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-t", fmt.Sprint(nc.Duration), "-u", "-J", "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), "-b", "0", fmt.Sprintf("--logfile=%s", file), } @@ -90,7 +91,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-t", fmt.Sprint(nc.Duration), "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), fmt.Sprintf("--logfile=%s", file), } } else { @@ -98,7 +99,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 serverIP, "-t", fmt.Sprint(nc.Duration), "-u", "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(ServerCtlPort), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), "-b", "0", fmt.Sprintf("--logfile=%s", file), } @@ -171,10 +172,10 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 // ParseResults accepts the stdout from the execution of the benchmark. // It will return a Sample struct or error -func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { +func (i *iperf3) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample := sample.Sample{} - sample.Driver = workload - result := Result{} + sample.Driver = i.driverName + result := IperfResult{} sample.Metric = "Mb/s" error := json.NewDecoder(stdout).Decode(&result) if error != nil { diff --git a/pkg/netperf/netperf.go b/pkg/drivers/netperf.go similarity index 86% rename from pkg/netperf/netperf.go rename to pkg/drivers/netperf.go index abae8aea..f0ad8144 100644 --- a/pkg/netperf/netperf.go +++ b/pkg/drivers/netperf.go @@ -1,4 +1,4 @@ -package netperf +package drivers import ( "bytes" @@ -11,6 +11,7 @@ import ( apiv1 "k8s.io/api/core/v1" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" "k8s.io/client-go/kubernetes" @@ -19,21 +20,24 @@ import ( "k8s.io/client-go/tools/remotecommand" ) -const workload = "netperf" +var Netperf netperf -// ServerDataPort data port for the service -const ServerDataPort = 42424 +func init() { + Netperf = netperf{ + driverName: "netperf", + } +} // omniOptions are netperf specific options that we will pass to the netperf client. const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_recv_calls,local_send_calls,local_transport_retrans" // Run will use the k8s client to run the netperf binary in the container image // it will return a bytes.Buffer of the stdout. -func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting netperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) - config.Show(nc, workload) + config.Show(nc, n.driverName) var cmd []string if nc.Service { cmd = []string{"bash", "super-netperf", "1", "-H", @@ -43,7 +47,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 "--", "-k", fmt.Sprint(omniOptions), "-m", fmt.Sprint(nc.MessageSize), - "-P", fmt.Sprint(ServerDataPort), + "-P", fmt.Sprint(k8s.NetperfServerDataPort), "-R", "1"} } else { cmd = []string{"bash", "super-netperf", strconv.Itoa(nc.Parallelism), "-H", @@ -89,9 +93,9 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 // ParseResults accepts the stdout from the execution of the benchmark. It also needs // It will return a Sample struct or error -func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { +func (n *netperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample := sample.Sample{} - sample.Driver = workload + sample.Driver = n.driverName send := 0.0 recv := 0.0 if len(strings.Split(stdout.String(), "\n")) < 5 { @@ -138,3 +142,8 @@ func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { } return sample, nil } + +// IsTestSupported Determine if the test is supported for driver +func (n *netperf) IsTestSupported(test string) bool { + return true +} diff --git a/pkg/uperf/uperf.go b/pkg/drivers/uperf.go similarity index 88% rename from pkg/uperf/uperf.go rename to pkg/drivers/uperf.go index 762bf579..3bd9321d 100644 --- a/pkg/uperf/uperf.go +++ b/pkg/drivers/uperf.go @@ -1,4 +1,4 @@ -package uperf +package drivers import ( "bytes" @@ -11,6 +11,7 @@ import ( apiv1 "k8s.io/api/core/v1" "github.com/cloud-bulldozer/k8s-netperf/pkg/config" + "github.com/cloud-bulldozer/k8s-netperf/pkg/k8s" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/sample" "github.com/montanaflynn/stats" @@ -35,16 +36,16 @@ type Result struct { } `json:"end"` } -const workload = "uperf" +var Uperf uperf -// ServerDataPort data port for the service -const ServerDataPort = 30001 - -// ServerCtlPort control port for the service -const ServerCtlPort = 30000 +func init() { + Uperf = uperf{ + driverName: "uperf", + } +} -// TestSupported Determine if the test is supproted for driver -func TestSupported(test string) bool { +// TestSupported Determine if the test is supported for driver +func (u *uperf) IsTestSupported(test string) bool { return !strings.Contains(test, "TCP_CRR") } @@ -74,7 +75,7 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi - `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize) + `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, k8s.UperfServerCtlPort+1, nc.Duration, nc.MessageSize) filePath = fmt.Sprintf("/tmp/uperf-stream-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) } else { fileContent = fmt.Sprintf(` @@ -91,7 +92,7 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi - `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, ServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize) + `, protocol, nc.MessageSize, nc.Parallelism, nc.Parallelism, serverIP, protocol, k8s.UperfServerCtlPort+1, nc.Duration, nc.MessageSize, nc.MessageSize) filePath = fmt.Sprintf("/tmp/uperf-rr-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) } @@ -135,13 +136,14 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi } // Run will invoke uperf in a client container -func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { + +func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer var exec remotecommand.Executor pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting uperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) - config.Show(nc, workload) + config.Show(nc, u.driverName) filePath, err := createUperfProfile(c, rc, nc, pod, serverIP) if err != nil { @@ -152,7 +154,7 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 stdout = bytes.Buffer{} stderr = bytes.Buffer{} - cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(ServerCtlPort)} + cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(k8s.UperfServerCtlPort)} log.Debug(cmd) req := c.CoreV1().RESTClient(). @@ -188,9 +190,9 @@ func Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1 // ParseResults accepts the stdout from the execution of the benchmark. // It will return a Sample struct or error -func ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { +func (u *uperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { sample := sample.Sample{} - sample.Driver = workload + sample.Driver = u.driverName sample.Metric = "Mb/s" transactions := regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)\r`).FindAllStringSubmatch(stdout.String(), -1) diff --git a/pkg/logging/log.go b/pkg/logging/log.go index b46f3f32..c3964847 100644 --- a/pkg/logging/log.go +++ b/pkg/logging/log.go @@ -79,3 +79,13 @@ func Warn(args ...interface{}) { func Warnf(format string, args ...interface{}) { defaultLog.Warnf(format, args...) } + +// Fatal - Fatal Message +func Fatal(args ...interface{}) { + defaultLog.Fatal(args...) +} + +// Fatalf - Fatal Message +func Fatalf(format string, args ...interface{}) { + defaultLog.Fatalf(format, args...) +}