From a5e6a69d0c6c6535b6add250aa9ed8746253776c Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Fri, 13 Oct 2023 18:43:01 +0530 Subject: [PATCH] feat(e2e): Improve data insert tests (#114) * fix(e2e): Improve Data insert tests * unnecessary changes * some more e2e fixes * don't react too fast * remove statefulset version check --- e2e/dragonfly_controller_test.go | 104 ++++++++++++++++--------------- e2e/util.go | 93 ++++++++++++--------------- 2 files changed, 94 insertions(+), 103 deletions(-) diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index 70092f1..183dffe 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -81,9 +81,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }, Key: "password", }, - ClientCaCertSecret: &corev1.SecretReference{ - Name: "df-client-ca-certs", - }, }, Affinity: &corev1.Affinity{ NodeAffinity: &corev1.NodeAffinity{ @@ -107,27 +104,16 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() } Context("Dragonfly resource creation", func() { + password := "df-pass-1" It("Should create successfully", func() { // create the secret err := k8sClient.Create(ctx, &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "df-client-ca-certs", - Namespace: namespace, - }, - StringData: map[string]string{ - "ca.crt": "foo", - }, - }) - Expect(err).To(BeNil()) - - // create the secret - err = k8sClient.Create(ctx, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "df-secret", Namespace: namespace, }, StringData: map[string]string{ - "password": "df-pass-1", + "password": password, }, }) Expect(err).To(BeNil()) @@ -185,22 +171,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() SecretKeyRef: df.Spec.Authentication.PasswordFromSecret, }, })) - - // ClientCACertSecret - Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("%s=%s", resources.TLSCACertDirArg, resources.TLSCACertDir))) - Expect(ss.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(corev1.VolumeMount{ - Name: resources.TLSCACertVolumeName, - MountPath: resources.TLSCACertDir, - })) - Expect(ss.Spec.Template.Spec.Volumes).To(ContainElement(corev1.Volume{ - Name: resources.TLSCACertVolumeName, - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: df.Spec.Authentication.ClientCaCertSecret.Name, - DefaultMode: func() *int32 { i := int32(420); return &i }(), - }, - }, - })) }) It("Check for pod values", func() { @@ -219,11 +189,9 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() It("Check for connectivity", func() { stopChan := make(chan struct{}, 1) - rc, err := InitRunCmd(ctx, stopChan, name, namespace, "df-pass-1") - defer close(stopChan) - Expect(err).To(BeNil()) - err = rc.Start(ctx) + _, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) Expect(err).To(BeNil()) + defer close(stopChan) }) It("Increase in replicas should be propagated successfully", func() { @@ -327,7 +295,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }) It("Update to image should be propagated successfully", func() { - newImage := resources.DragonflyImage + ":v1.1.0" + newImage := resources.DragonflyImage + ":v1.9.0" // Update df to the latest err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, @@ -494,8 +462,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }) }) -var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { - +var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempts(3), func() { ctx := context.Background() name := "df-pvc" namespace := "default" @@ -531,14 +498,16 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { }, }) Expect(err).To(BeNil()) + }) + It("Resources should exist", func() { // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute) waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) // Check for service and statefulset var ss appsv1.StatefulSet - err = k8sClient.Get(ctx, types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, Namespace: namespace, }, &ss) @@ -561,7 +530,48 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { Expect(pvcs.Items).To(HaveLen(1)) Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("--snapshot_cron=%s", schedule))) - // TODO: Do data insert testing + // Insert Data + stopChan := make(chan struct{}, 1) + rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "") + Expect(err).To(BeNil()) + + // Insert test data + Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil()) + close(stopChan) + + // delete the single replica + var pod corev1.Pod + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: fmt.Sprintf("%s-0", name), + Namespace: namespace, + }, &pod) + Expect(err).To(BeNil()) + + err = k8sClient.Delete(ctx, &pod) + Expect(err).To(BeNil()) + + time.Sleep(10 * time.Second) + + // Wait until Dragonfly object is marked initialized + waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 2*time.Minute) + waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) + // check if the pod is created + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: fmt.Sprintf("%s-0", name), + Namespace: namespace, + }, &pod) + Expect(err).To(BeNil()) + + // recreate Redis Client on the new pod + stopChan = make(chan struct{}, 1) + rc, err = checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "") + defer close(stopChan) + Expect(err).To(BeNil()) + + // check if the Data exists + data, err := rc.Get(ctx, "foo").Result() + Expect(err).To(BeNil()) + Expect(data).To(Equal("bar")) }) It("Cleanup", func() { @@ -579,7 +589,7 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { }) }) -var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() { +var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() { ctx := context.Background() name := "df-tls" namespace := "default" @@ -596,12 +606,6 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() { Spec: resourcesv1.DragonflySpec{ Replicas: 2, Args: args, - Env: []corev1.EnvVar{ - { - Name: "DFLY_PASSWORD", - Value: "df-pass-1", - }, - }, TLSSecretRef: &corev1.SecretReference{ Name: "df-tls", }, @@ -629,13 +633,15 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() { err = k8sClient.Create(ctx, &df) Expect(err).To(BeNil()) + }) + It("Resources should exist", func() { // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute) waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) // Check for service and statefulset var ss appsv1.StatefulSet - err = k8sClient.Get(ctx, types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, Namespace: namespace, }, &ss) diff --git a/e2e/util.go b/e2e/util.go index 053d03e..84605db 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "os" - "path/filepath" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,10 +34,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" - "k8s.io/client-go/util/homedir" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -78,29 +75,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } -type RunCmd struct { - *redis.Client - *portforward.PortForwarder -} - -func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*RunCmd, error) { - home := homedir.HomeDir() - if home == "" { - return nil, fmt.Errorf("can't find kube-config") - } - kubeconfig := filepath.Join(home, ".kube", "config") - // use the current context in kubeconfig - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, err - } - - // create the clientset - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - +func checkAndK8sPortForwardRedis(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) { pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ LabelSelector: fmt.Sprintf("app=%s", name), }) @@ -108,6 +83,10 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa return nil, err } + if len(pods.Items) == 0 { + return nil, fmt.Errorf("no pods found") + } + var master *corev1.Pod for _, pod := range pods.Items { if pod.Labels[resources.Role] == resources.Master { @@ -116,16 +95,43 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa } } - fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyPort) + if master == nil { + return nil, fmt.Errorf("no master pod found") + } + + fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyAdminPort) if err != nil { return nil, err } - redisClient := redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort), - Password: password, - }) - return &RunCmd{Client: redisClient, PortForwarder: fw}, nil + redisOptions := &redis.Options{ + Addr: fmt.Sprintf("localhost:9998"), + } + + if password != "" { + redisOptions.Password = password + } + + redisClient := redis.NewClient(redisOptions) + + errChan := make(chan error, 1) + go func() { errChan <- fw.ForwardPorts() }() + + select { + case err = <-errChan: + return nil, errors.Wrap(err, "unable to forward ports") + case <-fw.Ready: + } + + pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + err = redisClient.Ping(pingCtx).Err() + if err != nil { + return nil, fmt.Errorf("unable to ping instance: %w", err) + } + + return redisClient, nil } func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *corev1.Pod, stopChan chan struct{}, port int) (*portforward.PortForwarder, error) { @@ -142,7 +148,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) - ports := []string{fmt.Sprintf("%d:%d", port, resources.DragonflyPort)} + ports := []string{fmt.Sprintf("%d:%d", 9998, resources.DragonflyPort)} readyChan := make(chan struct{}, 1) fw, err := portforward.New(dialer, ports, stopChan, readyChan, io.Discard, os.Stderr) @@ -151,24 +157,3 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r } return fw, err } - -func (r *RunCmd) Start(ctx context.Context) error { - errChan := make(chan error, 1) - var err error - go func() { errChan <- r.ForwardPorts() }() - - select { - case err = <-errChan: - return errors.Wrap(err, "port forwarding failed") - case <-r.Ready: - } - - pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second) - defer cancel() - - err = r.Ping(pingCtx).Err() - if err != nil { - return fmt.Errorf("unable to ping instance") - } - return nil -}