From 58d867b5ab774cbde82a8c21b180f2514a92c023 Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Wed, 18 Oct 2023 14:49:54 +0530 Subject: [PATCH] fix(updates): re-run loop when update is made (#110) * fix(updates): re-run loop when update is made Currently, We sometimes miss updates when we compare the statefulset version. This could be stale as update couldn't have propogated already. This PR fixes that by requeuing the object when an update is made, so that we retreive the latest version when ran again. * fix comment --- Makefile | 2 +- e2e/dragonfly_controller_test.go | 88 ++++++++++++++++--- ...dragonfly_pod_lifecycle_controller_test.go | 12 ++- e2e/util.go | 2 +- internal/controller/dragonfly_controller.go | 47 +++++----- internal/controller/dragonfly_instance.go | 16 ++++ .../dragonfly_pod_lifecycle_controller.go | 4 +- internal/controller/util.go | 2 - 8 files changed, 130 insertions(+), 43 deletions(-) diff --git a/Makefile b/Makefile index 3b0b3ab..f70905b 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ vet: ## Run go vet against code. .PHONY: test test: manifests generate fmt vet envtest ## Run tests. GOBIN=$(LOCALBIN) go install github.com/onsi/ginkgo/v2/ginkgo@v2.12.1 - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -r -p -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -vv -r -p -coverprofile cover.out ##@ Build diff --git a/e2e/dragonfly_controller_test.go b/e2e/dragonfly_controller_test.go index 183dffe..c433816 100644 --- a/e2e/dragonfly_controller_test.go +++ b/e2e/dragonfly_controller_test.go @@ -189,8 +189,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() It("Check for connectivity", func() { stopChan := make(chan struct{}, 1) - _, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) + rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) Expect(err).To(BeNil()) + + // Insert test data + Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil()) + defer close(stopChan) }) @@ -294,8 +298,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() Expect(podRoles[resources.Replica]).To(HaveLen(2)) }) - It("Update to image should be propagated successfully", func() { - newImage := resources.DragonflyImage + ":v1.9.0" + It("Updates should be propagated successfully", func() { // Update df to the latest err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, @@ -303,14 +306,14 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }, &df) Expect(err).To(BeNil()) - df.Spec.Image = newImage + df.Spec.Image = fmt.Sprintf("%s:%s", resources.DragonflyImage, "v1.9.0") err = k8sClient.Update(ctx, &df) Expect(err).To(BeNil()) + }) - time.Sleep(30 * time.Second) - - // Wait until Dragonfly object is marked resources-created - err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) + It("Check for values in statefulset", func() { + // Wait until Dragonfly object is marked ready + err := waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) Expect(err).To(BeNil()) err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute) Expect(err).To(BeNil()) @@ -323,8 +326,8 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() }, &ss) Expect(err).To(BeNil()) - // check for pod image - Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(newImage)) + // check for image + Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(df.Spec.Image)) // Check if there are relevant pods with expected roles var pods corev1.PodList @@ -337,11 +340,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() // Get the pods along with their roles podRoles := make(map[string][]string) for _, pod := range pods.Items { + Expect(pod.Spec.Containers[0].Image).To(Equal(df.Spec.Image)) role, ok := pod.Labels[resources.Role] // error if there is no label Expect(ok).To(BeTrue()) // verify the role to match the label podRoles[role] = append(podRoles[role], pod.Name) + } // One Master & Two Replicas @@ -414,7 +419,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() err = k8sClient.Update(ctx, &df) Expect(err).To(BeNil()) - // Wait until Dragonfly object is marked resources-created + // Wait until Dragonfly object is marked ready err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute) Expect(err).To(BeNil()) err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute) @@ -446,6 +451,46 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func() // check for affinity Expect(ss.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)) + + // check for pods too + var pods corev1.PodList + err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{ + "app": name, + resources.KubernetesPartOfLabelKey: "dragonfly", + }) + Expect(err).To(BeNil()) + + for _, pod := range pods.Items { + // check for pod args + Expect(pod.Spec.Containers[0].Args).To(Equal(expectedArgs)) + + // check for pod resources + Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU].Equal(newResources.Limits[corev1.ResourceCPU])).To(BeTrue()) + Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceMemory].Equal(newResources.Limits[corev1.ResourceMemory])).To(BeTrue()) + Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU].Equal(newResources.Requests[corev1.ResourceCPU])).To(BeTrue()) + Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceMemory].Equal(newResources.Requests[corev1.ResourceMemory])).To(BeTrue()) + + // check for annotations + Expect(pod.ObjectMeta.Annotations).To(Equal(newAnnotations)) + + // check for tolerations + Expect(pod.Spec.Tolerations).To(ContainElements(newTolerations)) + + // check for affinity + Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)) + } + }) + + It("Check for data", func() { + stopChan := make(chan struct{}, 1) + rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password) + Expect(err).To(BeNil()) + + // Check for test data + data, err := rc.Get(ctx, "foo").Result() + Expect(err).To(BeNil()) + Expect(data).To(Equal("bar")) + defer close(stopChan) }) It("Cleanup", func() { @@ -609,12 +654,20 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() TLSSecretRef: &corev1.SecretReference{ Name: "df-tls", }, + Authentication: &resourcesv1.Authentication{ + PasswordFromSecret: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "df-password", + }, + Key: "password", + }, + }, }, } Context("Dragonfly TLS creation", func() { It("Should create successfully", func() { - // create the secret + // create the secrets cert, key, err := generateSelfSignedCert(name) Expect(err).To(BeNil()) @@ -630,6 +683,17 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func() }) Expect(err).To(BeNil()) + err = k8sClient.Create(ctx, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "df-password", + Namespace: namespace, + }, + StringData: map[string]string{ + "password": "df-pass-1", + }, + }) + Expect(err).To(BeNil()) + err = k8sClient.Create(ctx, &df) Expect(err).To(BeNil()) diff --git a/e2e/dragonfly_pod_lifecycle_controller_test.go b/e2e/dragonfly_pod_lifecycle_controller_test.go index 682ca61..6955b7d 100644 --- a/e2e/dragonfly_pod_lifecycle_controller_test.go +++ b/e2e/dragonfly_pod_lifecycle_controller_test.go @@ -56,6 +56,9 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( It("Initial Master is elected", func() { err := k8sClient.Create(ctx, &df) Expect(err).To(BeNil()) + }) + + It("Check for resources, functional pods and status", func() { // Wait until Dragonfly object is marked initialized waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute) @@ -63,7 +66,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( // Check for service and statefulset var ss appsv1.StatefulSet - err = k8sClient.Get(ctx, types.NamespacedName{ + err := k8sClient.Get(ctx, types.NamespacedName{ Name: name, Namespace: namespace, }, &ss) @@ -104,7 +107,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( Expect(podRoles[resources.Replica]).To(HaveLen(replicas - 1)) }) - It("New Master is elected as old one dies", func() { + It("Delete old master", func() { // Get & Delete the old master var pod corev1.Pod @@ -116,12 +119,15 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func( err = k8sClient.Delete(ctx, &pod) Expect(err).To(BeNil()) + }) + + It("New master is elected", func() { // Wait until the loop is reconciled. This is needed as status is ready previously // and the test might move forward even before the reconcile loop is triggered time.Sleep(1 * time.Minute) - err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute) + err := waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute) Expect(err).To(BeNil()) err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute) diff --git a/e2e/util.go b/e2e/util.go index 84605db..342ed46 100644 --- a/e2e/util.go +++ b/e2e/util.go @@ -68,7 +68,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st return false, nil } - if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas { + if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && statefulSet.Status.UpdatedReplicas == statefulSet.Status.Replicas { return true, nil } diff --git a/internal/controller/dragonfly_controller.go b/internal/controller/dragonfly_controller.go index 6f51555..48a594e 100644 --- a/internal/controller/dragonfly_controller.go +++ b/internal/controller/dragonfly_controller.go @@ -161,7 +161,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } - log.Info(fmt.Sprintf("%d/%d are in stable state", fullSyncedUpdatedReplicas, len(replicas))) + log.Info(fmt.Sprintf("%d/%d replicas are in stable state", fullSyncedUpdatedReplicas, len(replicas))) // if we are here it means that all latest replicas are in stable sync // delete older version replicas @@ -201,7 +201,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log.Error(err, "could not delete master") return ctrl.Result{Requeue: true}, err } - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Deleting master %s") + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", fmt.Sprintf("Deleting master %s", master.Name)) } // If we are here all are on latest version @@ -216,24 +216,6 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, nil } else { - // This is an Update - log.Info("updating existing resources") - newResources, err := resources.GetDragonflyResources(ctx, &df) - if err != nil { - log.Error(err, "could not get resources") - return ctrl.Result{}, err - } - - // update all resources - for _, resource := range newResources { - if err := r.Update(ctx, resource); err != nil { - log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) - return ctrl.Result{}, err - } - } - - log.Info("Updated resources for object") - // perform a rollout only if the pod spec has changed var statefulSet appsv1.StatefulSet if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil { @@ -241,6 +223,8 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{}, err } + // Check if the pod spec has changed + log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas) if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas { log.Info("Pod spec has changed, performing a rollout") r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout") @@ -253,12 +237,31 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return ctrl.Result{Requeue: true}, err } + r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout") + // requeue so that the rollout is processed return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } - r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources") + + // Is this a Dragonfly object update? + log.Info("updating existing resources") + newResources, err := resources.GetDragonflyResources(ctx, &df) + if err != nil { + log.Error(err, "could not get resources") + return ctrl.Result{}, err + } + + // update all resources + for _, resource := range newResources { + if err := r.Update(ctx, resource); err != nil { + log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName())) + return ctrl.Result{}, err + } + } + + log.Info("Updated resources for object") + return ctrl.Result{Requeue: true}, nil } - return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/controller/dragonfly_instance.go b/internal/controller/dragonfly_instance.go index b3331a6..9a16cc0 100644 --- a/internal/controller/dragonfly_instance.go +++ b/internal/controller/dragonfly_instance.go @@ -144,6 +144,14 @@ func (dfi *DragonflyInstance) configureReplication(ctx context.Context) error { } func (dfi *DragonflyInstance) updateStatus(ctx context.Context, phase string) error { + // get latest df object first + if err := dfi.client.Get(ctx, types.NamespacedName{ + Name: dfi.df.Name, + Namespace: dfi.df.Namespace, + }, dfi.df); err != nil { + return err + } + dfi.log.Info("Updating status", "phase", phase) dfi.df.Status.Phase = phase if err := dfi.client.Status().Update(ctx, dfi.df); err != nil { @@ -252,6 +260,14 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context) return err } + // retry if there are pods that are not running + for _, pod := range pods.Items { + if pod.Status.Phase != corev1.PodRunning { + dfi.log.Info("not all pods are running. retrying", "pod", pod.Name) + return nil + } + } + // check for one master and all replicas podRoles := make(map[string][]string) for _, pod := range pods.Items { diff --git a/internal/controller/dragonfly_pod_lifecycle_controller.go b/internal/controller/dragonfly_pod_lifecycle_controller.go index 17d0af5..a598238 100644 --- a/internal/controller/dragonfly_pod_lifecycle_controller.go +++ b/internal/controller/dragonfly_pod_lifecycle_controller.go @@ -120,7 +120,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque } else { log.Info(fmt.Sprintf("Master exists. Configuring %s as replica", pod.Status.PodIP)) if err := dfi.configureReplica(ctx, &pod); err != nil { - log.Error(err, "could not mark replica from db") + log.Error(err, "could not mark replica from db. retrying") return ctrl.Result{RequeueAfter: 5 * time.Second}, err } @@ -148,7 +148,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque log.Info("Non-deletion event for a pod with an existing role. checking if something is wrong", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "role", role) if err := dfi.checkAndConfigureReplication(ctx); err != nil { - log.Error(err, "could not check and configure replication") + log.Error(err, "could not check and configure replication. retrying") return ctrl.Result{RequeueAfter: 5 * time.Second}, err } diff --git a/internal/controller/util.go b/internal/controller/util.go index eb91857..5a017d1 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -34,8 +34,6 @@ import ( const ( PhaseResourcesCreated string = "resources-created" - PhaseResourcesUpdated string = "resources-updated" - PhaseReady string = "ready" )