Skip to content

Commit

Permalink
fix(updates): re-run loop when update is made (dragonflydb#110)
Browse files Browse the repository at this point in the history
* fix(updates): re-run loop when update is made

Currently, We sometimes miss updates when we compare the statefulset
version. This could be stale as update couldn't have propogated already.

This PR fixes that by requeuing the object when an update is made, so
that we retreive the latest version when ran again.

* fix comment
  • Loading branch information
Pothulapati authored and diffuse committed Nov 18, 2023
1 parent a5e6a69 commit 84a4fd2
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ vet: ## Run go vet against code.
.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
GOBIN=$(LOCALBIN) go install github.com/onsi/ginkgo/v2/[email protected]
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -r -p -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -vv -r -p -coverprofile cover.out

##@ Build

Expand Down
88 changes: 76 additions & 12 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

It("Check for connectivity", func() {
stopChan := make(chan struct{}, 1)
_, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
Expect(err).To(BeNil())

// Insert test data
Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil())

defer close(stopChan)
})

Expand Down Expand Up @@ -294,23 +298,22 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Expect(podRoles[resources.Replica]).To(HaveLen(2))
})

It("Update to image should be propagated successfully", func() {
newImage := resources.DragonflyImage + ":v1.9.0"
It("Updates should be propagated successfully", func() {
// Update df to the latest
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &df)
Expect(err).To(BeNil())

df.Spec.Image = newImage
df.Spec.Image = fmt.Sprintf("%s:%s", resources.DragonflyImage, "v1.9.0")
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())
})

time.Sleep(30 * time.Second)

// Wait until Dragonfly object is marked resources-created
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
It("Check for values in statefulset", func() {
// Wait until Dragonfly object is marked ready
err := waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
Expand All @@ -323,8 +326,8 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
}, &ss)
Expect(err).To(BeNil())

// check for pod image
Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(newImage))
// check for image
Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(df.Spec.Image))

// Check if there are relevant pods with expected roles
var pods corev1.PodList
Expand All @@ -337,11 +340,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// Get the pods along with their roles
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
Expect(pod.Spec.Containers[0].Image).To(Equal(df.Spec.Image))
role, ok := pod.Labels[resources.Role]
// error if there is no label
Expect(ok).To(BeTrue())
// verify the role to match the label
podRoles[role] = append(podRoles[role], pod.Name)

}

// One Master & Two Replicas
Expand Down Expand Up @@ -414,7 +419,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

// Wait until Dragonfly object is marked resources-created
// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expand Down Expand Up @@ -446,6 +451,46 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

// check for affinity
Expect(ss.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))

// check for pods too
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())

for _, pod := range pods.Items {
// check for pod args
Expect(pod.Spec.Containers[0].Args).To(Equal(expectedArgs))

// check for pod resources
Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU].Equal(newResources.Limits[corev1.ResourceCPU])).To(BeTrue())
Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceMemory].Equal(newResources.Limits[corev1.ResourceMemory])).To(BeTrue())
Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU].Equal(newResources.Requests[corev1.ResourceCPU])).To(BeTrue())
Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceMemory].Equal(newResources.Requests[corev1.ResourceMemory])).To(BeTrue())

// check for annotations
Expect(pod.ObjectMeta.Annotations).To(Equal(newAnnotations))

// check for tolerations
Expect(pod.Spec.Tolerations).To(ContainElements(newTolerations))

// check for affinity
Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))
}
})

It("Check for data", func() {
stopChan := make(chan struct{}, 1)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
Expect(err).To(BeNil())

// Check for test data
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
defer close(stopChan)
})

It("Cleanup", func() {
Expand Down Expand Up @@ -609,12 +654,20 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func()
TLSSecretRef: &corev1.SecretReference{
Name: "df-tls",
},
Authentication: &resourcesv1.Authentication{
PasswordFromSecret: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "df-password",
},
Key: "password",
},
},
},
}

Context("Dragonfly TLS creation", func() {
It("Should create successfully", func() {
// create the secret
// create the secrets
cert, key, err := generateSelfSignedCert(name)
Expect(err).To(BeNil())

Expand All @@ -630,6 +683,17 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func()
})
Expect(err).To(BeNil())

err = k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-password",
Namespace: namespace,
},
StringData: map[string]string{
"password": "df-pass-1",
},
})
Expect(err).To(BeNil())

err = k8sClient.Create(ctx, &df)
Expect(err).To(BeNil())

Expand Down
12 changes: 9 additions & 3 deletions e2e/dragonfly_pod_lifecycle_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
It("Initial Master is elected", func() {
err := k8sClient.Create(ctx, &df)
Expect(err).To(BeNil())
})

It("Check for resources, functional pods and status", func() {

// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// Check for service and statefulset
var ss appsv1.StatefulSet
err = k8sClient.Get(ctx, types.NamespacedName{
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expand Down Expand Up @@ -104,7 +107,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
Expect(podRoles[resources.Replica]).To(HaveLen(replicas - 1))
})

It("New Master is elected as old one dies", func() {
It("Delete old master", func() {

// Get & Delete the old master
var pod corev1.Pod
Expand All @@ -116,12 +119,15 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(

err = k8sClient.Delete(ctx, &pod)
Expect(err).To(BeNil())
})

It("New master is elected", func() {

// Wait until the loop is reconciled. This is needed as status is ready previously
// and the test might move forward even before the reconcile loop is triggered
time.Sleep(1 * time.Minute)

err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute)
err := waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute)
Expect(err).To(BeNil())

err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st
return false, nil
}

if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas {
if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && statefulSet.Status.UpdatedReplicas == statefulSet.Status.Replicas {
return true, nil
}

Expand Down
47 changes: 25 additions & 22 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

log.Info(fmt.Sprintf("%d/%d are in stable state", fullSyncedUpdatedReplicas, len(replicas)))
log.Info(fmt.Sprintf("%d/%d replicas are in stable state", fullSyncedUpdatedReplicas, len(replicas)))

// if we are here it means that all latest replicas are in stable sync
// delete older version replicas
Expand Down Expand Up @@ -201,7 +201,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
log.Error(err, "could not delete master")
return ctrl.Result{Requeue: true}, err
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Deleting master %s")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", fmt.Sprintf("Deleting master %s", master.Name))
}

// If we are here all are on latest version
Expand All @@ -216,31 +216,15 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

return ctrl.Result{}, nil
} else {
// This is an Update
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")

// perform a rollout only if the pod spec has changed
var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Check if the pod spec has changed
log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas)
if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")
Expand All @@ -253,12 +237,31 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{Requeue: true}, err
}

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")

// requeue so that the rollout is processed
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")

// Is this a Dragonfly object update?
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
16 changes: 16 additions & 0 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ func (dfi *DragonflyInstance) configureReplication(ctx context.Context) error {
}

func (dfi *DragonflyInstance) updateStatus(ctx context.Context, phase string) error {
// get latest df object first
if err := dfi.client.Get(ctx, types.NamespacedName{
Name: dfi.df.Name,
Namespace: dfi.df.Namespace,
}, dfi.df); err != nil {
return err
}

dfi.log.Info("Updating status", "phase", phase)
dfi.df.Status.Phase = phase
if err := dfi.client.Status().Update(ctx, dfi.df); err != nil {
Expand Down Expand Up @@ -252,6 +260,14 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
return err
}

// retry if there are pods that are not running
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
dfi.log.Info("not all pods are running. retrying", "pod", pod.Name)
return nil
}
}

// check for one master and all replicas
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/dragonfly_pod_lifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque
} else {
log.Info(fmt.Sprintf("Master exists. Configuring %s as replica", pod.Status.PodIP))
if err := dfi.configureReplica(ctx, &pod); err != nil {
log.Error(err, "could not mark replica from db")
log.Error(err, "could not mark replica from db. retrying")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque
log.Info("Non-deletion event for a pod with an existing role. checking if something is wrong", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "role", role)

if err := dfi.checkAndConfigureReplication(ctx); err != nil {
log.Error(err, "could not check and configure replication")
log.Error(err, "could not check and configure replication. retrying")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}

Expand Down
2 changes: 0 additions & 2 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import (
const (
PhaseResourcesCreated string = "resources-created"

PhaseResourcesUpdated string = "resources-updated"

PhaseReady string = "ready"
)

Expand Down

0 comments on commit 84a4fd2

Please sign in to comment.