Skip to content

Commit

Permalink
Revert "PWX-28113: Fix ListMembers API and add a new RemoveMemberByID…
Browse files Browse the repository at this point in the history
  • Loading branch information
adityadani authored Mar 31, 2023
1 parent 5cc46fd commit 36b33a4
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 248 deletions.
91 changes: 16 additions & 75 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ const (

var (
defaultMachines = []string{"http://127.0.0.1:2379", "http://[::1]:2379"}
// maintenanceClientLock is a lock over the maintenanceClient
maintenanceClientLock sync.Mutex
// mLock is a lock over the maintenanceClient
mLock sync.Mutex
)

// watchQ to collect updates without blocking
Expand Down Expand Up @@ -1455,6 +1455,7 @@ func (et *etcdKV) RemoveMember(
nodeName string,
nodeIP string,
) error {
fn := "RemoveMember"
ctx, cancel := et.MaintenanceContextWithLeader()
memberListResponse, err := et.kvClient.MemberList(ctx)
cancel()
Expand Down Expand Up @@ -1483,19 +1484,9 @@ func (et *etcdKV) RemoveMember(
}
et.kvClient.SetEndpoints(newClientUrls...)
et.maintenanceClient.SetEndpoints(newClientUrls...)

if removeMemberID == uint64(0) {
// Member not found. No need to remove it
return nil
}
return et.removeMember(removeMemberID)
}

func (et *etcdKV) removeMember(removeMemberID uint64) error {
fn := "removeMember"
removeMemberRetries := 5
for i := 0; i < removeMemberRetries; i++ {
ctx, cancel := et.MaintenanceContextWithLeader()
ctx, cancel = et.MaintenanceContextWithLeader()
_, err := et.kvClient.MemberRemove(ctx, removeMemberID)
cancel()

Expand All @@ -1506,13 +1497,13 @@ func (et *etcdKV) removeMember(removeMemberID uint64) error {
return nil
}
// Check if we need to retry
retry, err := isRetryNeeded(err, fn, "", i)
retry, err := isRetryNeeded(err, fn, nodeName, i)
if !retry {
// For all others return immediately
return err
}
if i == (removeMemberRetries - 1) {
return fmt.Errorf("too many retries for RemoveMember: %v", removeMemberID)
return fmt.Errorf("too many retries for RemoveMember: %v %v", nodeName, nodeIP)
}
time.Sleep(2 * time.Second)
continue
Expand All @@ -1522,55 +1513,7 @@ func (et *etcdKV) removeMember(removeMemberID uint64) error {
return nil
}

func (et *etcdKV) RemoveMemberByID(
removeMemberID uint64,
) error {
ctx, cancel := et.MaintenanceContextWithLeader()
memberListResponse, err := et.kvClient.MemberList(ctx)
cancel()
if err != nil {
return err
}
var (
removeMemberClientURLs, newClientURLs []string
found bool
)

for _, member := range memberListResponse.Members {
if member.ID == removeMemberID {
found = true
removeMemberClientURLs = append(removeMemberClientURLs, member.ClientURLs...)
break
}
}
if !found {
// Member does not exist
return nil
}
currentEndpoints := et.kvClient.Endpoints()

// Remove the clientURLs for the member which is being removed from
// the active set of endpoints
for _, currentEndpoint := range currentEndpoints {
found := false
for _, removeClientURL := range removeMemberClientURLs {
if removeClientURL == currentEndpoint {
found = true
break
}
}
if !found {
newClientURLs = append(newClientURLs, currentEndpoint)
}
}

et.kvClient.SetEndpoints(newClientURLs...)
et.maintenanceClient.SetEndpoints(newClientURLs...)

return et.removeMember(removeMemberID)
}

func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) {
func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) {
var (
fnMemberStatus = func(cliURL string) (*kvdb.MemberInfo, uint64, error) {
if cliURL == "" {
Expand Down Expand Up @@ -1606,8 +1549,8 @@ func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) {
}

membersMap := make(map[uint64]*kvdb.MemberInfo)
maintenanceClientLock.Lock()
defer maintenanceClientLock.Unlock()
mLock.Lock()
defer mLock.Unlock()

// Get status from Endpoints
for _, ep := range et.GetEndpoints() {
Expand All @@ -1634,24 +1577,22 @@ func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) {
}
}

// Fill in other details in the MemberInfo object
// Fill PeerURLs; also, remap with "Name" as a key
resp := make(map[string]*kvdb.MemberInfo)
for _, member := range memberListResponse.Members {
if mi, has := membersMap[member.ID]; has {
mi.PeerUrls = member.PeerURLs
mi.Name = member.Name
mi.HasStarted = len(member.Name) > 0
resp[member.Name] = mi
} else {
// no status -- add "blank" MemberInfo
membersMap[member.ID] = &kvdb.MemberInfo{
PeerUrls: member.PeerURLs,
Name: member.Name,
HasStarted: len(member.Name) > 0,
ID: strconv.FormatUint(member.ID, 16),
resp[member.Name] = &kvdb.MemberInfo{
PeerUrls: member.PeerURLs,
ID: strconv.FormatUint(member.ID, 16),
}
}
}

return membersMap, nil
return resp, nil
}

func (et *etcdKV) Serialize() ([]byte, error) {
Expand Down
47 changes: 16 additions & 31 deletions kvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ type Kvdb interface {
Snapshot(prefixes []string, consistent bool) (Kvdb, uint64, error)
// SnapPut records the key value pair including the index.
SnapPut(kvp *KVPair) (*KVPair, error)
// LockWithID locks the specified key and associates a lockerID with it, probably to identify
// Lock specfied key and associate a lockerID with it, probably to identify
// who acquired the lock. The KVPair returned should be used to unlock.
LockWithID(key string, lockerID string) (*KVPair, error)
// Lock locks the specified key. The KVPair returned should be used to unlock.
// Lock specfied key. The KVPair returned should be used to unlock.
Lock(key string) (*KVPair, error)
// LockWithTimeout locks with specified key and associates a lockerID with it.
// Lock with specified key and associate a lockerID with it.
// lockTryDuration is the maximum time that can be spent trying to acquire
// lock, else return error.
// lockHoldDuration is the maximum time the lock can be held, after which
Expand Down Expand Up @@ -407,48 +407,33 @@ const (

// MemberInfo represents a member of the kvdb cluster
type MemberInfo struct {
// PeerUrls is this member's URL on which it talks to its peers
PeerUrls []string
// ClientUrls is this member's URL on which clients can reach this member.
PeerUrls []string
ClientUrls []string
// Leader indicates if this member is the leader of this cluster.
Leader bool
// DbSize is the current DB size as seen by this member.
DbSize int64
// IsHealthy indicates the health of the member.
IsHealthy bool
// ID is the string representation of member's ID
ID string
// Name of the member. A member which has not started has an empty Name.
Name string
// HasStarted indicates if this member has successfully started kvdb.
HasStarted bool
Leader bool
DbSize int64
IsHealthy bool
ID string
}

// Controller interface provides APIs to manage Kvdb Cluster and Kvdb Clients.
type Controller interface {
// AddMember adds a new member to an existing kvdb cluster. Add API should be
// invoked on an existing kvdb node where kvdb is already running. It should be
// followed by a Setup call on the node which is being added.
// Returns: map of nodeID to peerUrls of all members in the initial cluster or error.
// AddMember adds a new member to an existing kvdb cluster. Add should be
// called on a kvdb node where kvdb is already running. It should be
// followed by a Setup call on the actual node
// Returns: map of nodeID to peerUrls of all members in the initial cluster or error
AddMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error)

// RemoveMember removes a member based on its Name from an existing kvdb cluster.
// RemoveMember removes a member from an existing kvdb cluster
// Returns: error if it fails to remove a member
RemoveMember(nodeName, nodeIP string) error

// RemoveMemberByID removes a member based on its ID from an existing kvdb cluster.
// Returns: error if it fails to remove a member
RemoveMemberByID(memberID uint64) error

// UpdateMember updates the IP for the given node in an existing kvdb cluster
// Returns: map of nodeID to peerUrls of all members from the existing cluster
UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error)

// ListMembers enumerates the members of the kvdb cluster. It includes both the
// started and unstarted members.
// Returns: the member's ID to MemberInfo mappings for all the members
ListMembers() (map[uint64]*MemberInfo, error)
// ListMembers enumerates the members of the kvdb cluster
// Returns: the nodeID to memberInfo mappings of all the members
ListMembers() (map[string]*MemberInfo, error)

// SetEndpoints set the kvdb endpoints for the client
SetEndpoints(endpoints []string) error
Expand Down
5 changes: 1 addition & 4 deletions kvdb_controller_not_supported.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ func (c *controllerNotSupported) AddMember(nodeIP, nodePeerPort, nodeName string
func (c *controllerNotSupported) RemoveMember(nodeID string, nodeIP string) error {
return ErrNotSupported
}
func (c *controllerNotSupported) RemoveMemberByID(memberID uint64) error {
return ErrNotSupported
}

func (c *controllerNotSupported) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) {
return nil, ErrNotSupported
}

func (c *controllerNotSupported) ListMembers() (map[uint64]*MemberInfo, error) {
func (c *controllerNotSupported) ListMembers() (map[string]*MemberInfo, error) {
return nil, ErrNotSupported
}

Expand Down
4 changes: 2 additions & 2 deletions test/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func deleteTree(kv kvdb.Kvdb, t *testing.T) {
prefix := "tree"
keys := map[string]string{
prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar",
prefix + "/foo": "baz",
prefix + "/foo": "baz",
}

for key, val := range keys {
Expand Down Expand Up @@ -407,7 +407,7 @@ func enumerate(kv kvdb.Kvdb, t *testing.T) {
prefix := "enumerate"
keys := map[string]string{
prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar",
prefix + "/foo": "baz",
prefix + "/foo": "baz",
}

kv.DeleteTree(prefix)
Expand Down
Loading

0 comments on commit 36b33a4

Please sign in to comment.