Skip to content

Commit

Permalink
fix(operator): ipv6 support
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrinux committed Dec 6, 2024
1 parent 4012589 commit 1e6d949
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 24 deletions.
47 changes: 35 additions & 12 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,25 @@ func (dfi *DragonflyInstance) masterExists(ctx context.Context) (bool, error) {
}

func (dfi *DragonflyInstance) getMasterIp(ctx context.Context) (string, error) {
dfi.log.Info("retrieving ip of the master")
dfi.log.Info("retrieving IP of the master")
pods, err := dfi.getPods(ctx)
if err != nil {
return "", err
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning && pod.Status.ContainerStatuses[0].Ready && pod.Labels[resources.Role] == resources.Master {
return pod.Status.PodIP, nil
if pod.Status.Phase == corev1.PodRunning &&
pod.Status.ContainerStatuses[0].Ready &&
pod.Labels[resources.Role] == resources.Master {

masterIp, hasMasterIp := pod.Annotations[resources.MasterIp]
if hasMasterIp {
dfi.log.Info("Retrieved Master IP from annotation", "masterIp", masterIp)
return masterIp, nil
}

masterIp = pod.Status.PodIP
return masterIp, nil
}
}

Expand Down Expand Up @@ -207,7 +217,7 @@ func (dfi *DragonflyInstance) configureReplica(ctx context.Context, pod *corev1.
// connected to the right master
func (dfi *DragonflyInstance) checkReplicaRole(ctx context.Context, pod *corev1.Pod, masterIp string) (bool, error) {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand Down Expand Up @@ -262,7 +272,8 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
// check for one master and all replicas
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
podRoles[pod.Labels[resources.Role]] = append(podRoles[pod.Labels[resources.Role]], pod.Name)
role := pod.Labels[resources.Role]
podRoles[role] = append(podRoles[role], pod.Name)
}

if len(podRoles[resources.Master]) != 1 {
Expand Down Expand Up @@ -299,7 +310,7 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
return err
}

// configuring to the right master
// Configure to the right master if not correct
if !ok {
dfi.log.Info("configuring pod as replica to the right master", "pod", pod.Name)
if err := dfi.configureReplica(ctx, &pod); err != nil {
Expand All @@ -309,7 +320,7 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
}
}

dfi.log.Info("all pods are configured correctly", "dfi", dfi.df.Name)
dfi.log.Info("All pods are configured correctly", "dfi", dfi.df.Name)
return nil
}

Expand All @@ -335,8 +346,11 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
})
defer redisClient.Close()

// Sanitize masterIp in case ipv6
masterIp = sanitizeIp(masterIp)

dfi.log.Info("Trying to invoke SLAVE OF command", "pod", pod.Name, "master", masterIp, "addr", redisClient.Options().Addr)
resp, err := redisClient.SlaveOf(ctx, masterIp, fmt.Sprint(resources.DragonflyAdminPort)).Result()
resp, err := redisClient.SlaveOf(ctx, masterIp, strconv.Itoa(resources.DragonflyAdminPort)).Result()
if err != nil {
return fmt.Errorf("error running SLAVE OF command: %s", err)
}
Expand All @@ -345,11 +359,14 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
return fmt.Errorf("response of `SLAVE OF` on replica is not OK: %s", resp)
}

dfi.log.Info("Marking pod role as replica", "pod", pod.Name)
dfi.log.Info("Marking pod role as replica", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.Role] = resources.Replica
pod.Labels[resources.MasterIp] = masterIp
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp
if err := dfi.client.Update(ctx, pod); err != nil {
return fmt.Errorf("could not update replica label")
return fmt.Errorf("could not update replica annotation: %w", err)
}

return nil
Expand All @@ -373,8 +390,14 @@ func (dfi *DragonflyInstance) replicaOfNoOne(ctx context.Context, pod *corev1.Po
return fmt.Errorf("response of `SLAVE OF NO ONE` on master is not OK: %s", resp)
}

dfi.log.Info("Marking pod role as master", "pod", pod.Name)
masterIp := pod.Status.PodIP

dfi.log.Info("Marking pod role as master", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.Role] = resources.Master
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp
if err := dfi.client.Update(ctx, pod); err != nil {
return err
}
Expand Down
38 changes: 27 additions & 11 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/dragonflydb/dragonfly-operator/internal/resources"
Expand Down Expand Up @@ -89,7 +91,7 @@ func getLatestReplica(ctx context.Context, c client.Client, statefulSet *appsv1.
// replTakeover runs the replTakeOver on the given replica pod
func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) error {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", newMaster.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(newMaster.Status.PodIP, strconv.Itoa(resources.DragonflyPort)),
})
defer redisClient.Close()

Expand All @@ -111,13 +113,13 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e
}

func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
// wait until pod IP is ready
// Ensure PodIP and Pod Phase are ready
if pod.Status.PodIP == "" || pod.Status.Phase != corev1.PodRunning {
return false, nil
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand All @@ -135,14 +137,7 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
return false, errors.New("empty info")
}

data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
data := parseRedisInfo(info)

if data["master_sync_in_progress"] == "1" {
return false, nil
Expand All @@ -158,3 +153,24 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {

return true, nil
}

// Helper function to parse Redis INFO data
func parseRedisInfo(info string) map[string]string {
data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
if len(kv) == 2 {
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
}
return data
}

// sanitizeIp Ipv6
func sanitizeIp(masterIp string) string {
masterIp = strings.Trim(masterIp, "[]")
return masterIp
}
2 changes: 1 addition & 1 deletion internal/resources/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// KubernetesPartOfLabel is the name of a higher level application this one is part of
KubernetesPartOfLabelKey = "app.kubernetes.io/part-of"

MasterIp string = "master-ip"
MasterIp string = "operator.dragonflydb.io/masterIP"

Role string = "role"

Expand Down

0 comments on commit 1e6d949

Please sign in to comment.