Skip to content

Commit

Permalink
chore: add ha switch (#5357)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuriwuyun authored Oct 9, 2023
1 parent 084feaa commit 9de830b
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 40 deletions.
2 changes: 2 additions & 0 deletions internal/constant/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ const (
KBEnvServicePort = "KB_SERVICE_PORT"
KBEnvDataPath = "KB_DATA_PATH"
KBEnvTTL = "KB_TTL"
KBEnvMaxLag = "KB_MAX_LAG"
KBEnvEnableHA = "KB_ENABLE_HA"
)
2 changes: 1 addition & 1 deletion lorry/binding/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (mysqlOps *MysqlOperations) GetRoleForReplication(ctx context.Context, requ
cluster := k8sStore.GetClusterFromCache()
if cluster == nil || !cluster.IsLocked() {
return "", nil
} else if !dcsStore.HasLock() {
} else if !dcsStore.HasLease() {
return SECONDARY, nil
}

Expand Down
6 changes: 3 additions & 3 deletions lorry/binding/operation_volume_protection.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func init() {
}

if err := optVolProt.Requester.init(context.Background()); err != nil {
optVolProt.Logger.Error(err, "init requester error")
optVolProt.Logger.Info("init requester failed", "error", err)
return
}

optVolProt.Pod = os.Getenv(constant.KBEnvPodName)
if err := optVolProt.initVolumes(); err != nil {
optVolProt.Logger.Error(err, "init volumes to monitor error")
optVolProt.Logger.Info("init volumes to monitor failed", "error", err)
}
optVolProt.Logger.Info(fmt.Sprintf("succeed to init volume protection, pod: %s, spec: %s", optVolProt.Pod, optVolProt.buildVolumesMsg()))
}
Expand Down Expand Up @@ -403,7 +403,7 @@ var _ volumeStatsRequester = &httpsVolumeStatsRequester{}
func (r *httpsVolumeStatsRequester) init(ctx context.Context) error {
var err error
if r.cli, err = httpClient(); err != nil {
r.logger.Error(err, "build HTTP client error at setup")
// r.logger.Error(err, "build HTTP client error at setup")
return err
}
// if r.req, err = httpRequest(ctx); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions lorry/dcs/dcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ type DCS interface {
DeleteSwitchover() error

// cluster scope leader lock
AttempAcquireLock() error
CreateLock() error
IsLockExist() (bool, error)
HasLock() bool
ReleaseLock() error
UpdateLock() error
AttempAcquireLease() error
CreateLease() error
IsLeaseExist() (bool, error)
HasLease() bool
ReleaseLease() error
UpdateLease() error

GetLeader() (*Leader, error)
}
Expand Down
40 changes: 27 additions & 13 deletions lorry/dcs/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (store *KubernetesStore) Initialize(cluster *Cluster) error {
store.logger.Error(err, "Create Ha ConfigMap failed")
}

err = store.CreateLock()
err = store.CreateLease()
if err != nil {
store.logger.Error(err, "Create Leader ConfigMap failed")
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (store *KubernetesStore) GetLeaderConfigMap() (*corev1.ConfigMap, error) {
return leaderConfigMap, err
}

func (store *KubernetesStore) IsLockExist() (bool, error) {
func (store *KubernetesStore) IsLeaseExist() (bool, error) {
leaderConfigMap, err := store.GetLeaderConfigMap()
appCluster, ok := store.cluster.resource.(*appsv1alpha1.Cluster)
if leaderConfigMap != nil && ok && leaderConfigMap.CreationTimestamp.Before(&appCluster.CreationTimestamp) {
Expand All @@ -255,8 +255,8 @@ func (store *KubernetesStore) IsLockExist() (bool, error) {
return leaderConfigMap != nil, err
}

func (store *KubernetesStore) CreateLock() error {
isExist, err := store.IsLockExist()
func (store *KubernetesStore) CreateLease() error {
isExist, err := store.IsLeaseExist()
if isExist || err != nil {
return err
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func (store *KubernetesStore) DeleteLeader() error {
return err
}

func (store *KubernetesStore) AttempAcquireLock() error {
func (store *KubernetesStore) AttempAcquireLease() error {
now := strconv.FormatInt(time.Now().Unix(), 10)
ttl := store.cluster.HaConfig.ttl
leaderName := store.currentMemberName
Expand All @@ -366,24 +366,24 @@ func (store *KubernetesStore) AttempAcquireLock() error {
}
cm, err := store.clientset.CoreV1().ConfigMaps(store.namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
if err != nil {
store.logger.Error(err, "Acquire lock failed")
store.logger.Error(err, "Acquire lease failed")
} else {
store.cluster.Leader.Resource = cm
}

return err
}

func (store *KubernetesStore) HasLock() bool {
func (store *KubernetesStore) HasLease() bool {
return store.cluster != nil && store.cluster.Leader != nil && store.cluster.Leader.Name == store.currentMemberName
}

func (store *KubernetesStore) UpdateLock() error {
func (store *KubernetesStore) UpdateLease() error {
configMap := store.cluster.Leader.Resource.(*corev1.ConfigMap)

annotations := configMap.GetAnnotations()
if annotations["leader"] != store.currentMemberName {
return errors.Errorf("lost lock")
return errors.Errorf("lost lease")
}
ttl := store.cluster.HaConfig.ttl
annotations["ttl"] = strconv.Itoa(ttl)
Expand All @@ -399,8 +399,8 @@ func (store *KubernetesStore) UpdateLock() error {
return err
}

func (store *KubernetesStore) ReleaseLock() error {
store.logger.Info("release lock")
func (store *KubernetesStore) ReleaseLease() error {
store.logger.Info("release lease")
configMap := store.cluster.Leader.Resource.(*corev1.ConfigMap)
configMap.Annotations["leader"] = ""

Expand All @@ -410,7 +410,7 @@ func (store *KubernetesStore) ReleaseLock() error {
}
_, err := store.clientset.CoreV1().ConfigMaps(store.namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
if err != nil {
store.logger.Error(err, "release lock failed")
store.logger.Error(err, "release lease failed")
}
// TODO: if response status code is 409, it means operation conflict.
return err
Expand All @@ -425,12 +425,18 @@ func (store *KubernetesStore) CreateHaConfig(cluster *Cluster) error {

store.logger.Info(fmt.Sprintf("Create Ha ConfigMap: %s", haName))
ttl := viper.GetString(constant.KBEnvTTL)
maxLag := viper.GetString("KB_MAX_LAG")
maxLag := viper.GetString(constant.KBEnvMaxLag)
enableHA := viper.GetString(constant.KBEnvEnableHA)
if enableHA == "" {
// disable HA by default
enableHA = "false"
}
haConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: haName,
Annotations: map[string]string{
"ttl": ttl,
"enable": enableHA,
"MaxLagOnSwitchover": maxLag,
},
},
Expand Down Expand Up @@ -468,6 +474,13 @@ func (store *KubernetesStore) GetHaConfig() (*HaConfig, error) {
if err != nil {
maxLagOnSwitchover = 1048576
}

enable := false
enableStr := annotations["enable"]
if enableStr != "" {
enable, err = strconv.ParseBool(enableStr)
}

deleteMembers := make(map[string]MemberToDelete)
str := annotations["delete-members"]
if str != "" {
Expand All @@ -480,6 +493,7 @@ func (store *KubernetesStore) GetHaConfig() (*HaConfig, error) {
return &HaConfig{
index: configmap.ResourceVersion,
ttl: ttl,
enable: enable,
maxLagOnSwitchover: int64(maxLagOnSwitchover),
DeleteMembers: deleteMembers,
resource: configmap,
Expand Down
5 changes: 5 additions & 0 deletions lorry/dcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type MemberToDelete struct {
type HaConfig struct {
index string
ttl int
enable bool
maxLagOnSwitchover int64
DeleteMembers map[string]MemberToDelete
resource any
Expand All @@ -123,6 +124,10 @@ func (c *HaConfig) GetTTL() int {
return c.ttl
}

func (c *HaConfig) IsEnable() bool {
return c.enable
}

func (c *HaConfig) GetMaxLagOnSwitchover() int64 {
return c.maxLagOnSwitchover
}
Expand Down
44 changes: 27 additions & 17 deletions lorry/highavailability/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (ha *Ha) RunCycle() {
return
}

if !cluster.HaConfig.IsEnable() {
return
}

currentMember := cluster.GetMemberWithName(ha.dbManager.GetCurrentMemberName())

if cluster.HaConfig.IsDeleting(currentMember) {
Expand All @@ -97,8 +101,8 @@ func (ha *Ha) RunCycle() {

if !ha.dbManager.IsRunning() {
ha.logger.Info("DB Service is not running, wait for lorryctl to start it")
if ha.dcs.HasLock() {
_ = ha.dcs.ReleaseLock()
if ha.dcs.HasLease() {
_ = ha.dcs.ReleaseLease()
}
_ = ha.dbManager.Start(ha.ctx, cluster)
return
Expand All @@ -123,34 +127,34 @@ func (ha *Ha) RunCycle() {

case !ha.dbManager.IsCurrentMemberHealthy(ha.ctx, cluster):
ha.logger.Info("DB Service is not healthy, do some recover")
if ha.dcs.HasLock() {
_ = ha.dcs.ReleaseLock()
if ha.dcs.HasLease() {
_ = ha.dcs.ReleaseLease()
}
// dbManager.Recover()

case !cluster.IsLocked():
ha.logger.Info("Cluster has no leader, attempt to take the leader")
if ha.IsHealthiestMember(ha.ctx, cluster) {
cluster.Leader.DBState = DBState
if ha.dcs.AttempAcquireLock() == nil {
if ha.dcs.AttempAcquireLease() == nil {
err := ha.dbManager.Promote(ha.ctx, cluster)
if err != nil {
ha.logger.Error(err, "Take the leader failed")
_ = ha.dcs.ReleaseLock()
_ = ha.dcs.ReleaseLease()
} else {
ha.logger.Info("Take the leader success!")
}
}
}

case ha.dcs.HasLock():
case ha.dcs.HasLease():
ha.logger.Info("This member is Cluster's leader")
if cluster.Switchover != nil {
if cluster.Switchover.Leader == ha.dbManager.GetCurrentMemberName() ||
(cluster.Switchover.Candidate != "" && cluster.Switchover.Candidate != ha.dbManager.GetCurrentMemberName()) {
if ha.HasOtherHealthyMember(cluster) {
_ = ha.dbManager.Demote(ha.ctx)
_ = ha.dcs.ReleaseLock()
_ = ha.dcs.ReleaseLease()
break
}

Expand All @@ -168,7 +172,7 @@ func (ha *Ha) RunCycle() {
// role services as the source of truth.
// for replicationset cluster, HasOtherHealthyLeader will always be false.
ha.logger.Info("Release leader")
_ = ha.dcs.ReleaseLock()
_ = ha.dcs.ReleaseLease()
break
}
err := ha.dbManager.Promote(ha.ctx, cluster)
Expand All @@ -178,13 +182,13 @@ func (ha *Ha) RunCycle() {
}

ha.logger.Info("Refresh leader ttl")
_ = ha.dcs.UpdateLock()
_ = ha.dcs.UpdateLease()

if int(cluster.Replicas) < len(ha.dbManager.GetMemberAddrs(ha.ctx, cluster)) && cluster.Replicas != 0 {
ha.DecreaseClusterReplicas(cluster)
}

case !ha.dcs.HasLock():
case !ha.dcs.HasLease():
if cluster.Switchover != nil {
break
}
Expand Down Expand Up @@ -248,20 +252,26 @@ func (ha *Ha) Start() {
isRootCreated, err = ha.dbManager.IsRootCreated(ha.ctx)
}

isExist, _ := ha.dcs.IsLockExist()
isExist, _ := ha.dcs.IsLeaseExist()
for !isExist {
if ok, _ := ha.dbManager.IsLeader(context.Background(), cluster); ok {
_ = ha.dcs.Initialize(cluster)
err := ha.dcs.Initialize(cluster)
if err != nil {
ha.logger.Error(err, "DCS initialize failed")
time.Sleep(5 * time.Second)
continue
}
break
}

ha.logger.Info("Waiting for the database Leader to be ready.")
time.Sleep(5 * time.Second)
isExist, _ = ha.dcs.IsLockExist()
isExist, _ = ha.dcs.IsLeaseExist()
}

for {
ha.RunCycle()
time.Sleep(1 * time.Second)
time.Sleep(10 * time.Second)
}
}

Expand All @@ -277,7 +287,7 @@ func (ha *Ha) DecreaseClusterReplicas(cluster *dcs3.Cluster) {
ha.logger.Info(fmt.Sprintf("The last pod %s is the primary member and cannot be deleted. waiting "+
"for The controller to perform a switchover to a new primary member before this pod can be removed. ", deleteHost))
_ = ha.dbManager.Demote(ha.ctx)
_ = ha.dcs.ReleaseLock()
_ = ha.dcs.ReleaseLease()
return
}
memberName := strings.Split(deleteHost, ".")[0]
Expand Down Expand Up @@ -359,7 +369,7 @@ func (ha *Ha) DeleteCurrentMember(ctx context.Context, cluster *dcs3.Cluster) er
defer ha.deleteLock.Unlock()

// if current member is leader, take a switchover first
if ha.dcs.HasLock() {
if ha.dcs.HasLease() {
for cluster.Switchover != nil {
ha.logger.Info("cluster is doing switchover, wait for it to finish")
return nil
Expand Down

0 comments on commit 9de830b

Please sign in to comment.