From 84eef090ee5430cb8a5df8352438b17cfd0e41af Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 13 Dec 2024 12:39:10 +0100 Subject: [PATCH 1/7] Move ringDescriber to a separate file and make in not rely on Session --- conn.go | 2 +- control.go | 2 +- host_source.go | 134 ++++++---------------------------------------- ring_describer.go | 117 ++++++++++++++++++++++++++++++++++++++++ session.go | 6 ++- 5 files changed, 139 insertions(+), 122 deletions(-) create mode 100644 ring_describer.go diff --git a/conn.go b/conn.go index ae02bd71c..d5b76c82e 100644 --- a/conn.go +++ b/conn.go @@ -1697,7 +1697,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { } for _, row := range rows { - host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port}) + host, err := hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port}, c.session.cfg.translateAddressPort) if err != nil { goto cont } diff --git a/control.go b/control.go index b30b44ea3..db256ac17 100644 --- a/control.go +++ b/control.go @@ -294,7 +294,7 @@ type connHost struct { func (c *controlConn) setupConn(conn *Conn) error { // we need up-to-date host info for the filterHost call below iter := conn.querySystemLocal(context.TODO()) - host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.conn.RemoteAddr().(*net.TCPAddr).Port) + host, err := hostInfoFromIter(iter, conn.host.connectAddress, conn.conn.RemoteAddr().(*net.TCPAddr).Port, c.session.cfg.translateAddressPort) if err != nil { return err } diff --git a/host_source.go b/host_source.go index a0bab9ad0..870ea858c 100644 --- a/host_source.go +++ b/host_source.go @@ -25,7 +25,6 @@ package gocql import ( - "context" "errors" "fmt" "net" @@ -445,14 +444,6 @@ func (h *HostInfo) String() string { h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) } -// Polls system.peers at a specific interval to find new hosts -type ringDescriber struct { - session *Session - mu sync.Mutex - prevHosts []*HostInfo - prevPartitioner string -} - // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces func checkSystemSchema(control *controlConn) (bool, error) { iter := control.query("SELECT * FROM system_schema.keyspaces") @@ -471,7 +462,7 @@ func checkSystemSchema(control *controlConn) (bool, error) { // Given a map that represents a row from either system.local or system.peers // return as much information as we can in *HostInfo -func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) { +func hostInfoFromMap(row map[string]interface{}, host *HostInfo, translateAddressPort func(addr net.IP, port int) (net.IP, int)) (*HostInfo, error) { const assertErrorMsg = "Assertion failed for %s" var ok bool @@ -583,14 +574,14 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (* // Not sure what the port field will be called until the JIRA issue is complete } - ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port) + ip, port := translateAddressPort(host.ConnectAddress(), host.port) host.connectAddress = ip host.port = port return host, nil } -func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int) (*HostInfo, error) { +func hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int, translateAddressPort func(addr net.IP, port int) (net.IP, int)) (*HostInfo, error) { rows, err := iter.SliceMap() if err != nil { // TODO(zariel): make typed error @@ -601,106 +592,13 @@ func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPor return nil, errors.New("query returned 0 rows") } - host, err := s.hostInfoFromMap(rows[0], &HostInfo{connectAddress: connectAddress, port: defaultPort}) + host, err := hostInfoFromMap(rows[0], &HostInfo{connectAddress: connectAddress, port: defaultPort}, translateAddressPort) if err != nil { return nil, err } return host, nil } -// Ask the control node for the local host info -func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) { - if r.session.control == nil { - return nil, errNoControl - } - - iter := r.session.control.withConnHost(func(ch *connHost) *Iter { - return ch.conn.querySystemLocal(context.TODO()) - }) - - if iter == nil { - return nil, errNoControl - } - - host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port) - if err != nil { - return nil, fmt.Errorf("could not retrieve local host info: %w", err) - } - return host, nil -} - -// Ask the control node for host info on all it's known peers -func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, error) { - if r.session.control == nil { - return nil, errNoControl - } - - var peers []*HostInfo - iter := r.session.control.withConnHost(func(ch *connHost) *Iter { - return ch.conn.querySystemPeers(context.TODO(), localHost.version) - }) - - if iter == nil { - return nil, errNoControl - } - - rows, err := iter.SliceMap() - if err != nil { - // TODO(zariel): make typed error - return nil, fmt.Errorf("unable to fetch peer host info: %s", err) - } - - for _, row := range rows { - // extract all available info about the peer - host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port}) - if err != nil { - return nil, err - } else if !isValidPeer(host) { - // If it's not a valid peer - r.session.logger.Printf("Found invalid peer '%s' "+ - "Likely due to a gossip or snitch issue, this host will be ignored", host) - continue - } - - peers = append(peers, host) - } - - return peers, nil -} - -// Return true if the host is a valid peer -func isValidPeer(host *HostInfo) bool { - return !(len(host.RPCAddress()) == 0 || - host.hostId == "" || - host.dataCenter == "" || - host.rack == "" || - len(host.tokens) == 0) -} - -// GetHosts returns a list of hosts found via queries to system.local and system.peers -func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { - r.mu.Lock() - defer r.mu.Unlock() - - localHost, err := r.getLocalHostInfo() - if err != nil { - return r.prevHosts, r.prevPartitioner, err - } - - peerHosts, err := r.getClusterPeerInfo(localHost) - if err != nil { - return r.prevHosts, r.prevPartitioner, err - } - - hosts := append([]*HostInfo{localHost}, peerHosts...) - var partitioner string - if len(hosts) > 0 { - partitioner = hosts[0].Partitioner() - } - - return hosts, partitioner, nil -} - // debounceRingRefresh submits a ring refresh request to the ring refresh debouncer. func (s *Session) debounceRingRefresh() { s.ringRefresher.debounce() @@ -716,21 +614,21 @@ func (s *Session) refreshRing() error { return err } -func refreshRing(r *ringDescriber) error { - hosts, partitioner, err := r.GetHosts() +func refreshRing(s *Session) error { + hosts, partitioner, err := s.hostSource.GetHosts() if err != nil { return err } - prevHosts := r.session.ring.currentHosts() + prevHosts := s.ring.currentHosts() for _, h := range hosts { - if r.session.cfg.filterHost(h) { + if s.cfg.filterHost(h) { continue } - if host, ok := r.session.ring.addHostIfMissing(h); !ok { - r.session.startPoolFill(h) + if host, ok := s.ring.addHostIfMissing(h); !ok { + s.startPoolFill(h) } else { // host (by hostID) already exists; determine if IP has changed newHostID := h.HostID() @@ -744,23 +642,23 @@ func refreshRing(r *ringDescriber) error { } else { // host IP has changed // remove old HostInfo (w/old IP) - r.session.removeHost(existing) - if _, alreadyExists := r.session.ring.addHostIfMissing(h); alreadyExists { + s.removeHost(existing) + if _, alreadyExists := s.ring.addHostIfMissing(h); alreadyExists { return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists) } // add new HostInfo (same hostID, new IP) - r.session.startPoolFill(h) + s.startPoolFill(h) } } delete(prevHosts, h.HostID()) } for _, host := range prevHosts { - r.session.removeHost(host) + s.removeHost(host) } - r.session.metadata.setPartitioner(partitioner) - r.session.policy.SetPartitioner(partitioner) + s.metadata.setPartitioner(partitioner) + s.policy.SetPartitioner(partitioner) return nil } diff --git a/ring_describer.go b/ring_describer.go new file mode 100644 index 000000000..138987442 --- /dev/null +++ b/ring_describer.go @@ -0,0 +1,117 @@ +package gocql + +import ( + "context" + "fmt" + "sync" +) + +// Polls system.peers at a specific interval to find new hosts +type ringDescriber struct { + control *controlConn + cfg *ClusterConfig + logger StdLogger + mu sync.Mutex + prevHosts []*HostInfo + prevPartitioner string +} + +func (r *ringDescriber) setControlConn(c *controlConn) { + r.mu.Lock() + defer r.mu.Unlock() + + r.control = c +} + +// Ask the control node for the local host info +func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) { + if r.control == nil { + return nil, errNoControl + } + + iter := r.control.withConnHost(func(ch *connHost) *Iter { + return ch.conn.querySystemLocal(context.TODO()) + }) + + if iter == nil { + return nil, errNoControl + } + + host, err := hostInfoFromIter(iter, nil, r.cfg.Port, r.cfg.translateAddressPort) + if err != nil { + return nil, fmt.Errorf("could not retrieve local host info: %w", err) + } + return host, nil +} + +// Ask the control node for host info on all it's known peers +func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, error) { + if r.control == nil { + return nil, errNoControl + } + + var peers []*HostInfo + iter := r.control.withConnHost(func(ch *connHost) *Iter { + return ch.conn.querySystemPeers(context.TODO(), localHost.version) + }) + + if iter == nil { + return nil, errNoControl + } + + rows, err := iter.SliceMap() + if err != nil { + // TODO(zariel): make typed error + return nil, fmt.Errorf("unable to fetch peer host info: %s", err) + } + + for _, row := range rows { + // extract all available info about the peer + host, err := hostInfoFromMap(row, &HostInfo{port: r.cfg.Port}, r.cfg.translateAddressPort) + if err != nil { + return nil, err + } else if !isValidPeer(host) { + // If it's not a valid peer + r.logger.Printf("Found invalid peer '%s' "+ + "Likely due to a gossip or snitch issue, this host will be ignored", host) + continue + } + + peers = append(peers, host) + } + + return peers, nil +} + +// Return true if the host is a valid peer +func isValidPeer(host *HostInfo) bool { + return !(len(host.RPCAddress()) == 0 || + host.hostId == "" || + host.dataCenter == "" || + host.rack == "" || + len(host.tokens) == 0) +} + +// GetHosts returns a list of hosts found via queries to system.local and system.peers +func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { + r.mu.Lock() + defer r.mu.Unlock() + + localHost, err := r.getLocalHostInfo() + if err != nil { + return r.prevHosts, r.prevPartitioner, err + } + + peerHosts, err := r.getClusterPeerInfo(localHost) + if err != nil { + return r.prevHosts, r.prevPartitioner, err + } + + hosts := append([]*HostInfo{localHost}, peerHosts...) + var partitioner string + if len(hosts) > 0 { + partitioner = hosts[0].Partitioner() + } + + return hosts, partitioner, nil +} diff --git a/session.go b/session.go index d04a13672..71b96f098 100644 --- a/session.go +++ b/session.go @@ -166,8 +166,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) { s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo) - s.hostSource = &ringDescriber{session: s} - s.ringRefresher = newRefreshDebouncer(ringRefreshDebounceTime, func() error { return refreshRing(s.hostSource) }) + s.hostSource = &ringDescriber{cfg: &s.cfg, logger: s.logger} + s.ringRefresher = newRefreshDebouncer(ringRefreshDebounceTime, func() error { return refreshRing(s) }) if cfg.PoolConfig.HostSelectionPolicy == nil { cfg.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy() @@ -255,6 +255,8 @@ func (s *Session) init() error { } } + s.hostSource.setControlConn(s.control) + for _, host := range hosts { // In case when host lookup is disabled and when we are in unit tests, // host are not discovered, and we are missing host ID information used From 742d5f232ccd1362a895b4a77995cacf9eafd401 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 13 Dec 2024 13:05:01 +0100 Subject: [PATCH 2/7] Merge ring and ringDescriber --- cassandra_test.go | 9 +-- control.go | 4 +- control_ccm_test.go | 4 +- events.go | 4 +- events_ccm_test.go | 6 +- host_source.go | 6 +- ring.go | 167 -------------------------------------------- ring_describer.go | 160 +++++++++++++++++++++++++++++++++++++++++- ring_test.go | 4 +- session.go | 17 +++-- 10 files changed, 186 insertions(+), 195 deletions(-) delete mode 100644 ring.go diff --git a/cassandra_test.go b/cassandra_test.go index ec6969190..010e57441 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -32,7 +32,6 @@ import ( "context" "errors" "fmt" - "github.com/stretchr/testify/require" "io" "math" "math/big" @@ -45,6 +44,8 @@ import ( "time" "unicode" + "github.com/stretchr/testify/require" + "gopkg.in/inf.v0" ) @@ -808,7 +809,7 @@ func TestReconnection(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - h := session.ring.allHosts()[0] + h := session.hostSource.allHosts()[0] session.handleNodeDown(h.ConnectAddress(), h.Port()) if h.State() != NodeDown { @@ -1613,7 +1614,7 @@ func TestPrepare_PreparedCacheEviction(t *testing.T) { } // Walk through all the configured hosts and test cache retention and eviction - for _, host := range session.ring.hosts { + for _, host := range session.hostSource.hosts { _, ok := session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host.HostID(), session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 0")) if ok { t.Errorf("expected first select to be purged but was in cache for host=%q", host) @@ -2769,7 +2770,7 @@ func TestTokenAwareConnPool(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - expectedPoolSize := cluster.NumConns * len(session.ring.allHosts()) + expectedPoolSize := cluster.NumConns * len(session.hostSource.allHosts()) // wait for pool to fill for i := 0; i < 10; i++ { diff --git a/control.go b/control.go index db256ac17..a376e3661 100644 --- a/control.go +++ b/control.go @@ -299,7 +299,7 @@ func (c *controlConn) setupConn(conn *Conn) error { return err } - host = c.session.ring.addOrUpdate(host) + host = c.session.hostSource.addOrUpdate(host) if c.session.cfg.filterHost(host) { return fmt.Errorf("host was filtered: %v", host.ConnectAddress()) @@ -385,7 +385,7 @@ func (c *controlConn) reconnect() { } func (c *controlConn) attemptReconnect() (*Conn, error) { - hosts := c.session.ring.allHosts() + hosts := c.session.hostSource.allHosts() hosts = shuffleHosts(hosts) // keep the old behavior of connecting to the old host first by moving it to diff --git a/control_ccm_test.go b/control_ccm_test.go index 426a59aef..f2dd159ca 100644 --- a/control_ccm_test.go +++ b/control_ccm_test.go @@ -131,7 +131,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { }() assertNodeDown := func() error { - hosts := session.ring.currentHosts() + hosts := session.hostSource.currentHosts() if len(hosts) != 1 { return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts)) } @@ -169,7 +169,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { } assertNodeUp := func() error { - hosts := session.ring.currentHosts() + hosts := session.hostSource.currentHosts() if len(hosts) != len(allCcmHosts) { return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts)) } diff --git a/events.go b/events.go index 93b001acc..bfddf16bb 100644 --- a/events.go +++ b/events.go @@ -217,7 +217,7 @@ func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) { s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort) } - host, ok := s.ring.getHostByIP(eventIp.String()) + host, ok := s.hostSource.getHostByIP(eventIp.String()) if !ok { s.debounceRingRefresh() return @@ -256,7 +256,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) { s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port) } - host, ok := s.ring.getHostByIP(ip.String()) + host, ok := s.hostSource.getHostByIP(ip.String()) if ok { host.setState(NodeDown) if s.cfg.filterHost(host) { diff --git a/events_ccm_test.go b/events_ccm_test.go index a105985bc..1e3c96018 100644 --- a/events_ccm_test.go +++ b/events_ccm_test.go @@ -104,7 +104,7 @@ func TestEventNodeDownControl(t *testing.T) { } session.pool.mu.RUnlock() - host := session.ring.getHost(node.Addr) + host := session.hostSource.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -146,7 +146,7 @@ func TestEventNodeDown(t *testing.T) { t.Fatal("node not removed after remove event") } - host := session.ring.getHost(node.Addr) + host := session.hostSource.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if host.IsUp() { @@ -203,7 +203,7 @@ func TestEventNodeUp(t *testing.T) { t.Fatal("node not added after node added event") } - host := session.ring.getHost(node.Addr) + host := session.hostSource.getHost(node.Addr) if host == nil { t.Fatal("node not in metadata ring") } else if !host.IsUp() { diff --git a/host_source.go b/host_source.go index 870ea858c..bb1724b20 100644 --- a/host_source.go +++ b/host_source.go @@ -620,14 +620,14 @@ func refreshRing(s *Session) error { return err } - prevHosts := s.ring.currentHosts() + prevHosts := s.hostSource.currentHosts() for _, h := range hosts { if s.cfg.filterHost(h) { continue } - if host, ok := s.ring.addHostIfMissing(h); !ok { + if host, ok := s.hostSource.addHostIfMissing(h); !ok { s.startPoolFill(h) } else { // host (by hostID) already exists; determine if IP has changed @@ -643,7 +643,7 @@ func refreshRing(s *Session) error { // host IP has changed // remove old HostInfo (w/old IP) s.removeHost(existing) - if _, alreadyExists := s.ring.addHostIfMissing(h); alreadyExists { + if _, alreadyExists := s.hostSource.addHostIfMissing(h); alreadyExists { return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists) } // add new HostInfo (same hostID, new IP) diff --git a/ring.go b/ring.go deleted file mode 100644 index 6821c0df2..000000000 --- a/ring.go +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/* - * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 - * Copyright (c) 2016, The Gocql authors, - * provided under the BSD-3-Clause License. - * See the NOTICE file distributed with this work for additional information. - */ - -package gocql - -import ( - "fmt" - "sync" - "sync/atomic" -) - -type ring struct { - // endpoints are the set of endpoints which the driver will attempt to connect - // to in the case it can not reach any of its hosts. They are also used to boot - // strap the initial connection. - endpoints []*HostInfo - - mu sync.RWMutex - // hosts are the set of all hosts in the cassandra ring that we know of. - // key of map is host_id. - hosts map[string]*HostInfo - // hostIPToUUID maps host native address to host_id. - hostIPToUUID map[string]string - - hostList []*HostInfo - pos uint32 - - // TODO: we should store the ring metadata here also. -} - -func (r *ring) rrHost() *HostInfo { - r.mu.RLock() - defer r.mu.RUnlock() - if len(r.hostList) == 0 { - return nil - } - - pos := int(atomic.AddUint32(&r.pos, 1) - 1) - return r.hostList[pos%len(r.hostList)] -} - -func (r *ring) getHostByIP(ip string) (*HostInfo, bool) { - r.mu.RLock() - defer r.mu.RUnlock() - hi, ok := r.hostIPToUUID[ip] - return r.hosts[hi], ok -} - -func (r *ring) getHost(hostID string) *HostInfo { - r.mu.RLock() - host := r.hosts[hostID] - r.mu.RUnlock() - return host -} - -func (r *ring) allHosts() []*HostInfo { - r.mu.RLock() - hosts := make([]*HostInfo, 0, len(r.hosts)) - for _, host := range r.hosts { - hosts = append(hosts, host) - } - r.mu.RUnlock() - return hosts -} - -func (r *ring) currentHosts() map[string]*HostInfo { - r.mu.RLock() - hosts := make(map[string]*HostInfo, len(r.hosts)) - for k, v := range r.hosts { - hosts[k] = v - } - r.mu.RUnlock() - return hosts -} - -func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { - if existingHost, ok := r.addHostIfMissing(host); ok { - existingHost.update(host) - host = existingHost - } - return host -} - -func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { - if host.invalidConnectAddr() { - panic(fmt.Sprintf("invalid host: %v", host)) - } - hostID := host.HostID() - - r.mu.Lock() - if r.hosts == nil { - r.hosts = make(map[string]*HostInfo) - } - if r.hostIPToUUID == nil { - r.hostIPToUUID = make(map[string]string) - } - - existing, ok := r.hosts[hostID] - if !ok { - r.hosts[hostID] = host - r.hostIPToUUID[host.nodeToNodeAddress().String()] = hostID - existing = host - r.hostList = append(r.hostList, host) - } - r.mu.Unlock() - return existing, ok -} - -func (r *ring) removeHost(hostID string) bool { - r.mu.Lock() - if r.hosts == nil { - r.hosts = make(map[string]*HostInfo) - } - if r.hostIPToUUID == nil { - r.hostIPToUUID = make(map[string]string) - } - - h, ok := r.hosts[hostID] - if ok { - for i, host := range r.hostList { - if host.HostID() == hostID { - r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) - break - } - } - delete(r.hostIPToUUID, h.nodeToNodeAddress().String()) - } - delete(r.hosts, hostID) - r.mu.Unlock() - return ok -} - -type clusterMetadata struct { - mu sync.RWMutex - partitioner string -} - -func (c *clusterMetadata) setPartitioner(partitioner string) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.partitioner != partitioner { - // TODO: update other things now - c.partitioner = partitioner - } -} diff --git a/ring_describer.go b/ring_describer.go index 138987442..ac2fa398e 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -1,9 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40 + * Copyright (c) 2016, The Gocql authors, + * provided under the BSD-3-Clause License. + * See the NOTICE file distributed with this work for additional information. + */ + package gocql import ( "context" "fmt" "sync" + "sync/atomic" ) // Polls system.peers at a specific interval to find new hosts @@ -11,9 +36,25 @@ type ringDescriber struct { control *controlConn cfg *ClusterConfig logger StdLogger - mu sync.Mutex prevHosts []*HostInfo prevPartitioner string + + // endpoints are the set of endpoints which the driver will attempt to connect + // to in the case it can not reach any of its hosts. They are also used to boot + // strap the initial connection. + endpoints []*HostInfo + + mu sync.RWMutex + // hosts are the set of all hosts in the cassandra ring that we know of. + // key of map is host_id. + hosts map[string]*HostInfo + // hostIPToUUID maps host native address to host_id. + hostIPToUUID map[string]string + + hostList []*HostInfo + pos uint32 + + // TODO: we should store the ring metadata here also. } func (r *ringDescriber) setControlConn(c *controlConn) { @@ -115,3 +156,120 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { return hosts, partitioner, nil } + +func (r *ringDescriber) rrHost() *HostInfo { + r.mu.RLock() + defer r.mu.RUnlock() + if len(r.hostList) == 0 { + return nil + } + + pos := int(atomic.AddUint32(&r.pos, 1) - 1) + return r.hostList[pos%len(r.hostList)] +} + +func (r *ringDescriber) getHostByIP(ip string) (*HostInfo, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + hi, ok := r.hostIPToUUID[ip] + return r.hosts[hi], ok +} + +func (r *ringDescriber) getHost(hostID string) *HostInfo { + r.mu.RLock() + host := r.hosts[hostID] + r.mu.RUnlock() + return host +} + +func (r *ringDescriber) allHosts() []*HostInfo { + r.mu.RLock() + hosts := make([]*HostInfo, 0, len(r.hosts)) + for _, host := range r.hosts { + hosts = append(hosts, host) + } + r.mu.RUnlock() + return hosts +} + +func (r *ringDescriber) currentHosts() map[string]*HostInfo { + r.mu.RLock() + hosts := make(map[string]*HostInfo, len(r.hosts)) + for k, v := range r.hosts { + hosts[k] = v + } + r.mu.RUnlock() + return hosts +} + +func (r *ringDescriber) addOrUpdate(host *HostInfo) *HostInfo { + if existingHost, ok := r.addHostIfMissing(host); ok { + existingHost.update(host) + host = existingHost + } + return host +} + +func (r *ringDescriber) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { + if host.invalidConnectAddr() { + panic(fmt.Sprintf("invalid host: %v", host)) + } + hostID := host.HostID() + + r.mu.Lock() + if r.hosts == nil { + r.hosts = make(map[string]*HostInfo) + } + if r.hostIPToUUID == nil { + r.hostIPToUUID = make(map[string]string) + } + + existing, ok := r.hosts[hostID] + if !ok { + r.hosts[hostID] = host + r.hostIPToUUID[host.nodeToNodeAddress().String()] = hostID + existing = host + r.hostList = append(r.hostList, host) + } + r.mu.Unlock() + return existing, ok +} + +func (r *ringDescriber) removeHost(hostID string) bool { + r.mu.Lock() + if r.hosts == nil { + r.hosts = make(map[string]*HostInfo) + } + if r.hostIPToUUID == nil { + r.hostIPToUUID = make(map[string]string) + } + + h, ok := r.hosts[hostID] + if ok { + for i, host := range r.hostList { + if host.HostID() == hostID { + r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) + break + } + } + delete(r.hostIPToUUID, h.nodeToNodeAddress().String()) + } + delete(r.hosts, hostID) + r.mu.Unlock() + return ok +} + +type clusterMetadata struct { + mu sync.RWMutex + partitioner string +} + +func (c *clusterMetadata) setPartitioner(partitioner string) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.partitioner != partitioner { + // TODO: update other things now + c.partitioner = partitioner + } +} diff --git a/ring_test.go b/ring_test.go index 3e9533ecd..c7a797947 100644 --- a/ring_test.go +++ b/ring_test.go @@ -30,7 +30,7 @@ import ( ) func TestRing_AddHostIfMissing_Missing(t *testing.T) { - ring := &ring{} + ring := &ringDescriber{} host := &HostInfo{hostId: MustRandomUUID().String(), connectAddress: net.IPv4(1, 1, 1, 1)} h1, ok := ring.addHostIfMissing(host) @@ -44,7 +44,7 @@ func TestRing_AddHostIfMissing_Missing(t *testing.T) { } func TestRing_AddHostIfMissing_Existing(t *testing.T) { - ring := &ring{} + ring := &ringDescriber{} host := &HostInfo{hostId: MustRandomUUID().String(), connectAddress: net.IPv4(1, 1, 1, 1)} ring.addHostIfMissing(host) diff --git a/session.go b/session.go index 71b96f098..5c1f15f3d 100644 --- a/session.go +++ b/session.go @@ -72,7 +72,6 @@ type Session struct { pool *policyConnPool policy HostSelectionPolicy - ring ring metadata clusterMetadata mu sync.RWMutex @@ -216,7 +215,7 @@ func (s *Session) init() error { if err != nil { return err } - s.ring.endpoints = hosts + s.hostSource.endpoints = hosts if !s.cfg.disableControlConn { s.control = createControlConn(s) @@ -284,7 +283,7 @@ func (s *Session) init() error { // again atomic.AddInt64(&left, 1) for _, host := range hostMap { - host := s.ring.addOrUpdate(host) + host := s.hostSource.addOrUpdate(host) if s.cfg.filterHost(host) { continue } @@ -346,7 +345,7 @@ func (s *Session) init() error { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { - version := s.ring.rrHost().Version() + version := s.hostSource.rrHost().Version() s.useSystemSchema = version.AtLeast(3, 0, 0) s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) } @@ -390,11 +389,11 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { for { select { case <-reconnectTicker.C: - hosts := s.ring.allHosts() + hosts := s.hostSource.allHosts() - // Print session.ring for debug. + // Print session.hostSource for debug. if gocqlDebug { - buf := bytes.NewBufferString("Session.ring:") + buf := bytes.NewBufferString("Session.hostSource:") for _, h := range hosts { buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") } @@ -560,7 +559,7 @@ func (s *Session) removeHost(h *HostInfo) { s.policy.RemoveHost(h) hostID := h.HostID() s.pool.removeHost(hostID) - s.ring.removeHost(hostID) + s.hostSource.removeHost(hostID) } // KeyspaceMetadata returns the schema metadata for the keyspace specified. Returns an error if the keyspace does not exist. @@ -576,7 +575,7 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { } func (s *Session) getConn() *Conn { - hosts := s.ring.allHosts() + hosts := s.hostSource.allHosts() for _, host := range hosts { if !host.IsUp() { continue From 02554acdf6f3c904087ae5bfb66ec9ef42d634e6 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 6 Dec 2024 15:25:27 +0100 Subject: [PATCH 3/7] Remove unused endpoints field in ringDescriber --- ring_describer.go | 5 ----- session.go | 1 - 2 files changed, 6 deletions(-) diff --git a/ring_describer.go b/ring_describer.go index ac2fa398e..b70ef360e 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -39,11 +39,6 @@ type ringDescriber struct { prevHosts []*HostInfo prevPartitioner string - // endpoints are the set of endpoints which the driver will attempt to connect - // to in the case it can not reach any of its hosts. They are also used to boot - // strap the initial connection. - endpoints []*HostInfo - mu sync.RWMutex // hosts are the set of all hosts in the cassandra ring that we know of. // key of map is host_id. diff --git a/session.go b/session.go index 5c1f15f3d..bd43b9db7 100644 --- a/session.go +++ b/session.go @@ -215,7 +215,6 @@ func (s *Session) init() error { if err != nil { return err } - s.hostSource.endpoints = hosts if !s.cfg.disableControlConn { s.control = createControlConn(s) From 68dbc596aa4040081c4b8b742b0088a75ec8b572 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 11 Dec 2024 16:32:08 +0100 Subject: [PATCH 4/7] Remove rrHost method from ringDescriber It was only used once in Session init(), but init is only called once and it does not make sense to do round robin there. --- ring_describer.go | 63 +++++++++++++++++++++++++++++++++++++---------- session.go | 2 +- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/ring_describer.go b/ring_describer.go index b70ef360e..eb01abad8 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -28,7 +28,6 @@ import ( "context" "fmt" "sync" - "sync/atomic" ) // Polls system.peers at a specific interval to find new hosts @@ -45,11 +44,6 @@ type ringDescriber struct { hosts map[string]*HostInfo // hostIPToUUID maps host native address to host_id. hostIPToUUID map[string]string - - hostList []*HostInfo - pos uint32 - - // TODO: we should store the ring metadata here also. } func (r *ringDescriber) setControlConn(c *controlConn) { @@ -152,6 +146,56 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { return hosts, partitioner, nil } +// Given an ip/port return HostInfo for the specified ip/port +func (r *ringDescriber) getHostInfo(hostID UUID) (*HostInfo, error) { + var host *HostInfo + for _, table := range []string{"system.peers", "system.local"} { + ch := r.control.getConn() + var iter *Iter + if ch.host.HostID() == hostID.String() { + host = ch.host + iter = nil + } + + if table == "system.peers" { + if ch.conn.getIsSchemaV2() { + iter = ch.conn.querySystem(context.TODO(), qrySystemPeersV2) + } else { + iter = ch.conn.querySystem(context.TODO(), qrySystemPeers) + } + } else { + iter = ch.conn.query(context.TODO(), fmt.Sprintf("SELECT * FROM %s", table)) + } + + if iter != nil { + rows, err := iter.SliceMap() + if err != nil { + return nil, err + } + + for _, row := range rows { + h, err := hostInfoFromMap(row, &HostInfo{port: r.cfg.Port}, r.cfg.translateAddressPort) + if err != nil { + return nil, err + } + + if h.HostID() == hostID.String() { + host = h + break + } + } + } + } + + if host == nil { + return nil, errors.New("unable to fetch host info: invalid control connection") + } else if host.invalidConnectAddr() { + return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", host.connectAddress, host) + } + + return host, nil +} + func (r *ringDescriber) rrHost() *HostInfo { r.mu.RLock() defer r.mu.RUnlock() @@ -224,7 +268,6 @@ func (r *ringDescriber) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { r.hosts[hostID] = host r.hostIPToUUID[host.nodeToNodeAddress().String()] = hostID existing = host - r.hostList = append(r.hostList, host) } r.mu.Unlock() return existing, ok @@ -241,12 +284,6 @@ func (r *ringDescriber) removeHost(hostID string) bool { h, ok := r.hosts[hostID] if ok { - for i, host := range r.hostList { - if host.HostID() == hostID { - r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) - break - } - } delete(r.hostIPToUUID, h.nodeToNodeAddress().String()) } delete(r.hosts, hostID) diff --git a/session.go b/session.go index bd43b9db7..d62cade73 100644 --- a/session.go +++ b/session.go @@ -344,7 +344,7 @@ func (s *Session) init() error { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { - version := s.hostSource.rrHost().Version() + version := s.hostSource.allHosts()[0].Version() s.useSystemSchema = version.AtLeast(3, 0, 0) s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) } From 48b98def8ec39857977f4a7a7661bab3dc748307 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 11 Dec 2024 16:37:50 +0100 Subject: [PATCH 5/7] Update prevHosts/Partitioner when getting hosts in ringDecriber --- ring_describer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ring_describer.go b/ring_describer.go index eb01abad8..019d3fdec 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -143,6 +143,9 @@ func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { partitioner = hosts[0].Partitioner() } + r.prevHosts = hosts + r.prevPartitioner = partitioner + return hosts, partitioner, nil } From e7d753800a49d0f1da32fb5b4ec2f4ad81e86cb3 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 11 Dec 2024 16:48:29 +0100 Subject: [PATCH 6/7] Remove unused clusterMetadata struct --- host_source.go | 2 -- ring_describer.go | 15 --------------- session.go | 2 -- 3 files changed, 19 deletions(-) diff --git a/host_source.go b/host_source.go index bb1724b20..38f54ea02 100644 --- a/host_source.go +++ b/host_source.go @@ -656,8 +656,6 @@ func refreshRing(s *Session) error { for _, host := range prevHosts { s.removeHost(host) } - - s.metadata.setPartitioner(partitioner) s.policy.SetPartitioner(partitioner) return nil } diff --git a/ring_describer.go b/ring_describer.go index 019d3fdec..97f2eb266 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -293,18 +293,3 @@ func (r *ringDescriber) removeHost(hostID string) bool { r.mu.Unlock() return ok } - -type clusterMetadata struct { - mu sync.RWMutex - partitioner string -} - -func (c *clusterMetadata) setPartitioner(partitioner string) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.partitioner != partitioner { - // TODO: update other things now - c.partitioner = partitioner - } -} diff --git a/session.go b/session.go index d62cade73..f3f961f6d 100644 --- a/session.go +++ b/session.go @@ -72,8 +72,6 @@ type Session struct { pool *policyConnPool policy HostSelectionPolicy - metadata clusterMetadata - mu sync.RWMutex control *controlConn From 1f3976d024ec3e694313ff9308c09d6689126e29 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Fri, 13 Dec 2024 12:22:07 +0100 Subject: [PATCH 7/7] Change the names of the hosts getters in ringDescriber --- cassandra_test.go | 4 ++-- control.go | 2 +- control_ccm_test.go | 4 ++-- host_source.go | 4 ++-- integration_test.go | 4 ++-- ring_describer.go | 8 ++++---- session.go | 8 ++++---- 7 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 010e57441..50c8de47f 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -809,7 +809,7 @@ func TestReconnection(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - h := session.hostSource.allHosts()[0] + h := session.hostSource.getHostsList()[0] session.handleNodeDown(h.ConnectAddress(), h.Port()) if h.State() != NodeDown { @@ -2770,7 +2770,7 @@ func TestTokenAwareConnPool(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - expectedPoolSize := cluster.NumConns * len(session.hostSource.allHosts()) + expectedPoolSize := cluster.NumConns * len(session.hostSource.getHostsList()) // wait for pool to fill for i := 0; i < 10; i++ { diff --git a/control.go b/control.go index a376e3661..daa662dd0 100644 --- a/control.go +++ b/control.go @@ -385,7 +385,7 @@ func (c *controlConn) reconnect() { } func (c *controlConn) attemptReconnect() (*Conn, error) { - hosts := c.session.hostSource.allHosts() + hosts := c.session.hostSource.getHostsList() hosts = shuffleHosts(hosts) // keep the old behavior of connecting to the old host first by moving it to diff --git a/control_ccm_test.go b/control_ccm_test.go index f2dd159ca..ecccb1ff0 100644 --- a/control_ccm_test.go +++ b/control_ccm_test.go @@ -131,7 +131,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { }() assertNodeDown := func() error { - hosts := session.hostSource.currentHosts() + hosts := session.hostSource.getHostsMap() if len(hosts) != 1 { return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts)) } @@ -169,7 +169,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) { } assertNodeUp := func() error { - hosts := session.hostSource.currentHosts() + hosts := session.hostSource.getHostsMap() if len(hosts) != len(allCcmHosts) { return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts)) } diff --git a/host_source.go b/host_source.go index 38f54ea02..ad150a4b6 100644 --- a/host_source.go +++ b/host_source.go @@ -615,12 +615,12 @@ func (s *Session) refreshRing() error { } func refreshRing(s *Session) error { - hosts, partitioner, err := s.hostSource.GetHosts() + hosts, partitioner, err := s.hostSource.GetHostsFromSystem() if err != nil { return err } - prevHosts := s.hostSource.currentHosts() + prevHosts := s.hostSource.getHostsMap() for _, h := range hosts { if s.cfg.filterHost(h) { diff --git a/integration_test.go b/integration_test.go index 61ffbf504..0cb936f13 100644 --- a/integration_test.go +++ b/integration_test.go @@ -62,12 +62,12 @@ func TestAuthentication(t *testing.T) { session.Close() } -func TestGetHosts(t *testing.T) { +func TestGetHostsFromSystem(t *testing.T) { clusterHosts := getClusterHosts() cluster := createCluster() session := createSessionFromCluster(cluster, t) - hosts, partitioner, err := session.hostSource.GetHosts() + hosts, partitioner, err := session.hostSource.GetHostsFromSystem() assertTrue(t, "err == nil", err == nil) assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts)) diff --git a/ring_describer.go b/ring_describer.go index 97f2eb266..6a1bec50a 100644 --- a/ring_describer.go +++ b/ring_describer.go @@ -122,8 +122,8 @@ func isValidPeer(host *HostInfo) bool { len(host.tokens) == 0) } -// GetHosts returns a list of hosts found via queries to system.local and system.peers -func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { +// GetHostsFromSystem returns a list of hosts found via queries to system.local and system.peers +func (r *ringDescriber) GetHostsFromSystem() ([]*HostInfo, string, error) { r.mu.Lock() defer r.mu.Unlock() @@ -224,7 +224,7 @@ func (r *ringDescriber) getHost(hostID string) *HostInfo { return host } -func (r *ringDescriber) allHosts() []*HostInfo { +func (r *ringDescriber) getHostsList() []*HostInfo { r.mu.RLock() hosts := make([]*HostInfo, 0, len(r.hosts)) for _, host := range r.hosts { @@ -234,7 +234,7 @@ func (r *ringDescriber) allHosts() []*HostInfo { return hosts } -func (r *ringDescriber) currentHosts() map[string]*HostInfo { +func (r *ringDescriber) getHostsMap() map[string]*HostInfo { r.mu.RLock() hosts := make(map[string]*HostInfo, len(r.hosts)) for k, v := range r.hosts { diff --git a/session.go b/session.go index f3f961f6d..c83fa353f 100644 --- a/session.go +++ b/session.go @@ -235,7 +235,7 @@ func (s *Session) init() error { if !s.cfg.DisableInitialHostLookup { var partitioner string - newHosts, partitioner, err := s.hostSource.GetHosts() + newHosts, partitioner, err := s.hostSource.GetHostsFromSystem() if err != nil { return err } @@ -342,7 +342,7 @@ func (s *Session) init() error { newer, _ := checkSystemSchema(s.control) s.useSystemSchema = newer } else { - version := s.hostSource.allHosts()[0].Version() + version := s.hostSource.getHostsList()[0].Version() s.useSystemSchema = version.AtLeast(3, 0, 0) s.hasAggregatesAndFunctions = version.AtLeast(2, 2, 0) } @@ -386,7 +386,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { for { select { case <-reconnectTicker.C: - hosts := s.hostSource.allHosts() + hosts := s.hostSource.getHostsList() // Print session.hostSource for debug. if gocqlDebug { @@ -572,7 +572,7 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { } func (s *Session) getConn() *Conn { - hosts := s.hostSource.allHosts() + hosts := s.hostSource.getHostsList() for _, host := range hosts { if !host.IsUp() { continue