Skip to content

Commit

Permalink
Parallelism in pod 2 service
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Sevilla <[email protected]>
  • Loading branch information
rsevilla87 committed Feb 22, 2024
1 parent c99337e commit 2ac5e86
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 74 deletions.
2 changes: 1 addition & 1 deletion cmd/k8s-netperf/k8s-netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var rootCmd = &cobra.Command{
} else {
acrossAZ = true
}

time.Sleep(5 * time.Second) // Wait some seconds to ensure service is ready
// Run through each test
for _, nc := range s.Configs {
// Determine the metric for the test
Expand Down
5 changes: 2 additions & 3 deletions containers/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ WORKDIR ../

RUN rm -rf netperf && \
rm -rf iperf-3.13-mt-beta3 && \
dnf clean all && \
curl -O https://raw.githubusercontent.com/jtaleric/tinker/main/networking/super-netperf && \
chmod +x super-netperf
dnf clean all
COPY super-netperf /usr/bin/super-netperf
69 changes: 69 additions & 0 deletions containers/super-netperf
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env bash
#
# This work came from inspiration from : https://github.com/borkmann/stuff/blob/master/super_netperf
#
# We wanted to take advantage of NetPerf's OMNI output for our work. To do this, we had to make some modifications.
#
# run_netperf(number_of_netperfs)
#
run_netperf() {
loops=$1
shift
port=$1
shift
for ((i=0; i<loops; i++)); do
netperf -s 2 $@ -P $port 2>&1 > /tmp/result-${i} &
((port++))
done
wait
return 0
}

#
# Assumption here is the user passed the -- -k rt_latency,p99_latency,throughput,throughput_units
# Which is taking advantage of the OMNI output
#
process_netperf() {
# Flush buffers
sync
tp=0 # Throughput
l=0 # Latency
rtl=0 # RT latency
send=0
recv=0
retrans=0
u=""
top=""
for file in `ls /tmp/result-*`; do
top=$(head -n 1 $file)
t=$(cat $file | grep "THROUGHPUT=" | awk -F= '{print $2}')
s=$(cat $file | grep "LOCAL_SEND_CALLS=" | awk -F= '{print $2}')
r=$(cat $file | grep "REMOTE_RECV_CALLS=" | awk -F= '{print $2}')
rt=$(cat $file | grep "LOCAL_TRANSPORT_RETRANS=" | awk -F= '{print $2}')
rrtl=$(cat $file | grep "RT_LATENCY=" | awk -F= '{print $2}')
if [[ $rrtl == "-1.000" ]]; then
rtl="-1.000"
else
rtl=$(echo $rtl+$rrtl | bc)
fi
rl=$(cat $file | grep "P99_LATENCY=" | awk -F= '{print $2}')
l=$(echo $l+rl | bc)
tp=$(echo $tp+$t | bc)
send=$(echo $send+$s | bc)
recv=$(echo $recv+$r | bc)
retrans=$(echo $retrans+$rt | bc)
u=$(cat $file | grep "UNITS")
filename=$(basename $file)
mv $file /tmp/old-$filename
done
echo "$top"
echo "RT_LATENCY=$rtl"
echo "P99_LATENCY=$rl"
echo "THROUGHPUT=$tp"
echo "LOCAL_TRANSPORT_RETRANS=$retrans"
echo "REMOTE_RECV_CALLS=$recv"
echo "LOCAL_SEND_CALLS=$send"
echo "$u"
}
run_netperf $@
process_netperf
11 changes: 3 additions & 8 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

import (
"fmt"
"io/ioutil"
"os"
"regexp"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,11 +67,6 @@ func validConfig(cfg Config) (bool, error) {
if cfg.Parallelism < 1 {
return false, fmt.Errorf("parallelism must be > 0")
}
if cfg.Service {
if cfg.Parallelism > 1 {
return false, fmt.Errorf("parallelism must be 1 when using a service")
}
}
return true, nil
}

Expand All @@ -80,7 +75,7 @@ func validConfig(cfg Config) (bool, error) {
// Returns Config struct
func ParseConf(fn string) ([]Config, error) {
log.Infof("📒 Reading %s file. ", fn)
buf, err := ioutil.ReadFile(fn)
buf, err := os.ReadFile(fn)
if err != nil {
return nil, err
}
Expand All @@ -107,7 +102,7 @@ func ParseConf(fn string) ([]Config, error) {
// Returns Config struct
func ParseV2Conf(fn string) ([]Config, error) {
log.Infof("📒 Reading %s file - using ConfigV2 Method. ", fn)
buf, err := ioutil.ReadFile(fn)
buf, err := os.ReadFile(fn)
if err != nil {
return nil, err
}
Expand Down
49 changes: 14 additions & 35 deletions pkg/drivers/iperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,22 @@ func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config,
tcp = false
}
var cmd []string
if nc.Service {
if tcp {
cmd = []string{"iperf3", "-P", "1", "-c",
serverIP, "-J", "-t",
fmt.Sprint(nc.Duration),
"-l", fmt.Sprint(nc.MessageSize),
"-p", fmt.Sprint(k8s.IperfServerCtlPort),
fmt.Sprintf("--logfile=%s", file),
}
} else {
cmd = []string{"iperf3", "-P", "1", "-c",
serverIP, "-t",
fmt.Sprint(nc.Duration), "-u", "-J",
"-l", fmt.Sprint(nc.MessageSize),
"-p", fmt.Sprint(k8s.IperfServerCtlPort),
"-b", "0",
fmt.Sprintf("--logfile=%s", file),
}
if tcp {
cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c",
serverIP, "-t",
fmt.Sprint(nc.Duration),
"-l", fmt.Sprint(nc.MessageSize),
"-p", fmt.Sprint(k8s.IperfServerCtlPort),
fmt.Sprintf("--logfile=%s", file),
}
} else {
if tcp {
cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c",
serverIP, "-t",
fmt.Sprint(nc.Duration),
"-l", fmt.Sprint(nc.MessageSize),
"-p", fmt.Sprint(k8s.IperfServerCtlPort),
fmt.Sprintf("--logfile=%s", file),
}
} else {
cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c",
serverIP, "-t",
fmt.Sprint(nc.Duration), "-u",
"-l", fmt.Sprint(nc.MessageSize),
"-p", fmt.Sprint(k8s.IperfServerCtlPort),
"-b", "0",
fmt.Sprintf("--logfile=%s", file),
}
cmd = []string{"iperf3", "-J", "-P", strconv.Itoa(nc.Parallelism), "-c",
serverIP, "-t",
fmt.Sprint(nc.Duration), "-u",
"-l", fmt.Sprint(nc.MessageSize),
"-p", fmt.Sprint(k8s.IperfServerCtlPort),
"-b", "0",
fmt.Sprintf("--logfile=%s", file),
}
}
log.Debug(cmd)
Expand Down
7 changes: 4 additions & 3 deletions pkg/drivers/netperf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func init() {
}
}

const superNetperf = "super-netperf"

// omniOptions are netperf specific options that we will pass to the netperf client.
const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_recv_calls,local_send_calls,local_transport_retrans"

Expand All @@ -40,17 +42,16 @@ func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config,
config.Show(nc, n.driverName)
var cmd []string
if nc.Service {
cmd = []string{"bash", "super-netperf", "1", "-H",
cmd = []string{superNetperf, strconv.Itoa(nc.Parallelism), strconv.Itoa(k8s.NetperfServerDataPort), "-H",
serverIP, "-l",
fmt.Sprint(nc.Duration),
"-t", nc.Profile,
"--",
"-k", fmt.Sprint(omniOptions),
"-m", fmt.Sprint(nc.MessageSize),
"-P", fmt.Sprint(k8s.NetperfServerDataPort),
"-R", "1"}
} else {
cmd = []string{"bash", "super-netperf", strconv.Itoa(nc.Parallelism), "-H",
cmd = []string{superNetperf, strconv.Itoa(nc.Parallelism), strconv.Itoa(k8s.NetperfServerDataPort), "-H",
serverIP, "-l",
fmt.Sprint(nc.Duration),
"-t", nc.Profile,
Expand Down
59 changes: 35 additions & 24 deletions pkg/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
)

// DeploymentParams describes the deployment
Expand All @@ -37,7 +38,7 @@ type ServiceParams struct {
Namespace string
Labels map[string]string
CtlPort int32
DataPort int32
DataPorts []int32
}

const sa string = "netperf"
Expand Down Expand Up @@ -66,6 +67,7 @@ const clientRole = "client-local"
const clientAcrossRole = "client-across"
const hostNetServerRole = "host-server"
const hostNetClientRole = "host-client"
const k8sNetperfImage = "quay.io/rsevilla/k8s-netperf:latest"

// BuildSUT Build the k8s env to run network performance tests
func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Expand Down Expand Up @@ -141,7 +143,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Name: "client",
Namespace: "netperf",
Replicas: 1,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Image: k8sNetperfImage,
Labels: map[string]string{"role": clientRole},
Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}},
Port: NetperfServerCtlPort,
Expand All @@ -168,7 +170,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Namespace: "netperf",
Labels: map[string]string{"role": serverRole},
CtlPort: IperfServerCtlPort,
DataPort: IperfServerDataPort,
DataPorts: []int32{IperfServerDataPort},
}
s.IperfService, err = CreateService(iperfSVC, client)
if err != nil {
Expand All @@ -181,20 +183,24 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Namespace: "netperf",
Labels: map[string]string{"role": serverRole},
CtlPort: UperfServerCtlPort,
DataPort: UperfServerDataPort,
DataPorts: []int32{UperfServerDataPort},
}
s.UperfService, err = CreateService(uperfSVC, client)
if err != nil {
return fmt.Errorf("😥 Unable to create uperf service")
}

// Create netperf service
var netperfDataPorts []int32
for i := 0; i < 16; i++ {
netperfDataPorts = append(netperfDataPorts, NetperfServerDataPort+int32(i))
}
netperfSVC := ServiceParams{
Name: "netperf-service",
Namespace: "netperf",
Labels: map[string]string{"role": serverRole},
CtlPort: NetperfServerCtlPort,
DataPort: NetperfServerDataPort,
DataPorts: netperfDataPorts,
}
s.NetperfService, err = CreateService(netperfSVC, client)
if err != nil {
Expand All @@ -204,7 +210,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Name: "client-across",
Namespace: "netperf",
Replicas: 1,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Image: k8sNetperfImage,
Labels: map[string]string{"role": clientAcrossRole},
Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}},
Port: NetperfServerCtlPort,
Expand All @@ -218,7 +224,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Namespace: "netperf",
Replicas: 1,
HostNetwork: true,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Image: k8sNetperfImage,
Labels: map[string]string{"role": hostNetClientRole},
Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}},
Port: NetperfServerCtlPort,
Expand Down Expand Up @@ -266,7 +272,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Namespace: "netperf",
Replicas: 1,
HostNetwork: true,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Image: k8sNetperfImage,
Labels: map[string]string{"role": hostNetServerRole},
Commands: dpCommands,
Port: NetperfServerCtlPort,
Expand All @@ -276,7 +282,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error {
Name: "server",
Namespace: "netperf",
Replicas: 1,
Image: "quay.io/cloud-bulldozer/netperf:latest",
Image: k8sNetperfImage,
Labels: map[string]string{"role": serverRole},
Commands: dpCommands,
Port: NetperfServerCtlPort,
Expand Down Expand Up @@ -500,9 +506,10 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv
},
},
Spec: apiv1.PodSpec{
ServiceAccountName: sa,
HostNetwork: dp.HostNetwork,
Containers: cmdContainers,
TerminationGracePeriodSeconds: pointer.Int64(1),
ServiceAccountName: sa,
HostNetwork: dp.HostNetwork,
Containers: cmdContainers,
Affinity: &apiv1.Affinity{
NodeAffinity: &dp.NodeAffinity,
PodAffinity: &dp.PodAffinity,
Expand Down Expand Up @@ -608,23 +615,27 @@ func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Servi
TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.CtlPort)),
Port: sp.CtlPort,
},
{
Name: fmt.Sprintf("%s-data-tcp", sp.Name),
Protocol: apiv1.ProtocolTCP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)),
Port: sp.DataPort,
},
{
Name: fmt.Sprintf("%s-data-udp", sp.Name),
Protocol: apiv1.ProtocolUDP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.DataPort)),
Port: sp.DataPort,
},
},
Type: apiv1.ServiceType("ClusterIP"),
Selector: sp.Labels,
},
}
for _, port := range sp.DataPorts {
service.Spec.Ports = append(service.Spec.Ports,
apiv1.ServicePort{
Name: fmt.Sprintf("%s-tcp-%d", sp.Name, port),
Protocol: apiv1.ProtocolTCP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", port)),
Port: port,
},
apiv1.ServicePort{
Name: fmt.Sprintf("%s-udp-%d", sp.Name, port),
Protocol: apiv1.ProtocolUDP,
TargetPort: intstr.Parse(fmt.Sprintf("%d", port)),
Port: port,
},
)
}
return sc.Create(context.TODO(), service, metav1.CreateOptions{})
}

Expand Down

0 comments on commit 2ac5e86

Please sign in to comment.