From 008c0082ef8ae9b2d5f6d58a6d246cf99e7775e4 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 30 May 2024 14:19:41 -0400 Subject: [PATCH] Fix cowHostList can't have hosts with same `ConnectAddress` cowHostList uses HostInfo.Equal to confirm host uniqueness, which relies on `ConnectAddress.Equal`, which does not allow to have different hosts with same `ConnectAddress` --- host_source.go | 10 +++++----- policies.go | 15 +++++++-------- policies_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/host_source.go b/host_source.go index 8dcf371ae..68ae9ad2d 100644 --- a/host_source.go +++ b/host_source.go @@ -145,7 +145,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool { return true } - return h.ConnectAddress().Equal(host.ConnectAddress()) + return h.HostID() == host.HostID() && h.ConnectAddressAndPort() == host.ConnectAddressAndPort() } func (h *HostInfo) Peer() net.IP { @@ -435,10 +435,10 @@ func (h *HostInfo) Hostname() string { } func (h *HostInfo) ConnectAddressAndPort() string { - h.mu.Lock() - defer h.mu.Unlock() - addr, _ := h.connectAddressLocked() - return net.JoinHostPort(addr.String(), strconv.Itoa(h.port)) + h.mu.Lock() + defer h.mu.Unlock() + addr, _ := h.connectAddressLocked() + return net.JoinHostPort(addr.String(), strconv.Itoa(h.port)) } func (h *HostInfo) String() string { diff --git a/policies.go b/policies.go index 98867d2fa..17d2541ca 100644 --- a/policies.go +++ b/policies.go @@ -12,7 +12,6 @@ import ( "fmt" "math" "math/rand" - "net" "sync" "sync/atomic" "time" @@ -64,7 +63,7 @@ func (c *cowHostList) add(host *HostInfo) bool { return true } -func (c *cowHostList) remove(ip net.IP) bool { +func (c *cowHostList) remove(host *HostInfo) bool { c.mu.Lock() l := c.get() size := len(l) @@ -76,7 +75,7 @@ func (c *cowHostList) remove(ip net.IP) bool { found := false newL := make([]*HostInfo, 0, size) for i := 0; i < len(l); i++ { - if !l[i].ConnectAddress().Equal(ip) { + if !l[i].Equal(host) { newL = append(newL, l[i]) } else { found = true @@ -374,7 +373,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) { } func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) { - r.hosts.remove(host.ConnectAddress()) + r.hosts.remove(host) } func (r *roundRobinHostPolicy) HostUp(host *HostInfo) { @@ -566,7 +565,7 @@ func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) { func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { t.mu.Lock() - if t.hosts.remove(host.ConnectAddress()) { + if t.hosts.remove(host) { meta := t.getMetadataForUpdate() meta.resetTokenRing(t.partitioner, t.hosts.get(), t.logger) t.updateReplicas(meta, t.getKeyspaceName()) @@ -981,9 +980,9 @@ func (d *dcAwareRR) AddHost(host *HostInfo) { func (d *dcAwareRR) RemoveHost(host *HostInfo) { if d.IsLocal(host) { - d.localHosts.remove(host.ConnectAddress()) + d.localHosts.remove(host) } else { - d.remoteHosts.remove(host.ConnectAddress()) + d.remoteHosts.remove(host) } } @@ -1090,7 +1089,7 @@ func (d *rackAwareRR) AddHost(host *HostInfo) { func (d *rackAwareRR) RemoveHost(host *HostInfo) { dist := d.HostTier(host) - d.hosts[dist].remove(host.ConnectAddress()) + d.hosts[dist].remove(host) } func (d *rackAwareRR) HostUp(host *HostInfo) { d.AddHost(host) } diff --git a/policies_test.go b/policies_test.go index 0b0420596..ea8e47439 100644 --- a/policies_test.go +++ b/policies_test.go @@ -44,6 +44,32 @@ func TestRoundRobbin(t *testing.T) { } } +func TestRoundRobbinSameConnectAddress(t *testing.T) { + policy := RoundRobinHostPolicy() + + hosts := [...]*HostInfo{ + {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1), port: 9042}, + {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 1), port: 9043}, + } + + for _, host := range hosts { + policy.AddHost(host) + } + + got := make(map[string]bool) + it := policy.Pick(nil) + for h := it(); h != nil; h = it() { + id := h.Info().hostId + if got[id] { + t.Fatalf("got duplicate host: %v", id) + } + got[id] = true + } + if len(got) != len(hosts) { + t.Fatalf("expected %d hosts got %d", len(hosts), len(got)) + } +} + // Tests of the token-aware host selection policy implementation with a // round-robin host selection policy fallback. func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {