diff --git a/.gitignore b/.gitignore index 91a84f94d..c21cd5127 100644 --- a/.gitignore +++ b/.gitignore @@ -79,4 +79,5 @@ tags vendor/* /main /Dockerfile-withvendor -Dockerfile-TZSH \ No newline at end of file +Dockerfile-TZSH +skaffold.yaml \ No newline at end of file diff --git a/pkg/controller/distributedrediscluster/helper.go b/pkg/controller/distributedrediscluster/helper.go index fca7aa18b..34790b492 100644 --- a/pkg/controller/distributedrediscluster/helper.go +++ b/pkg/controller/distributedrediscluster/helper.go @@ -142,7 +142,8 @@ type IWaitHandle interface { // we get a result from handler.Handler() or the timeout expires func waiting(handler IWaitHandle, reqLogger logr.Logger) error { timeout := time.After(handler.Timeout()) - tick := time.Tick(handler.Tick()) + tick := time.NewTicker(time.Second) + defer tick.Stop() // Keep trying until we're timed out or got a result or got an error for { select { @@ -150,7 +151,7 @@ func waiting(handler IWaitHandle, reqLogger logr.Logger) error { case <-timeout: return fmt.Errorf("%s timed out", handler.Name()) // Got a tick, we should check on Handler() - case <-tick: + case <-tick.C: err := handler.Handler() if err == nil { return nil diff --git a/pkg/controller/distributedrediscluster/sync_handler.go b/pkg/controller/distributedrediscluster/sync_handler.go index 3d758ce8e..471a33c85 100644 --- a/pkg/controller/distributedrediscluster/sync_handler.go +++ b/pkg/controller/distributedrediscluster/sync_handler.go @@ -36,6 +36,11 @@ func (r *ReconcileDistributedRedisCluster) ensureCluster(ctx *syncContext) error } return StopRetry.Wrap(err, "stop retry") } + + // Redis only load db from append only file when AOF ON, because of + // we only backed up the RDB file when doing data backup, so we set + // "appendonly no" force here when do restore. + dbLoadedFromDiskWhenRestore(cluster, ctx.reqLogger) labels := getLabels(cluster) if err := r.ensurer.EnsureRedisConfigMap(cluster, labels); err != nil { return Kubernetes.Wrap(err, "EnsureRedisConfigMap") @@ -98,9 +103,19 @@ func (r *ReconcileDistributedRedisCluster) validate(cluster *redisv1alpha1.Distr if update || updateDefault { return r.crController.UpdateCR(cluster) } + return nil } +func dbLoadedFromDiskWhenRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) { + if cluster.IsRestoreFromBackup() && !cluster.IsRestored() { + if cluster.Spec.Config != nil { + reqLogger.Info("force appendonly = no when do restore") + cluster.Spec.Config["appendonly"] = "no" + } + } +} + func (r *ReconcileDistributedRedisCluster) validateRestore(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) (bool, error) { update := false if cluster.Status.Restore.Backup == nil { diff --git a/pkg/controller/manager/ensurer.go b/pkg/controller/manager/ensurer.go index 495998eb8..d2aa94560 100644 --- a/pkg/controller/manager/ensurer.go +++ b/pkg/controller/manager/ensurer.go @@ -2,6 +2,7 @@ package manager import ( "strconv" + "strings" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" @@ -189,7 +190,7 @@ func (r *realEnsureResource) EnsureRedisSvc(cluster *redisv1alpha1.DistributedRe func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) error { cmName := configmaps.RedisConfigMapName(cluster.Name) - _, err := r.configMapClient.GetConfigMap(cluster.Namespace, cmName) + drcCm, err := r.configMapClient.GetConfigMap(cluster.Namespace, cmName) if err != nil { if errors.IsNotFound(err) { r.logger.WithValues("ConfigMap.Namespace", cluster.Namespace, "ConfigMap.Name", cmName). @@ -201,6 +202,13 @@ func (r *realEnsureResource) EnsureRedisConfigMap(cluster *redisv1alpha1.Distrib } else { return err } + } else { + if isRedisConfChanged(drcCm.Data[configmaps.RedisConfKey], cluster.Spec.Config, r.logger) { + cm := configmaps.NewConfigMapForCR(cluster, labels) + if err2 := r.configMapClient.UpdateConfigMap(cm); err2 != nil { + return err2 + } + } } if cluster.IsRestoreFromBackup() { @@ -237,3 +245,26 @@ func (r *realEnsureResource) EnsureRedisOSMSecret(cluster *redisv1alpha1.Distrib } return nil } + +func isRedisConfChanged(confInCm string, currentConf map[string]string, log logr.Logger) bool { + lines := strings.Split(strings.TrimSuffix(confInCm, "\n"), "\n") + if len(lines) != len(currentConf) { + return true + } + for _, line := range lines { + line = strings.TrimSuffix(line, " ") + confLine := strings.SplitN(line, " ", 2) + if len(confLine) == 2 { + if valueInCurrentConf, ok := currentConf[confLine[0]]; !ok { + return true + } else { + if valueInCurrentConf != confLine[1] { + return true + } + } + } else { + log.Info("custom config is invalid", "raw", line, "split", confLine) + } + } + return false +} diff --git a/pkg/controller/manager/ensurer_test.go b/pkg/controller/manager/ensurer_test.go new file mode 100644 index 000000000..b046f520d --- /dev/null +++ b/pkg/controller/manager/ensurer_test.go @@ -0,0 +1,152 @@ +package manager + +import ( + "testing" + + "github.com/go-logr/logr" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +var log = logf.Log.WithName("test") + +func Test_isRedisConfChanged(t *testing.T) { + type args struct { + confInCm string + currentConf map[string]string + log logr.Logger + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "should false", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +auto-aof-rewrite-min-size 67108864 +save 900 1 300 10`, + currentConf: map[string]string{ + "appendfsync": "everysec", + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "save": "900 1 300 10", + }, + log: log, + }, + want: false, + }, + { + name: "should false with newline", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +auto-aof-rewrite-min-size 67108864 +save 900 1 300 10 +`, + currentConf: map[string]string{ + "appendfsync": "everysec", + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "save": "900 1 300 10", + }, + log: log, + }, + want: false, + }, + { + name: "should true, compare value", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +auto-aof-rewrite-min-size 6710886 +save 900 1 300 10 +`, + currentConf: map[string]string{ + "appendfsync": "everysec", + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "save": "900 1 300 10", + }, + log: log, + }, + want: true, + }, + { + name: "should true, add current", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +save 900 1 300 10 +`, + currentConf: map[string]string{ + "appendfsync": "everysec", + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "save": "900 1 300 10", + }, + log: log, + }, + want: true, + }, + { + name: "should true, del current", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +auto-aof-rewrite-min-size 67108864 +save 900 1 300 10 +`, + currentConf: map[string]string{ + "appendfsync": "everysec", + "appendonly": "yes", + "save": "900 1 300 10", + }, + log: log, + }, + want: true, + }, + { + name: "should true, compare key", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +save 900 1 300 10 +`, + currentConf: map[string]string{ + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "save": "900 1 300 10", + }, + log: log, + }, + want: true, + }, + { + name: "should true, compare save", + args: args{ + confInCm: `appendfsync everysec +appendonly yes +auto-aof-rewrite-min-size 67108864 +save 900 1 300 10 +`, + currentConf: map[string]string{ + "appendfsync": "everysec", + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "save": "900 1", + }, + log: log, + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isRedisConfChanged(tt.args.confInCm, tt.args.currentConf, tt.args.log); got != tt.want { + t.Errorf("isRedisConfChanged() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/redisutil/admin.go b/pkg/redisutil/admin.go index f41c6256e..dad0a8c3b 100644 --- a/pkg/redisutil/admin.go +++ b/pkg/redisutil/admin.go @@ -388,12 +388,7 @@ func (a *Admin) MigrateKeys(addr string, dest *Node, slots []Slot, batch int, ti break } - var args []string - if replace { - args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "REPLACE", "KEYS"}, keys...) - } else { - args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "KEYS"}, keys...) - } + args := a.migrateCmdArgs(dest, timeoutStr, replace, keys) resp = c.Cmd("MIGRATE", args) if err := a.Connections().ValidateResp(resp, addr, "Unable to run command MIGRATE"); err != nil { @@ -432,12 +427,7 @@ func (a *Admin) MigrateKeysInSlot(addr string, dest *Node, slot Slot, batch int, break } - var args []string - if replace { - args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "REPLACE", "KEYS"}, keys...) - } else { - args = append([]string{dest.IP, dest.Port, "", "0", timeoutStr, "KEYS"}, keys...) - } + args := a.migrateCmdArgs(dest, timeoutStr, replace, keys) resp = c.Cmd("MIGRATE", args) if err := a.Connections().ValidateResp(resp, addr, "Unable to run command MIGRATE"); err != nil { @@ -448,6 +438,20 @@ func (a *Admin) MigrateKeysInSlot(addr string, dest *Node, slot Slot, batch int, return keyCount, nil } +func (a *Admin) migrateCmdArgs(dest *Node, timeoutStr string, replace bool, keys []string) []string { + args := []string{dest.IP, dest.Port, "", "0", timeoutStr} + if password, ok := a.Connections().GetAUTH(); ok { + args = append(args, "AUTH", password) + } + if replace { + args = append(args, "REPLACE", "KEYS") + } else { + args = append(args, "KEYS") + } + args = append(args, keys...) + return args +} + // ForgetNode used to force other redis cluster node to forget a specific node func (a *Admin) ForgetNode(id string) error { infos, _ := a.GetClusterInfos() diff --git a/pkg/redisutil/connections.go b/pkg/redisutil/connections.go index 5703c1b06..5d576459b 100644 --- a/pkg/redisutil/connections.go +++ b/pkg/redisutil/connections.go @@ -58,6 +58,8 @@ type IAdminConnections interface { ValidatePipeResp(c IClient, addr, errMessage string) bool // Reset close all connections and clear the connection map Reset() + // GetAUTH return password and true if connection password is set, else return false. + GetAUTH() (string, bool) } // AdminConnections connection map for redis cluster @@ -98,6 +100,14 @@ func NewAdminConnections(addrs []string, options *AdminOptions, log logr.Logger) return cnx } +// GetAUTH return password and true if connection password is set, else return false. +func (cnx *AdminConnections) GetAUTH() (string, bool) { + if len(cnx.password) > 0 { + return cnx.password, true + } + return "", false +} + // Reconnect force a reconnection on the given address // is the adress is not part of the map, act like Add func (cnx *AdminConnections) Reconnect(addr string) error { diff --git a/pkg/resources/configmaps/configmap.go b/pkg/resources/configmaps/configmap.go index 4d07237fd..7b7c58eee 100644 --- a/pkg/resources/configmaps/configmap.go +++ b/pkg/resources/configmaps/configmap.go @@ -1,7 +1,9 @@ package configmaps import ( + "bytes" "fmt" + "sort" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -9,7 +11,11 @@ import ( redisv1alpha1 "github.com/ucloud/redis-cluster-operator/pkg/apis/redis/v1alpha1" ) -const RestoreSucceeded = "succeeded" +const ( + RestoreSucceeded = "succeeded" + + RedisConfKey = "redis.conf" +) // NewConfigMapForCR creates a new ConfigMap for the given Cluster func NewConfigMapForCR(cluster *redisv1alpha1.DistributedRedisCluster, labels map[string]string) *corev1.ConfigMap { @@ -45,6 +51,8 @@ if [ -f ${CLUSTER_CONFIG} ]; then fi exec "$@"` + redisConfContent := generateRedisConfContent(cluster.Spec.Config) + return &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: RedisConfigMapName(cluster.Name), @@ -55,10 +63,32 @@ exec "$@"` Data: map[string]string{ "shutdown.sh": shutdownContent, "fix-ip.sh": fixIPContent, + RedisConfKey: redisConfContent, }, } } +func generateRedisConfContent(configMap map[string]string) string { + if configMap == nil { + return "" + } + + var buffer bytes.Buffer + + keys := make([]string, 0, len(configMap)) + for k := range configMap { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + buffer.WriteString(fmt.Sprintf("%s %s", k, configMap[k])) + buffer.WriteString("\n") + } + + return buffer.String() +} + func RedisConfigMapName(clusterName string) string { return fmt.Sprintf("%s-%s", "redis-cluster", clusterName) } diff --git a/pkg/resources/configmaps/configmap_test.go b/pkg/resources/configmaps/configmap_test.go new file mode 100644 index 000000000..c040c05aa --- /dev/null +++ b/pkg/resources/configmaps/configmap_test.go @@ -0,0 +1,84 @@ +package configmaps + +import ( + "testing" +) + +func Test_generateRedisConfContent(t *testing.T) { + confMap := map[string]string{ + "activerehashing": "yes", + "appendfsync": "everysec", + "appendonly": "yes", + "auto-aof-rewrite-min-size": "67108864", + "auto-aof-rewrite-percentage": "100", + "cluster-node-timeout": "15000", + "cluster-require-full-coverage": "yes", + "hash-max-ziplist-entries": "512", + "hash-max-ziplist-value": "64", + "hll-sparse-max-bytes": "3000", + "list-compress-depth": "0", + "maxmemory": "1000000000", + "maxmemory-policy": "noeviction", + "maxmemory-samples": "5", + "no-appendfsync-on-rewrite": "no", + "notify-keyspace-events": "", + "repl-backlog-size": "1048576", + "repl-backlog-ttl": "3600", + "set-max-intset-entries": "512", + "slowlog-log-slower-than": "10000", + "slowlog-max-len": "128", + "stop-writes-on-bgsave-error": "yes", + "tcp-keepalive": "0", + "timeout": "0", + "zset-max-ziplist-entries": "128", + "zset-max-ziplist-value": "64", + } + want := `activerehashing yes +appendfsync everysec +appendonly yes +auto-aof-rewrite-min-size 67108864 +auto-aof-rewrite-percentage 100 +cluster-node-timeout 15000 +cluster-require-full-coverage yes +hash-max-ziplist-entries 512 +hash-max-ziplist-value 64 +hll-sparse-max-bytes 3000 +list-compress-depth 0 +maxmemory 1000000000 +maxmemory-policy noeviction +maxmemory-samples 5 +no-appendfsync-on-rewrite no +notify-keyspace-events +repl-backlog-size 1048576 +repl-backlog-ttl 3600 +set-max-intset-entries 512 +slowlog-log-slower-than 10000 +slowlog-max-len 128 +stop-writes-on-bgsave-error yes +tcp-keepalive 0 +timeout 0 +zset-max-ziplist-entries 128 +zset-max-ziplist-value 64 +` + type args struct { + configMap map[string]string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "test", + args: struct{ configMap map[string]string }{configMap: confMap}, + want: want, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := generateRedisConfContent(tt.args.configMap); got != tt.want { + t.Errorf("generateRedisConfContent()\n[%v], want\n[%v]", got, tt.want) + } + }) + } +} diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index e9d47245e..938ef9d67 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -149,6 +149,7 @@ func getRedisCommand(cluster *redisv1alpha1.DistributedRedisCluster, password *c cmd := []string{ "/conf/fix-ip.sh", "redis-server", + "/conf/redis.conf", "--cluster-enabled yes", "--cluster-config-file /data/nodes.conf", }