Skip to content

Commit

Permalink
Merge pull request #30 from ucloud/v0.2.1
Browse files Browse the repository at this point in the history
V0.2.1
  • Loading branch information
gaopenghigh authored Mar 6, 2020
2 parents 3eb1ccd + b43f643 commit 5f80a1a
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ tags
vendor/*
/main
/Dockerfile-withvendor
Dockerfile-TZSH
Dockerfile-TZSH
skaffold.yaml
5 changes: 3 additions & 2 deletions pkg/controller/distributedrediscluster/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,16 @@ type IWaitHandle interface {
// we get a result from handler.Handler() or the timeout expires
func waiting(handler IWaitHandle, reqLogger logr.Logger) error {
timeout := time.After(handler.Timeout())
tick := time.Tick(handler.Tick())
tick := time.NewTicker(time.Second)
defer tick.Stop()
// Keep trying until we're timed out or got a result or got an error
for {
select {
// Got a timeout! fail with a timeout error
case <-timeout:
return fmt.Errorf("%s timed out", handler.Name())
// Got a tick, we should check on Handler()
case <-tick:
case <-tick.C:
err := handler.Handler()
if err == nil {
return nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/distributedrediscluster/sync_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func (r *ReconcileDistributedRedisCluster) ensureCluster(ctx *syncContext) error
}
return StopRetry.Wrap(err, "stop retry")
}

// Redis only load db from append only file when AOF ON, because of
// we only backed up the RDB file when doing data backup, so we set
// "appendonly no" force here when do restore.
dbLoadedFromDiskWhenRestore(cluster, ctx.reqLogger)
labels := getLabels(cluster)
if err := r.ensurer.EnsureRedisConfigMap(cluster, labels); err != nil {
return Kubernetes.Wrap(err, "EnsureRedisConfigMap")
Expand Down Expand Up @@ -98,9 +103,19 @@ func (r *ReconcileDistributedRedisCluster) validate(cluster *redisv1alpha1.Distr
if update || updateDefault {
return r.crController.UpdateCR(cluster)
}

return nil
}

func dbLoadedFromDiskWhenRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) {
if cluster.IsRestoreFromBackup() && !cluster.IsRestored() {
if cluster.Spec.Config != nil {
reqLogger.Info("force appendonly = no when do restore")
cluster.Spec.Config["appendonly"] = "no"
}
}
}

func (r *ReconcileDistributedRedisCluster) validateRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) (bool, error) {
update := false
if cluster.Status.Restore.Backup == nil {
Expand Down
33 changes: 32 additions & 1 deletion pkg/controller/manager/ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manager

import (
"strconv"
"strings"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (r *realEnsureResource) EnsureRedisSvc(cluster *redisv1alpha1.DistributedRe

func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error {
cmName := configmaps.RedisConfigMapName(cluster.Name)
_, err := r.configMapClient.GetConfigMap(cluster.Namespace, cmName)
drcCm, err := r.configMapClient.GetConfigMap(cluster.Namespace, cmName)
if err != nil {
if errors.IsNotFound(err) {
r.logger.WithValues("ConfigMap.Namespace", cluster.Namespace, "ConfigMap.Name", cmName).
Expand All @@ -201,6 +202,13 @@ func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.Distrib
} else {
return err
}
} else {
if isRedisConfChanged(drcCm.Data[configmaps.RedisConfKey], cluster.Spec.Config, r.logger) {
cm := configmaps.NewConfigMapForCR(cluster, labels)
if err2 := r.configMapClient.UpdateConfigMap(cm); err2 != nil {
return err2
}
}
}

if cluster.IsRestoreFromBackup() {
Expand Down Expand Up @@ -237,3 +245,26 @@ func (r *realEnsureResource) EnsureRedisOSMSecret(cluster *redisv1alpha1.Distrib
}
return nil
}

func isRedisConfChanged(confInCm string, currentConf map[string]string, log logr.Logger) bool {
lines := strings.Split(strings.TrimSuffix(confInCm, "\n"), "\n")
if len(lines) != len(currentConf) {
return true
}
for _, line := range lines {
line = strings.TrimSuffix(line, " ")
confLine := strings.SplitN(line, " ", 2)
if len(confLine) == 2 {
if valueInCurrentConf, ok := currentConf[confLine[0]]; !ok {
return true
} else {
if valueInCurrentConf != confLine[1] {
return true
}
}
} else {
log.Info("custom config is invalid", "raw", line, "split", confLine)
}
}
return false
}
152 changes: 152 additions & 0 deletions pkg/controller/manager/ensurer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package manager

import (
"testing"

"github.com/go-logr/logr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var log = logf.Log.WithName("test")

func Test_isRedisConfChanged(t *testing.T) {
type args struct {
confInCm string
currentConf map[string]string
log logr.Logger
}
tests := []struct {
name string
args args
want bool
}{
{
name: "should false",
args: args{
confInCm: `appendfsync everysec
appendonly yes
auto-aof-rewrite-min-size 67108864
save 900 1 300 10`,
currentConf: map[string]string{
"appendfsync": "everysec",
"appendonly": "yes",
"auto-aof-rewrite-min-size": "67108864",
"save": "900 1 300 10",
},
log: log,
},
want: false,
},
{
name: "should false with newline",
args: args{
confInCm: `appendfsync everysec
appendonly yes
auto-aof-rewrite-min-size 67108864
save 900 1 300 10
`,
currentConf: map[string]string{
"appendfsync": "everysec",
"appendonly": "yes",
"auto-aof-rewrite-min-size": "67108864",
"save": "900 1 300 10",
},
log: log,
},
want: false,
},
{
name: "should true, compare value",
args: args{
confInCm: `appendfsync everysec
appendonly yes
auto-aof-rewrite-min-size 6710886
save 900 1 300 10
`,
currentConf: map[string]string{
"appendfsync": "everysec",
"appendonly": "yes",
"auto-aof-rewrite-min-size": "67108864",
"save": "900 1 300 10",
},
log: log,
},
want: true,
},
{
name: "should true, add current",
args: args{
confInCm: `appendfsync everysec
appendonly yes
save 900 1 300 10
`,
currentConf: map[string]string{
"appendfsync": "everysec",
"appendonly": "yes",
"auto-aof-rewrite-min-size": "67108864",
"save": "900 1 300 10",
},
log: log,
},
want: true,
},
{
name: "should true, del current",
args: args{
confInCm: `appendfsync everysec
appendonly yes
auto-aof-rewrite-min-size 67108864
save 900 1 300 10
`,
currentConf: map[string]string{
"appendfsync": "everysec",
"appendonly": "yes",
"save": "900 1 300 10",
},
log: log,
},
want: true,
},
{
name: "should true, compare key",
args: args{
confInCm: `appendfsync everysec
appendonly yes
save 900 1 300 10
`,
currentConf: map[string]string{
"appendonly": "yes",
"auto-aof-rewrite-min-size": "67108864",
"save": "900 1 300 10",
},
log: log,
},
want: true,
},
{
name: "should true, compare save",
args: args{
confInCm: `appendfsync everysec
appendonly yes
auto-aof-rewrite-min-size 67108864
save 900 1 300 10
`,
currentConf: map[string]string{
"appendfsync": "everysec",
"appendonly": "yes",
"auto-aof-rewrite-min-size": "67108864",
"save": "900 1",
},
log: log,
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isRedisConfChanged(tt.args.confInCm, tt.args.currentConf, tt.args.log); got != tt.want {
t.Errorf("isRedisConfChanged() = %v, want %v", got, tt.want)
}
})
}
}
28 changes: 16 additions & 12 deletions pkg/redisutil/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,7 @@ func (a *Admin) MigrateKeys(addr string, dest *Node, slots []Slot, batch int, ti
break
}

var args []string
if replace {
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "REPLACE", "KEYS"}, keys...)
} else {
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "KEYS"}, keys...)
}
args := a.migrateCmdArgs(dest, timeoutStr, replace, keys)

resp = c.Cmd("MIGRATE", args)
if err := a.Connections().ValidateResp(resp, addr, "Unable to run command MIGRATE"); err != nil {
Expand Down Expand Up @@ -432,12 +427,7 @@ func (a *Admin) MigrateKeysInSlot(addr string, dest *Node, slot Slot, batch int,
break
}

var args []string
if replace {
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "REPLACE", "KEYS"}, keys...)
} else {
args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "KEYS"}, keys...)
}
args := a.migrateCmdArgs(dest, timeoutStr, replace, keys)

resp = c.Cmd("MIGRATE", args)
if err := a.Connections().ValidateResp(resp, addr, "Unable to run command MIGRATE"); err != nil {
Expand All @@ -448,6 +438,20 @@ func (a *Admin) MigrateKeysInSlot(addr string, dest *Node, slot Slot, batch int,
return keyCount, nil
}

func (a *Admin) migrateCmdArgs(dest *Node, timeoutStr string, replace bool, keys []string) []string {
args := []string{dest.IP, dest.Port, "", "0", timeoutStr}
if password, ok := a.Connections().GetAUTH(); ok {
args = append(args, "AUTH", password)
}
if replace {
args = append(args, "REPLACE", "KEYS")
} else {
args = append(args, "KEYS")
}
args = append(args, keys...)
return args
}

// ForgetNode used to force other redis cluster node to forget a specific node
func (a *Admin) ForgetNode(id string) error {
infos, _ := a.GetClusterInfos()
Expand Down
10 changes: 10 additions & 0 deletions pkg/redisutil/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type IAdminConnections interface {
ValidatePipeResp(c IClient, addr, errMessage string) bool
// Reset close all connections and clear the connection map
Reset()
// GetAUTH return password and true if connection password is set, else return false.
GetAUTH() (string, bool)
}

// AdminConnections connection map for redis cluster
Expand Down Expand Up @@ -98,6 +100,14 @@ func NewAdminConnections(addrs []string, options *AdminOptions, log logr.Logger)
return cnx
}

// GetAUTH return password and true if connection password is set, else return false.
func (cnx *AdminConnections) GetAUTH() (string, bool) {
if len(cnx.password) > 0 {
return cnx.password, true
}
return "", false
}

// Reconnect force a reconnection on the given address
// is the adress is not part of the map, act like Add
func (cnx *AdminConnections) Reconnect(addr string) error {
Expand Down
Loading

0 comments on commit 5f80a1a

Please sign in to comment.