From e4ae99e9a4a033e99ee4cb5cd0763598e43712d4 Mon Sep 17 00:00:00 2001 From: iurii Date: Wed, 26 Feb 2025 20:36:29 +0200 Subject: [PATCH] get rid of global variables --- network/discovery/dv5_filters.go | 5 ++-- network/discovery/dv5_service.go | 24 ++++++++++++----- network/discovery/service.go | 22 +++++++++++----- network/p2p/p2p.go | 32 ++++++++++++++++------- network/p2p/p2p_setup.go | 26 +++++++++++------- network/peers/conn_manager.go | 18 ++----------- network/peers/connections/conn_gater.go | 15 ++++++++--- network/peers/connections/conn_handler.go | 31 +++++++++++++--------- 8 files changed, 106 insertions(+), 67 deletions(-) diff --git a/network/discovery/dv5_filters.go b/network/discovery/dv5_filters.go index b7d8e1f959..c7d85dc2c5 100644 --- a/network/discovery/dv5_filters.go +++ b/network/discovery/dv5_filters.go @@ -5,7 +5,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/ssvlabs/ssv/network/commons" - "github.com/ssvlabs/ssv/network/peers" "github.com/ssvlabs/ssv/network/records" "go.uber.org/zap" ) @@ -66,7 +65,7 @@ func (dvs *DiscV5Service) recentlyTrimmedFilter() func(node *enode.Node) bool { if err != nil { return false } - return !peers.TrimmedRecently.Has(pid) + return !dvs.trimmedRecently.Has(pid) } } @@ -117,7 +116,7 @@ func (dvs *DiscV5Service) alreadyDiscoveredFilter(logger *zap.Logger) func(node logger.Warn("could not get peer ID from node record", zap.Error(err)) return false } - if peers.DiscoveredPeersPool.Has(pID) { + if dvs.discoveredPeersPool.Has(pID) { // this log line is commented out as it is too spammy //n.interfaceLogger.Debug( // "discovery proposed peer, this proposal is already in proposal-pool", diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index fed572e317..8f24a57178 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/ssvlabs/ssv/utils/ttl" "net" "time" @@ -58,6 +59,14 @@ type DiscV5Service struct { conns peers.ConnectionIndex subnetsIdx peers.SubnetsIndex + // discoveredPeersPool keeps track of recently discovered peers so we can rank them and choose + // the best candidates to connect to. + discoveredPeersPool *ttl.Map[peer.ID, DiscoveredPeer] + // trimmedRecently keeps track of recently trimmed peers so we don't try to connect to these + // shortly after we've trimmed these (we still might consider connecting to these once they + // are removed from this map after some time passes) + trimmedRecently *ttl.Map[peer.ID, struct{}] + conn *net.UDPConn sharedConn *SharedUDPConn @@ -70,13 +79,14 @@ type DiscV5Service struct { func newDiscV5Service(pctx context.Context, logger *zap.Logger, opts *Options) (Service, error) { ctx, cancel := context.WithCancel(pctx) dvs := DiscV5Service{ - ctx: ctx, - cancel: cancel, - conns: opts.ConnIndex, - subnetsIdx: opts.SubnetsIdx, - networkConfig: opts.NetworkConfig, - subnets: opts.DiscV5Opts.Subnets, - publishLock: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + conns: opts.ConnIndex, + subnetsIdx: opts.SubnetsIdx, + networkConfig: opts.NetworkConfig, + subnets: opts.DiscV5Opts.Subnets, + publishLock: make(chan struct{}, 1), + discoveredPeersPool: opts.DiscoveredPeersPool, } logger.Debug( diff --git a/network/discovery/service.go b/network/discovery/service.go index cecda743fb..6866d11a5b 100644 --- a/network/discovery/service.go +++ b/network/discovery/service.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "github.com/ssvlabs/ssv/utils/ttl" "io" "github.com/ethereum/go-ethereum/p2p/enode" @@ -33,13 +34,14 @@ type HandleNewPeer func(e PeerEvent) // Options represents the options passed to create a service type Options struct { - Host host.Host - DiscV5Opts *DiscV5Options - ConnIndex peers.ConnectionIndex - SubnetsIdx peers.SubnetsIndex - HostAddress string - HostDNS string - NetworkConfig networkconfig.NetworkConfig + Host host.Host + DiscV5Opts *DiscV5Options + ConnIndex peers.ConnectionIndex + SubnetsIdx peers.SubnetsIndex + HostAddress string + HostDNS string + NetworkConfig networkconfig.NetworkConfig + DiscoveredPeersPool *ttl.Map[peer.ID, DiscoveredPeer] } // Service is the interface for discovery @@ -59,3 +61,9 @@ func NewService(ctx context.Context, logger *zap.Logger, opts Options) (Service, } return newDiscV5Service(ctx, logger, &opts) } + +type DiscoveredPeer struct { + peer.AddrInfo + // ConnectRetries keeps track of how many times we tried to connect to this peer. + ConnectRetries int +} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 799ad5316e..bb21f2b92b 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/ssvlabs/ssv/utils/ttl" "math" "math/rand" "os" @@ -107,10 +108,21 @@ type p2pNetwork struct { operatorPKHashToPKCache *hashmap.Map[string, []byte] // used for metrics operatorSigner keys.OperatorSigner operatorDataStore operatordatastore.OperatorDataStore + + // discoveredPeersPool keeps track of recently discovered peers so we can rank them and choose + // the best candidates to connect to. + discoveredPeersPool *ttl.Map[peer.ID, discovery.DiscoveredPeer] + // trimmedRecently keeps track of recently trimmed peers so we don't try to connect to these + // shortly after we've trimmed these (we still might consider connecting to these once they + // are removed from this map after some time passes) + trimmedRecently *ttl.Map[peer.ID, struct{}] } // New creates a new p2p network -func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) { +func New( + logger *zap.Logger, + cfg *Config, +) (*p2pNetwork, error) { ctx, cancel := context.WithCancel(cfg.Ctx) logger = logger.Named(logging.NameP2PNetwork) @@ -129,6 +141,8 @@ func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) { operatorPKHashToPKCache: hashmap.New[string, []byte](), operatorSigner: cfg.OperatorSigner, operatorDataStore: cfg.OperatorDataStore, + discoveredPeersPool: ttl.New[peer.ID, discovery.DiscoveredPeer](15*time.Minute, 5*time.Minute), + trimmedRecently: ttl.New[peer.ID, struct{}](30*time.Minute, 5*time.Minute), } if err := n.parseTrustedPeers(); err != nil { return nil, err @@ -264,11 +278,11 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error { go func() { // keep discovered peers in a pool so we can choose the best ones for proposal := range connectorProposals { - discoveredPeer := peers.DiscoveredPeer{ + discoveredPeer := discovery.DiscoveredPeer{ AddrInfo: proposal, ConnectRetries: 0, } - peers.DiscoveredPeersPool.Set(proposal.ID, discoveredPeer) + n.discoveredPeersPool.Set(proposal.ID, discoveredPeer) n.interfaceLogger.Debug( "discovery proposed peer, adding it to the pool", @@ -357,7 +371,7 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error { } // peersToConnect is a final set of peers we are gonna try to connect with - peersToConnect := make(map[peer.ID]peers.DiscoveredPeer) + peersToConnect := make(map[peer.ID]discovery.DiscoveredPeer) // ownSubnetSum represents subnet-sum of peers we already have open connections with ownSubnetSum := SubnetSum{} allPeerIDs, err := n.topicsCtrl.Peers("") @@ -379,20 +393,20 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error { currentSubnetSum := addSubnetSums(ownSubnetSum, peersToConnectSubnetSum) type peerWithSubnetSum struct { - p peers.DiscoveredPeer + p discovery.DiscoveredPeer subnetSum SubnetSum } // peersByPriority keeps track of best peers (by their peer score) peersByPriority := lane.NewMaxPriorityQueue[peerWithSubnetSum, float64]() // minScore and maxScore are used for printing additional debugging info minScore, maxScore := math.MaxFloat64, 0.0 - peers.DiscoveredPeersPool.Range(func(key peer.ID, value peers.DiscoveredPeer) bool { + n.discoveredPeersPool.Range(func(key peer.ID, value discovery.DiscoveredPeer) bool { const retryLimit = 2 if value.ConnectRetries >= retryLimit { // this discovered peer has been tried many times already, we'll ignore him but won't - // remove him from DiscoveredPeersPool (since if we do - discovery might suggest this + // remove him from discoveredPeersPool (since if we do - discovery might suggest this // peer again essentially resetting this peer's retry attempts counter to 0), eventually - // (after some time passes) this peer will automatically get removed from DiscoveredPeersPool + // (after some time passes) this peer will automatically get removed from discoveredPeersPool // so it can be discovered again (effectively resetting peer's retry attempts counter to 0) // this log line is commented out as it is too spammy @@ -447,7 +461,7 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error { // finally, offer the best peers we've picked to connector so it tries to connect these for _, p := range peersToConnect { // update retry counter for this peer so we eventually skip it after certain number of retries - peers.DiscoveredPeersPool.Set(p.ID, peers.DiscoveredPeer{ + n.discoveredPeersPool.Set(p.ID, discovery.DiscoveredPeer{ AddrInfo: p.AddrInfo, ConnectRetries: p.ConnectRetries + 1, }) diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index d030d8c025..22541fb200 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -135,7 +135,14 @@ func (n *p2pNetwork) SetupHost(logger *zap.Logger) error { if err != nil { return errors.Wrap(err, "could not create resource manager") } - n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.atInboundLimit) + n.connGater = connections.NewConnectionGater( + logger, + n.cfg.DisableIPRateLimit, + n.connectionsAtLimit, + n.IsBadPeer, + n.atInboundLimit, + n.trimmedRecently, + ) opts = append(opts, libp2p.ResourceManager(rmgr), libp2p.ConnectionGater(n.connGater)) host, err := libp2p.New(opts...) if err != nil { @@ -234,7 +241,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error { n.host.SetStreamHandler(peers.NodeInfoProtocol, handshaker.Handler(logger)) logger.Debug("handshaker is ready") - n.connHandler = connections.NewConnHandler(n.ctx, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx) + n.connHandler = connections.NewConnHandler(n.ctx, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx, n.discoveredPeersPool) n.host.Network().Notify(n.connHandler.Handle(logger)) logger.Debug("connection handler is ready") @@ -276,13 +283,14 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error { logger.Info("discovery: using mdns (local)") } discOpts := discovery.Options{ - Host: n.host, - DiscV5Opts: discV5Opts, - ConnIndex: n.idx, - SubnetsIdx: n.idx, - HostAddress: n.cfg.HostAddress, - HostDNS: n.cfg.HostDNS, - NetworkConfig: n.cfg.Network, + Host: n.host, + DiscV5Opts: discV5Opts, + ConnIndex: n.idx, + SubnetsIdx: n.idx, + HostAddress: n.cfg.HostAddress, + HostDNS: n.cfg.HostDNS, + NetworkConfig: n.cfg.Network, + DiscoveredPeersPool: n.discoveredPeersPool, } disc, err := discovery.NewService(n.ctx, logger, discOpts) if err != nil { diff --git a/network/peers/conn_manager.go b/network/peers/conn_manager.go index c421ece9ef..32f4f3c758 100644 --- a/network/peers/conn_manager.go +++ b/network/peers/conn_manager.go @@ -2,8 +2,6 @@ package peers import ( "context" - "time" - connmgrcore "github.com/libp2p/go-libp2p/core/connmgr" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -17,19 +15,6 @@ const ( ProtectedTag = "ssv/subnets" ) -type DiscoveredPeer struct { - peer.AddrInfo - // ConnectRetries keeps track of how many times we tried to connect to this peer. - ConnectRetries int -} - -var ( - // DiscoveredPeersPool keeps track of recently discovered peers so we can rank them and choose - // the best candidates to connect to. - DiscoveredPeersPool = ttl.New[peer.ID, DiscoveredPeer](15*time.Minute, 5*time.Minute) - TrimmedRecently = ttl.New[peer.ID, struct{}](30*time.Minute, 5*time.Minute) -) - // ConnManager is a wrapper on top of go-libp2p/core/connmgr.ConnManager. // exposing an abstract interface so we can have the flexibility of doing some stuff manually // rather than relaying on libp2p's connection manager. @@ -48,6 +33,7 @@ type connManager struct { connManager connmgrcore.ConnManager subnetsIdx SubnetsIndex gossipScoreIndex GossipScoreIndex + trimmedRecently *ttl.Map[peer.ID, struct{}] } // NewConnManager creates a new conn manager. @@ -76,7 +62,7 @@ func (c connManager) TrimPeers(ctx context.Context, logger *zap.Logger, net libp if err := c.disconnect(pid, net); err != nil { logger.Debug("error closing peer", fields.PeerID(pid), zap.Error(err)) } - TrimmedRecently.Set(pid, struct{}{}) // record stats + c.trimmedRecently.Set(pid, struct{}{}) // record stats trimmed = append(trimmed, pid) if len(trimmed) >= maxTrims { break diff --git a/network/peers/connections/conn_gater.go b/network/peers/connections/conn_gater.go index 7c37b3ed5d..cbd20655de 100644 --- a/network/peers/connections/conn_gater.go +++ b/network/peers/connections/conn_gater.go @@ -1,6 +1,7 @@ package connections import ( + "github.com/ssvlabs/ssv/utils/ttl" "runtime" "time" @@ -13,7 +14,6 @@ import ( manet "github.com/multiformats/go-multiaddr/net" leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" "github.com/ssvlabs/ssv/logging/fields" - "github.com/ssvlabs/ssv/network/peers" "go.uber.org/zap" ) @@ -36,10 +36,18 @@ type connGater struct { ipLimiter *leakybucket.Collector isBadPeer IsBadPeerF atInboundLimit AtInboundLimitF + trimmedRecently *ttl.Map[peer.ID, struct{}] } // NewConnectionGater creates a new instance of ConnectionGater -func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, isBadPeer IsBadPeerF, atInboundLimit AtInboundLimitF) connmgr.ConnectionGater { +func NewConnectionGater( + logger *zap.Logger, + disable bool, + atLimit func() bool, + isBadPeer IsBadPeerF, + atInboundLimit AtInboundLimitF, + trimmedRecently *ttl.Map[peer.ID, struct{}], +) connmgr.ConnectionGater { return &connGater{ logger: logger, disable: disable, @@ -47,6 +55,7 @@ func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, i ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true), isBadPeer: isBadPeer, atInboundLimit: atInboundLimit, + trimmedRecently: trimmedRecently, } } @@ -94,7 +103,7 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo // InterceptSecured is called for both inbound and outbound connections, // after a security handshake has taken place and we've authenticated the peer. func (n *connGater) InterceptSecured(direction libp2pnetwork.Direction, id peer.ID, multiaddrs libp2pnetwork.ConnMultiaddrs) bool { - if peers.TrimmedRecently.Has(id) { + if n.trimmedRecently.Has(id) { n.logger.Debug( "InterceptSecured: trying to connect a peer we've recently trimmed", zap.String("conn_direction", direction.String()), diff --git a/network/peers/connections/conn_handler.go b/network/peers/connections/conn_handler.go index 5261e2be31..88ae33c5d3 100644 --- a/network/peers/connections/conn_handler.go +++ b/network/peers/connections/conn_handler.go @@ -2,6 +2,8 @@ package connections import ( "context" + "github.com/ssvlabs/ssv/network/discovery" + "github.com/ssvlabs/ssv/utils/ttl" "sync" "time" @@ -25,11 +27,12 @@ type ConnHandler interface { type connHandler struct { ctx context.Context - handshaker Handshaker - subnetsProvider SubnetsProvider - subnetsIndex peers.SubnetsIndex - connIdx peers.ConnectionIndex - peerInfos peers.PeerInfoIndex + handshaker Handshaker + subnetsProvider SubnetsProvider + subnetsIndex peers.SubnetsIndex + connIdx peers.ConnectionIndex + peerInfos peers.PeerInfoIndex + discoveredPeersPool *ttl.Map[peer.ID, discovery.DiscoveredPeer] } // NewConnHandler creates a new connection handler @@ -40,14 +43,16 @@ func NewConnHandler( subnetsIndex peers.SubnetsIndex, connIdx peers.ConnectionIndex, peerInfos peers.PeerInfoIndex, + discoveredPeersPool *ttl.Map[peer.ID, discovery.DiscoveredPeer], ) ConnHandler { return &connHandler{ - ctx: ctx, - handshaker: handshaker, - subnetsProvider: subnetsProvider, - subnetsIndex: subnetsIndex, - connIdx: connIdx, - peerInfos: peerInfos, + ctx: ctx, + handshaker: handshaker, + subnetsProvider: subnetsProvider, + subnetsIndex: subnetsIndex, + connIdx: connIdx, + peerInfos: peerInfos, + discoveredPeersPool: discoveredPeersPool, } } @@ -191,13 +196,13 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle { ch.peerInfos.SetState(conn.RemotePeer(), peers.StateConnected) logger.Debug("peer connected") - // if this connection is the one we found through discovery - remove it from DiscoveredPeersPool + // if this connection is the one we found through discovery - remove it from discoveredPeersPool // so we won't be retrying connecting that same peer again until discovery stumbles upon it again // (discovery also filters out peers we are already connected to, meaning it should re-discover // that same peer only after we'll disconnect him). Note, this is best-effort solution meaning // we still might try connecting to this peer even though we've connected him here - this is // because it would be hard to implement the prevention for this that works atomically - peers.DiscoveredPeersPool.Delete(conn.RemotePeer()) + ch.discoveredPeersPool.Delete(conn.RemotePeer()) }() }, DisconnectedF: func(net libp2pnetwork.Network, conn libp2pnetwork.Conn) {