Skip to content

Commit

Permalink
chore: try to workaround master label with ipv6 issue
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrinux committed Dec 5, 2024
1 parent 4012589 commit 51e573d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 34 deletions.
63 changes: 42 additions & 21 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,26 @@ func (dfi *DragonflyInstance) masterExists(ctx context.Context) (bool, error) {
}

func (dfi *DragonflyInstance) getMasterIp(ctx context.Context) (string, error) {
dfi.log.Info("retrieving ip of the master")
dfi.log.Info("retrieving IP of the master")
pods, err := dfi.getPods(ctx)
if err != nil {
return "", err
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning && pod.Status.ContainerStatuses[0].Ready && pod.Labels[resources.Role] == resources.Master {
return pod.Status.PodIP, nil
if pod.Status.Phase == corev1.PodRunning &&
pod.Status.ContainerStatuses[0].Ready &&
pod.Labels[resources.Role] == resources.Master {

masterIp, hasMasterIp := pod.Annotations[resources.MasterIp]
if hasMasterIp {
dfi.log.Info("Retrieved Master IP from annotation", "masterIp", masterIp)
return masterIp, nil
}

// Fallback: Use PodIP directly
masterIp = pod.Status.PodIP
return masterIp, nil
}
}

Expand Down Expand Up @@ -207,7 +218,7 @@ func (dfi *DragonflyInstance) configureReplica(ctx context.Context, pod *corev1.
// connected to the right master
func (dfi *DragonflyInstance) checkReplicaRole(ctx context.Context, pod *corev1.Pod, masterIp string) (bool, error) {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand Down Expand Up @@ -251,36 +262,35 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
return err
}

// retry if there are pods that are not running
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
dfi.log.Info("not all pods are running. retrying", "pod", pod.Name)
return nil
}
}

// check for one master and all replicas
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
podRoles[pod.Labels[resources.Role]] = append(podRoles[pod.Labels[resources.Role]], pod.Name)
role := pod.Labels[resources.Role]
podRoles[role] = append(podRoles[role], pod.Name)
}

if len(podRoles[resources.Master]) != 1 {
dfi.log.Info("incorrect number of masters. reconfiguring replication", "masters", podRoles[resources.Master])
if err = dfi.configureReplication(ctx); err != nil {
return err
return fmt.Errorf("failed to configure replication: %w", err)
}
}

if len(podRoles[resources.Replica]) != len(pods.Items)-1 {
dfi.log.Info("incorrect number of replicas", "replicas", podRoles[resources.Replica])

// configure non replica pods as replicas
for _, pod := range pods.Items {
if pod.Labels[resources.Role] == "" {
if pod.Status.Phase == corev1.PodRunning && pod.Status.ContainerStatuses[0].Ready && pod.Status.PodIP != "" {
dfi.log.Info("Marking pod as replica", "pod", pod.Name)
if err := dfi.configureReplica(ctx, &pod); err != nil {
return err
return fmt.Errorf("failed to configure pod as replica: %w", err)
}
}
}
Expand All @@ -289,27 +299,27 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)

masterIp, err := dfi.getMasterIp(ctx)
if err != nil {
return err
return fmt.Errorf("failed to retrieve master IP: %w", err)
}

for _, pod := range pods.Items {
if pod.Labels[resources.Role] == resources.Replica {
ok, err := dfi.checkReplicaRole(ctx, &pod, masterIp)
if err != nil {
return err
return fmt.Errorf("failed to check replica role: %w", err)
}

// configuring to the right master
// Configure to the right master if not correct
if !ok {
dfi.log.Info("configuring pod as replica to the right master", "pod", pod.Name)
dfi.log.Info("Configuring pod as replica to the correct master", "pod", pod.Name)
if err := dfi.configureReplica(ctx, &pod); err != nil {
return err
return fmt.Errorf("failed to configure replica to correct master: %w", err)
}
}
}
}

dfi.log.Info("all pods are configured correctly", "dfi", dfi.df.Name)
dfi.log.Info("All pods are configured correctly", "dfi", dfi.df.Name)
return nil
}

Expand All @@ -335,8 +345,10 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
})
defer redisClient.Close()

masterIp = strings.Trim(masterIp, "[]")

dfi.log.Info("Trying to invoke SLAVE OF command", "pod", pod.Name, "master", masterIp, "addr", redisClient.Options().Addr)
resp, err := redisClient.SlaveOf(ctx, masterIp, fmt.Sprint(resources.DragonflyAdminPort)).Result()
resp, err := redisClient.SlaveOf(ctx, masterIp, strconv.Itoa(resources.DragonflyAdminPort)).Result()
if err != nil {
return fmt.Errorf("error running SLAVE OF command: %s", err)
}
Expand All @@ -345,11 +357,14 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
return fmt.Errorf("response of `SLAVE OF` on replica is not OK: %s", resp)
}

dfi.log.Info("Marking pod role as replica", "pod", pod.Name)
dfi.log.Info("Marking pod role as replica", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.Role] = resources.Replica
pod.Labels[resources.MasterIp] = masterIp
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp
if err := dfi.client.Update(ctx, pod); err != nil {
return fmt.Errorf("could not update replica label")
return fmt.Errorf("could not update replica annotation: %w", err)
}

return nil
Expand All @@ -373,8 +388,14 @@ func (dfi *DragonflyInstance) replicaOfNoOne(ctx context.Context, pod *corev1.Po
return fmt.Errorf("response of `SLAVE OF NO ONE` on master is not OK: %s", resp)
}

dfi.log.Info("Marking pod role as master", "pod", pod.Name)
masterIp := pod.Status.PodIP

dfi.log.Info("Marking pod role as master", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.Role] = resources.Master
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp
if err := dfi.client.Update(ctx, pod); err != nil {
return err
}
Expand Down
37 changes: 25 additions & 12 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/dragonflydb/dragonfly-operator/internal/resources"
Expand Down Expand Up @@ -88,8 +90,10 @@ func getLatestReplica(ctx context.Context, c client.Client, statefulSet *appsv1.

// replTakeover runs the replTakeOver on the given replica pod
func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) error {
addr := net.JoinHostPort(newMaster.Status.PodIP, strconv.Itoa(resources.DragonflyPort))

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", newMaster.Status.PodIP, resources.DragonflyAdminPort),
Addr: addr,
})
defer redisClient.Close()

Expand All @@ -102,7 +106,6 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e
return fmt.Errorf("response of `REPLTAKEOVER` on replica is not OK: %s", resp)
}

// update the label on the pod
newMaster.Labels[resources.Role] = resources.Master
if err := c.Update(ctx, newMaster); err != nil {
return fmt.Errorf("error updating the role label on the pod: %w", err)
Expand All @@ -111,13 +114,15 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e
}

func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
// wait until pod IP is ready
// Ensure PodIP and Pod Phase are ready
if pod.Status.PodIP == "" || pod.Status.Phase != corev1.PodRunning {
return false, nil
}

addr := net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort))

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: addr,
})
defer redisClient.Close()

Expand All @@ -135,14 +140,7 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
return false, errors.New("empty info")
}

data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
data := parseRedisInfo(info)

if data["master_sync_in_progress"] == "1" {
return false, nil
Expand All @@ -158,3 +156,18 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {

return true, nil
}

// Helper function to parse Redis INFO data
func parseRedisInfo(info string) map[string]string {
data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
if len(kv) == 2 {
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
}
return data
}
2 changes: 1 addition & 1 deletion internal/resources/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// KubernetesPartOfLabel is the name of a higher level application this one is part of
KubernetesPartOfLabelKey = "app.kubernetes.io/part-of"

MasterIp string = "master-ip"
MasterIp string = "operator.dragonflydb.io/masterIP"

Role string = "role"

Expand Down

0 comments on commit 51e573d

Please sign in to comment.