From faf37b8a3e86fb70295b9063b8f3f69de58c591a Mon Sep 17 00:00:00 2001 From: Joe Talerico aka rook Date: Fri, 30 Aug 2024 13:29:33 -0400 Subject: [PATCH] Affinity Seeing about building the infra and reuing Affinity logic Signed-off-by: Joe Talerico aka rook --- cmd/k8s-netperf/k8s-netperf.go | 59 +++++-------- pkg/config/config.go | 8 +- pkg/k8s/kubernetes.go | 153 +++++++++++---------------------- pkg/k8s/kubevirt.go | 38 ++++---- 4 files changed, 101 insertions(+), 157 deletions(-) diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 616eac4c..97013900 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -151,6 +151,7 @@ var rootCmd = &cobra.Command{ } if vm { + s.VM = true // Create a dynamic client dynClient, err := dynamic.NewForConfig(rconfig) if err != nil { @@ -160,25 +161,8 @@ var rootCmd = &cobra.Command{ if err != nil { log.Error(err) } - _, err = k8s.CreateVMServer(kclient, "server") - if err != nil { - log.Error(err) - } - k8s.WaitForVMI(kclient, "server") - host, err := k8s.CreateVMClient(kclient, client, dynClient, "client") - if err != nil { - log.Error(err) - } - update := k8s.UpdateConfig(&s.Configs, host) - for _, cfg := range update { - log.Info(cfg.Profile) - log.Info(cfg.VM) - log.Info(cfg.VMHost) - } - log.Info(host) - k8s.WaitForVMI(kclient, "client") - os.Exit(0) - + s.KClient = kclient + s.DClient = dynClient } // Build the SUT (Deployments) @@ -208,28 +192,31 @@ var rootCmd = &cobra.Command{ if iperf3 { requestedDrivers = append(requestedDrivers, "iperf3") } + // Run through each test - for _, nc := range s.Configs { - // Determine the metric for the test - metric := string("OP/s") - if strings.Contains(nc.Profile, "STREAM") { - metric = "Mb/s" - } - nc.Metric = metric - nc.AcrossAZ = acrossAZ - // No need to run hostNetwork through Service. - var pr result.Data - for _, driver := range requestedDrivers { - if s.HostNetwork && !nc.Service { - pr = executeWorkload(nc, s, true, driver) + if !s.VM { + for _, nc := range s.Configs { + // Determine the metric for the test + metric := string("OP/s") + if strings.Contains(nc.Profile, "STREAM") { + metric = "Mb/s" + } + nc.Metric = metric + nc.AcrossAZ = acrossAZ + // No need to run hostNetwork through Service. + var pr result.Data + for _, driver := range requestedDrivers { + if s.HostNetwork && !nc.Service { + pr = executeWorkload(nc, s, true, driver) + if len(pr.Profile) > 1 { + sr.Results = append(sr.Results, pr) + } + } + pr = executeWorkload(nc, s, false, driver) if len(pr.Profile) > 1 { sr.Results = append(sr.Results, pr) } } - pr = executeWorkload(nc, s, false, driver) - if len(pr.Profile) > 1 { - sr.Results = append(sr.Results, pr) - } } } diff --git a/pkg/config/config.go b/pkg/config/config.go index 45c7bec4..d4382b56 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -5,11 +5,13 @@ import ( "os" "regexp" + kubevirtv1 "github.com/cloud-bulldozer/k8s-netperf/pkg/kubevirt/client-go/clientset/versioned/typed/core/v1" apiv1 "k8s.io/api/core/v1" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/metrics" "gopkg.in/yaml.v3" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -22,8 +24,6 @@ type Config struct { Samples int `yaml:"samples,omitempty"` MessageSize int `yaml:"messagesize,omitempty"` Service bool `default:"false" yaml:"service,omitempty"` - VM bool - VMHost string Metric string AcrossAZ bool } @@ -34,6 +34,8 @@ type PerfScenarios struct { AcrossAZ bool HostNetwork bool Configs []Config + VM bool + VMHost string ServerNodeInfo metrics.NodeInfo ClientNodeInfo metrics.NodeInfo Client apiv1.PodList @@ -46,6 +48,8 @@ type PerfScenarios struct { UperfService *apiv1.Service RestConfig rest.Config ClientSet *kubernetes.Clientset + KClient *kubevirtv1.KubevirtV1Client + DClient *dynamic.DynamicClient } // Tests we will support in k8s-netperf diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go index 622d7281..5db9a0ff 100644 --- a/pkg/k8s/kubernetes.go +++ b/pkg/k8s/kubernetes.go @@ -11,11 +11,8 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" ) @@ -213,7 +210,9 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { cdp.NodeAffinity = corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } - s.Client, err = deployDeployment(client, cdp) + if !s.VM { + s.Client, err = deployDeployment(client, cdp) + } if err != nil { return err } @@ -306,15 +305,30 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { cdpHostAcross.PodAntiAffinity = corev1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: clientRoleAffinity, } - s.ClientHost, err = deployDeployment(client, cdpHostAcross) - } - if err != nil { - return err + if !s.VM { + s.ClientHost, err = deployDeployment(client, cdpHostAcross) + if err != nil { + return err + } + } else { + _, err := CreateVMClient(s.KClient, client, s.DClient, clientAcrossRole, clientAcrossRole, &cdpHostAcross.PodAntiAffinity, &cdpHostAcross.NodeAffinity) + if err != nil { + return err + } + } } - s.ClientAcross, err = deployDeployment(client, cdpAcross) - if err != nil { - return err + if !s.VM { + s.ClientAcross, err = deployDeployment(client, cdpAcross) + if err != nil { + return err + } + } else { + _, err := CreateVMClient(s.KClient, client, s.DClient, clientAcrossRole, clientAcrossRole, &cdpAcross.PodAntiAffinity, &cdpHostAcross.NodeAffinity) + if err != nil { + return err + } } + } // Use separate containers for servers @@ -397,101 +411,38 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { if ncount > 1 { if s.HostNetwork { - s.ServerHost, err = deployDeployment(client, sdpHost) - if err != nil { - return err + if !s.VM { + s.ServerHost, err = deployDeployment(client, sdpHost) + if err != nil { + return err + } + } else { + _, err = CreateVMServer(s.KClient, serverRole, serverRole, sdp.PodAntiAffinity, sdp.NodeAffinity) + if err != nil { + return err + } } } } - s.Server, err = deployDeployment(client, sdp) - - s.ServerNodeInfo, _ = GetPodNodeInfo(client, sdp) - if !s.NodeLocal { - s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdpAcross) - } - if err != nil { - return err - } - return nil -} - -func CreateVM(dynamicClient dynamic.Interface, namespace, vmName string) error { - gvr := schema.GroupVersionResource{ - Group: "kubevirt.io", - Version: "v1", - Resource: "virtualmachines", - } - - vm := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "kubevirt.io/v1", - "kind": "VirtualMachine", - "metadata": map[string]interface{}{ - "name": vmName, - "namespace": namespace, - }, - "spec": map[string]interface{}{ - "running": true, - "template": map[string]interface{}{ - "metadata": map[string]interface{}{ - "labels": map[string]interface{}{ - "kubevirt.io/domain": vmName, - }, - }, - "spec": map[string]interface{}{ - "domain": map[string]interface{}{ - "cpu": map[string]interface{}{ - "sockets": 2, - "cores": 2, - "threads": 1, - }, - "devices": map[string]interface{}{ - "disks": []interface{}{ - map[string]interface{}{ - "name": "disk0", - "disk": map[string]interface{}{ - "bus": "virtio", - }, - }, - }, - }, - "resources": map[string]interface{}{ - "requests": map[string]interface{}{ - "memory": "4096Mi", - "cpu": "500m", - }, - }, - }, - "volumes": []interface{}{ - map[string]interface{}{ - "name": "disk0", - "containerDisk": map[string]interface{}{ - "image": "kubevirt/fedora-cloud-container-disk-demo:latest", - }, - }, - map[string]interface{}{ - "name": "cloudinit", - "cloudInitNoCloud": map[string]interface{}{ - "userData": `#cloud-config - password: fedora - chpasswd: { expire: False } - runcmd: - - dnf install -y uperf iperf3 git ethtool`, - }, - }, - }, - }, - }, - }, - }, - } - - _, err := dynamicClient.Resource(gvr).Namespace(namespace).Create(context.TODO(), vm, metav1.CreateOptions{}) - if err != nil { - return fmt.Errorf("failed to create VirtualMachine: %v", err) + if !s.VM { + s.Server, err = deployDeployment(client, sdp) + if err != nil { + return err + } + s.ServerNodeInfo, _ = GetPodNodeInfo(client, sdp) + if !s.NodeLocal { + s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdpAcross) + } + if err != nil { + return err + } + } else { + _, err = CreateVMServer(s.KClient, serverRole, serverRole, sdp.PodAntiAffinity, sdp.NodeAffinity) + if err != nil { + return err + } } - fmt.Printf("VirtualMachine %s created successfully in namespace %s\n", vmName, namespace) return nil } diff --git a/pkg/k8s/kubevirt.go b/pkg/k8s/kubevirt.go index d8c544e2..44e5b17e 100644 --- a/pkg/k8s/kubevirt.go +++ b/pkg/k8s/kubevirt.go @@ -7,7 +7,6 @@ import ( b64 "encoding/base64" - "github.com/cloud-bulldozer/k8s-netperf/pkg/config" kubevirtv1 "github.com/cloud-bulldozer/k8s-netperf/pkg/kubevirt/client-go/clientset/versioned/typed/core/v1" log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" corev1 "k8s.io/api/core/v1" @@ -22,16 +21,6 @@ import ( v1 "kubevirt.io/api/core/v1" ) -func UpdateConfig(nc *[]config.Config, host string) []config.Config { - update := make([]config.Config, len(*nc)) - for _, cfg := range *nc { - cfg.VM = true - cfg.VMHost = host - update = append(update, cfg) - } - return update -} - func createCommService(client *kubernetes.Clientset, label map[string]string, name string) error { log.Infof("🚀 Creating service for %s in namespace %s", name, namespace) sc := client.CoreV1().Services(namespace) @@ -105,9 +94,13 @@ func exposeService(client *kubernetes.Clientset, dynamicClient *dynamic.DynamicC return host, nil } -func CreateVMClient(kclient *kubevirtv1.KubevirtV1Client, client *kubernetes.Clientset, dyn *dynamic.DynamicClient, name string) (string, error) { +func CreateVMClient(kclient *kubevirtv1.KubevirtV1Client, client *kubernetes.Clientset, + dyn *dynamic.DynamicClient, role string, name string, + podAff *corev1.PodAntiAffinity, + nodeAff *corev1.NodeAffinity) (string, error) { label := map[string]string{ - "app": fmt.Sprintf("%s", name), + "app": name, + "role": role, } dirname, err := os.UserHomeDir() if err != nil { @@ -130,7 +123,7 @@ chpasswd: { expire: False } runcmd: - dnf install -y uperf iperf3 git ethtool `, string(ssh)) - _, err = CreateVMI(kclient, name, label, b64.StdEncoding.EncodeToString([]byte(data))) + _, err = CreateVMI(kclient, name, label, b64.StdEncoding.EncodeToString([]byte(data)), *podAff, *nodeAff) if err != nil { return "", err } @@ -145,9 +138,12 @@ runcmd: return host, nil } -func CreateVMServer(client *kubevirtv1.KubevirtV1Client, name string) (*v1.VirtualMachineInstance, error) { +func CreateVMServer(client *kubevirtv1.KubevirtV1Client, name string, role string, + podAff corev1.PodAntiAffinity, + nodeAff corev1.NodeAffinity) (*v1.VirtualMachineInstance, error) { label := map[string]string{ - "app": fmt.Sprintf("%s", name), + "app": name, + "role": role, } dirname, err := os.UserHomeDir() if err != nil { @@ -172,10 +168,12 @@ runcmd: - uperf -s -v & - iperf3 -s & `, string(ssh)) - return CreateVMI(client, name, label, b64.StdEncoding.EncodeToString([]byte(data))) + return CreateVMI(client, name, label, b64.StdEncoding.EncodeToString([]byte(data)), podAff, nodeAff) } -func CreateVMI(client *kubevirtv1.KubevirtV1Client, name string, label map[string]string, b64data string) (*v1.VirtualMachineInstance, error) { +func CreateVMI(client *kubevirtv1.KubevirtV1Client, name string, label map[string]string, b64data string, + podAff corev1.PodAntiAffinity, + nodeAff corev1.NodeAffinity) (*v1.VirtualMachineInstance, error) { delSeconds := int64(0) mutliQ := true vmi, err := client.VirtualMachineInstances(namespace).Create(context.TODO(), &v1.VirtualMachineInstance{ @@ -189,6 +187,10 @@ func CreateVMI(client *kubevirtv1.KubevirtV1Client, name string, label map[strin Labels: label, }, Spec: v1.VirtualMachineInstanceSpec{ + Affinity: &k8sv1.Affinity{ + PodAntiAffinity: &podAff, + NodeAffinity: &nodeAff, + }, TerminationGracePeriodSeconds: &delSeconds, Domain: v1.DomainSpec{ Resources: v1.ResourceRequirements{