From d6094aab27a63cca7b344fb627696add3f474fa2 Mon Sep 17 00:00:00 2001 From: Raul Sevilla Date: Thu, 22 Feb 2024 11:04:08 +0100 Subject: [PATCH] Parallelism with pod 2 service Signed-off-by: Raul Sevilla --- cmd/k8s-netperf/k8s-netperf.go | 2 +- containers/Containerfile | 5 +-- containers/super-netperf | 69 ++++++++++++++++++++++++++++++++++ pkg/config/config.go | 11 ++---- pkg/drivers/iperf.go | 49 +++++++----------------- pkg/drivers/netperf.go | 7 ++-- pkg/k8s/kubernetes.go | 59 +++++++++++++++++------------ 7 files changed, 128 insertions(+), 74 deletions(-) create mode 100755 containers/super-netperf diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 3e7d30c2..0b4e3076 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -151,7 +151,7 @@ var rootCmd = &cobra.Command{ } else { acrossAZ = true } - + time.Sleep(5 * time.Second) // Wait some seconds to ensure service is ready // Run through each test for _, nc := range s.Configs { // Determine the metric for the test diff --git a/containers/Containerfile b/containers/Containerfile index 26cee777..7e6aee3d 100644 --- a/containers/Containerfile +++ b/containers/Containerfile @@ -32,6 +32,5 @@ WORKDIR ../ RUN rm -rf netperf && \ rm -rf iperf-3.13-mt-beta3 && \ - dnf clean all && \ - curl -O https://raw.githubusercontent.com/jtaleric/tinker/main/networking/super-netperf && \ - chmod +x super-netperf + dnf clean all +COPY super-netperf /usr/bin/super-netperf diff --git a/containers/super-netperf b/containers/super-netperf new file mode 100755 index 00000000..efcf1f2e --- /dev/null +++ b/containers/super-netperf @@ -0,0 +1,69 @@ +#!/usr/bin/env bash +# +# This work came from inspiration from : https://github.com/borkmann/stuff/blob/master/super_netperf +# +# We wanted to take advantage of NetPerf's OMNI output for our work. To do this, we had to make some modifications. +# +# run_netperf(number_of_netperfs) +# +run_netperf() { + loops=$1 + shift + port=$1 + shift + for ((i=0; i&1 > /tmp/result-${i} & + ((port++)) + done + wait + return 0 +} + +# +# Assumption here is the user passed the -- -k rt_latency,p99_latency,throughput,throughput_units +# Which is taking advantage of the OMNI output +# +process_netperf() { + # Flush buffers + sync + tp=0 # Throughput + l=0 # Latency + rtl=0 # RT latency + send=0 + recv=0 + retrans=0 + u="" + top="" + for file in `ls /tmp/result-*`; do + top=$(head -n 1 $file) + t=$(cat $file | grep "THROUGHPUT=" | awk -F= '{print $2}') + s=$(cat $file | grep "LOCAL_SEND_CALLS=" | awk -F= '{print $2}') + r=$(cat $file | grep "REMOTE_RECV_CALLS=" | awk -F= '{print $2}') + rt=$(cat $file | grep "LOCAL_TRANSPORT_RETRANS=" | awk -F= '{print $2}') + rrtl=$(cat $file | grep "RT_LATENCY=" | awk -F= '{print $2}') + if [[ $rrtl == "-1.000" ]]; then + rtl="-1.000" + else + rtl=$(echo $rtl+$rrtl | bc) + fi + rl=$(cat $file | grep "P99_LATENCY=" | awk -F= '{print $2}') + l=$(echo $l+rl | bc) + tp=$(echo $tp+$t | bc) + send=$(echo $send+$s | bc) + recv=$(echo $recv+$r | bc) + retrans=$(echo $retrans+$rt | bc) + u=$(cat $file | grep "UNITS") + filename=$(basename $file) + mv $file /tmp/old-$filename + done + echo "$top" + echo "RT_LATENCY=$rtl" + echo "P99_LATENCY=$rl" + echo "THROUGHPUT=$tp" + echo "LOCAL_TRANSPORT_RETRANS=$retrans" + echo "REMOTE_RECV_CALLS=$recv" + echo "LOCAL_SEND_CALLS=$send" + echo "$u" +} +run_netperf $@ +process_netperf diff --git a/pkg/config/config.go b/pkg/config/config.go index 3486f0ce..8daeb3db 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -2,7 +2,7 @@ package config import ( "fmt" - "io/ioutil" + "os" "regexp" apiv1 "k8s.io/api/core/v1" @@ -67,11 +67,6 @@ func validConfig(cfg Config) (bool, error) { if cfg.Parallelism < 1 { return false, fmt.Errorf("parallelism must be > 0") } - if cfg.Service { - if cfg.Parallelism > 1 { - return false, fmt.Errorf("parallelism must be 1 when using a service") - } - } return true, nil } @@ -80,7 +75,7 @@ func validConfig(cfg Config) (bool, error) { // Returns Config struct func ParseConf(fn string) ([]Config, error) { log.Infof("📒 Reading %s file. ", fn) - buf, err := ioutil.ReadFile(fn) + buf, err := os.ReadFile(fn) if err != nil { return nil, err } @@ -107,7 +102,7 @@ func ParseConf(fn string) ([]Config, error) { // Returns Config struct func ParseV2Conf(fn string) ([]Config, error) { log.Infof("📒 Reading %s file - using ConfigV2 Method. ", fn) - buf, err := ioutil.ReadFile(fn) + buf, err := os.ReadFile(fn) if err != nil { return nil, err } diff --git a/pkg/drivers/iperf.go b/pkg/drivers/iperf.go index 5abf4e20..84f764ef 100644 --- a/pkg/drivers/iperf.go +++ b/pkg/drivers/iperf.go @@ -66,43 +66,22 @@ func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, tcp = false } var cmd []string - if nc.Service { - if tcp { - cmd = []string{"iperf3", "-P", "1", "-c", - serverIP, "-J", "-t", - fmt.Sprint(nc.Duration), - "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(k8s.IperfServerCtlPort), - fmt.Sprintf("--logfile=%s", file), - } - } else { - cmd = []string{"iperf3", "-P", "1", "-c", - serverIP, "-t", - fmt.Sprint(nc.Duration), "-u", "-J", - "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(k8s.IperfServerCtlPort), - "-b", "0", - fmt.Sprintf("--logfile=%s", file), - } + if tcp { + cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c", + serverIP, "-t", + fmt.Sprint(nc.Duration), + "-l", fmt.Sprint(nc.MessageSize), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), + fmt.Sprintf("--logfile=%s", file), } } else { - if tcp { - cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c", - serverIP, "-t", - fmt.Sprint(nc.Duration), - "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(k8s.IperfServerCtlPort), - fmt.Sprintf("--logfile=%s", file), - } - } else { - cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c", - serverIP, "-t", - fmt.Sprint(nc.Duration), "-u", - "-l", fmt.Sprint(nc.MessageSize), - "-p", fmt.Sprint(k8s.IperfServerCtlPort), - "-b", "0", - fmt.Sprintf("--logfile=%s", file), - } + cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c", + serverIP, "-t", + fmt.Sprint(nc.Duration), "-u", + "-l", fmt.Sprint(nc.MessageSize), + "-p", fmt.Sprint(k8s.IperfServerCtlPort), + "-b", "0", + fmt.Sprintf("--logfile=%s", file), } } log.Debug(cmd) diff --git a/pkg/drivers/netperf.go b/pkg/drivers/netperf.go index f0ad8144..a4239e19 100644 --- a/pkg/drivers/netperf.go +++ b/pkg/drivers/netperf.go @@ -28,6 +28,8 @@ func init() { } } +const superNetperf = "super-netperf" + // omniOptions are netperf specific options that we will pass to the netperf client. const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_recv_calls,local_send_calls,local_transport_retrans" @@ -40,17 +42,16 @@ func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, config.Show(nc, n.driverName) var cmd []string if nc.Service { - cmd = []string{"bash", "super-netperf", "1", "-H", + cmd = []string{superNetperf, strconv.Itoa(nc.Parallelism), strconv.Itoa(k8s.NetperfServerDataPort), "-H", serverIP, "-l", fmt.Sprint(nc.Duration), "-t", nc.Profile, "--", "-k", fmt.Sprint(omniOptions), "-m", fmt.Sprint(nc.MessageSize), - "-P", fmt.Sprint(k8s.NetperfServerDataPort), "-R", "1"} } else { - cmd = []string{"bash", "super-netperf", strconv.Itoa(nc.Parallelism), "-H", + cmd = []string{superNetperf, strconv.Itoa(nc.Parallelism), strconv.Itoa(k8s.NetperfServerDataPort), "-H", serverIP, "-l", fmt.Sprint(nc.Duration), "-t", nc.Profile, diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go index 615614be..a6d8b6dd 100644 --- a/pkg/k8s/kubernetes.go +++ b/pkg/k8s/kubernetes.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/utils/pointer" ) // DeploymentParams describes the deployment @@ -37,7 +38,7 @@ type ServiceParams struct { Namespace string Labels map[string]string CtlPort int32 - DataPort int32 + DataPorts []int32 } const sa string = "netperf" @@ -66,6 +67,7 @@ const clientRole = "client-local" const clientAcrossRole = "client-across" const hostNetServerRole = "host-server" const hostNetClientRole = "host-client" +const k8sNetperfImage = "quay.io/rsevilla/k8s-netperf:latest" // BuildSUT Build the k8s env to run network performance tests func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { @@ -141,7 +143,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Name: "client", Namespace: "netperf", Replicas: 1, - Image: "quay.io/cloud-bulldozer/netperf:latest", + Image: k8sNetperfImage, Labels: map[string]string{"role": clientRole}, Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, @@ -168,7 +170,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Namespace: "netperf", Labels: map[string]string{"role": serverRole}, CtlPort: IperfServerCtlPort, - DataPort: IperfServerDataPort, + DataPorts: []int32{IperfServerDataPort}, } s.IperfService, err = CreateService(iperfSVC, client) if err != nil { @@ -181,7 +183,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Namespace: "netperf", Labels: map[string]string{"role": serverRole}, CtlPort: UperfServerCtlPort, - DataPort: UperfServerDataPort, + DataPorts: []int32{UperfServerDataPort}, } s.UperfService, err = CreateService(uperfSVC, client) if err != nil { @@ -189,12 +191,16 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } // Create netperf service + var netperfDataPorts []int32 + for i := 0; i < 16; i++ { + netperfDataPorts = append(netperfDataPorts, NetperfServerDataPort+int32(i)) + } netperfSVC := ServiceParams{ Name: "netperf-service", Namespace: "netperf", Labels: map[string]string{"role": serverRole}, CtlPort: NetperfServerCtlPort, - DataPort: NetperfServerDataPort, + DataPorts: netperfDataPorts, } s.NetperfService, err = CreateService(netperfSVC, client) if err != nil { @@ -204,7 +210,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Name: "client-across", Namespace: "netperf", Replicas: 1, - Image: "quay.io/cloud-bulldozer/netperf:latest", + Image: k8sNetperfImage, Labels: map[string]string{"role": clientAcrossRole}, Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, @@ -218,7 +224,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Namespace: "netperf", Replicas: 1, HostNetwork: true, - Image: "quay.io/cloud-bulldozer/netperf:latest", + Image: k8sNetperfImage, Labels: map[string]string{"role": hostNetClientRole}, Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, @@ -266,7 +272,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Namespace: "netperf", Replicas: 1, HostNetwork: true, - Image: "quay.io/cloud-bulldozer/netperf:latest", + Image: k8sNetperfImage, Labels: map[string]string{"role": hostNetServerRole}, Commands: dpCommands, Port: NetperfServerCtlPort, @@ -276,7 +282,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Name: "server", Namespace: "netperf", Replicas: 1, - Image: "quay.io/cloud-bulldozer/netperf:latest", + Image: k8sNetperfImage, Labels: map[string]string{"role": serverRole}, Commands: dpCommands, Port: NetperfServerCtlPort, @@ -500,9 +506,10 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv }, }, Spec: apiv1.PodSpec{ - ServiceAccountName: sa, - HostNetwork: dp.HostNetwork, - Containers: cmdContainers, + TerminationGracePeriodSeconds: pointer.Int64(1), + ServiceAccountName: sa, + HostNetwork: dp.HostNetwork, + Containers: cmdContainers, Affinity: &apiv1.Affinity{ NodeAffinity: &dp.NodeAffinity, PodAffinity: &dp.PodAffinity, @@ -608,23 +615,27 @@ func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Servi TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.CtlPort)), Port: sp.CtlPort, }, - { - Name: fmt.Sprintf("%s-data-tcp", sp.Name), - Protocol: apiv1.ProtocolTCP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)), - Port: sp.DataPort, - }, - { - Name: fmt.Sprintf("%s-data-udp", sp.Name), - Protocol: apiv1.ProtocolUDP, - TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)), - Port: sp.DataPort, - }, }, Type: apiv1.ServiceType("ClusterIP"), Selector: sp.Labels, }, } + for _, port := range sp.DataPorts { + service.Spec.Ports = append(service.Spec.Ports, + apiv1.ServicePort{ + Name: fmt.Sprintf("%s-tcp-%d", sp.Name, port), + Protocol: apiv1.ProtocolTCP, + TargetPort: intstr.Parse(fmt.Sprintf("%d", port)), + Port: port, + }, + apiv1.ServicePort{ + Name: fmt.Sprintf("%s-udp-%d", sp.Name, port), + Protocol: apiv1.ProtocolUDP, + TargetPort: intstr.Parse(fmt.Sprintf("%d", port)), + Port: port, + }, + ) + } return sc.Create(context.TODO(), service, metav1.CreateOptions{}) }