Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(reconciler): recreate missing resources #222

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

GinkgoLogr.Info("start timestamp", "timestamp", time.Now().UTC())
// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
GinkgoLogr.Info("end timestamp", "timestamp", time.Now().UTC())

// Check for service and statefulset
var ss appsv1.StatefulSet
Expand Down Expand Up @@ -478,18 +480,26 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// check for affinity
Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))
}
// Update df to the latest
err = k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &df)
Expect(err).To(BeNil())
GinkgoLogr.Info("df arg propagate phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate)

})

It("Check for data", func() {
stopChan := make(chan struct{}, 1)
defer close(stopChan)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password, 6395)
Expect(err).To(BeNil())

// Check for test data
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
defer close(stopChan)
})

It("Change Service specification to LoadBalancer", func() {
Expand All @@ -512,11 +522,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Labels: newLabels,
}

GinkgoLogr.Info("df phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate)
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
Expand All @@ -533,6 +544,19 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Expect(svc.Labels).To(Equal(newLabels))
})

It("Should recreate missing statefulset", func() {
var ss appsv1.StatefulSet
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expect(err).To(BeNil())

Expect(k8sClient.Delete(ctx, &ss)).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)
Expect(err).To(BeNil())
})

It("Cleanup", func() {
var df resourcesv1.Dragonfly
err := k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -919,6 +943,8 @@ func isDragonflyInphase(ctx context.Context, c client.Client, name, namespace, p
return false, nil
}

GinkgoLogr.Info("dragonfly phase", "phase", df.Status.Phase, "update", df.Status.IsRollingUpdate)

// Ready means we also want rolling update to be false
if phase == controller.PhaseReady {
// check for replicas
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/onsi/gomega v1.33.1
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.5.3
github.com/samber/lo v1.47.0
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRci
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
144 changes: 103 additions & 41 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dragonflydb/dragonfly-operator/internal/resources"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -91,7 +92,49 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Created resources")
return ctrl.Result{}, nil
} else if df.Status.IsRollingUpdate {
}

// Ensure all resources exist before moving forward.
missingResources, err := r.getMissingResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}
for _, resource := range missingResources {
// recreate missing resources
if err := r.Create(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not create resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Update all resources even if the df is in rollout state to ensure
// that newer updates don't get blocked by failed update attempts.
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")

if df.Status.IsRollingUpdate {
// This is a Rollout
log.Info("Rolling out new version")
var updatedStatefulset appsv1.StatefulSet
Expand Down Expand Up @@ -127,6 +170,14 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
} else {
log.Info("found pod without label", "pod", pod.Name)
if isFailedToStart(&pod) {
// This is a new pod which is trying to be ready, but couldn't start due to misconfig.
// Delete the pod and create a new one.
if err := r.Delete(ctx, &pod); err != nil {
log.Error(err, "could not delete pod")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
}
// retry after they are ready
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
Expand Down Expand Up @@ -234,63 +285,74 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

return ctrl.Result{}, nil
} else {
} else if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
// perform a rollout only if the pod spec has changed
var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Check if the pod spec has changed
log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas)
if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")

// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")

// requeue so that the rollout is processed
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}

// Is this a Dragonfly object update?
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")
}
return ctrl.Result{Requeue: true}, nil
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
func isFailedToStart(pod *corev1.Pod) bool {
for _, containerStatus := range pod.Status.ContainerStatuses {
if (containerStatus.State.Waiting != nil && isFailureReason(containerStatus.State.Waiting.Reason)) ||
(containerStatus.State.Terminated != nil && isFailureReason(containerStatus.State.Terminated.Reason)) {
return true
}
}
return false
}

// isFailureReason checks if the given reason indicates a failure.
func isFailureReason(reason string) bool {
return reason == "ErrImagePull" ||
reason == "ImagePullBackOff" ||
reason == "CrashLoopBackOff" ||
reason == "RunContainerError"
}

log.Info("Updated resources for object")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")
return ctrl.Result{Requeue: true}, nil
func (r *DragonflyReconciler) getMissingResources(ctx context.Context, df *dfv1alpha1.Dragonfly) ([]client.Object, error) {
resources, err := resources.GetDragonflyResources(ctx, df)
if err != nil {
return nil, err
}
missingResources := make([]client.Object, 0)
for _, resource := range resources {
obj := resource.DeepCopyObject().(client.Object)

err := r.Get(ctx, client.ObjectKey{
Namespace: df.Namespace,
Name: resource.GetName(),
}, obj)

if errors.IsNotFound(err) {
missingResources = append(missingResources, resource)
} else if err != nil {
return nil, fmt.Errorf("failed to get resource %s/%s: %w", df.Namespace, resource.GetName(), err)
}
}
return missingResources, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *DragonflyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Listen only to spec changes
For(&dfv1alpha1.Dragonfly{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&appsv1.StatefulSet{}, builder.MatchEveryOwner).
Owns(&corev1.Service{}, builder.MatchEveryOwner).
Named("Dragonfly").
Complete(r)
}