From bb8496eb697a7ab4c4161f86b396805e90803e2c Mon Sep 17 00:00:00 2001 From: Aditya Dani Date: Mon, 10 Apr 2023 15:34:13 -0700 Subject: [PATCH] PWX-28113: Adding the ListMembers & RemoveMemberByIP changes back (#115) Revert "Revert "PWX-28113: Fix ListMembers API and add a new RemoveMemberByID API. (#112)" (#113)" This reverts commit 36b33a431eec1e7cfecbc41b95bb1532633953b0. Signed-off-by: Aditya Dani --- etcd/v3/kv_etcd.go | 91 ++++++++++++++---- kvdb.go | 47 ++++++---- kvdb_controller_not_supported.go | 5 +- test/kv.go | 4 +- test/kv_controller.go | 153 ++++++++++++++++++++++++------- wrappers/kv_log.go | 13 ++- wrappers/kv_no_quorum.go | 9 +- 7 files changed, 248 insertions(+), 74 deletions(-) diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index cf80b665..ab2b80a3 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -46,8 +46,8 @@ const ( var ( defaultMachines = []string{"http://127.0.0.1:2379", "http://[::1]:2379"} - // mLock is a lock over the maintenanceClient - mLock sync.Mutex + // maintenanceClientLock is a lock over the maintenanceClient + maintenanceClientLock sync.Mutex ) // watchQ to collect updates without blocking @@ -1455,7 +1455,6 @@ func (et *etcdKV) RemoveMember( nodeName string, nodeIP string, ) error { - fn := "RemoveMember" ctx, cancel := et.MaintenanceContextWithLeader() memberListResponse, err := et.kvClient.MemberList(ctx) cancel() @@ -1484,9 +1483,19 @@ func (et *etcdKV) RemoveMember( } et.kvClient.SetEndpoints(newClientUrls...) et.maintenanceClient.SetEndpoints(newClientUrls...) + + if removeMemberID == uint64(0) { + // Member not found. No need to remove it + return nil + } + return et.removeMember(removeMemberID) +} + +func (et *etcdKV) removeMember(removeMemberID uint64) error { + fn := "removeMember" removeMemberRetries := 5 for i := 0; i < removeMemberRetries; i++ { - ctx, cancel = et.MaintenanceContextWithLeader() + ctx, cancel := et.MaintenanceContextWithLeader() _, err := et.kvClient.MemberRemove(ctx, removeMemberID) cancel() @@ -1497,13 +1506,13 @@ func (et *etcdKV) RemoveMember( return nil } // Check if we need to retry - retry, err := isRetryNeeded(err, fn, nodeName, i) + retry, err := isRetryNeeded(err, fn, "", i) if !retry { // For all others return immediately return err } if i == (removeMemberRetries - 1) { - return fmt.Errorf("too many retries for RemoveMember: %v %v", nodeName, nodeIP) + return fmt.Errorf("too many retries for RemoveMember: %v", removeMemberID) } time.Sleep(2 * time.Second) continue @@ -1513,7 +1522,55 @@ func (et *etcdKV) RemoveMember( return nil } -func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) { +func (et *etcdKV) RemoveMemberByID( + removeMemberID uint64, +) error { + ctx, cancel := et.MaintenanceContextWithLeader() + memberListResponse, err := et.kvClient.MemberList(ctx) + cancel() + if err != nil { + return err + } + var ( + removeMemberClientURLs, newClientURLs []string + found bool + ) + + for _, member := range memberListResponse.Members { + if member.ID == removeMemberID { + found = true + removeMemberClientURLs = append(removeMemberClientURLs, member.ClientURLs...) + break + } + } + if !found { + // Member does not exist + return nil + } + currentEndpoints := et.kvClient.Endpoints() + + // Remove the clientURLs for the member which is being removed from + // the active set of endpoints + for _, currentEndpoint := range currentEndpoints { + found := false + for _, removeClientURL := range removeMemberClientURLs { + if removeClientURL == currentEndpoint { + found = true + break + } + } + if !found { + newClientURLs = append(newClientURLs, currentEndpoint) + } + } + + et.kvClient.SetEndpoints(newClientURLs...) + et.maintenanceClient.SetEndpoints(newClientURLs...) + + return et.removeMember(removeMemberID) +} + +func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { var ( fnMemberStatus = func(cliURL string) (*kvdb.MemberInfo, uint64, error) { if cliURL == "" { @@ -1549,8 +1606,8 @@ func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) { } membersMap := make(map[uint64]*kvdb.MemberInfo) - mLock.Lock() - defer mLock.Unlock() + maintenanceClientLock.Lock() + defer maintenanceClientLock.Unlock() // Get status from Endpoints for _, ep := range et.GetEndpoints() { @@ -1577,22 +1634,24 @@ func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) { } } - // Fill PeerURLs; also, remap with "Name" as a key - resp := make(map[string]*kvdb.MemberInfo) + // Fill in other details in the MemberInfo object for _, member := range memberListResponse.Members { if mi, has := membersMap[member.ID]; has { mi.PeerUrls = member.PeerURLs - resp[member.Name] = mi + mi.Name = member.Name + mi.HasStarted = len(member.Name) > 0 } else { // no status -- add "blank" MemberInfo - resp[member.Name] = &kvdb.MemberInfo{ - PeerUrls: member.PeerURLs, - ID: strconv.FormatUint(member.ID, 16), + membersMap[member.ID] = &kvdb.MemberInfo{ + PeerUrls: member.PeerURLs, + Name: member.Name, + HasStarted: len(member.Name) > 0, + ID: strconv.FormatUint(member.ID, 16), } } } - return resp, nil + return membersMap, nil } func (et *etcdKV) Serialize() ([]byte, error) { diff --git a/kvdb.go b/kvdb.go index eb062094..1eb6082d 100644 --- a/kvdb.go +++ b/kvdb.go @@ -315,12 +315,12 @@ type Kvdb interface { Snapshot(prefixes []string, consistent bool) (Kvdb, uint64, error) // SnapPut records the key value pair including the index. SnapPut(kvp *KVPair) (*KVPair, error) - // Lock specfied key and associate a lockerID with it, probably to identify + // LockWithID locks the specified key and associates a lockerID with it, probably to identify // who acquired the lock. The KVPair returned should be used to unlock. LockWithID(key string, lockerID string) (*KVPair, error) - // Lock specfied key. The KVPair returned should be used to unlock. + // Lock locks the specified key. The KVPair returned should be used to unlock. Lock(key string) (*KVPair, error) - // Lock with specified key and associate a lockerID with it. + // LockWithTimeout locks with specified key and associates a lockerID with it. // lockTryDuration is the maximum time that can be spent trying to acquire // lock, else return error. // lockHoldDuration is the maximum time the lock can be held, after which @@ -407,33 +407,48 @@ const ( // MemberInfo represents a member of the kvdb cluster type MemberInfo struct { - PeerUrls []string + // PeerUrls is this member's URL on which it talks to its peers + PeerUrls []string + // ClientUrls is this member's URL on which clients can reach this member. ClientUrls []string - Leader bool - DbSize int64 - IsHealthy bool - ID string + // Leader indicates if this member is the leader of this cluster. + Leader bool + // DbSize is the current DB size as seen by this member. + DbSize int64 + // IsHealthy indicates the health of the member. + IsHealthy bool + // ID is the string representation of member's ID + ID string + // Name of the member. A member which has not started has an empty Name. + Name string + // HasStarted indicates if this member has successfully started kvdb. + HasStarted bool } // Controller interface provides APIs to manage Kvdb Cluster and Kvdb Clients. type Controller interface { - // AddMember adds a new member to an existing kvdb cluster. Add should be - // called on a kvdb node where kvdb is already running. It should be - // followed by a Setup call on the actual node - // Returns: map of nodeID to peerUrls of all members in the initial cluster or error + // AddMember adds a new member to an existing kvdb cluster. Add API should be + // invoked on an existing kvdb node where kvdb is already running. It should be + // followed by a Setup call on the node which is being added. + // Returns: map of nodeID to peerUrls of all members in the initial cluster or error. AddMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) - // RemoveMember removes a member from an existing kvdb cluster + // RemoveMember removes a member based on its Name from an existing kvdb cluster. // Returns: error if it fails to remove a member RemoveMember(nodeName, nodeIP string) error + // RemoveMemberByID removes a member based on its ID from an existing kvdb cluster. + // Returns: error if it fails to remove a member + RemoveMemberByID(memberID uint64) error + // UpdateMember updates the IP for the given node in an existing kvdb cluster // Returns: map of nodeID to peerUrls of all members from the existing cluster UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) - // ListMembers enumerates the members of the kvdb cluster - // Returns: the nodeID to memberInfo mappings of all the members - ListMembers() (map[string]*MemberInfo, error) + // ListMembers enumerates the members of the kvdb cluster. It includes both the + // started and unstarted members. + // Returns: the member's ID to MemberInfo mappings for all the members + ListMembers() (map[uint64]*MemberInfo, error) // SetEndpoints set the kvdb endpoints for the client SetEndpoints(endpoints []string) error diff --git a/kvdb_controller_not_supported.go b/kvdb_controller_not_supported.go index 7b97adcc..c420a9a3 100644 --- a/kvdb_controller_not_supported.go +++ b/kvdb_controller_not_supported.go @@ -15,12 +15,15 @@ func (c *controllerNotSupported) AddMember(nodeIP, nodePeerPort, nodeName string func (c *controllerNotSupported) RemoveMember(nodeID string, nodeIP string) error { return ErrNotSupported } +func (c *controllerNotSupported) RemoveMemberByID(memberID uint64) error { + return ErrNotSupported +} func (c *controllerNotSupported) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) { return nil, ErrNotSupported } -func (c *controllerNotSupported) ListMembers() (map[string]*MemberInfo, error) { +func (c *controllerNotSupported) ListMembers() (map[uint64]*MemberInfo, error) { return nil, ErrNotSupported } diff --git a/test/kv.go b/test/kv.go index a968a85a..6c517c25 100644 --- a/test/kv.go +++ b/test/kv.go @@ -370,7 +370,7 @@ func deleteTree(kv kvdb.Kvdb, t *testing.T) { prefix := "tree" keys := map[string]string{ prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar", - prefix + "/foo": "baz", + prefix + "/foo": "baz", } for key, val := range keys { @@ -407,7 +407,7 @@ func enumerate(kv kvdb.Kvdb, t *testing.T) { prefix := "enumerate" keys := map[string]string{ prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar", - prefix + "/foo": "baz", + prefix + "/foo": "baz", } kv.DeleteTree(prefix) diff --git a/test/kv_controller.go b/test/kv_controller.go index 6a750977..ce973ba2 100644 --- a/test/kv_controller.go +++ b/test/kv_controller.go @@ -21,11 +21,12 @@ const ( ) var ( - names = []string{"infra0", "infra1", "infra2", "infra3", "infra4"} - clientUrls = []string{"http://127.0.0.1:20379", "http://127.0.0.1:21379", "http://127.0.0.1:22379", "http://127.0.0.1:23379", "http://127.0.0.1:24379"} - peerPorts = []string{"20380", "21380", "22380", "23380", "24380"} - dataDirs = []string{"/tmp/node0", "/tmp/node1", "/tmp/node2", "/tmp/node3", "/tmp/node4"} - cmds map[int]*exec.Cmd + names = []string{"infra0", "infra1", "infra2", "infra3", "infra4"} + clientUrls = []string{"http://127.0.0.1:20379", "http://127.0.0.2:21379", "http://127.0.0.3:22379", "http://127.0.0.4:23379", "http://127.0.0.5:24379"} + peerPorts = []string{"20380", "21380", "22380", "23380", "24380"} + dataDirs = []string{"/tmp/node0", "/tmp/node1", "/tmp/node2", "/tmp/node3", "/tmp/node4"} + cmds map[int]*exec.Cmd + firstMemberID uint64 ) // RunControllerTests is a test suite for kvdb controller APIs @@ -48,9 +49,16 @@ func RunControllerTests(datastoreInit kvdb.DatastoreInit, t *testing.T) { t.Fatalf(err.Error()) } + memberList, err := kv.ListMembers() + require.NoError(t, err, "error on ListMembers") + require.Equal(t, 1, len(memberList), "incorrect number of members") + for id := range memberList { + firstMemberID = id + break + } testAddMember(kv, t) testRemoveMember(kv, t) - testReAdd(kv, t) + testReAddAndRemoveByMemberID(kv, t) testUpdateMember(kv, t) testMemberStatus(kv, t) testDefrag(kv, t) @@ -69,14 +77,47 @@ func testAddMember(kv kvdb.Kvdb, t *testing.T) { initCluster, err := kv.AddMember(localhost, peerPorts[index], names[index]) require.NoError(t, err, "Error on AddMember") require.Equal(t, 2, len(initCluster), "Init Cluster length does not match") + + // Check for unstarted members + memberList, err := kv.ListMembers() + require.NoError(t, err, "error on ListMembers") + require.Equal(t, len(memberList), 2, "incorrect number of members") + + for memberID, m := range memberList { + if memberID != firstMemberID { + require.Equal(t, len(m.ClientUrls), 0, "Unexpected no. of client urls on unstarted member") + require.False(t, m.IsHealthy, "Unexpected health of unstarted member") + require.Empty(t, m.Name, "expected name to be empty") + require.False(t, m.HasStarted, "expected member to be unstarted") + require.Equal(t, len(m.PeerUrls), 1, "peerURLs should be set for unstarted members") + require.Equal(t, m.DbSize, int64(0), "db size should be 0") + } else { + require.Equal(t, len(m.ClientUrls), 1, "clientURLs should be set for started members") + require.True(t, m.IsHealthy, "expected member to be healthy") + require.NotEmpty(t, m.Name, "expected name") + require.True(t, m.HasStarted, "expected member to be started") + require.Equal(t, len(m.PeerUrls), 1, "peerURLs should be set for started members") + require.NotEqual(t, m.DbSize, int64(0), "db size should not be 0") + } + } + cmd, err := startEtcd(index, initCluster, "existing") require.NoError(t, err, "Error on start etcd") cmds[index] = cmd - // Check the list returned - list, err := kv.ListMembers() + // Check the list again after starting the second member. + memberList, err = kv.ListMembers() require.NoError(t, err, "Error on ListMembers") - require.Equal(t, 2, len(list), "List returned different length of cluster") + require.Equal(t, 2, len(memberList), "List returned different length of cluster") + + for _, m := range memberList { + require.True(t, m.IsHealthy, "expected member to be healthy") + require.NotEmpty(t, m.Name, "expected name") + require.True(t, m.HasStarted, "expected member to be started") + require.Equal(t, len(m.PeerUrls), 1, "peerURLs should be set for started members") + require.Equal(t, len(m.ClientUrls), 1, "clientURLs should be set for started members") + require.NotEqual(t, m.DbSize, 0, "db size should not be 0") + } } func testRemoveMember(kv kvdb.Kvdb, t *testing.T) { @@ -95,12 +136,21 @@ func testRemoveMember(kv kvdb.Kvdb, t *testing.T) { require.NoError(t, err, "Error on ListMembers") require.Equal(t, 3, len(list), "List returned different length of cluster") + // Before removing all endpoints should be set + require.Equal(t, len(clientUrls), len(kv.GetEndpoints()), "unexpected endpoints") + // Remove node 1 index = 1 controllerLog("Removing node 1") err = kv.RemoveMember(names[index], localhost) require.NoError(t, err, "Error on RemoveMember") + // Only 2 endpoints should be set and the third one should have been removed + require.Equal(t, 2, len(kv.GetEndpoints()), "unexpected endpoints") + for _, actualEndpoint := range kv.GetEndpoints() { + require.NotEqual(t, actualEndpoint, clientUrls[index], "removed member should not be present") + } + cmd, _ = cmds[index] cmd.Process.Kill() delete(cmds, index) @@ -116,24 +166,65 @@ func testRemoveMember(kv kvdb.Kvdb, t *testing.T) { require.NoError(t, err, "Error on RemoveMember") } -func testReAdd(kv kvdb.Kvdb, t *testing.T) { - controllerLog("testReAdd") +func testReAddAndRemoveByMemberID(kv kvdb.Kvdb, t *testing.T) { + controllerLog("testReAddAndRemoveByMemberID") + // Add node 1 back - index := 1 + node1Index := 1 controllerLog("Re-adding node 1") // For re-adding we need to delete the data-dir of this member - os.RemoveAll(dataDirs[index]) - initCluster, err := kv.AddMember(localhost, peerPorts[index], names[index]) + os.RemoveAll(dataDirs[node1Index]) + initCluster, err := kv.AddMember(localhost, peerPorts[node1Index], names[node1Index]) require.NoError(t, err, "Error on AddMember") require.Equal(t, 3, len(initCluster), "Init Cluster length does not match") - cmd, err := startEtcd(index, initCluster, "existing") + cmd, err := startEtcd(node1Index, initCluster, "existing") require.NoError(t, err, "Error on start etcd") - cmds[index] = cmd + cmds[node1Index] = cmd // Check the list returned list, err := kv.ListMembers() require.NoError(t, err, "Error on ListMembers") require.Equal(t, 3, len(list), "List returned different length of cluster") + + // Remove node 1 + var removeMemberID uint64 + for memberID, member := range list { + if member.Name == names[node1Index] { + removeMemberID = memberID + break + } + } + require.NotEqual(t, removeMemberID, 0, "unexpected memberID") + + // Remove on non-existent member should succeed + err = kv.RemoveMemberByID(12345) + require.NoError(t, err, "unexpected error on removing a non-existent member") + + err = kv.RemoveMemberByID(removeMemberID) + require.NoError(t, err, "unexpected error on remove") + + // Only 2 endpoints should be set and the third one should have been removed + require.Equal(t, 2, len(kv.GetEndpoints()), "unexpected endpoints") + for _, actualEndpoint := range kv.GetEndpoints() { + require.NotEqual(t, actualEndpoint, clientUrls[node1Index], "removed member should not be present") + } + + // Kill the old etcd process + cmd, _ = cmds[node1Index] + cmd.Process.Kill() + delete(cmds, node1Index) + + // Add node 1 back + controllerLog("Re-adding node 1 again") + // For re-adding we need to delete the data-dir of this member + os.RemoveAll(dataDirs[node1Index]) + initCluster, err = kv.AddMember(localhost, peerPorts[node1Index], names[node1Index]) + require.NoError(t, err, "Error on AddMember") + require.Equal(t, 3, len(initCluster), "Init Cluster length does not match") + cmd, err = startEtcd(node1Index, initCluster, "existing") + require.NoError(t, err, "Error on start etcd") + cmds[node1Index] = cmd + } func testUpdateMember(kv kvdb.Kvdb, t *testing.T) { @@ -222,30 +313,24 @@ func testMemberStatus(kv kvdb.Kvdb, t *testing.T) { var wg sync.WaitGroup wg.Add(numOfGoroutines) - checkMembers := func(id string, wait int) { + checkMembers := func(index string, wait int) { defer wg.Done() // Add a sleep so that all go routines run just around the same time time.Sleep(time.Duration(wait) * time.Second) - controllerLog("Listing Members for goroutine no. " + id) + controllerLog("Listing Members for goroutine no. " + index) list, err := kv.ListMembers() - require.NoError(t, err, "%v: Error on ListMembers", id) - require.Equal(t, 5, len(list), "%v: List returned different length of cluster", id) - - downMember, ok := list[names[stoppedIndex]] - require.True(t, ok, "%v: Could not find down member", id) - require.Equal(t, len(downMember.ClientUrls), 0, "%v: Unexpected no. of client urls on down member", id) - require.False(t, downMember.IsHealthy, "%v: Unexpected health of down member", id) - - for name, m := range list { - if name == names[stoppedIndex] { - continue - } - if name == names[stoppedIndex2] { - continue + require.NoError(t, err, "%v: Error on ListMembers", index) + require.Equal(t, 5, len(list), "%v: List returned different length of cluster", index) + + for _, m := range list { + if m.Name == names[stoppedIndex] || m.Name == names[stoppedIndex2] { + require.Equal(t, len(m.ClientUrls), 0, "%v: Unexpected no. of client urls on down member", index) + require.False(t, m.IsHealthy, "%v: Unexpected health of down member", index) + } else { + require.True(t, m.IsHealthy, "%v: Expected member %v to be healthy", index, m.Name) } - require.True(t, m.IsHealthy, "%v: Expected member %v to be healthy", id, name) } - fmt.Println("checkMembers done for ", id) + fmt.Println("checkMembers done for ", index) } for i := 0; i < numOfGoroutines; i++ { go checkMembers(strconv.Itoa(i), numOfGoroutines-1) diff --git a/wrappers/kv_log.go b/wrappers/kv_log.go index 3c8938e0..4fcb1ca9 100644 --- a/wrappers/kv_log.go +++ b/wrappers/kv_log.go @@ -431,7 +431,7 @@ func (k *logKvWrapper) SetLockHoldDuration(timeout time.Duration) { k.wrappedKvdb.SetLockHoldDuration(timeout) } -func (k *logKvWrapper) GetLockTryDuration() time.Duration{ +func (k *logKvWrapper) GetLockTryDuration() time.Duration { return k.wrappedKvdb.GetLockTryDuration() } @@ -466,6 +466,15 @@ func (k *logKvWrapper) RemoveMember(nodeName, nodeIP string) error { return err } +func (k *logKvWrapper) RemoveMemberByID(removeMemberID uint64) error { + err := k.wrappedKvdb.RemoveMemberByID(removeMemberID) + k.logger.WithFields(logrus.Fields{ + opType: "RemoveMemberByID", + errString: err, + }).Info() + return err +} + func (k *logKvWrapper) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) { members, err := k.wrappedKvdb.UpdateMember(nodeIP, nodePeerPort, nodeName) k.logger.WithFields(logrus.Fields{ @@ -476,7 +485,7 @@ func (k *logKvWrapper) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[ return members, err } -func (k *logKvWrapper) ListMembers() (map[string]*kvdb.MemberInfo, error) { +func (k *logKvWrapper) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { members, err := k.wrappedKvdb.ListMembers() k.logger.WithFields(logrus.Fields{ opType: "ListMembers", diff --git a/wrappers/kv_no_quorum.go b/wrappers/kv_no_quorum.go index 55ed1b9d..9e67b402 100644 --- a/wrappers/kv_no_quorum.go +++ b/wrappers/kv_no_quorum.go @@ -13,7 +13,6 @@ type noKvdbQuorumWrapper struct { randGen *rand.Rand } - // NewNoKvdbQuorumWrapper constructs a new kvdb.Kvdb. func NewNoKvdbQuorumWrapper( kv kvdb.Kvdb, @@ -78,7 +77,7 @@ func (k *noKvdbQuorumWrapper) Update( return nil, kvdb.ErrNoQuorum } -func (k *noKvdbQuorumWrapper) GetLockTryDuration() time.Duration{ +func (k *noKvdbQuorumWrapper) GetLockTryDuration() time.Duration { return k.wrappedKvdb.GetLockTryDuration() } @@ -256,11 +255,15 @@ func (k *noKvdbQuorumWrapper) RemoveMember(nodeName, nodeIP string) error { return kvdb.ErrNoQuorum } +func (k *noKvdbQuorumWrapper) RemoveMemberByID(removeMemberID uint64) error { + return kvdb.ErrNoQuorum +} + func (k *noKvdbQuorumWrapper) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) { return k.wrappedKvdb.UpdateMember(nodeIP, nodePeerPort, nodeName) } -func (k *noKvdbQuorumWrapper) ListMembers() (map[string]*kvdb.MemberInfo, error) { +func (k *noKvdbQuorumWrapper) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { return k.wrappedKvdb.ListMembers() }