Skip to content

Commit

Permalink
Merge pull request #373 from sylwiaszunejko/merge-rings
Browse files Browse the repository at this point in the history
Merge ring and ringDescriber
  • Loading branch information
dkropachev authored Dec 13, 2024
2 parents 0ca791b + b128b35 commit e12494d
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 239 deletions.
6 changes: 3 additions & 3 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func TestReconnection(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

h := session.ring.allHosts()[0]
h := session.hostSource.getHostsList()[0]
session.handleNodeDown(h.ConnectAddress(), h.Port())

if h.State() != NodeDown {
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func TestPrepare_PreparedCacheEviction(t *testing.T) {
}

// Walk through all the configured hosts and test cache retention and eviction
for _, host := range session.ring.hosts {
for _, host := range session.hostSource.hosts {
_, ok := session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host.HostID(), session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 0"))
if ok {
t.Errorf("expected first select to be purged but was in cache for host=%q", host)
Expand Down Expand Up @@ -2275,7 +2275,7 @@ func TestTokenAwareConnPool(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

expectedPoolSize := cluster.NumConns * len(session.ring.allHosts())
expectedPoolSize := cluster.NumConns * len(session.hostSource.getHostsList())

// wait for pool to fill
for i := 0; i < 50; i++ {
Expand Down
4 changes: 2 additions & 2 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (c *controlConn) setupConn(conn *Conn) error {
return err
}

host = c.session.ring.addOrUpdate(host)
host = c.session.hostSource.addOrUpdate(host)

if c.session.cfg.filterHost(host) {
return fmt.Errorf("host was filtered: %v", host.ConnectAddress())
Expand Down Expand Up @@ -385,7 +385,7 @@ func (c *controlConn) reconnect() {
}

func (c *controlConn) attemptReconnect() (*Conn, error) {
hosts := c.session.ring.allHosts()
hosts := c.session.hostSource.getHostsList()
hosts = shuffleHosts(hosts)

// keep the old behavior of connecting to the old host first by moving it to
Expand Down
4 changes: 2 additions & 2 deletions control_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}()
assertNodeDown := func() error {
hosts := session.ring.currentHosts()
hosts := session.hostSource.getHostsMap()
if len(hosts) != 1 {
return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts))
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}
assertNodeUp := func() error {
hosts := session.ring.currentHosts()
hosts := session.hostSource.getHostsMap()
if len(hosts) != len(allCcmHosts) {
return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts))
}
Expand Down
4 changes: 2 additions & 2 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) {
s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort)
}

host, ok := s.ring.getHostByIP(eventIp.String())
host, ok := s.hostSource.getHostByIP(eventIp.String())
if !ok {
s.debounceRingRefresh()
return
Expand Down Expand Up @@ -242,7 +242,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {
s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port)
}

host, ok := s.ring.getHostByIP(ip.String())
host, ok := s.hostSource.getHostByIP(ip.String())
if ok {
host.setState(NodeDown)
if s.cfg.filterHost(host) {
Expand Down
6 changes: 3 additions & 3 deletions events_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestEventNodeDownControl(t *testing.T) {
}
session.pool.mu.RUnlock()

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if host.IsUp() {
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestEventNodeDown(t *testing.T) {
t.Fatal("node not removed after remove event")
}

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if host.IsUp() {
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestEventNodeUp(t *testing.T) {
t.Fatal("node not added after node added event")
}

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if !host.IsUp() {
Expand Down
2 changes: 1 addition & 1 deletion export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ var TestLogger = &testLogger{}
var WaitUntilPoolsStopFilling = waitUntilPoolsStopFilling

func GetRingAllHosts(sess *Session) []*HostInfo {
return sess.ring.allHosts()
return sess.hostSource.getHostsList()
}
10 changes: 4 additions & 6 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,18 +675,18 @@ func (s *Session) refreshRingNow() error {
}

func (s *Session) refreshRing() error {
hosts, partitioner, err := s.hostSource.GetHosts()
hosts, partitioner, err := s.hostSource.GetHostsFromSystem()
if err != nil {
return err
}
prevHosts := s.ring.currentHosts()
prevHosts := s.hostSource.getHostsMap()

for _, h := range hosts {
if s.cfg.filterHost(h) {
continue
}

if host, ok := s.ring.addHostIfMissing(h); !ok {
if host, ok := s.hostSource.addHostIfMissing(h); !ok {
s.startPoolFill(h)
} else {
// host (by hostID) already exists; determine if IP has changed
Expand All @@ -702,7 +702,7 @@ func (s *Session) refreshRing() error {
// host IP has changed
// remove old HostInfo (w/old IP)
s.removeHost(existing)
if _, alreadyExists := s.ring.addHostIfMissing(h); alreadyExists {
if _, alreadyExists := s.hostSource.addHostIfMissing(h); alreadyExists {
return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists)
}
// add new HostInfo (same hostID, new IP)
Expand All @@ -716,8 +716,6 @@ func (s *Session) refreshRing() error {
s.metadataDescriber.removeTabletsWithHost(host)
s.removeHost(host)
}

s.metadata.setPartitioner(partitioner)
s.policy.SetPartitioner(partitioner)

return nil
Expand Down
4 changes: 2 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func TestAuthentication(t *testing.T) {
session.Close()
}

func TestGetHosts(t *testing.T) {
func TestGetHostsFromSystem(t *testing.T) {
clusterHosts := getClusterHosts()
cluster := createCluster()
session := createSessionFromCluster(cluster, t)

hosts, partitioner, err := session.hostSource.GetHosts()
hosts, partitioner, err := session.hostSource.GetHostsFromSystem()

assertTrue(t, "err == nil", err == nil)
assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts))
Expand Down
10 changes: 2 additions & 8 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,10 +982,7 @@ func (d *dcAwareRR) IsOperational(session *Session) error {
return nil
}

hosts, _, err := session.hostSource.GetHosts()
if err != nil {
return fmt.Errorf("gocql: unable to check if session is operational: %v", err)
}
hosts := session.hostSource.getHostsList()
for _, host := range hosts {
if !session.cfg.filterHost(host) && host.DataCenter() == d.local {
// Policy can work properly only if there is at least one host from target DC
Expand Down Expand Up @@ -1103,10 +1100,7 @@ func (d *rackAwareRR) IsOperational(session *Session) error {
if session.cfg.disableInit || session.cfg.disableControlConn {
return nil
}
hosts, _, err := session.hostSource.GetHosts()
if err != nil {
return fmt.Errorf("gocql: unable to check if session is operational: %v", err)
}
hosts := session.hostSource.getHostsList()
for _, host := range hosts {
if !session.cfg.filterHost(host) && host.DataCenter() == d.localDC && host.Rack() == d.localRack {
// Policy can work properly only if there is at least one host from target DC+Rack
Expand Down
143 changes: 0 additions & 143 deletions ring.go

This file was deleted.

Loading

0 comments on commit e12494d

Please sign in to comment.