diff --git a/internal/controller/dragonfly_controller.go b/internal/controller/dragonfly_controller.go index 1d36e09..496af1a 100644 --- a/internal/controller/dragonfly_controller.go +++ b/internal/controller/dragonfly_controller.go @@ -138,7 +138,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( fullSyncedUpdatedReplicas := 0 for _, replica := range replicas { // Check only with latest replicas - onLatestVersion, err := isPodOnLatestVersion(ctx, r.Client, &replica, &updatedStatefulset) + onLatestVersion, err := isPodOnLatestVersion(&replica, &updatedStatefulset) if err != nil { log.Error(err, "could not check if pod is on latest version") return ctrl.Result{RequeueAfter: 5 * time.Second}, err @@ -146,7 +146,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if onLatestVersion { // check if the replica had a full sync log.Info("New Replica found. Checking if replica had a full sync", "pod", replica.Name) - isStableState, err := isStableState(ctx, r.Client, &replica) + isStableState, err := isStableState(ctx, &replica) if err != nil { log.Error(err, "could not check if pod is in stable state") return ctrl.Result{RequeueAfter: 5 * time.Second}, err @@ -167,7 +167,7 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // delete older version replicas for _, replica := range replicas { // Check if pod is on latest version - onLatestVersion, err := isPodOnLatestVersion(ctx, r.Client, &replica, &updatedStatefulset) + onLatestVersion, err := isPodOnLatestVersion(&replica, &updatedStatefulset) if err != nil { log.Error(err, "could not check if pod is on latest version") return ctrl.Result{RequeueAfter: 5 * time.Second}, err @@ -186,13 +186,17 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } - latestReplica, err := getLatestReplica(ctx, r.Client, &updatedStatefulset) - if err != nil { - log.Error(err, "could not get latest replica") - return ctrl.Result{RequeueAfter: 5 * time.Second}, err + var latestReplica *corev1.Pod + var err error + if len(replicas) > 0 { + latestReplica, err = getLatestReplica(ctx, r.Client, &updatedStatefulset) + if err != nil { + log.Error(err, "could not get latest replica") + return ctrl.Result{RequeueAfter: 5 * time.Second}, err + } } - masterOnLatest, err := isPodOnLatestVersion(ctx, r.Client, &master, &updatedStatefulset) + masterOnLatest, err := isPodOnLatestVersion(&master, &updatedStatefulset) if err != nil { log.Error(err, "could not check if pod is on latest version") return ctrl.Result{RequeueAfter: 5 * time.Second}, err @@ -202,10 +206,12 @@ func (r *DragonflyReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // are on latest version if !masterOnLatest { // Update master now - log.Info("Running REPLTAKEOVER on replica", "pod", master.Name) - if err := replTakeover(ctx, r.Client, latestReplica); err != nil { - log.Error(err, "could not update master") - return ctrl.Result{RequeueAfter: 5 * time.Second}, err + if latestReplica != nil { + log.Info("Running REPLTAKEOVER on replica", "pod", master.Name) + if err := replTakeover(ctx, r.Client, latestReplica); err != nil { + log.Error(err, "could not update master") + return ctrl.Result{RequeueAfter: 5 * time.Second}, err + } } r.EventRecorder.Event(&df, corev1.EventTypeNormal, "Rollout", fmt.Sprintf("Shutting down master %s", master.Name)) diff --git a/internal/controller/dragonfly_instance.go b/internal/controller/dragonfly_instance.go index f877c6f..c094d4d 100644 --- a/internal/controller/dragonfly_instance.go +++ b/internal/controller/dragonfly_instance.go @@ -23,7 +23,6 @@ import ( "strings" dfv1alpha1 "github.com/dragonflydb/dragonfly-operator/api/v1alpha1" - resourcesv1 "github.com/dragonflydb/dragonfly-operator/api/v1alpha1" "github.com/dragonflydb/dragonfly-operator/internal/resources" "github.com/go-logr/logr" "github.com/redis/go-redis/v9" @@ -36,7 +35,7 @@ import ( // and provides methods to handle replication. type DragonflyInstance struct { // Dragonfly is the relevant Dragonfly CRD that it performs actions over - df *resourcesv1.Dragonfly + df *dfv1alpha1.Dragonfly client client.Client log logr.Logger @@ -65,17 +64,6 @@ func GetDragonflyInstanceFromPod(ctx context.Context, c client.Client, pod *core }, nil } -func (dfi *DragonflyInstance) getStatus(ctx context.Context) (string, error) { - if err := dfi.client.Get(ctx, types.NamespacedName{ - Name: dfi.df.Name, - Namespace: dfi.df.Namespace, - }, dfi.df); err != nil { - return "", err - } - - return dfi.df.Status.Phase, nil -} - func (dfi *DragonflyInstance) configureReplication(ctx context.Context) error { dfi.log.Info("Configuring replication") diff --git a/internal/controller/util.go b/internal/controller/util.go index d3d688b..562be1c 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -21,14 +21,12 @@ import ( "errors" "fmt" "strings" - "time" "github.com/dragonflydb/dragonfly-operator/internal/resources" "github.com/redis/go-redis/v9" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -40,7 +38,7 @@ const ( // isPodOnLatestVersion returns if the Given pod is on the updatedRevision // of the given statefulset or not -func isPodOnLatestVersion(ctx context.Context, c client.Client, pod *corev1.Pod, statefulSet *appsv1.StatefulSet) (bool, error) { +func isPodOnLatestVersion(pod *corev1.Pod, statefulSet *appsv1.StatefulSet) (bool, error) { // Get the pod's revision podRevision, ok := pod.Labels[appsv1.StatefulSetRevisionLabel] if !ok { @@ -74,7 +72,7 @@ func getLatestReplica(ctx context.Context, c client.Client, statefulSet *appsv1. // Iterate over the pods and find a replica which is on the latest version for _, pod := range podList.Items { - isLatest, err := isPodOnLatestVersion(ctx, c, &pod, statefulSet) + isLatest, err := isPodOnLatestVersion(&pod, statefulSet) if err != nil { return nil, err } @@ -112,43 +110,7 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e return nil } -func waitForStatefulSetReady(ctx context.Context, c client.Client, name, namespace string, maxDuration time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, maxDuration) - defer cancel() - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timed out waiting for statefulset to be ready") - default: - // Check if the statefulset is ready - ready, err := isStatefulSetReady(ctx, c, name, namespace) - if err != nil { - return err - } - if ready { - return nil - } - } - } -} - -func isStatefulSetReady(ctx context.Context, c client.Client, name, namespace string) (bool, error) { - var statefulSet appsv1.StatefulSet - if err := c.Get(ctx, types.NamespacedName{ - Name: name, - Namespace: namespace, - }, &statefulSet); err != nil { - return false, nil - } - - if statefulSet.Status.ReadyReplicas == *statefulSet.Spec.Replicas { - return true, nil - } - - return false, nil -} - -func isStableState(ctx context.Context, c client.Client, pod *corev1.Pod) (bool, error) { +func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) { // wait until pod IP is ready if pod.Status.PodIP == "" || pod.Status.Phase != corev1.PodRunning { return false, nil