diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index 9b774169..4268067a 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -46,8 +46,8 @@ const ( var ( defaultMachines = []string{"http://127.0.0.1:2379", "http://[::1]:2379"} - // maintenanceClientLock is a lock over the maintenanceClient - maintenanceClientLock sync.Mutex + // mLock is a lock over the maintenanceClient + mLock sync.Mutex ) // watchQ to collect updates without blocking @@ -1455,6 +1455,7 @@ func (et *etcdKV) RemoveMember( nodeName string, nodeIP string, ) error { + fn := "RemoveMember" ctx, cancel := et.MaintenanceContextWithLeader() memberListResponse, err := et.kvClient.MemberList(ctx) cancel() @@ -1483,19 +1484,9 @@ func (et *etcdKV) RemoveMember( } et.kvClient.SetEndpoints(newClientUrls...) et.maintenanceClient.SetEndpoints(newClientUrls...) - - if removeMemberID == uint64(0) { - // Member not found. No need to remove it - return nil - } - return et.removeMember(removeMemberID) -} - -func (et *etcdKV) removeMember(removeMemberID uint64) error { - fn := "removeMember" removeMemberRetries := 5 for i := 0; i < removeMemberRetries; i++ { - ctx, cancel := et.MaintenanceContextWithLeader() + ctx, cancel = et.MaintenanceContextWithLeader() _, err := et.kvClient.MemberRemove(ctx, removeMemberID) cancel() @@ -1506,13 +1497,13 @@ func (et *etcdKV) removeMember(removeMemberID uint64) error { return nil } // Check if we need to retry - retry, err := isRetryNeeded(err, fn, "", i) + retry, err := isRetryNeeded(err, fn, nodeName, i) if !retry { // For all others return immediately return err } if i == (removeMemberRetries - 1) { - return fmt.Errorf("too many retries for RemoveMember: %v", removeMemberID) + return fmt.Errorf("too many retries for RemoveMember: %v %v", nodeName, nodeIP) } time.Sleep(2 * time.Second) continue @@ -1522,55 +1513,7 @@ func (et *etcdKV) removeMember(removeMemberID uint64) error { return nil } -func (et *etcdKV) RemoveMemberByID( - removeMemberID uint64, -) error { - ctx, cancel := et.MaintenanceContextWithLeader() - memberListResponse, err := et.kvClient.MemberList(ctx) - cancel() - if err != nil { - return err - } - var ( - removeMemberClientURLs, newClientURLs []string - found bool - ) - - for _, member := range memberListResponse.Members { - if member.ID == removeMemberID { - found = true - removeMemberClientURLs = append(removeMemberClientURLs, member.ClientURLs...) - break - } - } - if !found { - // Member does not exist - return nil - } - currentEndpoints := et.kvClient.Endpoints() - - // Remove the clientURLs for the member which is being removed from - // the active set of endpoints - for _, currentEndpoint := range currentEndpoints { - found := false - for _, removeClientURL := range removeMemberClientURLs { - if removeClientURL == currentEndpoint { - found = true - break - } - } - if !found { - newClientURLs = append(newClientURLs, currentEndpoint) - } - } - - et.kvClient.SetEndpoints(newClientURLs...) - et.maintenanceClient.SetEndpoints(newClientURLs...) - - return et.removeMember(removeMemberID) -} - -func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { +func (et *etcdKV) ListMembers() (map[string]*kvdb.MemberInfo, error) { var ( fnMemberStatus = func(cliURL string) (*kvdb.MemberInfo, uint64, error) { if cliURL == "" { @@ -1606,8 +1549,8 @@ func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { } membersMap := make(map[uint64]*kvdb.MemberInfo) - maintenanceClientLock.Lock() - defer maintenanceClientLock.Unlock() + mLock.Lock() + defer mLock.Unlock() // Get status from Endpoints for _, ep := range et.GetEndpoints() { @@ -1634,24 +1577,22 @@ func (et *etcdKV) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { } } - // Fill in other details in the MemberInfo object + // Fill PeerURLs; also, remap with "Name" as a key + resp := make(map[string]*kvdb.MemberInfo) for _, member := range memberListResponse.Members { if mi, has := membersMap[member.ID]; has { mi.PeerUrls = member.PeerURLs - mi.Name = member.Name - mi.HasStarted = len(member.Name) > 0 + resp[member.Name] = mi } else { // no status -- add "blank" MemberInfo - membersMap[member.ID] = &kvdb.MemberInfo{ - PeerUrls: member.PeerURLs, - Name: member.Name, - HasStarted: len(member.Name) > 0, - ID: strconv.FormatUint(member.ID, 16), + resp[member.Name] = &kvdb.MemberInfo{ + PeerUrls: member.PeerURLs, + ID: strconv.FormatUint(member.ID, 16), } } } - return membersMap, nil + return resp, nil } func (et *etcdKV) Serialize() ([]byte, error) { diff --git a/kvdb.go b/kvdb.go index 1eb6082d..eb062094 100644 --- a/kvdb.go +++ b/kvdb.go @@ -315,12 +315,12 @@ type Kvdb interface { Snapshot(prefixes []string, consistent bool) (Kvdb, uint64, error) // SnapPut records the key value pair including the index. SnapPut(kvp *KVPair) (*KVPair, error) - // LockWithID locks the specified key and associates a lockerID with it, probably to identify + // Lock specfied key and associate a lockerID with it, probably to identify // who acquired the lock. The KVPair returned should be used to unlock. LockWithID(key string, lockerID string) (*KVPair, error) - // Lock locks the specified key. The KVPair returned should be used to unlock. + // Lock specfied key. The KVPair returned should be used to unlock. Lock(key string) (*KVPair, error) - // LockWithTimeout locks with specified key and associates a lockerID with it. + // Lock with specified key and associate a lockerID with it. // lockTryDuration is the maximum time that can be spent trying to acquire // lock, else return error. // lockHoldDuration is the maximum time the lock can be held, after which @@ -407,48 +407,33 @@ const ( // MemberInfo represents a member of the kvdb cluster type MemberInfo struct { - // PeerUrls is this member's URL on which it talks to its peers - PeerUrls []string - // ClientUrls is this member's URL on which clients can reach this member. + PeerUrls []string ClientUrls []string - // Leader indicates if this member is the leader of this cluster. - Leader bool - // DbSize is the current DB size as seen by this member. - DbSize int64 - // IsHealthy indicates the health of the member. - IsHealthy bool - // ID is the string representation of member's ID - ID string - // Name of the member. A member which has not started has an empty Name. - Name string - // HasStarted indicates if this member has successfully started kvdb. - HasStarted bool + Leader bool + DbSize int64 + IsHealthy bool + ID string } // Controller interface provides APIs to manage Kvdb Cluster and Kvdb Clients. type Controller interface { - // AddMember adds a new member to an existing kvdb cluster. Add API should be - // invoked on an existing kvdb node where kvdb is already running. It should be - // followed by a Setup call on the node which is being added. - // Returns: map of nodeID to peerUrls of all members in the initial cluster or error. + // AddMember adds a new member to an existing kvdb cluster. Add should be + // called on a kvdb node where kvdb is already running. It should be + // followed by a Setup call on the actual node + // Returns: map of nodeID to peerUrls of all members in the initial cluster or error AddMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) - // RemoveMember removes a member based on its Name from an existing kvdb cluster. + // RemoveMember removes a member from an existing kvdb cluster // Returns: error if it fails to remove a member RemoveMember(nodeName, nodeIP string) error - // RemoveMemberByID removes a member based on its ID from an existing kvdb cluster. - // Returns: error if it fails to remove a member - RemoveMemberByID(memberID uint64) error - // UpdateMember updates the IP for the given node in an existing kvdb cluster // Returns: map of nodeID to peerUrls of all members from the existing cluster UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) - // ListMembers enumerates the members of the kvdb cluster. It includes both the - // started and unstarted members. - // Returns: the member's ID to MemberInfo mappings for all the members - ListMembers() (map[uint64]*MemberInfo, error) + // ListMembers enumerates the members of the kvdb cluster + // Returns: the nodeID to memberInfo mappings of all the members + ListMembers() (map[string]*MemberInfo, error) // SetEndpoints set the kvdb endpoints for the client SetEndpoints(endpoints []string) error diff --git a/kvdb_controller_not_supported.go b/kvdb_controller_not_supported.go index c420a9a3..7b97adcc 100644 --- a/kvdb_controller_not_supported.go +++ b/kvdb_controller_not_supported.go @@ -15,15 +15,12 @@ func (c *controllerNotSupported) AddMember(nodeIP, nodePeerPort, nodeName string func (c *controllerNotSupported) RemoveMember(nodeID string, nodeIP string) error { return ErrNotSupported } -func (c *controllerNotSupported) RemoveMemberByID(memberID uint64) error { - return ErrNotSupported -} func (c *controllerNotSupported) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) { return nil, ErrNotSupported } -func (c *controllerNotSupported) ListMembers() (map[uint64]*MemberInfo, error) { +func (c *controllerNotSupported) ListMembers() (map[string]*MemberInfo, error) { return nil, ErrNotSupported } diff --git a/test/kv.go b/test/kv.go index 6c517c25..a968a85a 100644 --- a/test/kv.go +++ b/test/kv.go @@ -370,7 +370,7 @@ func deleteTree(kv kvdb.Kvdb, t *testing.T) { prefix := "tree" keys := map[string]string{ prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar", - prefix + "/foo": "baz", + prefix + "/foo": "baz", } for key, val := range keys { @@ -407,7 +407,7 @@ func enumerate(kv kvdb.Kvdb, t *testing.T) { prefix := "enumerate" keys := map[string]string{ prefix + "/1cbc9a98-072a-4793-8608-01ab43db96c8": "bar", - prefix + "/foo": "baz", + prefix + "/foo": "baz", } kv.DeleteTree(prefix) diff --git a/test/kv_controller.go b/test/kv_controller.go index ce973ba2..6a750977 100644 --- a/test/kv_controller.go +++ b/test/kv_controller.go @@ -21,12 +21,11 @@ const ( ) var ( - names = []string{"infra0", "infra1", "infra2", "infra3", "infra4"} - clientUrls = []string{"http://127.0.0.1:20379", "http://127.0.0.2:21379", "http://127.0.0.3:22379", "http://127.0.0.4:23379", "http://127.0.0.5:24379"} - peerPorts = []string{"20380", "21380", "22380", "23380", "24380"} - dataDirs = []string{"/tmp/node0", "/tmp/node1", "/tmp/node2", "/tmp/node3", "/tmp/node4"} - cmds map[int]*exec.Cmd - firstMemberID uint64 + names = []string{"infra0", "infra1", "infra2", "infra3", "infra4"} + clientUrls = []string{"http://127.0.0.1:20379", "http://127.0.0.1:21379", "http://127.0.0.1:22379", "http://127.0.0.1:23379", "http://127.0.0.1:24379"} + peerPorts = []string{"20380", "21380", "22380", "23380", "24380"} + dataDirs = []string{"/tmp/node0", "/tmp/node1", "/tmp/node2", "/tmp/node3", "/tmp/node4"} + cmds map[int]*exec.Cmd ) // RunControllerTests is a test suite for kvdb controller APIs @@ -49,16 +48,9 @@ func RunControllerTests(datastoreInit kvdb.DatastoreInit, t *testing.T) { t.Fatalf(err.Error()) } - memberList, err := kv.ListMembers() - require.NoError(t, err, "error on ListMembers") - require.Equal(t, 1, len(memberList), "incorrect number of members") - for id := range memberList { - firstMemberID = id - break - } testAddMember(kv, t) testRemoveMember(kv, t) - testReAddAndRemoveByMemberID(kv, t) + testReAdd(kv, t) testUpdateMember(kv, t) testMemberStatus(kv, t) testDefrag(kv, t) @@ -77,47 +69,14 @@ func testAddMember(kv kvdb.Kvdb, t *testing.T) { initCluster, err := kv.AddMember(localhost, peerPorts[index], names[index]) require.NoError(t, err, "Error on AddMember") require.Equal(t, 2, len(initCluster), "Init Cluster length does not match") - - // Check for unstarted members - memberList, err := kv.ListMembers() - require.NoError(t, err, "error on ListMembers") - require.Equal(t, len(memberList), 2, "incorrect number of members") - - for memberID, m := range memberList { - if memberID != firstMemberID { - require.Equal(t, len(m.ClientUrls), 0, "Unexpected no. of client urls on unstarted member") - require.False(t, m.IsHealthy, "Unexpected health of unstarted member") - require.Empty(t, m.Name, "expected name to be empty") - require.False(t, m.HasStarted, "expected member to be unstarted") - require.Equal(t, len(m.PeerUrls), 1, "peerURLs should be set for unstarted members") - require.Equal(t, m.DbSize, int64(0), "db size should be 0") - } else { - require.Equal(t, len(m.ClientUrls), 1, "clientURLs should be set for started members") - require.True(t, m.IsHealthy, "expected member to be healthy") - require.NotEmpty(t, m.Name, "expected name") - require.True(t, m.HasStarted, "expected member to be started") - require.Equal(t, len(m.PeerUrls), 1, "peerURLs should be set for started members") - require.NotEqual(t, m.DbSize, int64(0), "db size should not be 0") - } - } - cmd, err := startEtcd(index, initCluster, "existing") require.NoError(t, err, "Error on start etcd") cmds[index] = cmd - // Check the list again after starting the second member. - memberList, err = kv.ListMembers() + // Check the list returned + list, err := kv.ListMembers() require.NoError(t, err, "Error on ListMembers") - require.Equal(t, 2, len(memberList), "List returned different length of cluster") - - for _, m := range memberList { - require.True(t, m.IsHealthy, "expected member to be healthy") - require.NotEmpty(t, m.Name, "expected name") - require.True(t, m.HasStarted, "expected member to be started") - require.Equal(t, len(m.PeerUrls), 1, "peerURLs should be set for started members") - require.Equal(t, len(m.ClientUrls), 1, "clientURLs should be set for started members") - require.NotEqual(t, m.DbSize, 0, "db size should not be 0") - } + require.Equal(t, 2, len(list), "List returned different length of cluster") } func testRemoveMember(kv kvdb.Kvdb, t *testing.T) { @@ -136,21 +95,12 @@ func testRemoveMember(kv kvdb.Kvdb, t *testing.T) { require.NoError(t, err, "Error on ListMembers") require.Equal(t, 3, len(list), "List returned different length of cluster") - // Before removing all endpoints should be set - require.Equal(t, len(clientUrls), len(kv.GetEndpoints()), "unexpected endpoints") - // Remove node 1 index = 1 controllerLog("Removing node 1") err = kv.RemoveMember(names[index], localhost) require.NoError(t, err, "Error on RemoveMember") - // Only 2 endpoints should be set and the third one should have been removed - require.Equal(t, 2, len(kv.GetEndpoints()), "unexpected endpoints") - for _, actualEndpoint := range kv.GetEndpoints() { - require.NotEqual(t, actualEndpoint, clientUrls[index], "removed member should not be present") - } - cmd, _ = cmds[index] cmd.Process.Kill() delete(cmds, index) @@ -166,65 +116,24 @@ func testRemoveMember(kv kvdb.Kvdb, t *testing.T) { require.NoError(t, err, "Error on RemoveMember") } -func testReAddAndRemoveByMemberID(kv kvdb.Kvdb, t *testing.T) { - controllerLog("testReAddAndRemoveByMemberID") - +func testReAdd(kv kvdb.Kvdb, t *testing.T) { + controllerLog("testReAdd") // Add node 1 back - node1Index := 1 + index := 1 controllerLog("Re-adding node 1") // For re-adding we need to delete the data-dir of this member - os.RemoveAll(dataDirs[node1Index]) - initCluster, err := kv.AddMember(localhost, peerPorts[node1Index], names[node1Index]) + os.RemoveAll(dataDirs[index]) + initCluster, err := kv.AddMember(localhost, peerPorts[index], names[index]) require.NoError(t, err, "Error on AddMember") require.Equal(t, 3, len(initCluster), "Init Cluster length does not match") - cmd, err := startEtcd(node1Index, initCluster, "existing") + cmd, err := startEtcd(index, initCluster, "existing") require.NoError(t, err, "Error on start etcd") - cmds[node1Index] = cmd + cmds[index] = cmd // Check the list returned list, err := kv.ListMembers() require.NoError(t, err, "Error on ListMembers") require.Equal(t, 3, len(list), "List returned different length of cluster") - - // Remove node 1 - var removeMemberID uint64 - for memberID, member := range list { - if member.Name == names[node1Index] { - removeMemberID = memberID - break - } - } - require.NotEqual(t, removeMemberID, 0, "unexpected memberID") - - // Remove on non-existent member should succeed - err = kv.RemoveMemberByID(12345) - require.NoError(t, err, "unexpected error on removing a non-existent member") - - err = kv.RemoveMemberByID(removeMemberID) - require.NoError(t, err, "unexpected error on remove") - - // Only 2 endpoints should be set and the third one should have been removed - require.Equal(t, 2, len(kv.GetEndpoints()), "unexpected endpoints") - for _, actualEndpoint := range kv.GetEndpoints() { - require.NotEqual(t, actualEndpoint, clientUrls[node1Index], "removed member should not be present") - } - - // Kill the old etcd process - cmd, _ = cmds[node1Index] - cmd.Process.Kill() - delete(cmds, node1Index) - - // Add node 1 back - controllerLog("Re-adding node 1 again") - // For re-adding we need to delete the data-dir of this member - os.RemoveAll(dataDirs[node1Index]) - initCluster, err = kv.AddMember(localhost, peerPorts[node1Index], names[node1Index]) - require.NoError(t, err, "Error on AddMember") - require.Equal(t, 3, len(initCluster), "Init Cluster length does not match") - cmd, err = startEtcd(node1Index, initCluster, "existing") - require.NoError(t, err, "Error on start etcd") - cmds[node1Index] = cmd - } func testUpdateMember(kv kvdb.Kvdb, t *testing.T) { @@ -313,24 +222,30 @@ func testMemberStatus(kv kvdb.Kvdb, t *testing.T) { var wg sync.WaitGroup wg.Add(numOfGoroutines) - checkMembers := func(index string, wait int) { + checkMembers := func(id string, wait int) { defer wg.Done() // Add a sleep so that all go routines run just around the same time time.Sleep(time.Duration(wait) * time.Second) - controllerLog("Listing Members for goroutine no. " + index) + controllerLog("Listing Members for goroutine no. " + id) list, err := kv.ListMembers() - require.NoError(t, err, "%v: Error on ListMembers", index) - require.Equal(t, 5, len(list), "%v: List returned different length of cluster", index) - - for _, m := range list { - if m.Name == names[stoppedIndex] || m.Name == names[stoppedIndex2] { - require.Equal(t, len(m.ClientUrls), 0, "%v: Unexpected no. of client urls on down member", index) - require.False(t, m.IsHealthy, "%v: Unexpected health of down member", index) - } else { - require.True(t, m.IsHealthy, "%v: Expected member %v to be healthy", index, m.Name) + require.NoError(t, err, "%v: Error on ListMembers", id) + require.Equal(t, 5, len(list), "%v: List returned different length of cluster", id) + + downMember, ok := list[names[stoppedIndex]] + require.True(t, ok, "%v: Could not find down member", id) + require.Equal(t, len(downMember.ClientUrls), 0, "%v: Unexpected no. of client urls on down member", id) + require.False(t, downMember.IsHealthy, "%v: Unexpected health of down member", id) + + for name, m := range list { + if name == names[stoppedIndex] { + continue + } + if name == names[stoppedIndex2] { + continue } + require.True(t, m.IsHealthy, "%v: Expected member %v to be healthy", id, name) } - fmt.Println("checkMembers done for ", index) + fmt.Println("checkMembers done for ", id) } for i := 0; i < numOfGoroutines; i++ { go checkMembers(strconv.Itoa(i), numOfGoroutines-1) diff --git a/wrappers/kv_log.go b/wrappers/kv_log.go index 4fcb1ca9..3c8938e0 100644 --- a/wrappers/kv_log.go +++ b/wrappers/kv_log.go @@ -431,7 +431,7 @@ func (k *logKvWrapper) SetLockHoldDuration(timeout time.Duration) { k.wrappedKvdb.SetLockHoldDuration(timeout) } -func (k *logKvWrapper) GetLockTryDuration() time.Duration { +func (k *logKvWrapper) GetLockTryDuration() time.Duration{ return k.wrappedKvdb.GetLockTryDuration() } @@ -466,15 +466,6 @@ func (k *logKvWrapper) RemoveMember(nodeName, nodeIP string) error { return err } -func (k *logKvWrapper) RemoveMemberByID(removeMemberID uint64) error { - err := k.wrappedKvdb.RemoveMemberByID(removeMemberID) - k.logger.WithFields(logrus.Fields{ - opType: "RemoveMemberByID", - errString: err, - }).Info() - return err -} - func (k *logKvWrapper) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) { members, err := k.wrappedKvdb.UpdateMember(nodeIP, nodePeerPort, nodeName) k.logger.WithFields(logrus.Fields{ @@ -485,7 +476,7 @@ func (k *logKvWrapper) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[ return members, err } -func (k *logKvWrapper) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { +func (k *logKvWrapper) ListMembers() (map[string]*kvdb.MemberInfo, error) { members, err := k.wrappedKvdb.ListMembers() k.logger.WithFields(logrus.Fields{ opType: "ListMembers", diff --git a/wrappers/kv_no_quorum.go b/wrappers/kv_no_quorum.go index 9e67b402..55ed1b9d 100644 --- a/wrappers/kv_no_quorum.go +++ b/wrappers/kv_no_quorum.go @@ -13,6 +13,7 @@ type noKvdbQuorumWrapper struct { randGen *rand.Rand } + // NewNoKvdbQuorumWrapper constructs a new kvdb.Kvdb. func NewNoKvdbQuorumWrapper( kv kvdb.Kvdb, @@ -77,7 +78,7 @@ func (k *noKvdbQuorumWrapper) Update( return nil, kvdb.ErrNoQuorum } -func (k *noKvdbQuorumWrapper) GetLockTryDuration() time.Duration { +func (k *noKvdbQuorumWrapper) GetLockTryDuration() time.Duration{ return k.wrappedKvdb.GetLockTryDuration() } @@ -255,15 +256,11 @@ func (k *noKvdbQuorumWrapper) RemoveMember(nodeName, nodeIP string) error { return kvdb.ErrNoQuorum } -func (k *noKvdbQuorumWrapper) RemoveMemberByID(removeMemberID uint64) error { - return kvdb.ErrNoQuorum -} - func (k *noKvdbQuorumWrapper) UpdateMember(nodeIP, nodePeerPort, nodeName string) (map[string][]string, error) { return k.wrappedKvdb.UpdateMember(nodeIP, nodePeerPort, nodeName) } -func (k *noKvdbQuorumWrapper) ListMembers() (map[uint64]*kvdb.MemberInfo, error) { +func (k *noKvdbQuorumWrapper) ListMembers() (map[string]*kvdb.MemberInfo, error) { return k.wrappedKvdb.ListMembers() }