From 10c442b880d3d705bf4792d82f86c4bbf1afbfd4 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 2 Apr 2024 19:15:24 +0300 Subject: [PATCH] feat(p2p/peerTracker): remove gc and add subscriptions --- p2p/exchange.go | 6 +- p2p/exchange_test.go | 70 +++++++++++----- p2p/peer_tracker.go | 195 +++++++++++++++++++++---------------------- 3 files changed, 149 insertions(+), 122 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index fbf1f23a..9f0cf9de 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]]( } } + id := protocolID(params.networkID) ex := &Exchange[H]{ host: host, - protocolID: protocolID(params.networkID), - peerTracker: newPeerTracker(host, gater, params.pidstore, metrics), + protocolID: id, + peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics), Params: params, metrics: metrics, } @@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error { ex.ctx, ex.cancel = context.WithCancel(context.Background()) log.Infow("client: starting client", "protocol ID", ex.protocolID) - go ex.peerTracker.gc() go ex.peerTracker.track() // bootstrap the peerTracker with trusted peers as well as previously seen diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index 1a775bd3..466b0b82 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -8,12 +8,14 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p" libhost "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/stretchr/testify/assert" @@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { require.NoError(t, err) servers[index].Start(context.Background()) //nolint:errcheck exchange.peerTracker.peerLk.Lock() - exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()} + exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{} exchange.peerTracker.peerLk.Unlock() } @@ -262,12 +264,20 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { // TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range // from another peer with lower score after receiving header.ErrNotFound func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) { - hosts := createMocknet(t, 3) + hosts := quicHosts(t, 3) + // create client + server(it does not have needed headers) exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) + + serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10) + tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore} + + hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100) + hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90) + // create one more server(with more headers in the store) serverSideEx, err := NewExchangeServer[*headertest.DummyHeader]( - hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10), + hosts[2], tmServerSideStore, WithNetworkID[ServerParameters](networkID), ) require.NoError(t, err) @@ -275,17 +285,17 @@ func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) { t.Cleanup(func() { serverSideEx.Stop(context.Background()) //nolint:errcheck }) + exchg.peerTracker.peerLk.Lock() - exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20} + exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() - h, err := store.GetByHeight(context.Background(), 5) require.NoError(t, err) _, err = exchg.GetRangeByHeight(context.Background(), h, 8) require.NoError(t, err) // ensure that peerScore for the second peer is changed - newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score() + newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID()) require.NotEqual(t, 20, newPeerScore) } @@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) { func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { // create blankhost because mocknet does not support deadlines swarm0 := swarm.GenSwarm(t) - host0 := blankhost.NewBlankHost(swarm0) + mngr, err := connmgr.NewConnManager(0, 50) + require.NoError(t, err) + host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr)) swarm1 := swarm.GenSwarm(t) host1 := blankhost.NewBlankHost(swarm1) swarm2 := swarm.GenSwarm(t) @@ -495,9 +507,10 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { t.Cleanup(func() { serverSideEx.Stop(context.Background()) //nolint:errcheck }) - prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score() + prevScore := score(t, exchg.host, host1.ID()) exchg.peerTracker.peerLk.Lock() - exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200} + host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100) + exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() gen, err := store.GetByHeight(context.Background(), 1) @@ -505,14 +518,14 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { _, err = exchg.GetRangeByHeight(context.Background(), gen, 3) require.NoError(t, err) - newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score() + newPeerScore := score(t, exchg.host, host1.ID()) assert.NotEqual(t, newPeerScore, prevScore) } // TestExchange_RequestPartialRange enusres in case of receiving a partial response // from server, Exchange will re-request remaining headers from another peer func TestExchange_RequestPartialRange(t *testing.T) { - hosts := createMocknet(t, 3) + hosts := quicHosts(t, 3) exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) // create one more server(with more headers in the store) @@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) + hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100) + require.NoError(t, err) require.NoError(t, serverSideEx.Start(ctx)) exchg.peerTracker.peerLk.Lock() - prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore - prevScoreBefore2 := 50 - // reducing peerScore of the second server, so our exchange will request host[1] first. - exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50} + prevScoreBefore1 := score(t, exchg.host, hosts[1].ID()) + prevScoreBefore2 := score(t, exchg.host, hosts[2].ID()) + exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() gen, err := store.GetByHeight(context.Background(), 1) @@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) { require.NoError(t, err) exchg.peerTracker.peerLk.Lock() - prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore - prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore + prevScoreAfter1 := score(t, exchg.host, hosts[1].ID()) + prevScoreAfter2 := score(t, exchg.host, hosts[2].ID()) exchg.peerTracker.peerLk.Unlock() assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1) @@ -561,7 +575,6 @@ func createP2PExAndServer( host, tpeer libhost.Host, ) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) { store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5) - serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store, WithNetworkID[ServerParameters](networkID), ) @@ -582,7 +595,7 @@ func createP2PExAndServer( time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer ex.peerTracker.peerLk.Lock() - ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0} + ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{} ex.peerTracker.peerLk.Unlock() t.Cleanup(func() { @@ -595,12 +608,15 @@ func createP2PExAndServer( func quicHosts(t *testing.T, n int) []libhost.Host { hosts := make([]libhost.Host, n) + var err error for i := range hosts { - swrm := swarm.GenSwarm(t, swarm.OptDisableTCP) - hosts[i] = blankhost.NewBlankHost(swrm) + require.NoError(t, err) + hosts[i], err = libp2p.New() for _, host := range hosts[:i] { hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL) host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL) + err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()}) + require.NoError(t, err) } } @@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D time.Sleep(t.timeout) return nil, header.ErrNoHead } + +func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) { + time.Sleep(t.timeout) + return t.Store.GetRange(ctx, from, to) +} + +func score(t *testing.T, h libhost.Host, id peer.ID) int { + t.Helper() + tags := h.ConnManager().GetTagInfo(id) + tag, _ := tags.Tags[string(protocolID(networkID))] + return tag +} diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index e2c4f3cf..f7b9ad27 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -2,6 +2,8 @@ package p2p import ( "context" + "errors" + "slices" "sync" "time" @@ -9,33 +11,20 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" libpeer "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/net/conngater" ) -// defaultScore specifies the score for newly connected peers. -const defaultScore float32 = 1 - -var ( - // maxAwaitingTime specifies the duration that gives to the disconnected peer to be back online, - // otherwise it will be removed on the next GC cycle. - maxAwaitingTime = time.Hour - // gcCycle defines the duration after which the peerTracker starts removing peers. - gcCycle = time.Minute * 5 -) - type peerTracker struct { - host host.Host - connGater *conngater.BasicConnectionGater - metrics *exchangeMetrics - - peerLk sync.RWMutex + host host.Host + connGater *conngater.BasicConnectionGater + metrics *exchangeMetrics + protocolID protocol.ID + peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers - trackedPeers map[libpeer.ID]*peerStat - // disconnectedPeers contains disconnected peers. In case if peer does not return - // online until pruneDeadline, it will be removed and its score will be lost - disconnectedPeers map[libpeer.ID]*peerStat + trackedPeers map[libpeer.ID]struct{} // an optional interface used to periodically dump // good peers during garbage collection @@ -44,27 +33,28 @@ type peerTracker struct { ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. - // It allows to wait until track() and gc() will be stopped. + // It allows to wait until track() will be stopped. done chan struct{} } func newPeerTracker( h host.Host, connGater *conngater.BasicConnectionGater, + networkID string, pidstore PeerIDStore, metrics *exchangeMetrics, ) *peerTracker { ctx, cancel := context.WithCancel(context.Background()) return &peerTracker{ - host: h, - connGater: connGater, - metrics: metrics, - trackedPeers: make(map[libpeer.ID]*peerStat), - disconnectedPeers: make(map[libpeer.ID]*peerStat), - pidstore: pidstore, - ctx: ctx, - cancel: cancel, - done: make(chan struct{}, 2), + host: h, + connGater: connGater, + protocolID: protocolID(networkID), + metrics: metrics, + trackedPeers: make(map[libpeer.ID]struct{}), + pidstore: pidstore, + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), } } @@ -75,6 +65,10 @@ func newPeerTracker( // NOTE: bootstrap is intended to be used with an on-disk peerstore.Peerstore as // the peerTracker needs access to the previously-seen peers' AddrInfo on start. func (p *peerTracker) bootstrap(trusted []libpeer.ID) error { + // store peers that have been already connected + for _, c := range p.host.Network().Conns() { + go p.connected(c.RemotePeer()) + } for _, trust := range trusted { go p.connectToPeer(p.ctx, trust) } @@ -102,7 +96,6 @@ func (p *peerTracker) connectToPeer(ctx context.Context, peer libpeer.ID) { log.Debugw("failed to connect to peer", "id", peer.String(), "err", err) return } - log.Debugw("connected to peer", "id", peer.String()) } func (p *peerTracker) track() { @@ -110,33 +103,48 @@ func (p *peerTracker) track() { p.done <- struct{}{} }() - // store peers that have been already connected - for _, c := range p.host.Network().Conns() { - p.connected(c.RemotePeer()) + connSubs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + if err != nil { + log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) + return } - subs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + identifySub, err := p.host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) if err != nil { - log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) + log.Errorw("subscribing to EvtPeerIdentificationCompleted", "err", err) + return + } + + protocolSub, err := p.host.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) + if err != nil { + log.Errorw("subscribing to EvtPeerProtocolsUpdated", "err", err) return } for { select { case <-p.ctx.Done(): - err = subs.Close() + err = connSubs.Close() + errors.Join(err, identifySub.Close(), protocolSub.Close()) if err != nil { - log.Errorw("closing subscription", "err", err) + log.Errorw("closing subscriptions", "err", err) } return - case subscription := <-subs.Out(): - ev := subscription.(event.EvtPeerConnectednessChanged) - switch ev.Connectedness { - case network.Connected: - p.connected(ev.Peer) - case network.NotConnected: + case connSubscription := <-connSubs.Out(): + ev := connSubscription.(event.EvtPeerConnectednessChanged) + if network.NotConnected == ev.Connectedness { + p.disconnected(ev.Peer) + } + case subscription := <-identifySub.Out(): + ev := subscription.(event.EvtPeerIdentificationCompleted) + p.connected(ev.Peer) + case subscription := <-protocolSub.Out(): + ev := subscription.(event.EvtPeerProtocolsUpdated) + if slices.Contains(ev.Removed, p.protocolID) { p.disconnected(ev.Peer) + break } + p.connected(ev.Peer) } } } @@ -157,10 +165,23 @@ func (p *peerTracker) getPeers(max int) []libpeer.ID { } func (p *peerTracker) connected(pID libpeer.ID) { + if err := pID.Validate(); err != nil { + return + } + if p.host.ID() == pID { return } + // check that peer supports our protocol id. + protocol, err := p.host.Peerstore().SupportsProtocols(pID, p.protocolID) + if err != nil { + return + } + if !slices.Contains(protocol, p.protocolID) { + return + } + for _, c := range p.host.Network().ConnsToPeer(pID) { // check if connection is short-termed and skip this peer if c.Stat().Limited { @@ -170,31 +191,30 @@ func (p *peerTracker) connected(pID libpeer.ID) { p.peerLk.Lock() defer p.peerLk.Unlock() - - // additional check in p.trackedPeers should be done, - // because libp2p does not emit multiple Connected events per 1 peer - stats, ok := p.disconnectedPeers[pID] - if !ok { - stats = &peerStat{peerID: pID, peerScore: defaultScore} - } else { - delete(p.disconnectedPeers, pID) + if _, ok := p.trackedPeers[pID]; ok { + return } - p.trackedPeers[pID] = stats + + log.Debugw("connected to peer", "id", pID.String()) + p.trackedPeers[pID] = struct{}{} p.metrics.peersTracked(1) } func (p *peerTracker) disconnected(pID libpeer.ID) { + if err := pID.Validate(); err != nil { + return + } + p.peerLk.Lock() defer p.peerLk.Unlock() - stats, ok := p.trackedPeers[pID] - if !ok { + if _, ok := p.trackedPeers[pID]; !ok { return } - stats.pruneDeadline = time.Now().Add(maxAwaitingTime) - p.disconnectedPeers[pID] = stats delete(p.trackedPeers, pID) + p.host.ConnManager().UntagPeer(pID, string(p.protocolID)) + p.metrics.peersTracked(-1) p.metrics.peersDisconnected(1) } @@ -202,44 +222,16 @@ func (p *peerTracker) disconnected(pID libpeer.ID) { func (p *peerTracker) peers() []*peerStat { p.peerLk.RLock() defer p.peerLk.RUnlock() - peers := make([]*peerStat, 0, len(p.trackedPeers)) - for _, stat := range p.trackedPeers { - peers = append(peers, stat) - } - return peers -} - -// gc goes through connected and disconnected peers once every gcPeriod -// and removes: -// * disconnected peers which have been disconnected for more than maxAwaitingTime; -// * connected peers whose scores are less than or equal than defaultScore; -func (p *peerTracker) gc() { - ticker := time.NewTicker(gcCycle) - for { - select { - case <-p.ctx.Done(): - p.done <- struct{}{} - return - case <-ticker.C: - p.cleanUpDisconnectedPeers() - p.dumpPeers(p.ctx) - } - } -} -func (p *peerTracker) cleanUpDisconnectedPeers() { - p.peerLk.Lock() - defer p.peerLk.Unlock() - - now := time.Now() - var deletedDisconnectedNum int - for id, peer := range p.disconnectedPeers { - if peer.pruneDeadline.Before(now) { - delete(p.disconnectedPeers, id) - deletedDisconnectedNum++ + peers := make([]*peerStat, 0) + for peerID := range p.trackedPeers { + score := 0 + if info := p.host.ConnManager().GetTagInfo(peerID); info != nil { + score = info.Tags[string(p.protocolID)] } + peers = append(peers, &peerStat{peerID: peerID, peerScore: score}) } - p.metrics.peersDisconnected(-deletedDisconnectedNum) + return peers } // dumpPeers stores peers to the peerTracker's PeerIDStore if @@ -272,12 +264,10 @@ func (p *peerTracker) dumpPeers(ctx context.Context) { func (p *peerTracker) stop(ctx context.Context) error { p.cancel() - for i := 0; i < cap(p.done); i++ { - select { - case <-p.done: - case <-ctx.Done(): - return ctx.Err() - } + select { + case <-p.done: + case <-ctx.Done(): + return ctx.Err() } // dump remaining tracked peers @@ -287,6 +277,10 @@ func (p *peerTracker) stop(ctx context.Context) error { // blockPeer blocks a peer on the networking level and removes it from the local cache. func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { + if err := pID.Validate(); err != nil { + return + } + // add peer to the blacklist, so we can't connect to it in the future. err := p.connGater.BlockPeer(pID) if err != nil { @@ -301,3 +295,8 @@ func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { log.Warnw("header/p2p: blocked peer", "pID", pID, "reason", reason) p.metrics.peerBlocked() } + +func (p *peerTracker) updateScore(stats *peerStat, size uint64, duration time.Duration) { + score := stats.updateStats(size, duration) + p.host.ConnManager().TagPeer(stats.peerID, string(p.protocolID), score) +}