Skip to content

Commit

Permalink
PWX-28113: Fix ListMembers API and add a new RemoveMemberByID API. (p…
Browse files Browse the repository at this point in the history
…ortworx#112)

* PWX-28114: Fix ListMembers API and add a new RemoveMemberByID API.

ListMembers API
- The ListMembers API used to return a map[string]*MemberInfo. The key
to this map was the name of a member.
- However when there is unstarted etcd member, the name of the member is
set to empty.
- Changed the API to return a map[uint64]*MemberInfo, where the key is
a unique ID which is always set (even for unstarted members).
- Also added HasStarted and Name fields to MemberInfo for easy lookup.

RemoveMemberByID API
- Introduced a new API to remove members based on their IDs.

UTs
- Fixed existing UTs to handle the new API.
- Added tests for RemoveMemberByID API.
- Added more checks to existing UTs.

Signed-off-by: Aditya Dani <[email protected]>

* PWX-28113: fixup: review comments

Signed-off-by: Aditya Dani <[email protected]>

---------

Signed-off-by: Aditya Dani <[email protected]>
  • Loading branch information
adityadani authored Mar 31, 2023
1 parent 2decd55 commit 5cc46fd
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 74 deletions.
91 changes: 75 additions & 16 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const (

var (
defaultMachines = []string{"http://127.0.0.1:2379", "http://[::1]:2379"}
// mLock is a lock over the maintenanceClient
mLock sync.Mutex
// maintenanceClientLock is a lock over the maintenanceClient
maintenanceClientLock sync.Mutex
)

// watchQ to collect updates without blocking
Expand Down Expand Up @@ -1455,7 +1455,6 @@ func (et *etcdKV) RemoveMember(
nodeName string,
nodeIP string,
) error {
fn := "RemoveMember"
ctx, cancel := et.MaintenanceContextWithLeader()
memberListResponse, err := et.kvClient.MemberList(ctx)
cancel()
Expand Down Expand Up @@ -1484,9 +1483,19 @@ func (et *etcdKV) RemoveMember(
}
et.kvClient.SetEndpoints(newClientUrls...)
et.maintenanceClient.SetEndpoints(newClientUrls...)

if removeMemberID == uint64(0) {
// Member not found. No need to remove it
return nil
}
return et.removeMember(removeMemberID)
}

func (et *etcdKV) removeMember(removeMemberID uint64) error {
fn := "removeMember"
removeMemberRetries := 5
for i := 0; i < removeMemberRetries; i++ {
ctx, cancel = et.MaintenanceContextWithLeader()
ctx, cancel := et.MaintenanceContextWithLeader()
_, err := et.kvClient.MemberRemove(ctx, removeMemberID)
cancel()

Expand All @@ -1497,13 +1506,13 @@ func (et *etcdKV) RemoveMember(
return nil
}
// Check if we need to retry
retry, err := isRetryNeeded(err, fn, nodeName, i)
retry, err := isRetryNeeded(err, fn, "", i)
if !retry {
// For all others return immediately
return err
}
if i == (removeMemberRetries - 1) {
return fmt.Errorf("too many retries for RemoveMember: %v %v", nodeName, nodeIP)
return fmt.Errorf("too many retries for RemoveMember: %v", removeMemberID)
}
time.Sleep(2 * time.Second)
continue
Expand All @@ -1513,7 +1522,55 @@ func (et *etcdKV) RemoveMember(
return nil
}

func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) {
func (et *etcdKV) RemoveMemberByID(
removeMemberID uint64,
) error {
ctx, cancel := et.MaintenanceContextWithLeader()
memberListResponse, err := et.kvClient.MemberList(ctx)
cancel()
if err != nil {
return err
}
var (
removeMemberClientURLs, newClientURLs []string
found bool
)

for _, member := range memberListResponse.Members {
if member.ID == removeMemberID {
found = true
removeMemberClientURLs = append(removeMemberClientURLs, member.ClientURLs...)
break
}
}
if !found {
// Member does not exist
return nil
}
currentEndpoints := et.kvClient.Endpoints()

// Remove the clientURLs for the member which is being removed from
// the active set of endpoints
for _, currentEndpoint := range currentEndpoints {
found := false
for _, removeClientURL := range removeMemberClientURLs {
if removeClientURL == currentEndpoint {
found = true
break
}
}
if !found {
newClientURLs = append(newClientURLs, currentEndpoint)
}
}

et.kvClient.SetEndpoints(newClientURLs...)
et.maintenanceClient.SetEndpoints(newClientURLs...)

return et.removeMember(removeMemberID)
}

func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) {
var (
fnMemberStatus = func(cliURL string) (*kvdb.MemberInfo, uint64, error) {
if cliURL == "" {
Expand Down Expand Up @@ -1549,8 +1606,8 @@ func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) {
}

membersMap := make(map[uint64]*kvdb.MemberInfo)
mLock.Lock()
defer mLock.Unlock()
maintenanceClientLock.Lock()
defer maintenanceClientLock.Unlock()

// Get status from Endpoints
for _, ep := range et.GetEndpoints() {
Expand All @@ -1577,22 +1634,24 @@ func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) {
}
}

// Fill PeerURLs; also, remap with "Name" as a key
resp := make(map[string]*kvdb.MemberInfo)
// Fill in other details in the MemberInfo object
for _, member := range memberListResponse.Members {
if mi, has := membersMap[member.ID]; has {
mi.PeerUrls = member.PeerURLs
resp[member.Name] = mi
mi.Name = member.Name
mi.HasStarted = len(member.Name) > 0
} else {
// no status -- add "blank" MemberInfo
resp[member.Name] = &kvdb.MemberInfo{
PeerUrls: member.PeerURLs,
ID: strconv.FormatUint(member.ID, 16),
membersMap[member.ID] = &kvdb.MemberInfo{
PeerUrls: member.PeerURLs,
Name: member.Name,
HasStarted: len(member.Name) > 0,
ID: strconv.FormatUint(member.ID, 16),
}
}
}

return resp, nil
return membersMap, nil
}

func (et *etcdKV) Serialize() ([]byte, error) {
Expand Down
47 changes: 31 additions & 16 deletions kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ type Kvdb interface {
Snapshot(prefixes []string, consistent bool) (Kvdb, uint64, error)
// SnapPut records the key value pair including the index.
SnapPut(kvp *KVPair) (*KVPair, error)
// Lock specfied key and associate a lockerID with it, probably to identify
// LockWithID locks the specified key and associates a lockerID with it, probably to identify
// who acquired the lock. The KVPair returned should be used to unlock.
LockWithID(key string, lockerID string) (*KVPair, error)
// Lock specfied key. The KVPair returned should be used to unlock.
// Lock locks the specified key. The KVPair returned should be used to unlock.
Lock(key string) (*KVPair, error)
// Lock with specified key and associate a lockerID with it.
// LockWithTimeout locks with specified key and associates a lockerID with it.
// lockTryDuration is the maximum time that can be spent trying to acquire
// lock, else return error.
// lockHoldDuration is the maximum time the lock can be held, after which
Expand Down Expand Up @@ -407,33 +407,48 @@ const (

// MemberInfo represents a member of the kvdb cluster
type MemberInfo struct {
PeerUrls []string
// PeerUrls is this member's URL on which it talks to its peers
PeerUrls []string
// ClientUrls is this member's URL on which clients can reach this member.
ClientUrls []string
Leader bool
DbSize int64
IsHealthy bool
ID string
// Leader indicates if this member is the leader of this cluster.
Leader bool
// DbSize is the current DB size as seen by this member.
DbSize int64
// IsHealthy indicates the health of the member.
IsHealthy bool
// ID is the string representation of member's ID
ID string
// Name of the member. A member which has not started has an empty Name.
Name string
// HasStarted indicates if this member has successfully started kvdb.
HasStarted bool
}

// Controller interface provides APIs to manage Kvdb Cluster and Kvdb Clients.
type Controller interface {
// AddMember adds a new member to an existing kvdb cluster. Add should be
// called on a kvdb node where kvdb is already running. It should be
// followed by a Setup call on the actual node
// Returns: map of nodeID to peerUrls of all members in the initial cluster or error
// AddMember adds a new member to an existing kvdb cluster. Add API should be
// invoked on an existing kvdb node where kvdb is already running. It should be
// followed by a Setup call on the node which is being added.
// Returns: map of nodeID to peerUrls of all members in the initial cluster or error.
AddMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error)

// RemoveMember removes a member from an existing kvdb cluster
// RemoveMember removes a member based on its Name from an existing kvdb cluster.
// Returns: error if it fails to remove a member
RemoveMember(nodeName, nodeIP string) error

// RemoveMemberByID removes a member based on its ID from an existing kvdb cluster.
// Returns: error if it fails to remove a member
RemoveMemberByID(memberID uint64) error

// UpdateMember updates the IP for the given node in an existing kvdb cluster
// Returns: map of nodeID to peerUrls of all members from the existing cluster
UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error)

// ListMembers enumerates the members of the kvdb cluster
// Returns: the nodeID to memberInfo mappings of all the members
ListMembers() (map[string]*MemberInfo, error)
// ListMembers enumerates the members of the kvdb cluster. It includes both the
// started and unstarted members.
// Returns: the member's ID to MemberInfo mappings for all the members
ListMembers() (map[uint64]*MemberInfo, error)

// SetEndpoints set the kvdb endpoints for the client
SetEndpoints(endpoints []string) error
Expand Down
5 changes: 4 additions & 1 deletion kvdb_controller_not_supported.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ func (c *controllerNotSupported) AddMember(nodeIP, nodePeerPort, nodeName string
func (c *controllerNotSupported) RemoveMember(nodeID string, nodeIP string) error {
return ErrNotSupported
}
func (c *controllerNotSupported) RemoveMemberByID(memberID uint64) error {
return ErrNotSupported
}

func (c *controllerNotSupported) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) {
return nil, ErrNotSupported
}

func (c *controllerNotSupported) ListMembers() (map[string]*MemberInfo, error) {
func (c *controllerNotSupported) ListMembers() (map[uint64]*MemberInfo, error) {
return nil, ErrNotSupported
}

Expand Down
4 changes: 2 additions & 2 deletions test/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func deleteTree(kv kvdb.Kvdb, t *testing.T) {
prefix := "tree"
keys := map[string]string{
prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar",
prefix + "/foo": "baz",
prefix + "/foo": "baz",
}

for key, val := range keys {
Expand Down Expand Up @@ -407,7 +407,7 @@ func enumerate(kv kvdb.Kvdb, t *testing.T) {
prefix := "enumerate"
keys := map[string]string{
prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar",
prefix + "/foo": "baz",
prefix + "/foo": "baz",
}

kv.DeleteTree(prefix)
Expand Down
Loading

0 comments on commit 5cc46fd

Please sign in to comment.