Skip to content

Commit

Permalink
Merge branch 'dragonflydb:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sigarus authored Dec 12, 2024
2 parents d7f0b16 + b07f733 commit adb5611
Show file tree
Hide file tree
Showing 15 changed files with 1,700 additions and 112 deletions.
8 changes: 8 additions & 0 deletions api/v1alpha1/dragonfly_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ type DragonflySpec struct {
// +optional
// +kubebuilder:validation:Optional
ServiceSpec *ServiceSpec `json:"serviceSpec,omitempty"`

// (Optional) Dragonfly pod init containers
// +optional
// +kubebuilder:validation:Optional
InitContainers []corev1.Container `json:"initContainers,omitempty"`
}

type ServiceSpec struct {
Expand Down Expand Up @@ -221,6 +226,9 @@ type DragonflyStatus struct {

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase",description="The current phase of the Dragonfly cluster"
//+kubebuilder:printcolumn:name="Rolling Update",type="boolean",JSONPath=".status.isRollingUpdate",description="Indicates if a rolling update is in progress"
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".spec.replicas",description="Number of replicas"

// Dragonfly is the Schema for the dragonflies API
type Dragonfly struct {
Expand Down
7 changes: 7 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions charts/dragonfly-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: v1.1.7
version: v1.1.8

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v1.1.7"
appVersion: "v1.1.8"
1,443 changes: 1,442 additions & 1 deletion config/crd/bases/dragonflydb.io_dragonflies.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kind: Kustomization
images:
- name: controller
newName: docker.dragonflydb.io/dragonflydb/operator
newTag: v1.1.7
newTag: v1.1.8
36 changes: 31 additions & 5 deletions e2e/dragonfly_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
}, &df)
Expect(err).To(BeNil())

df.Spec.Image = fmt.Sprintf("%s:%s", resources.DragonflyImage, "v1.21.2")
df.Spec.Image = fmt.Sprintf("%s:%s", resources.DragonflyImage, "v1.24.0")
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())
})
Expand Down Expand Up @@ -418,11 +418,13 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

GinkgoLogr.Info("start timestamp", "timestamp", time.Now().UTC())
// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
GinkgoLogr.Info("end timestamp", "timestamp", time.Now().UTC())

// Check for service and statefulset
var ss appsv1.StatefulSet
Expand Down Expand Up @@ -478,18 +480,26 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
// check for affinity
Expect(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution).To(Equal(newAffinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution))
}
// Update df to the latest
err = k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &df)
Expect(err).To(BeNil())
GinkgoLogr.Info("df arg propagate phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate)

})

It("Check for data", func() {
stopChan := make(chan struct{}, 1)
defer close(stopChan)
rc, err := checkAndK8sPortForwardRedis(ctx, clientset, cfg, stopChan, name, namespace, password, 6395)
Expect(err).To(BeNil())

// Check for test data
data, err := rc.Get(ctx, "foo").Result()
Expect(err).To(BeNil())
Expect(data).To(Equal("bar"))
defer close(stopChan)
})

It("Change Service specification to LoadBalancer", func() {
Expand All @@ -512,11 +522,12 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Labels: newLabels,
}

GinkgoLogr.Info("df phase", "phase", df.Status.Phase, "rolling-update", df.Status.IsRollingUpdate)
err = k8sClient.Update(ctx, &df)
Expect(err).To(BeNil())

// Wait until Dragonfly object is marked ready
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 3*time.Minute)
err = waitForDragonflyPhase(ctx, k8sClient, name, namespace, controller.PhaseReady, 1*time.Minute)
Expect(err).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 3*time.Minute)
Expect(err).To(BeNil())
Expand All @@ -533,6 +544,19 @@ var _ = Describe("Dragonfly Lifecycle tests", Ordered, FlakeAttempts(3), func()
Expect(svc.Labels).To(Equal(newLabels))
})

It("Should recreate missing statefulset", func() {
var ss appsv1.StatefulSet
err := k8sClient.Get(ctx, types.NamespacedName{
Name: name,
Namespace: namespace,
}, &ss)
Expect(err).To(BeNil())

Expect(k8sClient.Delete(ctx, &ss)).To(BeNil())
err = waitForStatefulSetReady(ctx, k8sClient, name, namespace, 2*time.Minute)
Expect(err).To(BeNil())
})

It("Cleanup", func() {
var df resourcesv1.Dragonfly
err := k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -613,8 +637,8 @@ user john on #0c8e2b662f1c0f1 -@all +@string +hset
Expect(err).To(BeNil())
Expect(result).To(HaveLen(2))
Expect(result).To(ContainElements(
"user default on nopass ~* +@all",
"user john on #0c8e2b662f1c0f -@all +@string +hset",
"user default on nopass ~* resetchannels +@all",
"user john on #0c8e2b662f1c0f resetchannels -@all +@string +hset",
))
})
It("Cleanup", func() {
Expand Down Expand Up @@ -919,6 +943,8 @@ func isDragonflyInphase(ctx context.Context, c client.Client, name, namespace, p
return false, nil
}

GinkgoLogr.Info("dragonfly phase", "phase", df.Status.Phase, "update", df.Status.IsRollingUpdate)

// Ready means we also want rolling update to be false
if phase == controller.PhaseReady {
// check for replicas
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/onsi/gomega v1.33.1
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.5.3
github.com/samber/lo v1.47.0
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRci
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
145 changes: 104 additions & 41 deletions internal/controller/dragonfly_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dragonflydb/dragonfly-operator/internal/resources"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -91,7 +92,49 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Created resources")
return ctrl.Result{}, nil
} else if df.Status.IsRollingUpdate {
}

// Ensure all resources exist before moving forward.
missingResources, err := r.getMissingResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}
for _, resource := range missingResources {
// recreate missing resources
if err := r.Create(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not create resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Update all resources even if the df is in rollout state to ensure
// that newer updates don't get blocked by failed update attempts.
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
}

log.Info("Updated resources for object")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")

if df.Status.IsRollingUpdate {
// This is a Rollout
log.Info("Rolling out new version")
var updatedStatefulset appsv1.StatefulSet
Expand Down Expand Up @@ -127,6 +170,14 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
} else {
log.Info("found pod without label", "pod", pod.Name)
if isFailedToStart(&pod) {
// This is a new pod which is trying to be ready, but couldn't start due to misconfig.
// Delete the pod and create a new one.
if err := r.Delete(ctx, &pod); err != nil {
log.Error(err, "could not delete pod")
return ctrl.Result{RequeueAfter: 5 * time.Second}, err
}
}
// retry after they are ready
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
Expand Down Expand Up @@ -234,62 +285,74 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

return ctrl.Result{}, nil
} else {
} else if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
// perform a rollout only if the pod spec has changed
var statefulSet appsv1.StatefulSet
if err := r.Get(ctx, client.ObjectKey{Namespace: df.Namespace, Name: df.Name}, &statefulSet); err != nil {
log.Error(err, "could not get statefulset")
return ctrl.Result{}, err
}

// Check if the pod spec has changed
log.Info("Checking if pod spec has changed", "updatedReplicas", statefulSet.Status.UpdatedReplicas, "currentReplicas", statefulSet.Status.Replicas)
if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")

// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}

r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")
log.Info("Pod spec has changed, performing a rollout")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", "Starting a rollout")

// requeue so that the rollout is processed
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
// Start rollout and update status
// update status so that we can track progress
df.Status.IsRollingUpdate = true
if err := r.Status().Update(ctx, &df); err != nil {
log.Error(err, "could not update the Dragonfly object")
return ctrl.Result{Requeue: true}, err
}

// Is this a Dragonfly object update?
log.Info("updating existing resources")
newResources, err := resources.GetDragonflyResources(ctx, &df)
if err != nil {
log.Error(err, "could not get resources")
return ctrl.Result{}, err
}
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Performing a rollout")
}
return ctrl.Result{Requeue: true}, nil
}

// update all resources
for _, resource := range newResources {
if err := r.Update(ctx, resource); err != nil {
log.Error(err, fmt.Sprintf("could not update resource %s/%s/%s", resource.GetObjectKind(), resource.GetNamespace(), resource.GetName()))
return ctrl.Result{}, err
}
func isFailedToStart(pod *corev1.Pod) bool {
for _, containerStatus := range pod.Status.ContainerStatuses {
if (containerStatus.State.Waiting != nil && isFailureReason(containerStatus.State.Waiting.Reason)) ||
(containerStatus.State.Terminated != nil && isFailureReason(containerStatus.State.Terminated.Reason)) {
return true
}
}
return false
}

// isFailureReason checks if the given reason indicates a failure.
func isFailureReason(reason string) bool {
return reason == "ErrImagePull" ||
reason == "ImagePullBackOff" ||
reason == "CrashLoopBackOff" ||
reason == "RunContainerError"
}

log.Info("Updated resources for object")
r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Resources", "Updated resources")
return ctrl.Result{Requeue: true}, nil
func (r *DragonflyReconciler) getMissingResources(ctx context.Context, df *dfv1alpha1.Dragonfly) ([]client.Object, error) {
resources, err := resources.GetDragonflyResources(ctx, df)
if err != nil {
return nil, err
}
missingResources := make([]client.Object, 0)
for _, resource := range resources {
obj := resource.DeepCopyObject().(client.Object)

err := r.Get(ctx, client.ObjectKey{
Namespace: df.Namespace,
Name: resource.GetName(),
}, obj)

if errors.IsNotFound(err) {
missingResources = append(missingResources, resource)
} else if err != nil {
return nil, fmt.Errorf("failed to get resource %s/%s: %w", df.Namespace, resource.GetName(), err)
}
}
return missingResources, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *DragonflyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// Listen only to spec changes
For(&dfv1alpha1.Dragonfly{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{}).
Owns(&appsv1.StatefulSet{}, builder.MatchEveryOwner).
Owns(&corev1.Service{}, builder.MatchEveryOwner).
Named("Dragonfly").
Complete(r)
}
6 changes: 4 additions & 2 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"

dfv1alpha1 "github.com/dragonflydb/dragonfly-operator/api/v1alpha1"
Expand Down Expand Up @@ -329,7 +331,7 @@ func (dfi *DragonflyInstance) getPods(ctx context.Context) (*corev1.PodList, err
// to the given master instance
func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, masterIp string) error {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand Down Expand Up @@ -357,7 +359,7 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
// along while updating other pods to be replicas
func (dfi *DragonflyInstance) replicaOfNoOne(ctx context.Context, pod *corev1.Pod) error {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand Down
Loading

0 comments on commit adb5611

Please sign in to comment.