From a70359ae57399a4c90a024b2f9795eac92e4f546 Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Wed, 11 Oct 2023 06:41:14 +0530 Subject: [PATCH 1/5] fix(e2e): Improve Data insert tests --- e2e/dragonfly_controller_test.go | 100 ++++++++++++++++--------------- e2e/util.go | 69 +++++++++++---------- 2 files changed, 89 insertions(+), 80 deletions(-) diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index 70092f1..53670c1 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -81,9 +81,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }, Key: "password", }, - ClientCaCertSecret: &corev1.SecretReference{ - Name: "df-client-ca-certs", - }, }, Affinity: &corev1.Affinity{ NodeAffinity: &corev1.NodeAffinity{ @@ -107,27 +104,16 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() } Context("Dragonfly resource creation", func() { + password := "df-pass-1" It("Should create successfully", func() { // create the secret err := k8sClient.Create(ctx, &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "df-client-ca-certs", - Namespace: namespace, - }, - StringData: map[string]string{ - "ca.crt": "foo", - }, - }) - Expect(err).To(BeNil()) - - // create the secret - err = k8sClient.Create(ctx, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "df-secret", Namespace: namespace, }, StringData: map[string]string{ - "password": "df-pass-1", + "password": password, }, }) Expect(err).To(BeNil()) @@ -185,22 +171,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() SecretKeyRef: df.Spec.Authentication.PasswordFromSecret, }, })) - - // ClientCACertSecret - Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("%s=%s", resources.TLSCACertDirArg, resources.TLSCACertDir))) - Expect(ss.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(corev1.VolumeMount{ - Name: resources.TLSCACertVolumeName, - MountPath: resources.TLSCACertDir, - })) - Expect(ss.Spec.Template.Spec.Volumes).To(ContainElement(corev1.Volume{ - Name: resources.TLSCACertVolumeName, - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: df.Spec.Authentication.ClientCaCertSecret.Name, - DefaultMode: func() *int32 { i := int32(420); return &i }(), - }, - }, - })) }) It("Check for pod values", func() { @@ -219,11 +189,14 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() It("Check for connectivity", func() { stopChan := make(chan struct{}, 1) - rc, err := InitRunCmd(ctx, stopChan, name, namespace, "df-pass-1") - defer close(stopChan) + _, err := checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, password) Expect(err).To(BeNil()) - err = rc.Start(ctx) + defer close(stopChan) + // check for ping + stopChan = make(chan struct{}, 1) + _, err = checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, password) Expect(err).To(BeNil()) + }) It("Increase in replicas should be propagated successfully", func() { @@ -327,7 +300,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }) It("Update to image should be propagated successfully", func() { - newImage := resources.DragonflyImage + ":v1.1.0" + newImage := resources.DragonflyImage + ":v1.9.0" // Update df to the latest err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, @@ -494,7 +467,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }) }) -var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { +var _ = Describe("Dragonfly PVC Test with single replica", Ordered, func() { ctx := context.Background() name := "df-pvc" @@ -561,7 +534,47 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { Expect(pvcs.Items).To(HaveLen(1)) Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("--snapshot_cron=%s", schedule))) - // TODO: Do data insert testing + // Insert Data + stopChan := make(chan struct{}, 1) + rc, err := checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, "") + Expect(err).To(BeNil()) + + // Insert test data + Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil()) + close(stopChan) + + // delete the single replica + var pod corev1.Pod + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: fmt.Sprintf("%s-0", name), + Namespace: namespace, + }, &pod) + Expect(err).To(BeNil()) + + err = k8sClient.Delete(ctx, &pod) + Expect(err).To(BeNil()) + + // Wait until Dragonfly object is marked initialized + waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 2*time.Minute) + waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) + + // check if the pod is created + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: fmt.Sprintf("%s-0", name), + Namespace: namespace, + }, &pod) + Expect(err).To(BeNil()) + + // recreate Redis Client on the new pod + stopChan = make(chan struct{}, 1) + rc, err = checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, "df-pass-1") + defer close(stopChan) + Expect(err).To(BeNil()) + + // check if the Data exists + data, err := rc.Get(ctx, "foo").Result() + Expect(err).To(BeNil()) + Expect(data).To(Equal("bar")) }) It("Cleanup", func() { @@ -579,7 +592,7 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() { }) }) -var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() { +var _ = Describe("Dragonfly Server TLS tests", Ordered, func() { ctx := context.Background() name := "df-tls" namespace := "default" @@ -596,15 +609,6 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() { Spec: resourcesv1.DragonflySpec{ Replicas: 2, Args: args, - Env: []corev1.EnvVar{ - { - Name: "DFLY_PASSWORD", - Value: "df-pass-1", - }, - }, - TLSSecretRef: &corev1.SecretReference{ - Name: "df-tls", - }, }, } diff --git a/e2e/util.go b/e2e/util.go index 053d03e..ca7fb9d 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -78,12 +78,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } -type RunCmd struct { - *redis.Client - *portforward.PortForwarder -} - -func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*RunCmd, error) { +func checkAndK8sPortForwardRedis(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) { home := homedir.HomeDir() if home == "" { return nil, fmt.Errorf("can't find kube-config") @@ -108,6 +103,10 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa return nil, err } + if len(pods.Items) == 0 { + return nil, fmt.Errorf("no pods found") + } + var master *corev1.Pod for _, pod := range pods.Items { if pod.Labels[resources.Role] == resources.Master { @@ -116,16 +115,43 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa } } + if master == nil { + return nil, fmt.Errorf("no master pod found") + } + fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyPort) if err != nil { return nil, err } - redisClient := redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort), - Password: password, - }) - return &RunCmd{Client: redisClient, PortForwarder: fw}, nil + redisOptions := &redis.Options{ + Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort), + } + + if password != "" { + redisOptions.Password = password + } + + redisClient := redis.NewClient(redisOptions) + + errChan := make(chan error, 1) + go func() { errChan <- fw.ForwardPorts() }() + + select { + case err = <-errChan: + return nil, errors.Wrap(err, "unable to forward ports") + case <-fw.Ready: + } + + pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second) + defer cancel() + + err = redisClient.Ping(pingCtx).Err() + if err != nil { + return nil, fmt.Errorf("unable to ping instance: %w", err) + } + + return redisClient, nil } func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *corev1.Pod, stopChan chan struct{}, port int) (*portforward.PortForwarder, error) { @@ -151,24 +177,3 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r } return fw, err } - -func (r *RunCmd) Start(ctx context.Context) error { - errChan := make(chan error, 1) - var err error - go func() { errChan <- r.ForwardPorts() }() - - select { - case err = <-errChan: - return errors.Wrap(err, "port forwarding failed") - case <-r.Ready: - } - - pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second) - defer cancel() - - err = r.Ping(pingCtx).Err() - if err != nil { - return fmt.Errorf("unable to ping instance") - } - return nil -} From 7a2a8d8413089dc642e1a3b2ef194e6bae6568bf Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Wed, 11 Oct 2023 06:42:58 +0530 Subject: [PATCH 2/5] unnecessary changes --- e2e/dragonfly_controller_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index 53670c1..f0556c4 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -467,7 +467,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }) }) -var _ = Describe("Dragonfly PVC Test with single replica", Ordered, func() { +var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempts(3), func() { ctx := context.Background() name := "df-pvc" @@ -592,7 +592,7 @@ var _ = Describe("Dragonfly PVC Test with single replica", Ordered, func() { }) }) -var _ = Describe("Dragonfly Server TLS tests", Ordered, func() { +var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() { ctx := context.Background() name := "df-tls" namespace := "default" From 2c6662eadb69235acd15e6cdeb58076d80ad9a20 Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Wed, 11 Oct 2023 14:12:50 +0530 Subject: [PATCH 3/5] some more e2e fixes --- e2e/dragonfly_controller_test.go | 23 ++++++++++++----------- e2e/util.go | 28 ++++------------------------ 2 files changed, 16 insertions(+), 35 deletions(-) diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index f0556c4..f8c2646 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -189,14 +189,9 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() It("Check for connectivity", func() { stopChan := make(chan struct{}, 1) - _, err := checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, password) + _, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) Expect(err).To(BeNil()) defer close(stopChan) - // check for ping - stopChan = make(chan struct{}, 1) - _, err = checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, password) - Expect(err).To(BeNil()) - }) It("Increase in replicas should be propagated successfully", func() { @@ -468,7 +463,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }) var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempts(3), func() { - ctx := context.Background() name := "df-pvc" namespace := "default" @@ -504,14 +498,16 @@ var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempt }, }) Expect(err).To(BeNil()) + }) + It("Resources should exist", func() { // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute) waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) // Check for service and statefulset var ss appsv1.StatefulSet - err = k8sClient.Get(ctx, types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, Namespace: namespace, }, &ss) @@ -536,7 +532,7 @@ var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempt // Insert Data stopChan := make(chan struct{}, 1) - rc, err := checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, "") + rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "") Expect(err).To(BeNil()) // Insert test data @@ -567,7 +563,7 @@ var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempt // recreate Redis Client on the new pod stopChan = make(chan struct{}, 1) - rc, err = checkAndK8sPortForwardRedis(ctx, stopChan, name, namespace, "df-pass-1") + rc, err = checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "") defer close(stopChan) Expect(err).To(BeNil()) @@ -609,6 +605,9 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() Spec: resourcesv1.DragonflySpec{ Replicas: 2, Args: args, + TLSSecretRef: &corev1.SecretReference{ + Name: "df-tls", + }, }, } @@ -633,13 +632,15 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() err = k8sClient.Create(ctx, &df) Expect(err).To(BeNil()) + }) + It("Resources should exist", func() { // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute) waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) // Check for service and statefulset var ss appsv1.StatefulSet - err = k8sClient.Get(ctx, types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, Namespace: namespace, }, &ss) diff --git a/e2e/util.go b/e2e/util.go index ca7fb9d..84605db 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "os" - "path/filepath" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,10 +34,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" - "k8s.io/client-go/util/homedir" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -78,24 +75,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } -func checkAndK8sPortForwardRedis(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) { - home := homedir.HomeDir() - if home == "" { - return nil, fmt.Errorf("can't find kube-config") - } - kubeconfig := filepath.Join(home, ".kube", "config") - // use the current context in kubeconfig - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) - if err != nil { - return nil, err - } - - // create the clientset - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - +func checkAndK8sPortForwardRedis(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) { pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ LabelSelector: fmt.Sprintf("app=%s", name), }) @@ -119,13 +99,13 @@ func checkAndK8sPortForwardRedis(ctx context.Context, stopChan chan struct{}, na return nil, fmt.Errorf("no master pod found") } - fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyPort) + fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyAdminPort) if err != nil { return nil, err } redisOptions := &redis.Options{ - Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort), + Addr: fmt.Sprintf("localhost:9998"), } if password != "" { @@ -168,7 +148,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) - ports := []string{fmt.Sprintf("%d:%d", port, resources.DragonflyPort)} + ports := []string{fmt.Sprintf("%d:%d", 9998, resources.DragonflyPort)} readyChan := make(chan struct{}, 1) fw, err := portforward.New(dialer, ports, stopChan, readyChan, io.Discard, os.Stderr) From abbc24794b3db43a07d01b48373fd21e6f1c70e6 Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Wed, 11 Oct 2023 21:25:07 +0530 Subject: [PATCH 4/5] don't react too fast --- e2e/dragonfly_controller_test.go | 3 ++- e2e/util.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index f8c2646..183dffe 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -550,10 +550,11 @@ var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempt err = k8sClient.Delete(ctx, &pod) Expect(err).To(BeNil()) + time.Sleep(10 * time.Second) + // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 2*time.Minute) waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute) - // check if the pod is created err = k8sClient.Get(ctx, types.NamespacedName{ Name: fmt.Sprintf("%s-0", name), diff --git a/e2e/util.go b/e2e/util.go index 84605db..589544a 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -68,7 +68,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } - if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas { + if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && statefulSet.Status.CurrentRevision == statefulSet.Status.UpdateRevision { return true, nil } From b8682520e775649ce380e31ec9b49cf6e5035dd3 Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Fri, 13 Oct 2023 14:01:51 +0530 Subject: [PATCH 5/5] remove statefulset version check --- e2e/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/util.go b/e2e/util.go index 589544a..84605db 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -68,7 +68,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } - if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && statefulSet.Status.CurrentRevision == statefulSet.Status.UpdateRevision { + if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas { return true, nil }