Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(updates): re-run loop when update is made #110

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ vet: ## Run go vet against code.
.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
GOBIN=$(LOCALBIN) go install github.com/onsi/ginkgo/v2/[email protected]
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -r -p -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" $(GINKGO) -vv -r -p -coverprofile cover.out
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adds more verbose logs on the tests


##@ Build

Expand Down
88 changes: 76 additions & 12 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

It("Check for connectivity", func() {
stopChan := make(chan struct{}, 1)
_, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
Expect(err).To(BeNil())

// Insert test data
Expect(rc.Set(ctx, "foo", "bar", 0).Err()).To(BeNil())

defer close(stopChan)
})

Expand Down Expand Up @@ -294,23 +298,22 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Expect(podRoles[resources.Replica]).To(HaveLen(2))
})

It("Update to image should be propagated successfully", func() {
newImage := resources.DragonflyImage + ":v1.9.0"
It("Updates should be propagated successfully", func() {
// Update df to the latest
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &df)
Expect(err).To(BeNil())

df.Spec.Image = newImage
df.Spec.Image = fmt.Sprintf("%s:%s", resources.DragonflyImage, "v1.9.0")
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())
})

time.Sleep(30 * time.Second)

// Wait until Dragonfly object is marked resources-created
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
It("Check for values in statefulset", func() {
// Wait until Dragonfly object is marked ready
err := waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
Expand All @@ -323,8 +326,8 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
}, &ss)
Expect(err).To(BeNil())

// check for pod image
Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(newImage))
// check for env
Expect(ss.Spec.Template.Spec.Containers[0].Image).To(Equal(df.Spec.Image))

// Check if there are relevant pods with expected roles
var pods corev1.PodList
Expand All @@ -337,11 +340,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// Get the pods along with their roles
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
Expect(pod.Spec.Containers[0].Image).To(Equal(df.Spec.Image))
role, ok := pod.Labels[resources.Role]
// error if there is no label
Expect(ok).To(BeTrue())
// verify the role to match the label
podRoles[role] = append(podRoles[role], pod.Name)

}

// One Master & Two Replicas
Expand Down Expand Up @@ -414,7 +419,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

// Wait until Dragonfly object is marked resources-created
// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expand Down Expand Up @@ -446,6 +451,46 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()

// check for affinity
Expect(ss.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important as it actually verifies if the statefulset update actually translated into a rollout onto the pods.

// check for pods too
var pods corev1.PodList
err = k8sClient.List(ctx, &pods, client.InNamespace(namespace), client.MatchingLabels{
"app": name,
resources.KubernetesPartOfLabelKey: "dragonfly",
})
Expect(err).To(BeNil())

for _, pod := range pods.Items {
// check for pod args
Expect(pod.Spec.Containers[0].Args).To(Equal(expectedArgs))

// check for pod resources
Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU].Equal(newResources.Limits[corev1.ResourceCPU])).To(BeTrue())
Expect(pod.Spec.Containers[0].Resources.Limits[corev1.ResourceMemory].Equal(newResources.Limits[corev1.ResourceMemory])).To(BeTrue())
Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU].Equal(newResources.Requests[corev1.ResourceCPU])).To(BeTrue())
Expect(pod.Spec.Containers[0].Resources.Requests[corev1.ResourceMemory].Equal(newResources.Requests[corev1.ResourceMemory])).To(BeTrue())

// check for annotations
Expect(pod.ObjectMeta.Annotations).To(Equal(newAnnotations))

// check for tolerations
Expect(pod.Spec.Tolerations).To(ContainElements(newTolerations))

// check for affinity
Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))
}
})

It("Check for data", func() {
stopChan := make(chan struct{}, 1)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password)
Expect(err).To(BeNil())

// Check for test data
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
defer close(stopChan)
})

It("Cleanup", func() {
Expand Down Expand Up @@ -609,12 +654,20 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func()
TLSSecretRef: &corev1.SecretReference{
Name: "df-tls",
},
Authentication: &resourcesv1.Authentication{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed as an auth mechanism is necessary when we pass a TLS certificate.

PasswordFromSecret: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: "df-password",
},
Key: "password",
},
},
},
}

Context("Dragonfly TLS creation", func() {
It("Should create successfully", func() {
// create the secret
// create the secrets
cert, key, err := generateSelfSignedCert(name)
Expect(err).To(BeNil())

Expand All @@ -630,6 +683,17 @@ var _ = Describe("Dragonfly Server TLS tests", Ordered, FlakeAttempts(3), func()
})
Expect(err).To(BeNil())

err = k8sClient.Create(ctx, &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "df-password",
Namespace: namespace,
},
StringData: map[string]string{
"password": "df-pass-1",
},
})
Expect(err).To(BeNil())

err = k8sClient.Create(ctx, &df)
Expect(err).To(BeNil())

Expand Down
12 changes: 9 additions & 3 deletions e2e/dragonfly_pod_lifecycle_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,17 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
It("Initial Master is elected", func() {
err := k8sClient.Create(ctx, &df)
Expect(err).To(BeNil())
})

It("Check for resources, functional pods and status", func() {

// Wait until Dragonfly object is marked initialized
waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseResourcesCreated, 2*time.Minute)
waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)

// Check for service and statefulset
var ss appsv1.StatefulSet
err = k8sClient.Get(ctx, types.NamespacedName{
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expand Down Expand Up @@ -104,7 +107,7 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(
Expect(podRoles[resources.Replica]).To(HaveLen(replicas - 1))
})

It("New Master is elected as old one dies", func() {
It("Delete old master", func() {

// Get & Delete the old master
var pod corev1.Pod
Expand All @@ -116,12 +119,15 @@ var _ = Describe("DF Pod Lifecycle Reconciler", Ordered, FlakeAttempts(3), func(

err = k8sClient.Delete(ctx, &pod)
Expect(err).To(BeNil())
})

It("New master is elected", func() {

// Wait until the loop is reconciled. This is needed as status is ready previously
// and the test might move forward even before the reconcile loop is triggered
time.Sleep(1 * time.Minute)

err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute)
err := waitForStatefulSetReady(ctx, k8sClient, name, namespace, 1*time.Minute)
Expect(err).To(BeNil())

err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace st
return false, nil
}

if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas {
if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas && statefulSet.Status.UpdatedReplicas == statefulSet.Status.Replicas {
return true, nil
}

Expand Down
47 changes: 25 additions & 22 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

log.Info(fmt.Sprintf("%d/%d are in stable state", fullSyncedUpdatedReplicas, len(replicas)))
log.Info(fmt.Sprintf("%d/%d replicas are in stable state", fullSyncedUpdatedReplicas, len(replicas)))

// if we are here it means that all latest replicas are in stable sync
// delete older version replicas
Expand Down Expand Up @@ -201,7 +201,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
log.Error(err, "could not delete master")
return ctrl.Result{Requeue: true}, err
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Deleting master %s")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", fmt.Sprintf("Deleting master %s", master.Name))
}

// If we are here all are on latest version
Expand All @@ -216,31 +216,15 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

return ctrl.Result{}, nil
} else {
// This is an Update
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")

// perform a rollout only if the pod spec has changed
var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Check if the pod spec has changed
log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas)
if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")
Expand All @@ -253,12 +237,31 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{Requeue: true}, err
}

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")

// requeue so that the rollout is processed
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")

// Is this a Dragonfly object update?
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
16 changes: 16 additions & 0 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ func (dfi *DragonflyInstance) configureReplication(ctx context.Context) error {
}

func (dfi *DragonflyInstance) updateStatus(ctx context.Context, phase string) error {
// get latest df object first
if err := dfi.client.Get(ctx, types.NamespacedName{
Name: dfi.df.Name,
Namespace: dfi.df.Namespace,
}, dfi.df); err != nil {
return err
}

dfi.log.Info("Updating status", "phase", phase)
dfi.df.Status.Phase = phase
if err := dfi.client.Status().Update(ctx, dfi.df); err != nil {
Expand Down Expand Up @@ -252,6 +260,14 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
return err
}

// retry if there are pods that are not running
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
dfi.log.Info("not all pods are running. retrying", "pod", pod.Name)
return nil
}
}

// check for one master and all replicas
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/dragonfly_pod_lifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque
} else {
log.Info(fmt.Sprintf("Master exists. Configuring %s as replica", pod.Status.PodIP))
if err := dfi.configureReplica(ctx, &pod); err != nil {
log.Error(err, "could not mark replica from db")
log.Error(err, "could not mark replica from db. retrying")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (r *DfPodLifeCycleReconciler) Reconcile(ctx context.Context, req ctrl.Reque
log.Info("Non-deletion event for a pod with an existing role. checking if something is wrong", "pod", fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), "role", role)

if err := dfi.checkAndConfigureReplication(ctx); err != nil {
log.Error(err, "could not check and configure replication")
log.Error(err, "could not check and configure replication. retrying")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}

Expand Down
2 changes: 0 additions & 2 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import (
const (
PhaseResourcesCreated string = "resources-created"

PhaseResourcesUpdated string = "resources-updated"

PhaseReady string = "ready"
)

Expand Down