diff --git a/internal/constant/env.go b/internal/constant/env.go index 037c24c09a8..f6d4c98eb49 100644 --- a/internal/constant/env.go +++ b/internal/constant/env.go @@ -39,4 +39,6 @@ const ( KBEnvServicePort = "KB_SERVICE_PORT" KBEnvDataPath = "KB_DATA_PATH" KBEnvTTL = "KB_TTL" + KBEnvMaxLag = "KB_MAX_LAG" + KBEnvEnableHA = "KB_ENABLE_HA" ) diff --git a/lorry/binding/mysql/mysql.go b/lorry/binding/mysql/mysql.go index 80c161e9c51..db0067f9661 100644 --- a/lorry/binding/mysql/mysql.go +++ b/lorry/binding/mysql/mysql.go @@ -159,7 +159,7 @@ func (mysqlOps *MysqlOperations) GetRoleForReplication(ctx context.Context, requ cluster := k8sStore.GetClusterFromCache() if cluster == nil || !cluster.IsLocked() { return "", nil - } else if !dcsStore.HasLock() { + } else if !dcsStore.HasLease() { return SECONDARY, nil } diff --git a/lorry/binding/operation_volume_protection.go b/lorry/binding/operation_volume_protection.go index af228a973b7..1a7acd40b1b 100644 --- a/lorry/binding/operation_volume_protection.go +++ b/lorry/binding/operation_volume_protection.go @@ -95,13 +95,13 @@ func init() { } if err := optVolProt.Requester.init(context.Background()); err != nil { - optVolProt.Logger.Error(err, "init requester error") + optVolProt.Logger.Info("init requester failed", "error", err) return } optVolProt.Pod = os.Getenv(constant.KBEnvPodName) if err := optVolProt.initVolumes(); err != nil { - optVolProt.Logger.Error(err, "init volumes to monitor error") + optVolProt.Logger.Info("init volumes to monitor failed", "error", err) } optVolProt.Logger.Info(fmt.Sprintf("succeed to init volume protection, pod: %s, spec: %s", optVolProt.Pod, optVolProt.buildVolumesMsg())) } @@ -403,7 +403,7 @@ var _ volumeStatsRequester = &httpsVolumeStatsRequester{} func (r *httpsVolumeStatsRequester) init(ctx context.Context) error { var err error if r.cli, err = httpClient(); err != nil { - r.logger.Error(err, "build HTTP client error at setup") + // r.logger.Error(err, "build HTTP client error at setup") return err } // if r.req, err = httpRequest(ctx); err != nil { diff --git a/lorry/dcs/dcs.go b/lorry/dcs/dcs.go index bc678e2ec38..9dd54714cb0 100644 --- a/lorry/dcs/dcs.go +++ b/lorry/dcs/dcs.go @@ -47,12 +47,12 @@ type DCS interface { DeleteSwitchover() error // cluster scope leader lock - AttempAcquireLock() error - CreateLock() error - IsLockExist() (bool, error) - HasLock() bool - ReleaseLock() error - UpdateLock() error + AttempAcquireLease() error + CreateLease() error + IsLeaseExist() (bool, error) + HasLease() bool + ReleaseLease() error + UpdateLease() error GetLeader() (*Leader, error) } diff --git a/lorry/dcs/k8s.go b/lorry/dcs/k8s.go index 805f933bd9c..eb111cbbe2e 100644 --- a/lorry/dcs/k8s.go +++ b/lorry/dcs/k8s.go @@ -124,7 +124,7 @@ func (store *KubernetesStore) Initialize(cluster *Cluster) error { store.logger.Error(err, "Create Ha ConfigMap failed") } - err = store.CreateLock() + err = store.CreateLease() if err != nil { store.logger.Error(err, "Create Leader ConfigMap failed") } @@ -244,7 +244,7 @@ func (store *KubernetesStore) GetLeaderConfigMap() (*corev1.ConfigMap, error) { return leaderConfigMap, err } -func (store *KubernetesStore) IsLockExist() (bool, error) { +func (store *KubernetesStore) IsLeaseExist() (bool, error) { leaderConfigMap, err := store.GetLeaderConfigMap() appCluster, ok := store.cluster.resource.(*appsv1alpha1.Cluster) if leaderConfigMap != nil && ok && leaderConfigMap.CreationTimestamp.Before(&appCluster.CreationTimestamp) { @@ -255,8 +255,8 @@ func (store *KubernetesStore) IsLockExist() (bool, error) { return leaderConfigMap != nil, err } -func (store *KubernetesStore) CreateLock() error { - isExist, err := store.IsLockExist() +func (store *KubernetesStore) CreateLease() error { + isExist, err := store.IsLeaseExist() if isExist || err != nil { return err } @@ -347,7 +347,7 @@ func (store *KubernetesStore) DeleteLeader() error { return err } -func (store *KubernetesStore) AttempAcquireLock() error { +func (store *KubernetesStore) AttempAcquireLease() error { now := strconv.FormatInt(time.Now().Unix(), 10) ttl := store.cluster.HaConfig.ttl leaderName := store.currentMemberName @@ -366,7 +366,7 @@ func (store *KubernetesStore) AttempAcquireLock() error { } cm, err := store.clientset.CoreV1().ConfigMaps(store.namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}) if err != nil { - store.logger.Error(err, "Acquire lock failed") + store.logger.Error(err, "Acquire lease failed") } else { store.cluster.Leader.Resource = cm } @@ -374,16 +374,16 @@ func (store *KubernetesStore) AttempAcquireLock() error { return err } -func (store *KubernetesStore) HasLock() bool { +func (store *KubernetesStore) HasLease() bool { return store.cluster != nil && store.cluster.Leader != nil && store.cluster.Leader.Name == store.currentMemberName } -func (store *KubernetesStore) UpdateLock() error { +func (store *KubernetesStore) UpdateLease() error { configMap := store.cluster.Leader.Resource.(*corev1.ConfigMap) annotations := configMap.GetAnnotations() if annotations["leader"] != store.currentMemberName { - return errors.Errorf("lost lock") + return errors.Errorf("lost lease") } ttl := store.cluster.HaConfig.ttl annotations["ttl"] = strconv.Itoa(ttl) @@ -399,8 +399,8 @@ func (store *KubernetesStore) UpdateLock() error { return err } -func (store *KubernetesStore) ReleaseLock() error { - store.logger.Info("release lock") +func (store *KubernetesStore) ReleaseLease() error { + store.logger.Info("release lease") configMap := store.cluster.Leader.Resource.(*corev1.ConfigMap) configMap.Annotations["leader"] = "" @@ -410,7 +410,7 @@ func (store *KubernetesStore) ReleaseLock() error { } _, err := store.clientset.CoreV1().ConfigMaps(store.namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}) if err != nil { - store.logger.Error(err, "release lock failed") + store.logger.Error(err, "release lease failed") } // TODO: if response status code is 409, it means operation conflict. return err @@ -425,12 +425,18 @@ func (store *KubernetesStore) CreateHaConfig(cluster *Cluster) error { store.logger.Info(fmt.Sprintf("Create Ha ConfigMap: %s", haName)) ttl := viper.GetString(constant.KBEnvTTL) - maxLag := viper.GetString("KB_MAX_LAG") + maxLag := viper.GetString(constant.KBEnvMaxLag) + enableHA := viper.GetString(constant.KBEnvEnableHA) + if enableHA == "" { + // disable HA by default + enableHA = "false" + } haConfigMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: haName, Annotations: map[string]string{ "ttl": ttl, + "enable": enableHA, "MaxLagOnSwitchover": maxLag, }, }, @@ -468,6 +474,13 @@ func (store *KubernetesStore) GetHaConfig() (*HaConfig, error) { if err != nil { maxLagOnSwitchover = 1048576 } + + enable := false + enableStr := annotations["enable"] + if enableStr != "" { + enable, err = strconv.ParseBool(enableStr) + } + deleteMembers := make(map[string]MemberToDelete) str := annotations["delete-members"] if str != "" { @@ -480,6 +493,7 @@ func (store *KubernetesStore) GetHaConfig() (*HaConfig, error) { return &HaConfig{ index: configmap.ResourceVersion, ttl: ttl, + enable: enable, maxLagOnSwitchover: int64(maxLagOnSwitchover), DeleteMembers: deleteMembers, resource: configmap, diff --git a/lorry/dcs/types.go b/lorry/dcs/types.go index 75697a9d731..a9410f532fa 100644 --- a/lorry/dcs/types.go +++ b/lorry/dcs/types.go @@ -114,6 +114,7 @@ type MemberToDelete struct { type HaConfig struct { index string ttl int + enable bool maxLagOnSwitchover int64 DeleteMembers map[string]MemberToDelete resource any @@ -123,6 +124,10 @@ func (c *HaConfig) GetTTL() int { return c.ttl } +func (c *HaConfig) IsEnable() bool { + return c.enable +} + func (c *HaConfig) GetMaxLagOnSwitchover() int64 { return c.maxLagOnSwitchover } diff --git a/lorry/highavailability/ha.go b/lorry/highavailability/ha.go index b8b05fbb9d7..a36d0e6d2d0 100644 --- a/lorry/highavailability/ha.go +++ b/lorry/highavailability/ha.go @@ -87,6 +87,10 @@ func (ha *Ha) RunCycle() { return } + if !cluster.HaConfig.IsEnable() { + return + } + currentMember := cluster.GetMemberWithName(ha.dbManager.GetCurrentMemberName()) if cluster.HaConfig.IsDeleting(currentMember) { @@ -97,8 +101,8 @@ func (ha *Ha) RunCycle() { if !ha.dbManager.IsRunning() { ha.logger.Info("DB Service is not running, wait for lorryctl to start it") - if ha.dcs.HasLock() { - _ = ha.dcs.ReleaseLock() + if ha.dcs.HasLease() { + _ = ha.dcs.ReleaseLease() } _ = ha.dbManager.Start(ha.ctx, cluster) return @@ -123,8 +127,8 @@ func (ha *Ha) RunCycle() { case !ha.dbManager.IsCurrentMemberHealthy(ha.ctx, cluster): ha.logger.Info("DB Service is not healthy, do some recover") - if ha.dcs.HasLock() { - _ = ha.dcs.ReleaseLock() + if ha.dcs.HasLease() { + _ = ha.dcs.ReleaseLease() } // dbManager.Recover() @@ -132,25 +136,25 @@ func (ha *Ha) RunCycle() { ha.logger.Info("Cluster has no leader, attempt to take the leader") if ha.IsHealthiestMember(ha.ctx, cluster) { cluster.Leader.DBState = DBState - if ha.dcs.AttempAcquireLock() == nil { + if ha.dcs.AttempAcquireLease() == nil { err := ha.dbManager.Promote(ha.ctx, cluster) if err != nil { ha.logger.Error(err, "Take the leader failed") - _ = ha.dcs.ReleaseLock() + _ = ha.dcs.ReleaseLease() } else { ha.logger.Info("Take the leader success!") } } } - case ha.dcs.HasLock(): + case ha.dcs.HasLease(): ha.logger.Info("This member is Cluster's leader") if cluster.Switchover != nil { if cluster.Switchover.Leader == ha.dbManager.GetCurrentMemberName() || (cluster.Switchover.Candidate != "" && cluster.Switchover.Candidate != ha.dbManager.GetCurrentMemberName()) { if ha.HasOtherHealthyMember(cluster) { _ = ha.dbManager.Demote(ha.ctx) - _ = ha.dcs.ReleaseLock() + _ = ha.dcs.ReleaseLease() break } @@ -168,7 +172,7 @@ func (ha *Ha) RunCycle() { // role services as the source of truth. // for replicationset cluster, HasOtherHealthyLeader will always be false. ha.logger.Info("Release leader") - _ = ha.dcs.ReleaseLock() + _ = ha.dcs.ReleaseLease() break } err := ha.dbManager.Promote(ha.ctx, cluster) @@ -178,13 +182,13 @@ func (ha *Ha) RunCycle() { } ha.logger.Info("Refresh leader ttl") - _ = ha.dcs.UpdateLock() + _ = ha.dcs.UpdateLease() if int(cluster.Replicas) < len(ha.dbManager.GetMemberAddrs(ha.ctx, cluster)) && cluster.Replicas != 0 { ha.DecreaseClusterReplicas(cluster) } - case !ha.dcs.HasLock(): + case !ha.dcs.HasLease(): if cluster.Switchover != nil { break } @@ -248,20 +252,26 @@ func (ha *Ha) Start() { isRootCreated, err = ha.dbManager.IsRootCreated(ha.ctx) } - isExist, _ := ha.dcs.IsLockExist() + isExist, _ := ha.dcs.IsLeaseExist() for !isExist { if ok, _ := ha.dbManager.IsLeader(context.Background(), cluster); ok { - _ = ha.dcs.Initialize(cluster) + err := ha.dcs.Initialize(cluster) + if err != nil { + ha.logger.Error(err, "DCS initialize failed") + time.Sleep(5 * time.Second) + continue + } break } + ha.logger.Info("Waiting for the database Leader to be ready.") time.Sleep(5 * time.Second) - isExist, _ = ha.dcs.IsLockExist() + isExist, _ = ha.dcs.IsLeaseExist() } for { ha.RunCycle() - time.Sleep(1 * time.Second) + time.Sleep(10 * time.Second) } } @@ -277,7 +287,7 @@ func (ha *Ha) DecreaseClusterReplicas(cluster *dcs3.Cluster) { ha.logger.Info(fmt.Sprintf("The last pod %s is the primary member and cannot be deleted. waiting "+ "for The controller to perform a switchover to a new primary member before this pod can be removed. ", deleteHost)) _ = ha.dbManager.Demote(ha.ctx) - _ = ha.dcs.ReleaseLock() + _ = ha.dcs.ReleaseLease() return } memberName := strings.Split(deleteHost, ".")[0] @@ -359,7 +369,7 @@ func (ha *Ha) DeleteCurrentMember(ctx context.Context, cluster *dcs3.Cluster) er defer ha.deleteLock.Unlock() // if current member is leader, take a switchover first - if ha.dcs.HasLock() { + if ha.dcs.HasLease() { for cluster.Switchover != nil { ha.logger.Info("cluster is doing switchover, wait for it to finish") return nil