Skip to content

Commit

Permalink
Refactor drivers code
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 committed Feb 16, 2024
1 parent 842a364 commit 9d1bc90
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 155 deletions.
182 changes: 73 additions & 109 deletions cmd/k8s-netperf/k8s-netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ import (
cmdVersion "github.com/cloud-bulldozer/go-commons/version"
"github.com/cloud-bulldozer/k8s-netperf/pkg/archive"
"github.com/cloud-bulldozer/k8s-netperf/pkg/config"
"github.com/cloud-bulldozer/k8s-netperf/pkg/iperf"
"github.com/cloud-bulldozer/k8s-netperf/pkg/drivers"
"github.com/cloud-bulldozer/k8s-netperf/pkg/k8s"
log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging"
"github.com/cloud-bulldozer/k8s-netperf/pkg/metrics"
"github.com/cloud-bulldozer/k8s-netperf/pkg/netperf"
result "github.com/cloud-bulldozer/k8s-netperf/pkg/results"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
uperf_driver "github.com/cloud-bulldozer/k8s-netperf/pkg/uperf"
"github.com/google/uuid"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -37,6 +35,7 @@ var (
cfgfile string
nl bool
clean bool
netperf bool
iperf3 bool
uperf bool
acrossAZ bool
Expand Down Expand Up @@ -84,8 +83,7 @@ var rootCmd = &cobra.Command{
if err != nil {
cf, err := config.ParseV2Conf(cfgfile)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
cfg = cf
}
Expand All @@ -95,13 +93,11 @@ var rootCmd = &cobra.Command{
&clientcmd.ConfigOverrides{})
rconfig, err := kconfig.ClientConfig()
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
client, err := kubernetes.NewForConfig(rconfig)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
if clean {
cleanup(client)
Expand All @@ -117,8 +113,7 @@ var rootCmd = &cobra.Command{
// Get node count
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="})
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
if !s.NodeLocal && len(nodes.Items) < 2 {
log.Error("Node count too low to run pod to pod across nodes.")
Expand All @@ -141,8 +136,7 @@ var rootCmd = &cobra.Command{
// Build the SUT (Deployments)
err = k8s.BuildSUT(client, &s)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}

var sr result.ScenarioResults
Expand Down Expand Up @@ -171,32 +165,36 @@ var rootCmd = &cobra.Command{
if s.HostNetwork {
// No need to run hostNetwork through Service.
if !nc.Service {
npr := executeWorkload(nc, s, true, false, false)
sr.Results = append(sr.Results, npr)
if netperf {
npr := executeWorkload(nc, s, false, true, false, false)
sr.Results = append(sr.Results, npr)
}
if iperf3 {
ipr := executeWorkload(nc, s, true, true, false)
ipr := executeWorkload(nc, s, true, true, true, false)
if len(ipr.Profile) > 1 {
sr.Results = append(sr.Results, ipr)
}
}
if uperf {
upr := executeWorkload(nc, s, true, false, true)
upr := executeWorkload(nc, s, true, false, true, true)
if len(upr.Profile) > 1 {
sr.Results = append(sr.Results, upr)
}
}
}
}
npr := executeWorkload(nc, s, false, false, false)
sr.Results = append(sr.Results, npr)
if netperf {
npr := executeWorkload(nc, s, true, false, false, false)
sr.Results = append(sr.Results, npr)
}
if iperf3 {
ipr := executeWorkload(nc, s, false, true, false)
ipr := executeWorkload(nc, s, false, false, true, false)
if len(ipr.Profile) > 1 {
sr.Results = append(sr.Results, ipr)
}
}
if uperf {
upr := executeWorkload(nc, s, false, false, true)
upr := executeWorkload(nc, s, false, false, false, true)
if len(upr.Profile) > 1 {
sr.Results = append(sr.Results, upr)
}
Expand Down Expand Up @@ -244,13 +242,11 @@ var rootCmd = &cobra.Command{
if len(searchURL) > 1 {
jdocs, err := archive.BuildDocs(sr, uid)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
esClient, err := archive.Connect(searchURL, index, true)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
log.Infof("Indexing [%d] documents in %s with UUID %s", len(jdocs), index, uid)
resp, err := (*esClient).Index(jdocs, indexers.IndexingOpts{})
Expand Down Expand Up @@ -278,20 +274,17 @@ var rootCmd = &cobra.Command{
}
err = archive.WriteCSVResult(sr)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
if pavail {
err = archive.WritePromCSVResult(sr)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
}
err = archive.WriteSpecificCSV(sr)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
// Initially we are just checking against TCP_STREAM results.
retCode := 0
Expand Down Expand Up @@ -320,8 +313,7 @@ func cleanup(client *kubernetes.Clientset) {
log.Info("Cleaning resources created by k8s-netperf")
svcList, err := k8s.GetServices(client, namespace)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
for svc := range svcList.Items {
err = k8s.DestroyService(client, svcList.Items[svc])
Expand All @@ -331,8 +323,7 @@ func cleanup(client *kubernetes.Clientset) {
}
dpList, err := k8s.GetDeployments(client, namespace)
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
for dp := range dpList.Items {
err = k8s.DestroyDeployment(client, dpList.Items[dp])
Expand All @@ -341,18 +332,18 @@ func cleanup(client *kubernetes.Clientset) {
}
_, err := k8s.WaitForDelete(client, dpList.Items[dp])
if err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
}

}

func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, iperf3 bool, uperf bool) result.Data {
func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, netperfDriver, iperf3, uperf bool) result.Data {
serverIP := ""
service := false
sameNode := true
Client := s.Client
var driver drivers.Driver
if nc.Service {
service = true
if iperf3 {
Expand All @@ -377,19 +368,6 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
Client = s.ClientHost
}
npr := result.Data{}
if iperf3 {
// iperf doesn't support all tests cases
if !iperf.TestSupported(nc.Profile) {
return npr
}
}
if uperf {
// uperf doesn't support all tests cases
if !uperf_driver.TestSupported(nc.Profile) {
return npr
}
}

npr.Config = nc
npr.Metric = nc.Metric
npr.Service = service
Expand All @@ -404,64 +382,52 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe
log.Debugf("Executing workloads. hostNetwork is %t, service is %t", hostNet, service)
for i := 0; i < nc.Samples; i++ {
nr := sample.Sample{}
if netperfDriver {
driver = drivers.NewDriver("netperf")
npr.Driver = "netperf"
}
if iperf3 {
driver = drivers.NewDriver("iperf3")
npr.Driver = "iperf3"
r, err := iperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
if err != nil {
log.Error(err)
os.Exit(1)
}
nr, err = iperf.ParseResults(&r)
if err != nil {
log.Error(err)
os.Exit(1)
}
} else if uperf {
}
if uperf {
driver = drivers.NewDriver("uperf")
npr.Driver = "uperf"
r, err := uperf_driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
if err != nil {
log.Error(err)
os.Exit(1)
}
nr, err = uperf_driver.ParseResults(&r)
if err != nil {
log.Error(err)
os.Exit(1)
}
} else {
npr.Driver = "netperf"
r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
if err != nil {
log.Error(err)
os.Exit(1)
}
nr, err = netperf.ParseResults(&r)
if err != nil {
log.Error(err)
try := 0
success := false
// Retry the current test.
for try < retry {
log.Warn("Rerunning test.")
r, err := netperf.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
if err != nil {
log.Error(err)
continue
}
nr, err = netperf.ParseResults(&r)
if err != nil {
log.Error(err)
try++
} else {
success = true
break
}
}
// Check if test is supported
if driver.IsTestSupported(nc.Profile) {
log.Warnf("Test %s is not supported with driver %s. Skipping.", nc.Profile, npr.Driver)
return npr
}
r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
if err != nil {
log.Fatal(err)
}
nr, err = driver.ParseResults(&r)
if err != nil {
log.Error(err)
try := 0
success := false
// Retry the current test.
for try < retry {
log.Warn("Rerunning test.")
r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP)
if err != nil {
log.Error(err)
continue
}
if !success {
log.Error("test was unsuccessful after retry.")
os.Exit(1)
nr, err = driver.ParseResults(&r)
if err != nil {
log.Error(err)
try++
} else {
success = true
break
}
}
if !success {
log.Fatal("test was unsuccessful after retry.")
}
}
npr.LossSummary = append(npr.LossSummary, float64(nr.LossPercent))
npr.RetransmitSummary = append(npr.RetransmitSummary, nr.Retransmits)
Expand All @@ -479,8 +445,9 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, ipe

func main() {
rootCmd.Flags().StringVar(&cfgfile, "config", "netperf.yml", "K8s netperf Configuration File")
rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver (along with netperf)")
rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver (along with netperf)")
rootCmd.Flags().BoolVar(&netperf, "netperf", true, "Use netperf as load driver")
rootCmd.Flags().BoolVar(&iperf3, "iperf", false, "Use iperf3 as load driver")
rootCmd.Flags().BoolVar(&uperf, "uperf", false, "Use uperf as load driver")
rootCmd.Flags().BoolVar(&clean, "clean", true, "Clean-up resources created by k8s-netperf")
rootCmd.Flags().BoolVar(&json, "json", false, "Instead of human-readable output, return JSON to stdout")
rootCmd.Flags().BoolVar(&nl, "local", false, "Run network performance tests with Server-Pods/Client-Pods on the same Node")
Expand All @@ -493,10 +460,7 @@ func main() {
rootCmd.Flags().BoolVar(&showMetrics, "metrics", false, "Show all system metrics retrieved from prom")
rootCmd.Flags().Float64Var(&tcpt, "tcp-tolerance", 10, "Allowed %diff from hostNetwork to podNetwork, anything above tolerance will result in k8s-netperf exiting 1.")
rootCmd.Flags().BoolVar(&version, "version", false, "k8s-netperf version")

if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
log.Fatal(err)
}

}
49 changes: 49 additions & 0 deletions pkg/drivers/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package drivers

import (
"bytes"

"github.com/cloud-bulldozer/k8s-netperf/pkg/config"
"github.com/cloud-bulldozer/k8s-netperf/pkg/sample"
apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type Driver interface {
IsTestSupported(string) bool
Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error)
ParseResults(stdout *bytes.Buffer) (sample.Sample, error)
}

type netperf struct {
driverName string
}

type iperf3 struct {
driverName string
}

type uperf struct {
driverName string
}

// NewDriver creates a new driver based on the provided driver name.
//
// It takes a string parameter `driverName` and returns a Driver.
func NewDriver(driverName string) Driver {
switch driverName {
case "iperf3":
return &iperf3{
driverName: driverName,
}
case "uperf":
return &uperf{
driverName: driverName,
}
default:
return &netperf{
driverName: driverName,
}
}
}
Loading

0 comments on commit 9d1bc90

Please sign in to comment.