diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 7f078391..2975550d 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -207,12 +207,12 @@ var rootCmd = &cobra.Command{ var pr result.Data for _, driver := range requestedDrivers { if s.HostNetwork && !nc.Service { - pr = executeWorkload(nc, s, true, driver) + pr = executeWorkload(nc, s, true, driver, false) if len(pr.Profile) > 1 { sr.Results = append(sr.Results, pr) } } - pr = executeWorkload(nc, s, false, driver) + pr = executeWorkload(nc, s, false, driver, false) if len(pr.Profile) > 1 { sr.Results = append(sr.Results, pr) } @@ -220,18 +220,44 @@ var rootCmd = &cobra.Command{ } } else { log.Info("Connecting via ssh to the VMI") - err = k8s.SSHConnect(&s) + client, err := k8s.SSHConnect(&s) if err != nil { log.Fatal(err) } + s.SSHClient = client + for _, nc := range s.Configs { + // Determine the metric for the test + metric := string("OP/s") + if strings.Contains(nc.Profile, "STREAM") { + metric = "Mb/s" + } + nc.Metric = metric + nc.AcrossAZ = acrossAZ + // No need to run hostNetwork through Service. + var pr result.Data + for _, driver := range requestedDrivers { + if s.HostNetwork && !nc.Service { + pr = executeWorkload(nc, s, true, driver, true) + if len(pr.Profile) > 1 { + sr.Results = append(sr.Results, pr) + } + } + pr = executeWorkload(nc, s, false, driver, true) + if len(pr.Profile) > 1 { + sr.Results = append(sr.Results, pr) + } + } + } } if pavail { for i, npr := range sr.Results { - sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime) - sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime) - sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime) - sr.Results[i].ServerPodCPU, _ = metrics.TopPodCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime) + if len(npr.ClientNodeInfo.Hostname) > 0 && len(npr.ServerNodeInfo.Hostname) > 0 { + sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime) + sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime) + sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime) + sr.Results[i].ServerPodCPU, _ = metrics.TopPodCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime) + } } } @@ -332,14 +358,17 @@ func cleanup(client *kubernetes.Clientset) { } // executeWorkload executes the workload and returns the result data. -func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, driverName string) result.Data { +func executeWorkload(nc config.Config, + s config.PerfScenarios, + hostNet bool, + driverName string, virt bool) result.Data { serverIP := "" Client := s.Client var driver drivers.Driver if nc.Service { - if iperf3 { + if driverName == "iperf3" { serverIP = s.IperfService.Spec.ClusterIP - } else if uperf { + } else if driverName == "uperf" { serverIP = s.UperfService.Spec.ClusterIP } else { serverIP = s.NetperfService.Spec.ClusterIP @@ -387,7 +416,7 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, dri log.Warnf("Test %s is not supported with driver %s. Skipping.", nc.Profile, npr.Driver) return npr } - r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP, &s) if err != nil { log.Fatal(err) } @@ -399,7 +428,7 @@ func executeWorkload(nc config.Config, s config.PerfScenarios, hostNet bool, dri // Retry the current test. for try < retry { log.Warn("Rerunning test.") - r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP) + r, err := driver.Run(s.ClientSet, s.RestConfig, nc, Client, serverIP, &s) if err != nil { log.Error(err) continue diff --git a/pkg/config/config.go b/pkg/config/config.go index d4382b56..e40d08fa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -6,6 +6,7 @@ import ( "regexp" kubevirtv1 "github.com/cloud-bulldozer/k8s-netperf/pkg/kubevirt/client-go/clientset/versioned/typed/core/v1" + "github.com/melbahja/goph" apiv1 "k8s.io/api/core/v1" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" @@ -50,6 +51,7 @@ type PerfScenarios struct { ClientSet *kubernetes.Clientset KClient *kubevirtv1.KubevirtV1Client DClient *dynamic.DynamicClient + SSHClient *goph.Client } // Tests we will support in k8s-netperf diff --git a/pkg/drivers/driver.go b/pkg/drivers/driver.go index cd265c76..7359115a 100644 --- a/pkg/drivers/driver.go +++ b/pkg/drivers/driver.go @@ -12,7 +12,7 @@ import ( type Driver interface { IsTestSupported(string) bool - Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) + Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string, perf *config.PerfScenarios) (bytes.Buffer, error) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) } diff --git a/pkg/drivers/iperf.go b/pkg/drivers/iperf.go index e98652f3..5e99334b 100644 --- a/pkg/drivers/iperf.go +++ b/pkg/drivers/iperf.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "time" "encoding/json" @@ -51,7 +52,11 @@ func (i *iperf3) IsTestSupported(test string) bool { } // Run will invoke iperf3 in a client container -func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (i *iperf3) Run(c *kubernetes.Clientset, + rc rest.Config, + nc config.Config, + client apiv1.PodList, + serverIP string, perf *config.PerfScenarios) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer id := uuid.New() file := fmt.Sprintf("/tmp/iperf-%s", id.String()) @@ -85,68 +90,124 @@ func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, } } log.Debug(cmd) - req := c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return stdout, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return stdout, err + if !perf.VM { + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + } else { + retry := 3 + present := false + sshclient, err := k8s.SSHConnect(perf) + if err != nil { + return stdout, err + } + for i := 0; i <= retry; i++ { + log.Debug("⏰ Waiting for iperf3 to be present on VM") + _, err = sshclient.Run("until iperf3 -h; do sleep 30; done") + if err != nil { + time.Sleep(10 * time.Second) + continue + } else { + present = true + break + } + } + if !present { + sshclient.Close() + return stdout, fmt.Errorf("iperf3 binary is not present on the VM") + } + var stdout []byte + ran := false + for i := 0; i <= retry; i++ { + stdout, err = sshclient.Run(strings.Join(cmd[:], " ")) + if err != nil { + log.Debugf("Failed running command %s", err) + log.Debugf("⏰ Retrying iperf3 command -- cloud-init still finishing up") + time.Sleep(60 * time.Second) + continue + } else { + ran = true + break + } + } + sshclient.Close() + if !ran { + return *bytes.NewBuffer(stdout), fmt.Errorf("Unable to run iperf3") + } } //Empty buffer stdout = bytes.Buffer{} stderr = bytes.Buffer{} - req = c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: []string{"cat", file}, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err = remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return stdout, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return stdout, err + if !perf.VM { + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: []string{"cat", file}, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + log.Debug(strings.TrimSpace(stdout.String())) + return stdout, nil + } else { + sshclient, err := k8s.SSHConnect(perf) + if err != nil { + return stdout, err + } + stdout, err := sshclient.Run(fmt.Sprintf("cat %s", file)) + if err != nil { + sshclient.Close() + return *bytes.NewBuffer(stdout), err + } + log.Debug(strings.TrimSpace(bytes.NewBuffer(stdout).String())) + sshclient.Close() + return *bytes.NewBuffer(stdout), nil } - - log.Debug(strings.TrimSpace(stdout.String())) - return stdout, nil } // ParseResults accepts the stdout from the execution of the benchmark. diff --git a/pkg/drivers/netperf.go b/pkg/drivers/netperf.go index ad172440..750c09ba 100644 --- a/pkg/drivers/netperf.go +++ b/pkg/drivers/netperf.go @@ -7,6 +7,7 @@ import ( "math" "strconv" "strings" + "time" apiv1 "k8s.io/api/core/v1" @@ -35,7 +36,7 @@ const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_r // Run will use the k8s client to run the netperf binary in the container image // it will return a bytes.Buffer of the stdout. -func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string, perf *config.PerfScenarios) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer pod := client.Items[0] log.Debugf("🔥 Client (%s,%s) starting netperf against server: %s", pod.Name, pod.Status.PodIP, serverIP) @@ -49,35 +50,84 @@ func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, "-m", fmt.Sprint(nc.MessageSize), "-R", "1"} log.Debug(cmd) - req := c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return stdout, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return stdout, err + if !perf.VM { + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + log.Debug(strings.TrimSpace(stdout.String())) + return stdout, nil + } else { + retry := 3 + present := false + sshclient, err := k8s.SSHConnect(perf) + if err != nil { + return stdout, err + } + for i := 0; i <= retry; i++ { + log.Debug("⏰ Waiting for netperf to be present on VM") + _, err = sshclient.Run("until which netperf; do sleep 30; done") + if err != nil { + time.Sleep(10 * time.Second) + continue + } else { + present = true + break + } + } + if !present { + sshclient.Close() + return stdout, fmt.Errorf("netperf binary is not present on the VM") + } + var stdout []byte + ran := false + for i := 0; i <= retry; i++ { + _, err = sshclient.Run(fmt.Sprintf("netperf -H %s -l 1 -- %s", serverIP, strconv.Itoa(k8s.NetperfServerDataPort))) + if err != nil { + log.Debugf("Failed running command %s", err) + log.Debugf("⏰ Retrying netperf command -- cloud-init still finishing up") + time.Sleep(60 * time.Second) + continue + } else { + ran = true + break + } + } + stdout, err = sshclient.Run(strings.Join(cmd[:], " ")) + if err != nil { + return *bytes.NewBuffer(stdout), fmt.Errorf("Failed running command %s", err) + } + sshclient.Close() + if !ran { + return *bytes.NewBuffer(stdout), fmt.Errorf("Unable to run iperf3") + } else { + log.Debug(bytes.NewBuffer(stdout)) + return *bytes.NewBuffer(stdout), nil + } } - log.Debug(strings.TrimSpace(stdout.String())) - return stdout, nil } // ParseResults accepts the stdout from the execution of the benchmark. It also needs diff --git a/pkg/drivers/uperf.go b/pkg/drivers/uperf.go index 60a1a160..93e99b85 100644 --- a/pkg/drivers/uperf.go +++ b/pkg/drivers/uperf.go @@ -7,6 +7,7 @@ import ( "regexp" "strconv" "strings" + "time" apiv1 "k8s.io/api/core/v1" @@ -51,7 +52,7 @@ func (u *uperf) IsTestSupported(test string) bool { // uperf needs "rr" or "stream" profiles which are config files passed to uperf command through -m option // We need to create these profiles based on the test using provided configuration -func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Config, pod apiv1.Pod, serverIP string) (string, error) { +func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Config, pod apiv1.Pod, serverIP string, perf *config.PerfScenarios) (string, error) { var stdout, stderr bytes.Buffer var fileContent string @@ -96,48 +97,65 @@ func createUperfProfile(c *kubernetes.Clientset, rc rest.Config, nc config.Confi filePath = fmt.Sprintf("/tmp/uperf-rr-%s-%d-%d", protocol, nc.MessageSize, nc.Parallelism) } - var cmd []string - uperfCmd := "echo '" + fileContent + "' > " + filePath - cmd = []string{"bash", "-c", uperfCmd} - //Empty buffer stdout = bytes.Buffer{} - req := c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return filePath, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return filePath, err - } + if !perf.VM { + var cmd []string + uperfCmd := "echo '" + fileContent + "' > " + filePath + cmd = []string{"bash", "-c", uperfCmd} + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return filePath, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return filePath, err + } - log.Debug(strings.TrimSpace(stdout.String())) + log.Debug(strings.TrimSpace(stdout.String())) + return filePath, nil + } else { + + var cmd []string + uperfCmd := "echo '" + fileContent + "' > " + filePath + cmd = []string{uperfCmd} + sshclient, err := k8s.SSHConnect(perf) + if err != nil { + return filePath, err + } + log.Debug(strings.Join(cmd[:], " ")) + _, err = sshclient.Run(strings.Join(cmd[:], " ")) + if err != nil { + return filePath, err + } + sshclient.Close() + } return filePath, nil } // Run will invoke uperf in a client container -func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { +func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string, perf *config.PerfScenarios) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer var exec remotecommand.Executor @@ -145,7 +163,8 @@ func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, c log.Debugf("🔥 Client (%s,%s) starting uperf against server: %s", pod.Name, pod.Status.PodIP, serverIP) config.Show(nc, u.driverName) - filePath, err := createUperfProfile(c, rc, nc, pod, serverIP) + log.Debug("Creating uperf configuration file") + filePath, err := createUperfProfile(c, rc, nc, pod, serverIP, perf) if err != nil { return stdout, err } @@ -157,35 +176,78 @@ func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, c cmd := []string{"uperf", "-v", "-a", "-R", "-i", "1", "-m", filePath, "-P", fmt.Sprint(k8s.UperfServerCtlPort)} log.Debug(cmd) - req := c.CoreV1().RESTClient(). - Post(). - Namespace(pod.Namespace). - Resource("pods"). - Name(pod.Name). - SubResource("exec"). - VersionedParams(&apiv1.PodExecOptions{ - Container: pod.Spec.Containers[0].Name, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: true, - }, scheme.ParameterCodec) - exec, err = remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) - if err != nil { - return stdout, err - } - // Connect this process' std{in,out,err} to the remote shell process. - err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - }) - if err != nil { - return stdout, err + if !perf.VM { + req := c.CoreV1().RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&apiv1.PodExecOptions{ + Container: pod.Spec.Containers[0].Name, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: true, + }, scheme.ParameterCodec) + exec, err = remotecommand.NewSPDYExecutor(&rc, "POST", req.URL()) + if err != nil { + return stdout, err + } + // Connect this process' std{in,out,err} to the remote shell process. + err = exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return stdout, err + } + return stdout, nil + } else { + retry := 3 + present := false + sshclient, err := k8s.SSHConnect(perf) + if err != nil { + return stdout, err + } + for i := 0; i <= retry; i++ { + log.Debug("⏰ Waiting for uperf to be present on VM") + _, err = sshclient.Run("until uperf -h; do sleep 30; done") + if err != nil { + time.Sleep(10 * time.Second) + continue + } else { + present = true + break + } + } + if !present { + sshclient.Close() + return stdout, fmt.Errorf("uperf binary is not present on the VM") + } + var stdout []byte + ran := false + for i := 0; i <= retry; i++ { + stdout, err = sshclient.Run(strings.Join(cmd[:], " ")) + if err != nil { + log.Debugf("Failed running command %s", err) + log.Debugf("⏰ Retrying uperf command -- cloud-init still finishing up") + time.Sleep(60 * time.Second) + continue + } else { + ran = true + break + } + } + sshclient.Close() + if !ran { + return *bytes.NewBuffer(stdout), fmt.Errorf("Unable to run uperf") + } else { + return *bytes.NewBuffer(stdout), nil + } } - - return stdout, nil } // ParseResults accepts the stdout from the execution of the benchmark. @@ -197,6 +259,11 @@ func (u *uperf) ParseResults(stdout *bytes.Buffer) (sample.Sample, error) { transactions := regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)\r`).FindAllStringSubmatch(stdout.String(), -1) + // VM output does not have the \r. + if len(transactions) < 1 { + transactions = regexp.MustCompile(`timestamp_ms:(.*) name:Txn2 nr_bytes:(.*) nr_ops:(.*)`).FindAllStringSubmatch(stdout.String(), -1) + } + var prevTimestamp, normLtcy float64 var prevBytes, prevOps, normOps float64 var byteSummary, latSummary, opSummary []float64 diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go index 46553de8..4dfb95b0 100644 --- a/pkg/k8s/kubernetes.go +++ b/pkg/k8s/kubernetes.go @@ -317,6 +317,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } s.VMHost = host WaitForVMI(s.KClient, clientAcrossRole) + s.ClientAcross, err = GetNakedPods(s.ClientSet, fmt.Sprintf("app=%s", clientAcrossRole)) + if err != nil { + return err + } + s.ClientNodeInfo, _ = GetNakedPodNodeInfo(client, fmt.Sprintf("app=%s", clientAcrossRole)) } } if !s.VM { @@ -331,6 +336,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } s.VMHost = host WaitForVMI(s.KClient, clientAcrossRole) + s.ClientAcross, err = GetNakedPods(s.ClientSet, fmt.Sprintf("app=%s", clientAcrossRole)) + if err != nil { + return err + } + s.ClientNodeInfo, _ = GetNakedPodNodeInfo(client, fmt.Sprintf("app=%s", clientAcrossRole)) } } @@ -426,6 +436,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { return err } WaitForVMI(s.KClient, serverRole) + s.ServerHost, err = GetNakedPods(s.ClientSet, fmt.Sprintf("app=%s", serverRole)) + if err != nil { + return err + } + s.ServerNodeInfo, _ = GetNakedPodNodeInfo(client, fmt.Sprintf("app=%s", serverRole)) } } } @@ -447,6 +462,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { return err } WaitForVMI(s.KClient, serverRole) + s.Server, err = GetNakedPods(s.ClientSet, fmt.Sprintf("app=%s", serverRole)) + if err != nil { + return err + } + s.ServerNodeInfo, _ = GetNakedPodNodeInfo(client, fmt.Sprintf("app=%s", serverRole)) } return nil @@ -615,6 +635,7 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv // GetNodeLabels Return Labels for a specific node func GetNodeLabels(c *kubernetes.Clientset, node string) (map[string]string, error) { + log.Debugf("Looking for Node labels for node - %s", node) nodeInfo, err := c.CoreV1().Nodes().Get(context.TODO(), node, metav1.GetOptions{}) if err != nil { return nil, err @@ -650,6 +671,29 @@ func GetPodNodeInfo(c *kubernetes.Clientset, dp DeploymentParams) (metrics.NodeI return info, nil } +// GetNakedPodNodeInfo collects the node information for a specific pod +func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) { + var info metrics.NodeInfo + listOpt := metav1.ListOptions{ + LabelSelector: label, + } + pods, err := c.CoreV1().Pods(namespace).List(context.TODO(), listOpt) + if err != nil { + return info, fmt.Errorf("❌ Failure to capture pods: %v", err) + } + for pod := range pods.Items { + p := pods.Items[pod] + if pods.Items[pod].DeletionTimestamp != nil { + continue + } else { + info.IP = p.Status.HostIP + info.Hostname = p.Spec.NodeName + } + } + log.Debugf("Machine with lablel %s is Running on %s with IP %s", label, info.Hostname, info.IP) + return info, nil +} + // GetPods searches for a specific set of pods from DeploymentParms // It returns a PodList if the deployment is found. // NOTE : Since we can update the replicas to be > 1, is why I return a PodList. @@ -677,6 +721,27 @@ func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (corev1.PodList, erro return npl, nil } +func GetNakedPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) { + npl := corev1.PodList{} + listOpt := metav1.ListOptions{ + LabelSelector: label, + } + log.Infof("Looking for pods with label %s", fmt.Sprint(label)) + pods, err := c.CoreV1().Pods(namespace).List(context.TODO(), listOpt) + if err != nil { + return npl, fmt.Errorf("❌ Failure to capture pods: %v", err) + } + for pod := range pods.Items { + if pods.Items[pod].DeletionTimestamp != nil { + continue + } else { + npl.Items = append(npl.Items, pods.Items[pod]) + } + } + return npl, nil + +} + // CreateService will build a k8s service func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*corev1.Service, error) { s, err := client.CoreV1().Services(sp.Namespace).Get(context.TODO(), sp.Name, metav1.GetOptions{}) diff --git a/pkg/k8s/kubevirt.go b/pkg/k8s/kubevirt.go index ca0ab859..f2df1095 100644 --- a/pkg/k8s/kubevirt.go +++ b/pkg/k8s/kubevirt.go @@ -45,20 +45,20 @@ func connect(config *goph.Config) (*goph.Client, error) { return nil, fmt.Errorf("Unable to connect via ssh after %d attempts", retry) } -func SSHConnect(conf *config.PerfScenarios) error { +func SSHConnect(conf *config.PerfScenarios) (*goph.Client, error) { dir, err := os.UserHomeDir() if err != nil { - return fmt.Errorf("Unable to retrieve users homedir. %s", err) + return nil, fmt.Errorf("Unable to retrieve users homedir. %s", err) } key := fmt.Sprintf("%s/.ssh/id_rsa", dir) keyd, err := os.ReadFile(key) auth, err := goph.RawKey(string(keyd), "") if err != nil { - return fmt.Errorf("Unable to retrieve sshkey. Error : %s", err) + return nil, fmt.Errorf("Unable to retrieve sshkey. Error : %s", err) } user := "fedora" addr := conf.VMHost - log.Infof("Attempting to connect with : %s@%s", user, addr) + log.Debugf("Attempting to connect with : %s@%s", user, addr) config := goph.Config{ User: user, @@ -70,16 +70,10 @@ func SSHConnect(conf *config.PerfScenarios) error { client, err := connect(&config) if err != nil { - return fmt.Errorf("Unable to connect via ssh. Error: %s", err) + return nil, fmt.Errorf("Unable to connect via ssh. Error: %s", err) } - defer client.Close() - out, err := client.Run("ls /etc") - if err != nil { - return err - } - log.Info(string(out)) - return nil + return client, nil } func createCommService(client *kubernetes.Clientset, label map[string]string, name string) error { @@ -182,7 +176,19 @@ ssh_deletekeys: false password: fedora chpasswd: { expire: False } runcmd: - - dnf install -y uperf iperf3 git ethtool + - export HOME=/home/fedora + - dnf install -y --nodocs uperf iperf3 git ethtool automake gcc bc lksctp-tools-devel texinfo --enablerepo=* + - git clone https://github.com/HewlettPackard/netperf + - cd netperf + - git reset --hard 3bc455b23f901dae377ca0a558e1e32aa56b31c4 + - curl -o netperf.diff https://raw.githubusercontent.com/cloud-bulldozer/k8s-netperf/main/containers/netperf.diff + - git apply netperf.diff + - ./autogen.sh + - ./configure --enable-sctp=yes --enable-demo=yes + - make && make install + - cd + - curl -o /usr/bin/super-netperf https://raw.githubusercontent.com/cloud-bulldozer/k8s-netperf/main/containers/super-netperf + - chmod 0777 /usr/bin/super-netperf `, string(ssh)) _, err = CreateVMI(kclient, name, label, b64.StdEncoding.EncodeToString([]byte(data)), *podAff, *nodeAff) if err != nil { @@ -225,10 +231,21 @@ ssh_deletekeys: false password: fedora chpasswd: { expire: False } runcmd: - - dnf install -y uperf iperf3 git ethtool - - uperf -s -v & - - iperf3 -s & -`, string(ssh)) + - dnf install -y --nodocs uperf iperf3 git ethtool + - dnf install -y --nodocs automake gcc bc lksctp-tools-devel texinfo --enablerepo=* + - git clone https://github.com/HewlettPackard/netperf + - cd netperf + - git reset --hard 3bc455b23f901dae377ca0a558e1e32aa56b31c4 + - curl -o netperf.diff https://raw.githubusercontent.com/cloud-bulldozer/k8s-netperf/main/containers/netperf.diff + - git apply netperf.diff + - ./autogen.sh + - ./configure --enable-sctp=yes --enable-demo=yes + - make && make install + - cd + - uperf -s -v -P %d & + - iperf3 -s -p %d & + - netserver & +`, string(ssh), UperfServerCtlPort, IperfServerCtlPort) return CreateVMI(client, name, label, b64.StdEncoding.EncodeToString([]byte(data)), podAff, nodeAff) } @@ -306,7 +323,7 @@ func CreateVMI(client *kubevirtv1.KubevirtV1Client, name string, label map[strin } func WaitForVMI(client *kubevirtv1.KubevirtV1Client, name string) error { - log.Infof("Wating for VMI (%s) to be in state running", name) + log.Infof("⏰ Wating for VMI (%s) to be in state running", name) vmw, err := client.VirtualMachineInstances(namespace).Watch(context.TODO(), metav1.ListOptions{}) if err != nil { return err @@ -318,7 +335,7 @@ func WaitForVMI(client *kubevirtv1.KubevirtV1Client, name string) error { return fmt.Errorf("Unable to watch VMI %s", name) } if d.Name == name { - log.Infof("Found in state (%s)", d.Status.Phase) + log.Debugf("Found in state (%s)", d.Status.Phase) if d.Status.Phase == "Running" { return nil }