Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(operator): ipv6 support #269

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,25 @@ func (dfi *DragonflyInstance) masterExists(ctx context.Context) (bool, error) {
}

func (dfi *DragonflyInstance) getMasterIp(ctx context.Context) (string, error) {
dfi.log.Info("retrieving ip of the master")
dfi.log.Info("retrieving IP of the master")
pods, err := dfi.getPods(ctx)
if err != nil {
return "", err
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning && pod.Status.ContainerStatuses[0].Ready && pod.Labels[resources.Role] == resources.Master {
return pod.Status.PodIP, nil
if pod.Status.Phase == corev1.PodRunning &&
pod.Status.ContainerStatuses[0].Ready &&
pod.Labels[resources.Role] == resources.Master {

masterIp, hasMasterIp := pod.Annotations[resources.MasterIp]
cyrinux marked this conversation as resolved.
Show resolved Hide resolved
if hasMasterIp {
dfi.log.Info("Retrieved Master IP from annotation", "masterIp", masterIp)
return masterIp, nil
}

masterIp = pod.Status.PodIP
return masterIp, nil
}
}

Expand Down Expand Up @@ -207,7 +217,7 @@ func (dfi *DragonflyInstance) configureReplica(ctx context.Context, pod *corev1.
// connected to the right master
func (dfi *DragonflyInstance) checkReplicaRole(ctx context.Context, pod *corev1.Pod, masterIp string) (bool, error) {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand Down Expand Up @@ -262,7 +272,8 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
// check for one master and all replicas
podRoles := make(map[string][]string)
for _, pod := range pods.Items {
podRoles[pod.Labels[resources.Role]] = append(podRoles[pod.Labels[resources.Role]], pod.Name)
role := pod.Labels[resources.Role]
podRoles[role] = append(podRoles[role], pod.Name)
}

if len(podRoles[resources.Master]) != 1 {
Expand Down Expand Up @@ -299,7 +310,7 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
return err
}

// configuring to the right master
// Configure to the right master if not correct
if !ok {
dfi.log.Info("configuring pod as replica to the right master", "pod", pod.Name)
if err := dfi.configureReplica(ctx, &pod); err != nil {
Expand All @@ -309,7 +320,7 @@ func (dfi *DragonflyInstance) checkAndConfigureReplication(ctx context.Context)
}
}

dfi.log.Info("all pods are configured correctly", "dfi", dfi.df.Name)
dfi.log.Info("All pods are configured correctly", "dfi", dfi.df.Name)
return nil
}

Expand All @@ -335,8 +346,11 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
})
defer redisClient.Close()

// Sanitize masterIp in case ipv6
masterIp = sanitizeIp(masterIp)

dfi.log.Info("Trying to invoke SLAVE OF command", "pod", pod.Name, "master", masterIp, "addr", redisClient.Options().Addr)
resp, err := redisClient.SlaveOf(ctx, masterIp, fmt.Sprint(resources.DragonflyAdminPort)).Result()
resp, err := redisClient.SlaveOf(ctx, masterIp, strconv.Itoa(resources.DragonflyAdminPort)).Result()
if err != nil {
return fmt.Errorf("error running SLAVE OF command: %s", err)
}
Expand All @@ -345,11 +359,14 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
return fmt.Errorf("response of `SLAVE OF` on replica is not OK: %s", resp)
}

dfi.log.Info("Marking pod role as replica", "pod", pod.Name)
dfi.log.Info("Marking pod role as replica", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.Role] = resources.Replica
pod.Labels[resources.MasterIp] = masterIp
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp
if err := dfi.client.Update(ctx, pod); err != nil {
return fmt.Errorf("could not update replica label")
return fmt.Errorf("could not update replica annotation: %w", err)
}

return nil
Expand All @@ -373,8 +390,14 @@ func (dfi *DragonflyInstance) replicaOfNoOne(ctx context.Context, pod *corev1.Po
return fmt.Errorf("response of `SLAVE OF NO ONE` on master is not OK: %s", resp)
}

dfi.log.Info("Marking pod role as master", "pod", pod.Name)
masterIp := pod.Status.PodIP

dfi.log.Info("Marking pod role as master", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.Role] = resources.Master
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp
if err := dfi.client.Update(ctx, pod); err != nil {
return err
}
Expand Down
37 changes: 26 additions & 11 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/dragonflydb/dragonfly-operator/internal/resources"
Expand Down Expand Up @@ -89,7 +91,7 @@ func getLatestReplica(ctx context.Context, c client.Client, statefulSet *appsv1.
// replTakeover runs the replTakeOver on the given replica pod
func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) error {
redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", newMaster.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(newMaster.Status.PodIP, strconv.Itoa(resources.DragonflyPort)),
})
defer redisClient.Close()

Expand All @@ -111,13 +113,13 @@ func replTakeover(ctx context.Context, c client.Client, newMaster *corev1.Pod) e
}

func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
// wait until pod IP is ready
// Ensure PodIP and Pod Phase are ready
if pod.Status.PodIP == "" || pod.Status.Phase != corev1.PodRunning {
return false, nil
}

redisClient := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", pod.Status.PodIP, resources.DragonflyAdminPort),
Addr: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(resources.DragonflyAdminPort)),
})
defer redisClient.Close()

Expand All @@ -135,14 +137,7 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {
return false, errors.New("empty info")
}

data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
data := parseRedisInfo(info)

if data["master_sync_in_progress"] == "1" {
return false, nil
Expand All @@ -158,3 +153,23 @@ func isStableState(ctx context.Context, pod *corev1.Pod) (bool, error) {

return true, nil
}

// Helper function to parse Redis INFO data
func parseRedisInfo(info string) map[string]string {
data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
if len(kv) == 2 {
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
}
return data
}

// sanitizeIp Ipv6
func sanitizeIp(masterIp string) string {
return strings.Trim(masterIp, "[]")
}
2 changes: 1 addition & 1 deletion internal/resources/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// KubernetesPartOfLabel is the name of a higher level application this one is part of
KubernetesPartOfLabelKey = "app.kubernetes.io/part-of"

MasterIp string = "master-ip"
MasterIp string = "operator.dragonflydb.io/masterIP"

Role string = "role"

Expand Down
Loading