From be41e107d087aecc6b8c65c45a5b986158158db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Sevilla?= Date: Wed, 22 May 2024 19:28:31 +0200 Subject: [PATCH] Wait for namespace deletion (#142) Signed-off-by: Raul Sevilla --- cmd/k8s-netperf/k8s-netperf.go | 28 +------ pkg/drivers/iperf.go | 2 +- pkg/drivers/netperf.go | 2 +- pkg/drivers/uperf.go | 2 +- pkg/k8s/kubernetes.go | 141 +++++++++++++++++---------------- 5 files changed, 75 insertions(+), 100 deletions(-) diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index 6e3c5373..33f4eb5b 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -27,7 +27,6 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -const namespace = "netperf" const index = "k8s-netperf" const retry = 3 @@ -306,32 +305,7 @@ var rootCmd = &cobra.Command{ } func cleanup(client *kubernetes.Clientset) { - log.Info("Cleaning resources created by k8s-netperf") - svcList, err := k8s.GetServices(client, namespace) - if err != nil { - log.Fatal(err) - } - for svc := range svcList.Items { - err = k8s.DestroyService(client, svcList.Items[svc]) - if err != nil { - log.Error(err) - } - } - dpList, err := k8s.GetDeployments(client, namespace) - if err != nil { - log.Fatal(err) - } - for dp := range dpList.Items { - err = k8s.DestroyDeployment(client, dpList.Items[dp]) - if err != nil { - log.Error(err) - } - _, err := k8s.WaitForDelete(client, dpList.Items[dp]) - if err != nil { - log.Fatal(err) - } - } - err = k8s.DestroyNamespace(client) + err := k8s.DestroyNamespace(client) if err != nil { log.Error(err) os.Exit(1) diff --git a/pkg/drivers/iperf.go b/pkg/drivers/iperf.go index d2437fec..e98652f3 100644 --- a/pkg/drivers/iperf.go +++ b/pkg/drivers/iperf.go @@ -56,7 +56,7 @@ func (i *iperf3) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, id := uuid.New() file := fmt.Sprintf("/tmp/iperf-%s", id.String()) pod := client.Items[0] - log.Debugf("🔥 Client (%s,%s) starting iperf3 against server : %s", pod.Name, pod.Status.PodIP, serverIP) + log.Debugf("🔥 Client (%s,%s) starting iperf3 against server: %s", pod.Name, pod.Status.PodIP, serverIP) config.Show(nc, i.driverName) tcp := true if !strings.Contains(nc.Profile, "STREAM") { diff --git a/pkg/drivers/netperf.go b/pkg/drivers/netperf.go index 2c68b08d..ad172440 100644 --- a/pkg/drivers/netperf.go +++ b/pkg/drivers/netperf.go @@ -38,7 +38,7 @@ const omniOptions = "rt_latency,p99_latency,throughput,throughput_units,remote_r func (n *netperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, client apiv1.PodList, serverIP string) (bytes.Buffer, error) { var stdout, stderr bytes.Buffer pod := client.Items[0] - log.Debugf("🔥 Client (%s,%s) starting netperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) + log.Debugf("🔥 Client (%s,%s) starting netperf against server: %s", pod.Name, pod.Status.PodIP, serverIP) config.Show(nc, n.driverName) cmd := []string{superNetperf, strconv.Itoa(nc.Parallelism), strconv.Itoa(k8s.NetperfServerDataPort), "-H", serverIP, "-l", diff --git a/pkg/drivers/uperf.go b/pkg/drivers/uperf.go index 3bd9321d..60a1a160 100644 --- a/pkg/drivers/uperf.go +++ b/pkg/drivers/uperf.go @@ -142,7 +142,7 @@ func (u *uperf) Run(c *kubernetes.Clientset, rc rest.Config, nc config.Config, c var exec remotecommand.Executor pod := client.Items[0] - log.Debugf("🔥 Client (%s,%s) starting uperf against server : %s", pod.Name, pod.Status.PodIP, serverIP) + log.Debugf("🔥 Client (%s,%s) starting uperf against server: %s", pod.Name, pod.Status.PodIP, serverIP) config.Show(nc, u.driverName) filePath, err := createUperfProfile(c, rc, nc, pod, serverIP) diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go index 7ff96ddd..024c7f4f 100644 --- a/pkg/k8s/kubernetes.go +++ b/pkg/k8s/kubernetes.go @@ -8,7 +8,7 @@ import ( log "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/cloud-bulldozer/k8s-netperf/pkg/metrics" appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -27,9 +27,9 @@ type DeploymentParams struct { Image string Labels map[string]string Commands [][]string - PodAffinity apiv1.PodAffinity - PodAntiAffinity apiv1.PodAntiAffinity - NodeAffinity apiv1.NodeAffinity + PodAffinity corev1.PodAffinity + PodAntiAffinity corev1.PodAntiAffinity + NodeAffinity corev1.NodeAffinity Port int } @@ -77,7 +77,7 @@ func BuildInfra(client *kubernetes.Clientset) error { log.Infof("♻️ Namespace already exists, reusing it") } else { log.Infof("🔨 Creating namespace: %s", namespace) - _, err := client.CoreV1().Namespaces().Create(context.TODO(), &apiv1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) + _, err := client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("😥 Unable to create namespace: %v", err) } @@ -87,7 +87,7 @@ func BuildInfra(client *kubernetes.Clientset) error { log.Infof("♻️ Service account already exists, reusing it") } else { log.Infof("🔨 Creating service account: %s", sa) - _, err = client.CoreV1().ServiceAccounts(namespace).Create(context.TODO(), &apiv1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: sa}}, metav1.CreateOptions{}) + _, err = client.CoreV1().ServiceAccounts(namespace).Create(context.TODO(), &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: sa}}, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("😥 Unable to create service account: %v", err) } @@ -167,19 +167,19 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } // Schedule pods to nodes with role worker=, but not nodes with infra= and workload= - workerNodeSelectorExpression := &apiv1.NodeSelector{ - NodeSelectorTerms: []apiv1.NodeSelectorTerm{ + workerNodeSelectorExpression := &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ { - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "node-role.kubernetes.io/worker", Operator: apiv1.NodeSelectorOpIn, Values: []string{""}}, - {Key: "node-role.kubernetes.io/infra", Operator: apiv1.NodeSelectorOpNotIn, Values: []string{""}}, - {Key: "node-role.kubernetes.io/workload", Operator: apiv1.NodeSelectorOpNotIn, Values: []string{""}}, + MatchExpressions: []corev1.NodeSelectorRequirement{ + {Key: "node-role.kubernetes.io/worker", Operator: corev1.NodeSelectorOpIn, Values: []string{""}}, + {Key: "node-role.kubernetes.io/infra", Operator: corev1.NodeSelectorOpNotIn, Values: []string{""}}, + {Key: "node-role.kubernetes.io/workload", Operator: corev1.NodeSelectorOpNotIn, Values: []string{""}}, }, }, }, } - clientRoleAffinity := []apiv1.PodAffinityTerm{ + clientRoleAffinity := []corev1.PodAffinityTerm{ { LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -202,12 +202,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Port: NetperfServerCtlPort, } if z != "" && numNodes > 1 { - cdp.NodeAffinity = apiv1.NodeAffinity{ + cdp.NodeAffinity = corev1.NodeAffinity{ PreferredDuringSchedulingIgnoredDuringExecution: zoneNodeSelectorExpression(z), } } - cdp.NodeAffinity = apiv1.NodeAffinity{ + cdp.NodeAffinity = corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } s.Client, err = deployDeployment(client, cdp) @@ -267,7 +267,7 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Commands: [][]string{{"/bin/bash", "-c", "sleep 10000000"}}, Port: NetperfServerCtlPort, } - cdpAcross.PodAntiAffinity = apiv1.PodAntiAffinity{ + cdpAcross.PodAntiAffinity = corev1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: clientRoleAffinity, } @@ -283,12 +283,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } if z != "" { if numNodes > 1 { - cdpAcross.NodeAffinity = apiv1.NodeAffinity{ + cdpAcross.NodeAffinity = corev1.NodeAffinity{ PreferredDuringSchedulingIgnoredDuringExecution: zoneNodeSelectorExpression(z), RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } } else { - cdpAcross.NodeAffinity = apiv1.NodeAffinity{ + cdpAcross.NodeAffinity = corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } } @@ -296,11 +296,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { if ncount > 1 { if s.HostNetwork { - cdpHostAcross.NodeAffinity = apiv1.NodeAffinity{ + cdpHostAcross.NodeAffinity = corev1.NodeAffinity{ PreferredDuringSchedulingIgnoredDuringExecution: zoneNodeSelectorExpression(z), RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } - cdpHostAcross.PodAntiAffinity = apiv1.PodAntiAffinity{ + cdpHostAcross.PodAntiAffinity = corev1.PodAntiAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: clientRoleAffinity, } s.ClientHost, err = deployDeployment(client, cdpHostAcross) @@ -340,23 +340,23 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { Port: NetperfServerCtlPort, } if s.NodeLocal { - sdp.PodAffinity = apiv1.PodAffinity{ + sdp.PodAffinity = corev1.PodAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: clientRoleAffinity, } } if z != "" { - var affinity apiv1.NodeAffinity + var affinity corev1.NodeAffinity if numNodes > 1 { nodeZone := zoneNodeSelectorExpression(z) if s.AcrossAZ { nodeZone = zoneNodeSelectorExpression(acrossZone) } - affinity = apiv1.NodeAffinity{ + affinity = corev1.NodeAffinity{ PreferredDuringSchedulingIgnoredDuringExecution: nodeZone, RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } } else { - affinity = apiv1.NodeAffinity{ + affinity = corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: workerNodeSelectorExpression, } } @@ -364,8 +364,8 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { sdpHost.NodeAffinity = affinity } if ncount > 1 { - antiAffinity := apiv1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ + antiAffinity := corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ { LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -377,8 +377,8 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { }, } sdp.PodAntiAffinity = antiAffinity - antiAffinity = apiv1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ + antiAffinity = corev1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ { LabelSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -412,13 +412,13 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { return nil } -func zoneNodeSelectorExpression(zone string) []apiv1.PreferredSchedulingTerm { - return []apiv1.PreferredSchedulingTerm{ +func zoneNodeSelectorExpression(zone string) []corev1.PreferredSchedulingTerm { + return []corev1.PreferredSchedulingTerm{ { Weight: 100, - Preference: apiv1.NodeSelectorTerm{ - MatchExpressions: []apiv1.NodeSelectorRequirement{ - {Key: "topology.kubernetes.io/zone", Operator: apiv1.NodeSelectorOpIn, Values: []string{zone}}, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + {Key: "topology.kubernetes.io/zone", Operator: corev1.NodeSelectorOpIn, Values: []string{zone}}, }, }, }, @@ -427,8 +427,8 @@ func zoneNodeSelectorExpression(zone string) []apiv1.PreferredSchedulingTerm { // deployDeployment Manages the creation and waits for the pods to become ready. // returns a podList which is associated with the Deployment. -func deployDeployment(client *kubernetes.Clientset, dp DeploymentParams) (apiv1.PodList, error) { - pods := apiv1.PodList{} +func deployDeployment(client *kubernetes.Clientset, dp DeploymentParams) (corev1.PodList, error) { + pods := corev1.PodList{} _, err := CreateDeployment(dp, client) if err != nil { return pods, fmt.Errorf("😥 Unable to create deployment: %v", err) @@ -468,22 +468,21 @@ func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) { return false, fmt.Errorf("❌ Deployment had issues") } -// WaitForDelete return true if the deployment is deleted, false otherwise. Error if it goes bad. -func WaitForDelete(c *kubernetes.Clientset, dp appsv1.Deployment) (bool, error) { - log.Infof("⏰ Waiting for %s Deployment to deleted...", dp.Name) +// WaitForDelete return nil if the namespace is deleted, error otherwise +func waitForNamespaceDelete(c *kubernetes.Clientset, nsName string) error { + log.Infof("⏰ Waiting for %s Namespace to be deleted...", nsName) // Timeout in seconds - timeout := int64(300) - dw, err := c.AppsV1().Deployments(dp.Namespace).Watch(context.TODO(), metav1.ListOptions{TimeoutSeconds: &timeout}) + ns, err := c.CoreV1().Namespaces().Watch(context.TODO(), metav1.ListOptions{FieldSelector: "metadata.name=" + nsName}) if err != nil { - return false, err + return err } - defer dw.Stop() - for event := range dw.ResultChan() { + defer ns.Stop() + for event := range ns.ResultChan() { if event.Type == watch.Deleted { - return true, nil + return nil } } - return false, fmt.Errorf("❌ Deployment delete issues") + return fmt.Errorf("❌ Namespace delete issues: %v", err) } // GetZone will determine if we have a multiAZ/Zone cloud. @@ -528,16 +527,16 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv dc := client.AppsV1().Deployments(dp.Namespace) // Add containers to deployment - var cmdContainers []apiv1.Container + var cmdContainers []corev1.Container for i := 0; i < len(dp.Commands); i++ { // each container should have a unique name containerName := fmt.Sprintf("%s-%d", dp.Name, i) cmdContainers = append(cmdContainers, - apiv1.Container{ + corev1.Container{ Name: containerName, Image: dp.Image, Command: dp.Commands[i], - ImagePullPolicy: apiv1.PullAlways, + ImagePullPolicy: corev1.PullAlways, }) } @@ -550,19 +549,19 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv Selector: &metav1.LabelSelector{ MatchLabels: dp.Labels, }, - Template: apiv1.PodTemplateSpec{ + Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: dp.Labels, Annotations: map[string]string{ "sidecar.istio.io/inject": "true", }, }, - Spec: apiv1.PodSpec{ + Spec: corev1.PodSpec{ TerminationGracePeriodSeconds: pointer.Int64(1), ServiceAccountName: sa, HostNetwork: dp.HostNetwork, Containers: cmdContainers, - Affinity: &apiv1.Affinity{ + Affinity: &corev1.Affinity{ NodeAffinity: &dp.NodeAffinity, PodAffinity: &dp.PodAffinity, PodAntiAffinity: &dp.PodAntiAffinity, @@ -614,9 +613,9 @@ func GetPodNodeInfo(c *kubernetes.Clientset, dp DeploymentParams) (metrics.NodeI // GetPods searches for a specific set of pods from DeploymentParms // It returns a PodList if the deployment is found. // NOTE : Since we can update the replicas to be > 1, is why I return a PodList. -func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (apiv1.PodList, error) { +func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (corev1.PodList, error) { d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) - npl := apiv1.PodList{} + npl := corev1.PodList{} if err != nil { return npl, fmt.Errorf("❌ Failure to capture deployment: %v", err) } @@ -639,7 +638,7 @@ func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (apiv1.PodList, error } // CreateService will build a k8s service -func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Service, error) { +func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*corev1.Service, error) { s, err := client.CoreV1().Services(sp.Namespace).Get(context.TODO(), sp.Name, metav1.GetOptions{}) log.Debugf("Looking for service %s in namespace %s", sp.Name, sp.Namespace) if err == nil { @@ -648,41 +647,41 @@ func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Servi } log.Infof("🚀 Creating service for %s in namespace %s", sp.Name, sp.Namespace) sc := client.CoreV1().Services(sp.Namespace) - service := &apiv1.Service{ + service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: sp.Name, Namespace: sp.Namespace, }, - Spec: apiv1.ServiceSpec{ - Ports: []apiv1.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ { Name: fmt.Sprintf("%s-tcp-ctl", sp.Name), - Protocol: apiv1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.CtlPort)), Port: sp.CtlPort, }, { Name: fmt.Sprintf("%s-udp-ctl", sp.Name), - Protocol: apiv1.ProtocolUDP, + Protocol: corev1.ProtocolUDP, TargetPort: intstr.Parse(fmt.Sprintf("%d", sp.CtlPort)), Port: sp.CtlPort, }, }, - Type: apiv1.ServiceType("ClusterIP"), + Type: corev1.ServiceType("ClusterIP"), Selector: sp.Labels, }, } for _, port := range sp.DataPorts { service.Spec.Ports = append(service.Spec.Ports, - apiv1.ServicePort{ + corev1.ServicePort{ Name: fmt.Sprintf("%s-tcp-%d", sp.Name, port), - Protocol: apiv1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, TargetPort: intstr.Parse(fmt.Sprintf("%d", port)), Port: port, }, - apiv1.ServicePort{ + corev1.ServicePort{ Name: fmt.Sprintf("%s-udp-%d", sp.Name, port), - Protocol: apiv1.ProtocolUDP, + Protocol: corev1.ProtocolUDP, TargetPort: intstr.Parse(fmt.Sprintf("%d", port)), Port: port, }, @@ -692,7 +691,7 @@ func CreateService(sp ServiceParams, client *kubernetes.Clientset) (*apiv1.Servi } // GetServices retrieve all services for a given namespoace, in this case for netperf -func GetServices(client *kubernetes.Clientset, namespace string) (*apiv1.ServiceList, error) { +func GetServices(client *kubernetes.Clientset, namespace string) (*corev1.ServiceList, error) { services, err := client.CoreV1().Services(namespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err @@ -710,7 +709,7 @@ func GetDeployments(client *kubernetes.Clientset, namespace string) (*appsv1.Dep } // DestroyService cleans up a specific service from a namespace -func DestroyService(client *kubernetes.Clientset, serv apiv1.Service) error { +func DestroyService(client *kubernetes.Clientset, serv corev1.Service) error { deletePolicy := metav1.DeletePropagationForeground return client.CoreV1().Services(serv.Namespace).Delete(context.TODO(), serv.Name, metav1.DeleteOptions{ PropagationPolicy: &deletePolicy, @@ -721,10 +720,12 @@ func DestroyService(client *kubernetes.Clientset, serv apiv1.Service) error { func DestroyNamespace(client *kubernetes.Clientset) error { _, err := client.CoreV1().Namespaces().Get(context.TODO(), namespace, metav1.GetOptions{}) if err == nil { - deletePolicy := metav1.DeletePropagationForeground - return client.CoreV1().Namespaces().Delete(context.TODO(), namespace, metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - }) + log.Info("Cleaning resources created by k8s-netperf") + err = client.CoreV1().Namespaces().Delete(context.TODO(), namespace, metav1.DeleteOptions{}) + if err != nil { + return err + } + return waitForNamespaceDelete(client, namespace) } return nil }