diff --git a/Makefile b/Makefile index 3b0b3ab..f70905b 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ vet: ## Run go vet against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. GOBIN=$(LOCALBIN) go install github.com/onsi/ginkgo/v2/ginkgo@v2.12.1 - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -r -p -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -vv -r -p -coverprofile cover.out ##@ Build diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index 183dffe..c433816 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -189,8 +189,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() It("Check for connectivity", func() { stopChan := make(chan struct{}, 1) - _, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) + rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) Expect(err).To(BeNil()) + + // Insert test data + Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil()) + defer close(stopChan) }) @@ -294,8 +298,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() Expect(podRoles[resources.Replica]).To(HaveLen(2)) }) - It("Update to image should be propagated successfully", func() { - newImage := resources.DragonflyImage + ":v1.9.0" + It("Updates should be propagated successfully", func() { // Update df to the latest err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, @@ -303,14 +306,14 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }, &df) Expect(err).To(BeNil()) - df.Spec.Image = newImage + df.Spec.Image = fmt.Sprintf("%s:%s", resources.DragonflyImage, "v1.9.0") err = k8sClient.Update(ctx, &df) Expect(err).To(BeNil()) + }) - time.Sleep(30 * time.Second) - - // Wait until Dragonfly object is marked resources-created - err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) + It("Check for values in statefulset", func() { + // Wait until Dragonfly object is marked ready + err := waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) Expect(err).To(BeNil()) err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute) Expect(err).To(BeNil()) @@ -323,8 +326,8 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }, &ss) Expect(err).To(BeNil()) - // check for pod image - Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(newImage)) + // check for image + Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(df.Spec.Image)) // Check if there are relevant pods with expected roles var pods corev1.PodList @@ -337,11 +340,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() // Get the pods along with their roles podRoles := make(map[string][]string) for _, pod := range pods.Items { + Expect(pod.Spec.Containers[0].Image).To(Equal(df.Spec.Image)) role, ok := pod.Labels[resources.Role] // error if there is no label Expect(ok).To(BeTrue()) // verify the role to match the label podRoles[role] = append(podRoles[role], pod.Name) + } // One Master & Two Replicas @@ -414,7 +419,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() err = k8sClient.Update(ctx, &df) Expect(err).To(BeNil()) - // Wait until Dragonfly object is marked resources-created + // Wait until Dragonfly object is marked ready err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) Expect(err).To(BeNil()) err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute) @@ -446,6 +451,46 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() // check for affinity Expect(ss.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)) + + // check for pods too + var pods corev1.PodList + err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{ + "app": name, + resources.KubernetesPartOfLabelKey: "dragonfly", + }) + Expect(err).To(BeNil()) + + for _, pod := range pods.Items { + // check for pod args + Expect(pod.Spec.Containers[0].Args).To(Equal(expectedArgs)) + + // check for pod resources + Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU].Equal(newResources.Limits[corev1.ResourceCPU])).To(BeTrue()) + Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceMemory].Equal(newResources.Limits[corev1.ResourceMemory])).To(BeTrue()) + Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU].Equal(newResources.Requests[corev1.ResourceCPU])).To(BeTrue()) + Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceMemory].Equal(newResources.Requests[corev1.ResourceMemory])).To(BeTrue()) + + // check for annotations + Expect(pod.ObjectMeta.Annotations).To(Equal(newAnnotations)) + + // check for tolerations + Expect(pod.Spec.Tolerations).To(ContainElements(newTolerations)) + + // check for affinity + Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)) + } + }) + + It("Check for data", func() { + stopChan := make(chan struct{}, 1) + rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) + Expect(err).To(BeNil()) + + // Check for test data + data, err := rc.Get(ctx, "foo").Result() + Expect(err).To(BeNil()) + Expect(data).To(Equal("bar")) + defer close(stopChan) }) It("Cleanup", func() { @@ -609,12 +654,20 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() TLSSecretRef: &corev1.SecretReference{ Name: "df-tls", }, + Authentication: &resourcesv1.Authentication{ + PasswordFromSecret: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "df-password", + }, + Key: "password", + }, + }, }, } Context("Dragonfly TLS creation", func() { It("Should create successfully", func() { - // create the secret + // create the secrets cert, key, err := generateSelfSignedCert(name) Expect(err).To(BeNil()) @@ -630,6 +683,17 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() }) Expect(err).To(BeNil()) + err = k8sClient.Create(ctx, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "df-password", + Namespace: namespace, + }, + StringData: map[string]string{ + "password": "df-pass-1", + }, + }) + Expect(err).To(BeNil()) + err = k8sClient.Create(ctx, &df) Expect(err).To(BeNil()) diff --git a/e2e/dragonfly_pod_lifecycle_controller_test.go b/e2e/dragonfly_pod_lifecycle_controller_test.go index 682ca61..6955b7d 100644 --- a/e2e/dragonfly_pod_lifecycle_controller_test.go +++ b/e2e/dragonfly_pod_lifecycle_controller_test.go @@ -56,6 +56,9 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( It("Initial Master is elected", func() { err := k8sClient.Create(ctx, &df) Expect(err).To(BeNil()) + }) + + It("Check for resources, functional pods and status", func() { // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute) @@ -63,7 +66,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( // Check for service and statefulset var ss appsv1.StatefulSet - err = k8sClient.Get(ctx, types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, Namespace: namespace, }, &ss) @@ -104,7 +107,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( Expect(podRoles[resources.Replica]).To(HaveLen(replicas - 1)) }) - It("New Master is elected as old one dies", func() { + It("Delete old master", func() { // Get & Delete the old master var pod corev1.Pod @@ -116,12 +119,15 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( err = k8sClient.Delete(ctx, &pod) Expect(err).To(BeNil()) + }) + + It("New master is elected", func() { // Wait until the loop is reconciled. This is needed as status is ready previously // and the test might move forward even before the reconcile loop is triggered time.Sleep(1 * time.Minute) - err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute) + err := waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute) Expect(err).To(BeNil()) err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute) diff --git a/e2e/util.go b/e2e/util.go index 84605db..342ed46 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -68,7 +68,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } - if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas { + if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && statefulSet.Status.UpdatedReplicas == statefulSet.Status.Replicas { return true, nil } diff --git a/internal/controller/dragonfly_controller.go b/internal/controller/dragonfly_controller.go index 6f51555..48a594e 100644 --- a/internal/controller/dragonfly_controller.go +++ b/internal/controller/dragonfly_controller.go @@ -161,7 +161,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } - log.Info(fmt.Sprintf("%d/%d are in stable state", fullSyncedUpdatedReplicas, len(replicas))) + log.Info(fmt.Sprintf("%d/%d replicas are in stable state", fullSyncedUpdatedReplicas, len(replicas))) // if we are here it means that all latest replicas are in stable sync // delete older version replicas @@ -201,7 +201,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log.Error(err, "could not delete master") return ctrl.Result{Requeue: true}, err } - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Deleting master %s") + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", fmt.Sprintf("Deleting master %s", master.Name)) } // If we are here all are on latest version @@ -216,24 +216,6 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } else { - // This is an Update - log.Info("updating existing resources") - newResources, err := resources.GetDragonflyResources(ctx, &df) - if err != nil { - log.Error(err, "could not get resources") - return ctrl.Result{}, err - } - - // update all resources - for _, resource := range newResources { - if err := r.Update(ctx, resource); err != nil { - log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) - return ctrl.Result{}, err - } - } - - log.Info("Updated resources for object") - // perform a rollout only if the pod spec has changed var statefulSet appsv1.StatefulSet if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil { @@ -241,6 +223,8 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + // Check if the pod spec has changed + log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas) if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { log.Info("Pod spec has changed, performing a rollout") r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout") @@ -253,12 +237,31 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{Requeue: true}, err } + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout") + // requeue so that the rollout is processed return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources") + + // Is this a Dragonfly object update? + log.Info("updating existing resources") + newResources, err := resources.GetDragonflyResources(ctx, &df) + if err != nil { + log.Error(err, "could not get resources") + return ctrl.Result{}, err + } + + // update all resources + for _, resource := range newResources { + if err := r.Update(ctx, resource); err != nil { + log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) + return ctrl.Result{}, err + } + } + + log.Info("Updated resources for object") + return ctrl.Result{Requeue: true}, nil } - return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/controller/dragonfly_instance.go b/internal/controller/dragonfly_instance.go index b3331a6..9a16cc0 100644 --- a/internal/controller/dragonfly_instance.go +++ b/internal/controller/dragonfly_instance.go @@ -144,6 +144,14 @@ func (dfi *DragonflyInstance) configureReplication(ctx context.Context) error { } func (dfi *DragonflyInstance) updateStatus(ctx context.Context, phase string) error { + // get latest df object first + if err := dfi.client.Get(ctx, types.NamespacedName{ + Name: dfi.df.Name, + Namespace: dfi.df.Namespace, + }, dfi.df); err != nil { + return err + } + dfi.log.Info("Updating status", "phase", phase) dfi.df.Status.Phase = phase if err := dfi.client.Status().Update(ctx, dfi.df); err != nil { @@ -252,6 +260,14 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context) return err } + // retry if there are pods that are not running + for _, pod := range pods.Items { + if pod.Status.Phase != corev1.PodRunning { + dfi.log.Info("not all pods are running. retrying", "pod", pod.Name) + return nil + } + } + // check for one master and all replicas podRoles := make(map[string][]string) for _, pod := range pods.Items { diff --git a/internal/controller/dragonfly_pod_lifecycle_controller.go b/internal/controller/dragonfly_pod_lifecycle_controller.go index 17d0af5..a598238 100644 --- a/internal/controller/dragonfly_pod_lifecycle_controller.go +++ b/internal/controller/dragonfly_pod_lifecycle_controller.go @@ -120,7 +120,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque } else { log.Info(fmt.Sprintf("Master exists. Configuring %s as replica", pod.Status.PodIP)) if err := dfi.configureReplica(ctx, &pod); err != nil { - log.Error(err, "could not mark replica from db") + log.Error(err, "could not mark replica from db. retrying") return ctrl.Result{RequeueAfter: 5 * time.Second}, err } @@ -148,7 +148,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque log.Info("Non-deletion event for a pod with an existing role. checking if something is wrong", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "role", role) if err := dfi.checkAndConfigureReplication(ctx); err != nil { - log.Error(err, "could not check and configure replication") + log.Error(err, "could not check and configure replication. retrying") return ctrl.Result{RequeueAfter: 5 * time.Second}, err } diff --git a/internal/controller/util.go b/internal/controller/util.go index eb91857..5a017d1 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -34,8 +34,6 @@ import ( const ( PhaseResourcesCreated string = "resources-created" - PhaseResourcesUpdated string = "resources-updated" - PhaseReady string = "ready" )