Skip to content

Commit

Permalink
feat(e2e): Improve data insert tests (dragonflydb#114)
Browse files Browse the repository at this point in the history
* fix(e2e): Improve Data insert tests

* unnecessary changes

* some more e2e fixes

* don't react too fast

* remove statefulset version check
  • Loading branch information
Pothulapati authored and diffuse committed Nov 18, 2023
1 parent a910063 commit a5e6a69
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 103 deletions.
104 changes: 55 additions & 49 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
},
Key: "password",
},
ClientCaCertSecret: &corev1.SecretReference{
Name: "df-client-ca-certs",
},
},
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -107,27 +104,16 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
}

Context("Dragonfly resource creation", func() {
password := "df-pass-1"
It("Should create successfully", func() {
// create the secret
err := k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-client-ca-certs",
Namespace: namespace,
},
StringData: map[string]string{
"ca.crt": "foo",
},
})
Expect(err).To(BeNil())

// create the secret
err = k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-secret",
Namespace: namespace,
},
StringData: map[string]string{
"password": "df-pass-1",
"password": password,
},
})
Expect(err).To(BeNil())
Expand Down Expand Up @@ -185,22 +171,6 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
SecretKeyRef: df.Spec.Authentication.PasswordFromSecret,
},
}))

// ClientCACertSecret
Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("%s=%s", resources.TLSCACertDirArg, resources.TLSCACertDir)))
Expect(ss.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(corev1.VolumeMount{
Name: resources.TLSCACertVolumeName,
MountPath: resources.TLSCACertDir,
}))
Expect(ss.Spec.Template.Spec.Volumes).To(ContainElement(corev1.Volume{
Name: resources.TLSCACertVolumeName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: df.Spec.Authentication.ClientCaCertSecret.Name,
DefaultMode: func() *int32 { i := int32(420); return &i }(),
},
},
}))
})

It("Check for pod values", func() {
Expand All @@ -219,11 +189,9 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

It("Check for connectivity", func() {
stopChan := make(chan struct{}, 1)
rc, err := InitRunCmd(ctx, stopChan, name, namespace, "df-pass-1")
defer close(stopChan)
Expect(err).To(BeNil())
err = rc.Start(ctx)
_, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
Expect(err).To(BeNil())
defer close(stopChan)
})

It("Increase in replicas should be propagated successfully", func() {
Expand Down Expand Up @@ -327,7 +295,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
})

It("Update to image should be propagated successfully", func() {
newImage := resources.DragonflyImage + ":v1.1.0"
newImage := resources.DragonflyImage + ":v1.9.0"
// Update df to the latest
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Expand Down Expand Up @@ -494,8 +462,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
})
})

var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {

var _ = Describe("Dragonfly PVC Test with single replica", Ordered, FlakeAttempts(3), func() {
ctx := context.Background()
name := "df-pvc"
namespace := "default"
Expand Down Expand Up @@ -531,14 +498,16 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
},
})
Expect(err).To(BeNil())
})

It("Resources should exist", func() {
// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// Check for service and statefulset
var ss appsv1.StatefulSet
err = k8sClient.Get(ctx, types.NamespacedName{
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expand All @@ -561,7 +530,48 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
Expect(pvcs.Items).To(HaveLen(1))
Expect(ss.Spec.Template.Spec.Containers[0].Args).To(ContainElement(fmt.Sprintf("--snapshot_cron=%s", schedule)))

// TODO: Do data insert testing
// Insert Data
stopChan := make(chan struct{}, 1)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "")
Expect(err).To(BeNil())

// Insert test data
Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil())
close(stopChan)

// delete the single replica
var pod corev1.Pod
err = k8sClient.Get(ctx, types.NamespacedName{
Name: fmt.Sprintf("%s-0", name),
Namespace: namespace,
}, &pod)
Expect(err).To(BeNil())

err = k8sClient.Delete(ctx, &pod)
Expect(err).To(BeNil())

time.Sleep(10 * time.Second)

// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)
// check if the pod is created
err = k8sClient.Get(ctx, types.NamespacedName{
Name: fmt.Sprintf("%s-0", name),
Namespace: namespace,
}, &pod)
Expect(err).To(BeNil())

// recreate Redis Client on the new pod
stopChan = make(chan struct{}, 1)
rc, err = checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, "")
defer close(stopChan)
Expect(err).To(BeNil())

// check if the Data exists
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
})

It("Cleanup", func() {
Expand All @@ -579,7 +589,7 @@ var _ = Describe("Dragonfly PVC Test", Ordered, FlakeAttempts(3), func() {
})
})

var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() {
ctx := context.Background()
name := "df-tls"
namespace := "default"
Expand All @@ -596,12 +606,6 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
Spec: resourcesv1.DragonflySpec{
Replicas: 2,
Args: args,
Env: []corev1.EnvVar{
{
Name: "DFLY_PASSWORD",
Value: "df-pass-1",
},
},
TLSSecretRef: &corev1.SecretReference{
Name: "df-tls",
},
Expand Down Expand Up @@ -629,13 +633,15 @@ var _ = Describe("Dragonfly TLS tests", Ordered, FlakeAttempts(3), func() {
err = k8sClient.Create(ctx, &df)
Expect(err).To(BeNil())

})
It("Resources should exist", func() {
// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// Check for service and statefulset
var ss appsv1.StatefulSet
err = k8sClient.Get(ctx, types.NamespacedName{
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expand Down
93 changes: 39 additions & 54 deletions e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"net/http"
"os"
"path/filepath"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,10 +34,8 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/homedir"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -78,36 +75,18 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st
return false, nil
}

type RunCmd struct {
*redis.Client
*portforward.PortForwarder
}

func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, password string) (*RunCmd, error) {
home := homedir.HomeDir()
if home == "" {
return nil, fmt.Errorf("can't find kube-config")
}
kubeconfig := filepath.Join(home, ".kube", "config")
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}

// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

func checkAndK8sPortForwardRedis(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, stopChan chan struct{}, name, namespace, password string) (*redis.Client, error) {
pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%s", name),
})
if err != nil {
return nil, err
}

if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pods found")
}

var master *corev1.Pod
for _, pod := range pods.Items {
if pod.Labels[resources.Role] == resources.Master {
Expand All @@ -116,16 +95,43 @@ func InitRunCmd(ctx context.Context, stopChan chan struct{}, name, namespace, pa
}
}

fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyPort)
if master == nil {
return nil, fmt.Errorf("no master pod found")
}

fw, err := portForward(ctx, clientset, config, master, stopChan, resources.DragonflyAdminPort)
if err != nil {
return nil, err
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("localhost:%d", resources.DragonflyPort),
Password: password,
})
return &RunCmd{Client: redisClient, PortForwarder: fw}, nil
redisOptions := &redis.Options{
Addr: fmt.Sprintf("localhost:9998"),
}

if password != "" {
redisOptions.Password = password
}

redisClient := redis.NewClient(redisOptions)

errChan := make(chan error, 1)
go func() { errChan <- fw.ForwardPorts() }()

select {
case err = <-errChan:
return nil, errors.Wrap(err, "unable to forward ports")
case <-fw.Ready:
}

pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

err = redisClient.Ping(pingCtx).Err()
if err != nil {
return nil, fmt.Errorf("unable to ping instance: %w", err)
}

return redisClient, nil
}

func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, pod *corev1.Pod, stopChan chan struct{}, port int) (*portforward.PortForwarder, error) {
Expand All @@ -142,7 +148,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
}

dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
ports := []string{fmt.Sprintf("%d:%d", port, resources.DragonflyPort)}
ports := []string{fmt.Sprintf("%d:%d", 9998, resources.DragonflyPort)}
readyChan := make(chan struct{}, 1)

fw, err := portforward.New(dialer, ports, stopChan, readyChan, io.Discard, os.Stderr)
Expand All @@ -151,24 +157,3 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
}
return fw, err
}

func (r *RunCmd) Start(ctx context.Context) error {
errChan := make(chan error, 1)
var err error
go func() { errChan <- r.ForwardPorts() }()

select {
case err = <-errChan:
return errors.Wrap(err, "port forwarding failed")
case <-r.Ready:
}

pingCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

err = r.Ping(pingCtx).Err()
if err != nil {
return fmt.Errorf("unable to ping instance")
}
return nil
}

0 comments on commit a5e6a69

Please sign in to comment.