Skip to content

Commit

Permalink
get rid of global variables
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Feb 26, 2025
1 parent ca56aa5 commit e4ae99e
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 67 deletions.
5 changes: 2 additions & 3 deletions network/discovery/dv5_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -66,7 +65,7 @@ func (dvs *DiscV5Service) recentlyTrimmedFilter() func(node *enode.Node) bool {
if err != nil {
return false
}
return !peers.TrimmedRecently.Has(pid)
return !dvs.trimmedRecently.Has(pid)
}
}

Expand Down Expand Up @@ -117,7 +116,7 @@ func (dvs *DiscV5Service) alreadyDiscoveredFilter(logger *zap.Logger) func(node
logger.Warn("could not get peer ID from node record", zap.Error(err))
return false
}
if peers.DiscoveredPeersPool.Has(pID) {
if dvs.discoveredPeersPool.Has(pID) {
// this log line is commented out as it is too spammy
//n.interfaceLogger.Debug(
// "discovery proposed peer, this proposal is already in proposal-pool",
Expand Down
24 changes: 17 additions & 7 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/ssvlabs/ssv/utils/ttl"
"net"
"time"

Expand Down Expand Up @@ -58,6 +59,14 @@ type DiscV5Service struct {
conns peers.ConnectionIndex
subnetsIdx peers.SubnetsIndex

// discoveredPeersPool keeps track of recently discovered peers so we can rank them and choose
// the best candidates to connect to.
discoveredPeersPool *ttl.Map[peer.ID, DiscoveredPeer]
// trimmedRecently keeps track of recently trimmed peers so we don't try to connect to these
// shortly after we've trimmed these (we still might consider connecting to these once they
// are removed from this map after some time passes)
trimmedRecently *ttl.Map[peer.ID, struct{}]

conn *net.UDPConn
sharedConn *SharedUDPConn

Expand All @@ -70,13 +79,14 @@ type DiscV5Service struct {
func newDiscV5Service(pctx context.Context, logger *zap.Logger, opts *Options) (Service, error) {
ctx, cancel := context.WithCancel(pctx)
dvs := DiscV5Service{
ctx: ctx,
cancel: cancel,
conns: opts.ConnIndex,
subnetsIdx: opts.SubnetsIdx,
networkConfig: opts.NetworkConfig,
subnets: opts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
conns: opts.ConnIndex,
subnetsIdx: opts.SubnetsIdx,
networkConfig: opts.NetworkConfig,
subnets: opts.DiscV5Opts.Subnets,
publishLock: make(chan struct{}, 1),
discoveredPeersPool: opts.DiscoveredPeersPool,
}

logger.Debug(
Expand Down
22 changes: 15 additions & 7 deletions network/discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"github.com/ssvlabs/ssv/utils/ttl"
"io"

"github.com/ethereum/go-ethereum/p2p/enode"
Expand Down Expand Up @@ -33,13 +34,14 @@ type HandleNewPeer func(e PeerEvent)

// Options represents the options passed to create a service
type Options struct {
Host host.Host
DiscV5Opts *DiscV5Options
ConnIndex peers.ConnectionIndex
SubnetsIdx peers.SubnetsIndex
HostAddress string
HostDNS string
NetworkConfig networkconfig.NetworkConfig
Host host.Host
DiscV5Opts *DiscV5Options
ConnIndex peers.ConnectionIndex
SubnetsIdx peers.SubnetsIndex
HostAddress string
HostDNS string
NetworkConfig networkconfig.NetworkConfig
DiscoveredPeersPool *ttl.Map[peer.ID, DiscoveredPeer]
}

// Service is the interface for discovery
Expand All @@ -59,3 +61,9 @@ func NewService(ctx context.Context, logger *zap.Logger, opts Options) (Service,
}
return newDiscV5Service(ctx, logger, &opts)
}

type DiscoveredPeer struct {
peer.AddrInfo
// ConnectRetries keeps track of how many times we tried to connect to this peer.
ConnectRetries int
}
32 changes: 23 additions & 9 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/ssvlabs/ssv/utils/ttl"
"math"
"math/rand"
"os"
Expand Down Expand Up @@ -107,10 +108,21 @@ type p2pNetwork struct {
operatorPKHashToPKCache *hashmap.Map[string, []byte] // used for metrics
operatorSigner keys.OperatorSigner
operatorDataStore operatordatastore.OperatorDataStore

// discoveredPeersPool keeps track of recently discovered peers so we can rank them and choose
// the best candidates to connect to.
discoveredPeersPool *ttl.Map[peer.ID, discovery.DiscoveredPeer]
// trimmedRecently keeps track of recently trimmed peers so we don't try to connect to these
// shortly after we've trimmed these (we still might consider connecting to these once they
// are removed from this map after some time passes)
trimmedRecently *ttl.Map[peer.ID, struct{}]
}

// New creates a new p2p network
func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) {
func New(
logger *zap.Logger,
cfg *Config,
) (*p2pNetwork, error) {
ctx, cancel := context.WithCancel(cfg.Ctx)

logger = logger.Named(logging.NameP2PNetwork)
Expand All @@ -129,6 +141,8 @@ func New(logger *zap.Logger, cfg *Config) (*p2pNetwork, error) {
operatorPKHashToPKCache: hashmap.New[string, []byte](),
operatorSigner: cfg.OperatorSigner,
operatorDataStore: cfg.OperatorDataStore,
discoveredPeersPool: ttl.New[peer.ID, discovery.DiscoveredPeer](15*time.Minute, 5*time.Minute),
trimmedRecently: ttl.New[peer.ID, struct{}](30*time.Minute, 5*time.Minute),
}
if err := n.parseTrustedPeers(); err != nil {
return nil, err
Expand Down Expand Up @@ -264,11 +278,11 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
go func() {
// keep discovered peers in a pool so we can choose the best ones
for proposal := range connectorProposals {
discoveredPeer := peers.DiscoveredPeer{
discoveredPeer := discovery.DiscoveredPeer{
AddrInfo: proposal,
ConnectRetries: 0,
}
peers.DiscoveredPeersPool.Set(proposal.ID, discoveredPeer)
n.discoveredPeersPool.Set(proposal.ID, discoveredPeer)

n.interfaceLogger.Debug(
"discovery proposed peer, adding it to the pool",
Expand Down Expand Up @@ -357,7 +371,7 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
}

// peersToConnect is a final set of peers we are gonna try to connect with
peersToConnect := make(map[peer.ID]peers.DiscoveredPeer)
peersToConnect := make(map[peer.ID]discovery.DiscoveredPeer)
// ownSubnetSum represents subnet-sum of peers we already have open connections with
ownSubnetSum := SubnetSum{}
allPeerIDs, err := n.topicsCtrl.Peers("")
Expand All @@ -379,20 +393,20 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
currentSubnetSum := addSubnetSums(ownSubnetSum, peersToConnectSubnetSum)

type peerWithSubnetSum struct {
p peers.DiscoveredPeer
p discovery.DiscoveredPeer
subnetSum SubnetSum
}
// peersByPriority keeps track of best peers (by their peer score)
peersByPriority := lane.NewMaxPriorityQueue[peerWithSubnetSum, float64]()
// minScore and maxScore are used for printing additional debugging info
minScore, maxScore := math.MaxFloat64, 0.0
peers.DiscoveredPeersPool.Range(func(key peer.ID, value peers.DiscoveredPeer) bool {
n.discoveredPeersPool.Range(func(key peer.ID, value discovery.DiscoveredPeer) bool {
const retryLimit = 2
if value.ConnectRetries >= retryLimit {
// this discovered peer has been tried many times already, we'll ignore him but won't
// remove him from DiscoveredPeersPool (since if we do - discovery might suggest this
// remove him from discoveredPeersPool (since if we do - discovery might suggest this
// peer again essentially resetting this peer's retry attempts counter to 0), eventually
// (after some time passes) this peer will automatically get removed from DiscoveredPeersPool
// (after some time passes) this peer will automatically get removed from discoveredPeersPool
// so it can be discovered again (effectively resetting peer's retry attempts counter to 0)

// this log line is commented out as it is too spammy
Expand Down Expand Up @@ -447,7 +461,7 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
// finally, offer the best peers we've picked to connector so it tries to connect these
for _, p := range peersToConnect {
// update retry counter for this peer so we eventually skip it after certain number of retries
peers.DiscoveredPeersPool.Set(p.ID, peers.DiscoveredPeer{
n.discoveredPeersPool.Set(p.ID, discovery.DiscoveredPeer{
AddrInfo: p.AddrInfo,
ConnectRetries: p.ConnectRetries + 1,
})
Expand Down
26 changes: 17 additions & 9 deletions network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,14 @@ func (n *p2pNetwork) SetupHost(logger *zap.Logger) error {
if err != nil {
return errors.Wrap(err, "could not create resource manager")
}
n.connGater = connections.NewConnectionGater(logger, n.cfg.DisableIPRateLimit, n.connectionsAtLimit, n.IsBadPeer, n.atInboundLimit)
n.connGater = connections.NewConnectionGater(
logger,
n.cfg.DisableIPRateLimit,
n.connectionsAtLimit,
n.IsBadPeer,
n.atInboundLimit,
n.trimmedRecently,
)
opts = append(opts, libp2p.ResourceManager(rmgr), libp2p.ConnectionGater(n.connGater))
host, err := libp2p.New(opts...)
if err != nil {
Expand Down Expand Up @@ -234,7 +241,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error {
n.host.SetStreamHandler(peers.NodeInfoProtocol, handshaker.Handler(logger))
logger.Debug("handshaker is ready")

n.connHandler = connections.NewConnHandler(n.ctx, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx)
n.connHandler = connections.NewConnHandler(n.ctx, handshaker, n.ActiveSubnets, n.idx, n.idx, n.idx, n.discoveredPeersPool)
n.host.Network().Notify(n.connHandler.Handle(logger))
logger.Debug("connection handler is ready")

Expand Down Expand Up @@ -276,13 +283,14 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error {
logger.Info("discovery: using mdns (local)")
}
discOpts := discovery.Options{
Host: n.host,
DiscV5Opts: discV5Opts,
ConnIndex: n.idx,
SubnetsIdx: n.idx,
HostAddress: n.cfg.HostAddress,
HostDNS: n.cfg.HostDNS,
NetworkConfig: n.cfg.Network,
Host: n.host,
DiscV5Opts: discV5Opts,
ConnIndex: n.idx,
SubnetsIdx: n.idx,
HostAddress: n.cfg.HostAddress,
HostDNS: n.cfg.HostDNS,
NetworkConfig: n.cfg.Network,
DiscoveredPeersPool: n.discoveredPeersPool,
}
disc, err := discovery.NewService(n.ctx, logger, discOpts)
if err != nil {
Expand Down
18 changes: 2 additions & 16 deletions network/peers/conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package peers

import (
"context"
"time"

connmgrcore "github.com/libp2p/go-libp2p/core/connmgr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -17,19 +15,6 @@ const (
ProtectedTag = "ssv/subnets"
)

type DiscoveredPeer struct {
peer.AddrInfo
// ConnectRetries keeps track of how many times we tried to connect to this peer.
ConnectRetries int
}

var (
// DiscoveredPeersPool keeps track of recently discovered peers so we can rank them and choose
// the best candidates to connect to.
DiscoveredPeersPool = ttl.New[peer.ID, DiscoveredPeer](15*time.Minute, 5*time.Minute)
TrimmedRecently = ttl.New[peer.ID, struct{}](30*time.Minute, 5*time.Minute)
)

// ConnManager is a wrapper on top of go-libp2p/core/connmgr.ConnManager.
// exposing an abstract interface so we can have the flexibility of doing some stuff manually
// rather than relaying on libp2p's connection manager.
Expand All @@ -48,6 +33,7 @@ type connManager struct {
connManager connmgrcore.ConnManager
subnetsIdx SubnetsIndex
gossipScoreIndex GossipScoreIndex
trimmedRecently *ttl.Map[peer.ID, struct{}]
}

// NewConnManager creates a new conn manager.
Expand Down Expand Up @@ -76,7 +62,7 @@ func (c connManager) TrimPeers(ctx context.Context, logger *zap.Logger, net libp
if err := c.disconnect(pid, net); err != nil {
logger.Debug("error closing peer", fields.PeerID(pid), zap.Error(err))
}
TrimmedRecently.Set(pid, struct{}{}) // record stats
c.trimmedRecently.Set(pid, struct{}{}) // record stats
trimmed = append(trimmed, pid)
if len(trimmed) >= maxTrims {
break
Expand Down
15 changes: 12 additions & 3 deletions network/peers/connections/conn_gater.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package connections

import (
"github.com/ssvlabs/ssv/utils/ttl"
"runtime"
"time"

Expand All @@ -13,7 +14,6 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/network/peers"
"go.uber.org/zap"
)

Expand All @@ -36,17 +36,26 @@ type connGater struct {
ipLimiter *leakybucket.Collector
isBadPeer IsBadPeerF
atInboundLimit AtInboundLimitF
trimmedRecently *ttl.Map[peer.ID, struct{}]
}

// NewConnectionGater creates a new instance of ConnectionGater
func NewConnectionGater(logger *zap.Logger, disable bool, atLimit func() bool, isBadPeer IsBadPeerF, atInboundLimit AtInboundLimitF) connmgr.ConnectionGater {
func NewConnectionGater(
logger *zap.Logger,
disable bool,
atLimit func() bool,
isBadPeer IsBadPeerF,
atInboundLimit AtInboundLimitF,
trimmedRecently *ttl.Map[peer.ID, struct{}],
) connmgr.ConnectionGater {
return &connGater{
logger: logger,
disable: disable,
atMaxPeersLimit: atLimit,
ipLimiter: leakybucket.NewCollector(ipLimitRate, ipLimitBurst, ipLimitPeriod, true),
isBadPeer: isBadPeer,
atInboundLimit: atInboundLimit,
trimmedRecently: trimmedRecently,
}
}

Expand Down Expand Up @@ -94,7 +103,7 @@ func (n *connGater) InterceptAccept(multiaddrs libp2pnetwork.ConnMultiaddrs) boo
// InterceptSecured is called for both inbound and outbound connections,
// after a security handshake has taken place and we've authenticated the peer.
func (n *connGater) InterceptSecured(direction libp2pnetwork.Direction, id peer.ID, multiaddrs libp2pnetwork.ConnMultiaddrs) bool {
if peers.TrimmedRecently.Has(id) {
if n.trimmedRecently.Has(id) {
n.logger.Debug(
"InterceptSecured: trying to connect a peer we've recently trimmed",
zap.String("conn_direction", direction.String()),
Expand Down
Loading

0 comments on commit e4ae99e

Please sign in to comment.