diff --git a/internal/controller/dragonfly_instance.go b/internal/controller/dragonfly_instance.go index 0e2b143..c3e5fc4 100644 --- a/internal/controller/dragonfly_instance.go +++ b/internal/controller/dragonfly_instance.go @@ -168,15 +168,26 @@ func (dfi *DragonflyInstance) masterExists(ctx context.Context) (bool, error) { } func (dfi *DragonflyInstance) getMasterIp(ctx context.Context) (string, error) { - dfi.log.Info("retrieving ip of the master") + dfi.log.Info("retrieving IP of the master") pods, err := dfi.getPods(ctx) if err != nil { return "", err } for _, pod := range pods.Items { - if pod.Status.Phase == corev1.PodRunning && pod.Status.ContainerStatuses[0].Ready && pod.Labels[resources.Role] == resources.Master { - return pod.Status.PodIP, nil + if pod.Status.Phase == corev1.PodRunning && + pod.Status.ContainerStatuses[0].Ready && + pod.Labels[resources.Role] == resources.Master { + + masterIp, hasMasterIp := pod.Annotations[resources.MasterIp] + if hasMasterIp { + dfi.log.Info("Retrieved Master IP from annotation", "masterIp", masterIp) + return masterIp, nil + } + + // Fallback: Use PodIP directly + masterIp = pod.Status.PodIP + return masterIp, nil } } @@ -207,7 +218,7 @@ func (dfi *DragonflyInstance) configureReplica(ctx context.Context, pod *corev1. // connected to the right master func (dfi *DragonflyInstance) checkReplicaRole(ctx context.Context, pod *corev1.Pod, masterIp string) (bool, error) { redisClient := redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort), + Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)), }) defer redisClient.Close() @@ -251,7 +262,6 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context) return err } - // retry if there are pods that are not running for _, pod := range pods.Items { if pod.Status.Phase != corev1.PodRunning { dfi.log.Info("not all pods are running. retrying", "pod", pod.Name) @@ -259,28 +269,28 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context) } } - // check for one master and all replicas podRoles := make(map[string][]string) for _, pod := range pods.Items { - podRoles[pod.Labels[resources.Role]] = append(podRoles[pod.Labels[resources.Role]], pod.Name) + role := pod.Labels[resources.Role] + podRoles[role] = append(podRoles[role], pod.Name) } if len(podRoles[resources.Master]) != 1 { dfi.log.Info("incorrect number of masters. reconfiguring replication", "masters", podRoles[resources.Master]) if err = dfi.configureReplication(ctx); err != nil { - return err + return fmt.Errorf("failed to configure replication: %w", err) } } if len(podRoles[resources.Replica]) != len(pods.Items)-1 { dfi.log.Info("incorrect number of replicas", "replicas", podRoles[resources.Replica]) - // configure non replica pods as replicas for _, pod := range pods.Items { if pod.Labels[resources.Role] == "" { if pod.Status.Phase == corev1.PodRunning && pod.Status.ContainerStatuses[0].Ready && pod.Status.PodIP != "" { + dfi.log.Info("Marking pod as replica", "pod", pod.Name) if err := dfi.configureReplica(ctx, &pod); err != nil { - return err + return fmt.Errorf("failed to configure pod as replica: %w", err) } } } @@ -289,27 +299,27 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context) masterIp, err := dfi.getMasterIp(ctx) if err != nil { - return err + return fmt.Errorf("failed to retrieve master IP: %w", err) } for _, pod := range pods.Items { if pod.Labels[resources.Role] == resources.Replica { ok, err := dfi.checkReplicaRole(ctx, &pod, masterIp) if err != nil { - return err + return fmt.Errorf("failed to check replica role: %w", err) } - // configuring to the right master + // Configure to the right master if not correct if !ok { - dfi.log.Info("configuring pod as replica to the right master", "pod", pod.Name) + dfi.log.Info("Configuring pod as replica to the correct master", "pod", pod.Name) if err := dfi.configureReplica(ctx, &pod); err != nil { - return err + return fmt.Errorf("failed to configure replica to correct master: %w", err) } } } } - dfi.log.Info("all pods are configured correctly", "dfi", dfi.df.Name) + dfi.log.Info("All pods are configured correctly", "dfi", dfi.df.Name) return nil } @@ -335,8 +345,10 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma }) defer redisClient.Close() + masterIp = strings.Trim(masterIp, "[]") + dfi.log.Info("Trying to invoke SLAVE OF command", "pod", pod.Name, "master", masterIp, "addr", redisClient.Options().Addr) - resp, err := redisClient.SlaveOf(ctx, masterIp, fmt.Sprint(resources.DragonflyAdminPort)).Result() + resp, err := redisClient.SlaveOf(ctx, masterIp, strconv.Itoa(resources.DragonflyAdminPort)).Result() if err != nil { return fmt.Errorf("error running SLAVE OF command: %s", err) } @@ -345,11 +357,14 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma return fmt.Errorf("response of `SLAVE OF` on replica is not OK: %s", resp) } - dfi.log.Info("Marking pod role as replica", "pod", pod.Name) + dfi.log.Info("Marking pod role as replica", "pod", pod.Name, "masterIp", masterIp) pod.Labels[resources.Role] = resources.Replica - pod.Labels[resources.MasterIp] = masterIp + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[resources.MasterIp] = masterIp if err := dfi.client.Update(ctx, pod); err != nil { - return fmt.Errorf("could not update replica label") + return fmt.Errorf("could not update replica annotation: %w", err) } return nil @@ -373,8 +388,14 @@ func (dfi *DragonflyInstance) replicaOfNoOne(ctx context.Context, pod *corev1.Po return fmt.Errorf("response of `SLAVE OF NO ONE` on master is not OK: %s", resp) } - dfi.log.Info("Marking pod role as master", "pod", pod.Name) + masterIp := pod.Status.PodIP + + dfi.log.Info("Marking pod role as master", "pod", pod.Name, "masterIp", masterIp) pod.Labels[resources.Role] = resources.Master + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[resources.MasterIp] = masterIp if err := dfi.client.Update(ctx, pod); err != nil { return err } diff --git a/internal/controller/util.go b/internal/controller/util.go index 562be1c..8d73312 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -20,6 +20,8 @@ import ( "context" "errors" "fmt" + "net" + "strconv" "strings" "github.com/dragonflydb/dragonfly-operator/internal/resources" @@ -88,8 +90,10 @@ func getLatestReplica(ctx context.Context, c client.Client, statefulSet *appsv1. // replTakeover runs the replTakeOver on the given replica pod func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) error { + addr := net.JoinHostPort(newMaster.Status.PodIP, strconv.Itoa(resources.DragonflyPort)) + redisClient := redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("%s:%d", newMaster.Status.PodIP, resources.DragonflyAdminPort), + Addr: addr, }) defer redisClient.Close() @@ -102,7 +106,6 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e return fmt.Errorf("response of `REPLTAKEOVER` on replica is not OK: %s", resp) } - // update the label on the pod newMaster.Labels[resources.Role] = resources.Master if err := c.Update(ctx, newMaster); err != nil { return fmt.Errorf("error updating the role label on the pod: %w", err) @@ -111,13 +114,15 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e } func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) { - // wait until pod IP is ready + // Ensure PodIP and Pod Phase are ready if pod.Status.PodIP == "" || pod.Status.Phase != corev1.PodRunning { return false, nil } + addr := net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)) + redisClient := redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort), + Addr: addr, }) defer redisClient.Close() @@ -135,14 +140,7 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) { return false, errors.New("empty info") } - data := map[string]string{} - for _, line := range strings.Split(info, "\n") { - if line == "" || strings.HasPrefix(line, "#") { - continue - } - kv := strings.Split(line, ":") - data[kv[0]] = strings.TrimSuffix(kv[1], "\r") - } + data := parseRedisInfo(info) if data["master_sync_in_progress"] == "1" { return false, nil @@ -158,3 +156,18 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) { return true, nil } + +// Helper function to parse Redis INFO data +func parseRedisInfo(info string) map[string]string { + data := map[string]string{} + for _, line := range strings.Split(info, "\n") { + if line == "" || strings.HasPrefix(line, "#") { + continue + } + kv := strings.Split(line, ":") + if len(kv) == 2 { + data[kv[0]] = strings.TrimSuffix(kv[1], "\r") + } + } + return data +} diff --git a/internal/resources/const.go b/internal/resources/const.go index ec6a3b5..e4c8ca6 100644 --- a/internal/resources/const.go +++ b/internal/resources/const.go @@ -56,7 +56,7 @@ const ( // KubernetesPartOfLabel is the name of a higher level application this one is part of KubernetesPartOfLabelKey = "app.kubernetes.io/part-of" - MasterIp string = "master-ip" + MasterIp string = "operator.dragonflydb.io/masterIP" Role string = "role"