Skip to content

Commit

Permalink
fix: delete leave member in ha loop (#7062)
Browse files Browse the repository at this point in the history
(cherry picked from commit e5c256e)
  • Loading branch information
kizuna-lek committed Apr 17, 2024
1 parent 69dae95 commit 5422cb2
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 86 deletions.
4 changes: 2 additions & 2 deletions pkg/lorry/client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ var _ = Describe("Lorry HTTP Client", func() {
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(2)
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(2)
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
})
Expand All @@ -457,7 +457,7 @@ var _ = Describe("Lorry HTTP Client", func() {
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(4)
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(2)
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil).Times(2)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(3)
// first leave
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
Expand Down
23 changes: 23 additions & 0 deletions pkg/lorry/dcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,29 @@ func (c *HaConfig) FinishDeleted(member *Member) {
c.DeleteMembers[member.Name] = *memberToDelete
}

func (c *HaConfig) TryToRemoveDeleteRecord(member *Member) bool {
memberToDelete := c.GetDuplicatedMemberToDelete(member)
if memberToDelete != nil {
delete(c.DeleteMembers, member.Name)
return true
}

return false
}

// GetDuplicatedMemberToDelete get previous duplicated delete record in ha configmap
func (c *HaConfig) GetDuplicatedMemberToDelete(member *Member) *MemberToDelete {
memberToDelete, ok := c.DeleteMembers[member.Name]
if !ok {
return nil
}

if memberToDelete.UID == member.UID {
return nil
}
return &memberToDelete
}

func (c *HaConfig) GetMemberToDelete(member *Member) *MemberToDelete {
memberToDelete, ok := c.DeleteMembers[member.Name]
if !ok {
Expand Down
91 changes: 7 additions & 84 deletions pkg/lorry/highavailability/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ package highavailability

import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -42,7 +38,6 @@ type Ha struct {
dbManager engines.DBManager
dcs dcs3.DCS
logger logr.Logger
deleteLock sync.Mutex
disableDNSChecker bool
}

Expand Down Expand Up @@ -87,7 +82,6 @@ func (ha *Ha) RunCycle() {

if cluster.HaConfig.IsDeleting(currentMember) {
ha.logger.Info("Current Member is deleted!")
_ = ha.DeleteCurrentMember(ha.ctx, cluster)
return
}

Expand Down Expand Up @@ -187,10 +181,6 @@ func (ha *Ha) RunCycle() {
ha.logger.Info("Refresh leader ttl")
_ = ha.dcs.UpdateLease()

if int(cluster.Replicas) < len(ha.dbManager.GetMemberAddrs(ha.ctx, cluster)) && cluster.Replicas != 0 {
ha.DecreaseClusterReplicas(cluster)
}

case !ha.dcs.HasLease():
if cluster.Switchover != nil {
break
Expand Down Expand Up @@ -228,6 +218,13 @@ func (ha *Ha) Start() {
util.WaitForPodReady(false)
}

// Delete duplicate member deletion records.
removed := cluster.HaConfig.TryToRemoveDeleteRecord(cluster.GetMemberWithName(ha.dbManager.GetCurrentMemberName()))
if removed {
ha.logger.Info("Found previous duplicated delete record, remove it")
_ = ha.dcs.UpdateHaConfig()
}

ha.logger.Info(fmt.Sprintf("cluster: %v", cluster))
isInitialized, err := ha.dbManager.IsClusterInitialized(context.TODO(), cluster)
for err != nil || !isInitialized {
Expand Down Expand Up @@ -288,30 +285,6 @@ func (ha *Ha) Start() {
}
}

func (ha *Ha) DecreaseClusterReplicas(cluster *dcs3.Cluster) {
hosts := ha.dbManager.GetMemberAddrs(ha.ctx, cluster)
sort.Strings(hosts)
deleteHost := hosts[len(hosts)-1]
ha.logger.Info("Delete member", "name", deleteHost)
// The pods in the cluster are managed by a StatefulSet. If the replica count is decreased,
// then the last pod will be removed first.
//
if strings.HasPrefix(deleteHost, ha.dbManager.GetCurrentMemberName()) {
ha.logger.Info(fmt.Sprintf("The last pod %s is the primary member and cannot be deleted. waiting "+
"for The controller to perform a switchover to a new primary member before this pod can be removed. ", deleteHost))
_ = ha.dbManager.Demote(ha.ctx)
_ = ha.dcs.ReleaseLease()
return
}
memberName := strings.Split(deleteHost, ".")[0]
member := cluster.GetMemberWithName(memberName)
if member != nil {
ha.logger.Info(fmt.Sprintf("member %s exists, do not delete", memberName))
return
}
_ = ha.dbManager.LeaveMemberFromCluster(ha.ctx, cluster, memberName)
}

func (ha *Ha) IsHealthiestMember(ctx context.Context, cluster *dcs3.Cluster) bool {
currentMemberName := ha.dbManager.GetCurrentMemberName()
currentMember := cluster.GetMemberWithName(currentMemberName)
Expand Down Expand Up @@ -373,56 +346,6 @@ func (ha *Ha) HasOtherHealthyMember(cluster *dcs3.Cluster) bool {
return false
}

func (ha *Ha) DeleteCurrentMember(ctx context.Context, cluster *dcs3.Cluster) error {
currentMember := cluster.GetMemberWithName(ha.dbManager.GetCurrentMemberName())
if cluster.HaConfig.IsDeleted(currentMember) {
return nil
}

ha.deleteLock.Lock()
defer ha.deleteLock.Unlock()

// if current member is leader, take a switchover first
if ha.dcs.HasLease() {
for cluster.Switchover != nil {
ha.logger.Info("cluster is doing switchover, wait for it to finish")
return nil
}

leaderMember := cluster.GetLeaderMember()
if len(ha.dbManager.HasOtherHealthyMembers(ctx, cluster, leaderMember.Name)) == 0 {
message := "cluster has no other healthy members"
ha.logger.Info(message)
return errors.New(message)
}

err := ha.dcs.CreateSwitchover(leaderMember.Name, "")
if err != nil {
ha.logger.Error(err, "switchover failed")
return err
}

ha.logger.Info("cluster is doing switchover, wait for it to finish")
return nil
}

// redistribute the data of the current member among other members if needed
err := ha.dbManager.MoveData(ctx, cluster)
if err != nil {
ha.logger.Error(err, "Move data failed")
return err
}

// remove current member from db cluster
err = ha.dbManager.LeaveMemberFromCluster(ctx, cluster, ha.dbManager.GetCurrentMemberName())
if err != nil {
ha.logger.Error(err, "Delete member form cluster failed")
return err
}
cluster.HaConfig.FinishDeleted(currentMember)
return ha.dcs.UpdateHaConfig()
}

func (ha *Ha) isMinimumLag(ctx context.Context, cluster *dcs3.Cluster, member *dcs3.Member) bool {
isCurrentLagging, currentLag := ha.dbManager.IsMemberLagging(ctx, cluster, member)
if isCurrentLagging {
Expand Down
5 changes: 5 additions & 0 deletions pkg/lorry/operations/replica/leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,10 @@ func (s *Leave) Do(ctx context.Context, req *operations.OpsRequest) (*operations
return nil, err
}

if cluster.HaConfig.IsDeleting(currentMember) {
cluster.HaConfig.FinishDeleted(currentMember)
_ = s.dcsStore.UpdateHaConfig()
}

return nil, nil
}

0 comments on commit 5422cb2

Please sign in to comment.