From 80ae9b40c5d90800d8a8c7940c738195116635e3 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 26 Feb 2024 16:52:22 +0200 Subject: [PATCH 1/5] remove limiting in peer tracker --- p2p/peer_tracker.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index b5676dd9..e2c4f3cf 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -12,12 +12,8 @@ import ( "github.com/libp2p/go-libp2p/p2p/net/conngater" ) -const ( - // defaultScore specifies the score for newly connected peers. - defaultScore float32 = 1 - // maxPeerTrackerSize specifies the max amount of peers that can be added to the peerTracker. - maxPeerTrackerSize = 100 -) +// defaultScore specifies the score for newly connected peers. +const defaultScore float32 = 1 var ( // maxAwaitingTime specifies the duration that gives to the disconnected peer to be back online, @@ -174,13 +170,6 @@ func (p *peerTracker) connected(pID libpeer.ID) { p.peerLk.Lock() defer p.peerLk.Unlock() - // skip adding the peer to avoid overfilling of the peerTracker with unused peers if: - // peerTracker reaches the maxTrackerSize and there are more connected peers - // than disconnected peers. - if len(p.trackedPeers)+len(p.disconnectedPeers) > maxPeerTrackerSize && - len(p.trackedPeers) > len(p.disconnectedPeers) { - return - } // additional check in p.trackedPeers should be done, // because libp2p does not emit multiple Connected events per 1 peer From 10c442b880d3d705bf4792d82f86c4bbf1afbfd4 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 2 Apr 2024 19:15:24 +0300 Subject: [PATCH 2/5] feat(p2p/peerTracker): remove gc and add subscriptions --- p2p/exchange.go | 6 +- p2p/exchange_test.go | 70 +++++++++++----- p2p/peer_tracker.go | 195 +++++++++++++++++++++---------------------- 3 files changed, 149 insertions(+), 122 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index fbf1f23a..9f0cf9de 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]]( } } + id := protocolID(params.networkID) ex := &Exchange[H]{ host: host, - protocolID: protocolID(params.networkID), - peerTracker: newPeerTracker(host, gater, params.pidstore, metrics), + protocolID: id, + peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics), Params: params, metrics: metrics, } @@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error { ex.ctx, ex.cancel = context.WithCancel(context.Background()) log.Infow("client: starting client", "protocol ID", ex.protocolID) - go ex.peerTracker.gc() go ex.peerTracker.track() // bootstrap the peerTracker with trusted peers as well as previously seen diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index 1a775bd3..466b0b82 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -8,12 +8,14 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p" libhost "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/stretchr/testify/assert" @@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { require.NoError(t, err) servers[index].Start(context.Background()) //nolint:errcheck exchange.peerTracker.peerLk.Lock() - exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()} + exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{} exchange.peerTracker.peerLk.Unlock() } @@ -262,12 +264,20 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { // TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range // from another peer with lower score after receiving header.ErrNotFound func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) { - hosts := createMocknet(t, 3) + hosts := quicHosts(t, 3) + // create client + server(it does not have needed headers) exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) + + serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10) + tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore} + + hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100) + hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90) + // create one more server(with more headers in the store) serverSideEx, err := NewExchangeServer[*headertest.DummyHeader]( - hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10), + hosts[2], tmServerSideStore, WithNetworkID[ServerParameters](networkID), ) require.NoError(t, err) @@ -275,17 +285,17 @@ func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) { t.Cleanup(func() { serverSideEx.Stop(context.Background()) //nolint:errcheck }) + exchg.peerTracker.peerLk.Lock() - exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20} + exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() - h, err := store.GetByHeight(context.Background(), 5) require.NoError(t, err) _, err = exchg.GetRangeByHeight(context.Background(), h, 8) require.NoError(t, err) // ensure that peerScore for the second peer is changed - newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score() + newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID()) require.NotEqual(t, 20, newPeerScore) } @@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) { func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { // create blankhost because mocknet does not support deadlines swarm0 := swarm.GenSwarm(t) - host0 := blankhost.NewBlankHost(swarm0) + mngr, err := connmgr.NewConnManager(0, 50) + require.NoError(t, err) + host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr)) swarm1 := swarm.GenSwarm(t) host1 := blankhost.NewBlankHost(swarm1) swarm2 := swarm.GenSwarm(t) @@ -495,9 +507,10 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { t.Cleanup(func() { serverSideEx.Stop(context.Background()) //nolint:errcheck }) - prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score() + prevScore := score(t, exchg.host, host1.ID()) exchg.peerTracker.peerLk.Lock() - exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200} + host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100) + exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() gen, err := store.GetByHeight(context.Background(), 1) @@ -505,14 +518,14 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { _, err = exchg.GetRangeByHeight(context.Background(), gen, 3) require.NoError(t, err) - newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score() + newPeerScore := score(t, exchg.host, host1.ID()) assert.NotEqual(t, newPeerScore, prevScore) } // TestExchange_RequestPartialRange enusres in case of receiving a partial response // from server, Exchange will re-request remaining headers from another peer func TestExchange_RequestPartialRange(t *testing.T) { - hosts := createMocknet(t, 3) + hosts := quicHosts(t, 3) exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) // create one more server(with more headers in the store) @@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) + hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100) + require.NoError(t, err) require.NoError(t, serverSideEx.Start(ctx)) exchg.peerTracker.peerLk.Lock() - prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore - prevScoreBefore2 := 50 - // reducing peerScore of the second server, so our exchange will request host[1] first. - exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50} + prevScoreBefore1 := score(t, exchg.host, hosts[1].ID()) + prevScoreBefore2 := score(t, exchg.host, hosts[2].ID()) + exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() gen, err := store.GetByHeight(context.Background(), 1) @@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) { require.NoError(t, err) exchg.peerTracker.peerLk.Lock() - prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore - prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore + prevScoreAfter1 := score(t, exchg.host, hosts[1].ID()) + prevScoreAfter2 := score(t, exchg.host, hosts[2].ID()) exchg.peerTracker.peerLk.Unlock() assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1) @@ -561,7 +575,6 @@ func createP2PExAndServer( host, tpeer libhost.Host, ) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) { store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5) - serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store, WithNetworkID[ServerParameters](networkID), ) @@ -582,7 +595,7 @@ func createP2PExAndServer( time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer ex.peerTracker.peerLk.Lock() - ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0} + ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{} ex.peerTracker.peerLk.Unlock() t.Cleanup(func() { @@ -595,12 +608,15 @@ func createP2PExAndServer( func quicHosts(t *testing.T, n int) []libhost.Host { hosts := make([]libhost.Host, n) + var err error for i := range hosts { - swrm := swarm.GenSwarm(t, swarm.OptDisableTCP) - hosts[i] = blankhost.NewBlankHost(swrm) + require.NoError(t, err) + hosts[i], err = libp2p.New() for _, host := range hosts[:i] { hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL) host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL) + err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()}) + require.NoError(t, err) } } @@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D time.Sleep(t.timeout) return nil, header.ErrNoHead } + +func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) { + time.Sleep(t.timeout) + return t.Store.GetRange(ctx, from, to) +} + +func score(t *testing.T, h libhost.Host, id peer.ID) int { + t.Helper() + tags := h.ConnManager().GetTagInfo(id) + tag, _ := tags.Tags[string(protocolID(networkID))] + return tag +} diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index e2c4f3cf..f7b9ad27 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -2,6 +2,8 @@ package p2p import ( "context" + "errors" + "slices" "sync" "time" @@ -9,33 +11,20 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" libpeer "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/net/conngater" ) -// defaultScore specifies the score for newly connected peers. -const defaultScore float32 = 1 - -var ( - // maxAwaitingTime specifies the duration that gives to the disconnected peer to be back online, - // otherwise it will be removed on the next GC cycle. - maxAwaitingTime = time.Hour - // gcCycle defines the duration after which the peerTracker starts removing peers. - gcCycle = time.Minute * 5 -) - type peerTracker struct { - host host.Host - connGater *conngater.BasicConnectionGater - metrics *exchangeMetrics - - peerLk sync.RWMutex + host host.Host + connGater *conngater.BasicConnectionGater + metrics *exchangeMetrics + protocolID protocol.ID + peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers - trackedPeers map[libpeer.ID]*peerStat - // disconnectedPeers contains disconnected peers. In case if peer does not return - // online until pruneDeadline, it will be removed and its score will be lost - disconnectedPeers map[libpeer.ID]*peerStat + trackedPeers map[libpeer.ID]struct{} // an optional interface used to periodically dump // good peers during garbage collection @@ -44,27 +33,28 @@ type peerTracker struct { ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. - // It allows to wait until track() and gc() will be stopped. + // It allows to wait until track() will be stopped. done chan struct{} } func newPeerTracker( h host.Host, connGater *conngater.BasicConnectionGater, + networkID string, pidstore PeerIDStore, metrics *exchangeMetrics, ) *peerTracker { ctx, cancel := context.WithCancel(context.Background()) return &peerTracker{ - host: h, - connGater: connGater, - metrics: metrics, - trackedPeers: make(map[libpeer.ID]*peerStat), - disconnectedPeers: make(map[libpeer.ID]*peerStat), - pidstore: pidstore, - ctx: ctx, - cancel: cancel, - done: make(chan struct{}, 2), + host: h, + connGater: connGater, + protocolID: protocolID(networkID), + metrics: metrics, + trackedPeers: make(map[libpeer.ID]struct{}), + pidstore: pidstore, + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), } } @@ -75,6 +65,10 @@ func newPeerTracker( // NOTE: bootstrap is intended to be used with an on-disk peerstore.Peerstore as // the peerTracker needs access to the previously-seen peers' AddrInfo on start. func (p *peerTracker) bootstrap(trusted []libpeer.ID) error { + // store peers that have been already connected + for _, c := range p.host.Network().Conns() { + go p.connected(c.RemotePeer()) + } for _, trust := range trusted { go p.connectToPeer(p.ctx, trust) } @@ -102,7 +96,6 @@ func (p *peerTracker) connectToPeer(ctx context.Context, peer libpeer.ID) { log.Debugw("failed to connect to peer", "id", peer.String(), "err", err) return } - log.Debugw("connected to peer", "id", peer.String()) } func (p *peerTracker) track() { @@ -110,33 +103,48 @@ func (p *peerTracker) track() { p.done <- struct{}{} }() - // store peers that have been already connected - for _, c := range p.host.Network().Conns() { - p.connected(c.RemotePeer()) + connSubs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + if err != nil { + log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) + return } - subs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + identifySub, err := p.host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) if err != nil { - log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) + log.Errorw("subscribing to EvtPeerIdentificationCompleted", "err", err) + return + } + + protocolSub, err := p.host.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) + if err != nil { + log.Errorw("subscribing to EvtPeerProtocolsUpdated", "err", err) return } for { select { case <-p.ctx.Done(): - err = subs.Close() + err = connSubs.Close() + errors.Join(err, identifySub.Close(), protocolSub.Close()) if err != nil { - log.Errorw("closing subscription", "err", err) + log.Errorw("closing subscriptions", "err", err) } return - case subscription := <-subs.Out(): - ev := subscription.(event.EvtPeerConnectednessChanged) - switch ev.Connectedness { - case network.Connected: - p.connected(ev.Peer) - case network.NotConnected: + case connSubscription := <-connSubs.Out(): + ev := connSubscription.(event.EvtPeerConnectednessChanged) + if network.NotConnected == ev.Connectedness { + p.disconnected(ev.Peer) + } + case subscription := <-identifySub.Out(): + ev := subscription.(event.EvtPeerIdentificationCompleted) + p.connected(ev.Peer) + case subscription := <-protocolSub.Out(): + ev := subscription.(event.EvtPeerProtocolsUpdated) + if slices.Contains(ev.Removed, p.protocolID) { p.disconnected(ev.Peer) + break } + p.connected(ev.Peer) } } } @@ -157,10 +165,23 @@ func (p *peerTracker) getPeers(max int) []libpeer.ID { } func (p *peerTracker) connected(pID libpeer.ID) { + if err := pID.Validate(); err != nil { + return + } + if p.host.ID() == pID { return } + // check that peer supports our protocol id. + protocol, err := p.host.Peerstore().SupportsProtocols(pID, p.protocolID) + if err != nil { + return + } + if !slices.Contains(protocol, p.protocolID) { + return + } + for _, c := range p.host.Network().ConnsToPeer(pID) { // check if connection is short-termed and skip this peer if c.Stat().Limited { @@ -170,31 +191,30 @@ func (p *peerTracker) connected(pID libpeer.ID) { p.peerLk.Lock() defer p.peerLk.Unlock() - - // additional check in p.trackedPeers should be done, - // because libp2p does not emit multiple Connected events per 1 peer - stats, ok := p.disconnectedPeers[pID] - if !ok { - stats = &peerStat{peerID: pID, peerScore: defaultScore} - } else { - delete(p.disconnectedPeers, pID) + if _, ok := p.trackedPeers[pID]; ok { + return } - p.trackedPeers[pID] = stats + + log.Debugw("connected to peer", "id", pID.String()) + p.trackedPeers[pID] = struct{}{} p.metrics.peersTracked(1) } func (p *peerTracker) disconnected(pID libpeer.ID) { + if err := pID.Validate(); err != nil { + return + } + p.peerLk.Lock() defer p.peerLk.Unlock() - stats, ok := p.trackedPeers[pID] - if !ok { + if _, ok := p.trackedPeers[pID]; !ok { return } - stats.pruneDeadline = time.Now().Add(maxAwaitingTime) - p.disconnectedPeers[pID] = stats delete(p.trackedPeers, pID) + p.host.ConnManager().UntagPeer(pID, string(p.protocolID)) + p.metrics.peersTracked(-1) p.metrics.peersDisconnected(1) } @@ -202,44 +222,16 @@ func (p *peerTracker) disconnected(pID libpeer.ID) { func (p *peerTracker) peers() []*peerStat { p.peerLk.RLock() defer p.peerLk.RUnlock() - peers := make([]*peerStat, 0, len(p.trackedPeers)) - for _, stat := range p.trackedPeers { - peers = append(peers, stat) - } - return peers -} - -// gc goes through connected and disconnected peers once every gcPeriod -// and removes: -// * disconnected peers which have been disconnected for more than maxAwaitingTime; -// * connected peers whose scores are less than or equal than defaultScore; -func (p *peerTracker) gc() { - ticker := time.NewTicker(gcCycle) - for { - select { - case <-p.ctx.Done(): - p.done <- struct{}{} - return - case <-ticker.C: - p.cleanUpDisconnectedPeers() - p.dumpPeers(p.ctx) - } - } -} -func (p *peerTracker) cleanUpDisconnectedPeers() { - p.peerLk.Lock() - defer p.peerLk.Unlock() - - now := time.Now() - var deletedDisconnectedNum int - for id, peer := range p.disconnectedPeers { - if peer.pruneDeadline.Before(now) { - delete(p.disconnectedPeers, id) - deletedDisconnectedNum++ + peers := make([]*peerStat, 0) + for peerID := range p.trackedPeers { + score := 0 + if info := p.host.ConnManager().GetTagInfo(peerID); info != nil { + score = info.Tags[string(p.protocolID)] } + peers = append(peers, &peerStat{peerID: peerID, peerScore: score}) } - p.metrics.peersDisconnected(-deletedDisconnectedNum) + return peers } // dumpPeers stores peers to the peerTracker's PeerIDStore if @@ -272,12 +264,10 @@ func (p *peerTracker) dumpPeers(ctx context.Context) { func (p *peerTracker) stop(ctx context.Context) error { p.cancel() - for i := 0; i < cap(p.done); i++ { - select { - case <-p.done: - case <-ctx.Done(): - return ctx.Err() - } + select { + case <-p.done: + case <-ctx.Done(): + return ctx.Err() } // dump remaining tracked peers @@ -287,6 +277,10 @@ func (p *peerTracker) stop(ctx context.Context) error { // blockPeer blocks a peer on the networking level and removes it from the local cache. func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { + if err := pID.Validate(); err != nil { + return + } + // add peer to the blacklist, so we can't connect to it in the future. err := p.connGater.BlockPeer(pID) if err != nil { @@ -301,3 +295,8 @@ func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { log.Warnw("header/p2p: blocked peer", "pID", pID, "reason", reason) p.metrics.peerBlocked() } + +func (p *peerTracker) updateScore(stats *peerStat, size uint64, duration time.Duration) { + score := stats.updateStats(size, duration) + p.host.ConnManager().TagPeer(stats.peerID, string(p.protocolID), score) +} From 39232eb3140b2539f4c54cbc989aa463ec5d5c6b Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 2 Apr 2024 19:17:10 +0300 Subject: [PATCH 3/5] misc(p2p/peerstat): remove pruneDeadline + update peer scoring mechanism --- go.mod | 12 +++++++ go.sum | 1 - p2p/peer_stats.go | 40 ++++++++------------- p2p/peer_stats_test.go | 4 +-- p2p/peer_tracker_test.go | 75 ++++++++-------------------------------- p2p/session.go | 4 +-- p2p/session_test.go | 4 +-- 7 files changed, 46 insertions(+), 94 deletions(-) diff --git a/go.mod b/go.mod index 8d211d0d..6e441aad 100644 --- a/go.mod +++ b/go.mod @@ -24,17 +24,23 @@ require ( github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/containerd/cgroups v1.1.0 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/elastic/gosigar v0.14.2 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect github.com/google/uuid v1.5.0 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect @@ -69,6 +75,8 @@ require ( github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.15.0 // indirect + github.com/opencontainers/runtime-spec v1.2.0 // indirect + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect github.com/pion/ice/v2 v2.3.24 // indirect @@ -85,6 +93,7 @@ require ( github.com/pion/transport/v2 v2.2.5 // indirect github.com/pion/turn/v2 v2.1.6 // indirect github.com/pion/webrtc/v3 v3.2.40 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -93,7 +102,10 @@ require ( github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/quic-go v0.44.0 // indirect github.com/quic-go/webtransport-go v0.8.0 // indirect + github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + go.uber.org/dig v1.17.1 // indirect + go.uber.org/fx v1.21.1 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/go.sum b/go.sum index c73359d8..7fa84993 100644 --- a/go.sum +++ b/go.sum @@ -126,7 +126,6 @@ github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8Nz github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index 0277fcd4..8c9257d2 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -14,10 +14,7 @@ type peerStat struct { sync.RWMutex peerID peer.ID // score is the average speed per single request - peerScore float32 - // pruneDeadline specifies when disconnected peer will be removed if - // it does not return online. - pruneDeadline time.Time + peerScore int } // updateStats recalculates peer.score by averaging the last score @@ -26,33 +23,28 @@ type peerStat struct { // by dividing the amount by time, so the result score will represent how many bytes // were retrieved in 1 millisecond. This value will then be averaged relative to the // previous peerScore. -func (p *peerStat) updateStats(amount uint64, duration time.Duration) { - p.Lock() - defer p.Unlock() +func (p *peerStat) updateStats(amount uint64, duration time.Duration) int { + if amount == 0 && duration == 0 { + // decrease peerScore by 20% of the peer that failed the request by any reason. + // NOTE: peerScore will not be decreased if the score is less than 100. + p.peerScore -= p.peerScore / 100 * 20 + return p.peerScore + } + averageSpeed := float32(amount) if duration != 0 { averageSpeed /= float32(duration.Milliseconds()) } if p.peerScore == 0.0 { - p.peerScore = averageSpeed - return + p.peerScore = int(averageSpeed * 100) + return p.peerScore } - p.peerScore = (p.peerScore + averageSpeed) / 2 -} - -// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason. -// NOTE: decreasing peerScore in one session will not affect its position in queue in another -// session(as we can have multiple sessions running concurrently). -// TODO(vgonkivs): to figure out the better scoring increments/decrements -func (p *peerStat) decreaseScore() { - p.Lock() - defer p.Unlock() - - p.peerScore -= p.peerScore / 100 * 20 + p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2 + return p.peerScore } // score reads a peer's latest score from the queue -func (p *peerStat) score() float32 { +func (p *peerStat) score() int { p.RLock() defer p.RUnlock() return p.peerScore @@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { // in case if there are no peer available in current session, it blocks until // the peer will be pushed in. func (p *peerQueue) waitPop(ctx context.Context) *peerStat { - // TODO(vgonkivs): implement fallback solution for cases when peer queue is empty. - // As we discussed with @Wondertan there could be 2 possible solutions: - // * use libp2p.Discovery to find new peers outside peerTracker to request headers; - // * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore; select { case <-ctx.Done(): return &peerStat{} diff --git a/p2p/peer_stats_test.go b/p2p/peer_stats_test.go index 1d2c5cfb..36fe797f 100644 --- a/p2p/peer_stats_test.go +++ b/p2p/peer_stats_test.go @@ -107,6 +107,6 @@ func Test_StatDecreaseScore(t *testing.T) { peerScore: 100, } // will decrease score by 20% - pStats.decreaseScore() - require.Equal(t, pStats.score(), float32(80.0)) + pStats.updateStats(0, 0) + require.Equal(t, pStats.score(), 80) } diff --git a/p2p/peer_tracker_test.go b/p2p/peer_tracker_test.go index 8c9a8f28..41796c8a 100644 --- a/p2p/peer_tracker_test.go +++ b/p2p/peer_tracker_test.go @@ -9,67 +9,21 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" testpeer "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/p2p/net/conngater" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestPeerTracker_GC(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - h := createMocknet(t, 1) - - gcCycle = time.Millisecond * 200 - - connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) - require.NoError(t, err) - - pidstore := newDummyPIDStore() - p := newPeerTracker(h[0], connGater, pidstore, nil) - - maxAwaitingTime = time.Millisecond - - peerlist := generateRandomPeerlist(t, 10) - for i := 0; i < 10; i++ { - p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5} - } - - peerlist = generateRandomPeerlist(t, 4) - pid1 := peerlist[2] - pid2 := peerlist[3] - - p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()} - p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)} - assert.True(t, len(p.trackedPeers) > 0) - assert.True(t, len(p.disconnectedPeers) > 0) - - go p.track() - go p.gc() - - time.Sleep(time.Millisecond * 500) - - err = p.stop(ctx) - require.NoError(t, err) - - require.Len(t, p.trackedPeers, 10) - require.Nil(t, p.disconnectedPeers[pid1]) - - // ensure good peers were dumped to store - peers, err := pidstore.Load(ctx) - require.NoError(t, err) - require.Equal(t, 10, len(peers)) -} - func TestPeerTracker_BlockPeer(t *testing.T) { h := createMocknet(t, 2) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) - p := newPeerTracker(h[0], connGater, nil, nil) - maxAwaitingTime = time.Millisecond + p := newPeerTracker(h[0], connGater, "private", nil, nil) p.blockPeer(h[1].ID(), errors.New("test")) require.Len(t, connGater.ListBlockedPeers(), 1) require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID()) @@ -82,26 +36,25 @@ func TestPeerTracker_Bootstrap(t *testing.T) { connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) - // mn := createMocknet(t, 10) - mn, err := mocknet.FullMeshConnected(10) - require.NoError(t, err) + hosts := make([]host.Host, 10) + + for i := range hosts { + hosts[i], err = libp2p.New() + require.NoError(t, err) + hosts[i].SetStreamHandler(protocolID("private"), nil) + } // store peers to peerstore prevSeen := make([]peer.ID, 9) - for i, peer := range mn.Hosts()[1:] { + for i, peer := range hosts[1:] { + hosts[0].Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL) prevSeen[i] = peer.ID() - - // disconnect so they're not already connected on attempt to - // connect - err = mn.DisconnectPeers(mn.Hosts()[i].ID(), peer.ID()) - require.NoError(t, err) } pidstore := newDummyPIDStore() // only store 7 peers to pidstore, and use 2 as trusted err = pidstore.Put(ctx, prevSeen[2:]) require.NoError(t, err) - - tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore, nil) + tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil) go tracker.track() diff --git a/p2p/session.go b/p2p/session.go index 05b4f18a..2e8594d2 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -198,7 +198,7 @@ func (s *session[H]) doRequest( switch err { case header.ErrNotFound, errEmptyResponse: logFn = log.Debugw - stat.decreaseScore() + s.peerTracker.updateScore(stat, 0, 0) default: s.peerTracker.blockPeer(stat.peerID, err) } @@ -234,7 +234,7 @@ func (s *session[H]) doRequest( span.SetStatus(codes.Ok, "") // update peer stats - stat.updateStats(size, took) + s.peerTracker.updateScore(stat, size, took) // ensure that we received the correct amount of headers. if remainingHeaders > 0 { diff --git a/p2p/session_test.go b/p2p/session_test.go index d7044961..7c8599f5 100644 --- a/p2p/session_test.go +++ b/p2p/session_test.go @@ -28,7 +28,7 @@ func Test_Validate(t *testing.T) { ses := newSession( context.Background(), nil, - &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, + &peerTracker{trackedPeers: make(map[peer.ID]struct{})}, "", time.Second, nil, withValidation(head), ) @@ -45,7 +45,7 @@ func Test_ValidateFails(t *testing.T) { ses := newSession( context.Background(), nil, - &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, + &peerTracker{trackedPeers: make(map[peer.ID]struct{})}, "", time.Second, nil, withValidation(head), ) From b80ef73bf7ac9043f9b34a09c09ae200426fcb0b Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 2 Apr 2024 19:39:54 +0300 Subject: [PATCH 4/5] fix(p2p): use CloseRead instead of stream.Close --- p2p/helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/helpers.go b/p2p/helpers.go index 2e4df655..dc8cc1c5 100644 --- a/p2p/helpers.go +++ b/p2p/helpers.go @@ -103,7 +103,7 @@ func sendMessage( } if err == nil { - if closeErr := stream.Close(); closeErr != nil { + if closeErr := stream.CloseRead(); closeErr != nil { log.Errorw("closing stream", "err", closeErr) } } else { From c88c5547deb39d80a344597d6d4979eb39cf9383 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 2 Apr 2024 20:37:20 +0300 Subject: [PATCH 5/5] fix(p2p/session): return err if peer tracker is empty --- p2p/exchange.go | 12 ++++-- p2p/helpers.go | 10 +++++ p2p/peer_tracker.go | 80 ++++++++++++++++++---------------------- p2p/peer_tracker_test.go | 2 +- p2p/session.go | 11 ++++-- p2p/session_test.go | 28 ++++++++++---- 6 files changed, 84 insertions(+), 59 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 9f0cf9de..834a3b5b 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -150,9 +150,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( // their Head and verify against the given trusted header. useTrackedPeers := !reqParams.TrustedHead.IsZero() if useTrackedPeers { - trackedPeers := ex.peerTracker.getPeers(maxUntrustedHeadRequests) + trackedPeers := ex.peerTracker.peers(maxUntrustedHeadRequests) if len(trackedPeers) > 0 { - peers = trackedPeers + peers = transform(trackedPeers, func(p *peerStat) peer.ID { + return p.peerID + }) log.Debugw("requesting head from tracked peers", "amount", len(peers)) } } @@ -292,9 +294,13 @@ func (ex *Exchange[H]) GetRangeByHeight( attribute.Int64("to", int64(to)), )) defer span.End() - session := newSession[H]( + session, err := newSession[H]( ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RequestTimeout, ex.metrics, withValidation(from), ) + // TODO(@vgonkivs): decide what to do with this error. Maybe we should fall into "discovery mode" and try to collect peers??? + if err != nil { + return nil, err + } defer session.close() // we request the next header height that we don't have: `fromHead`+1 amount := to - (from.Height() + 1) diff --git a/p2p/helpers.go b/p2p/helpers.go index dc8cc1c5..637fa961 100644 --- a/p2p/helpers.go +++ b/p2p/helpers.go @@ -124,3 +124,13 @@ func convertStatusCodeToError(code p2p_pb.StatusCode) error { return fmt.Errorf("unknown status code %d", code) } } + +// transform applies a provided function to each element of the input slice, +// producing a new slice with the results of the function. +func transform[T, U any](ts []T, f func(T) U) []U { + us := make([]U, len(ts)) + for i := range ts { + us[i] = f(ts[i]) + } + return us +} diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index f7b9ad27..294ade03 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -16,13 +16,13 @@ import ( ) type peerTracker struct { - host host.Host - connGater *conngater.BasicConnectionGater - metrics *exchangeMetrics protocolID protocol.ID - peerLk sync.RWMutex + + host host.Host + connGater *conngater.BasicConnectionGater + + peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. - // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers trackedPeers map[libpeer.ID]struct{} @@ -30,6 +30,8 @@ type peerTracker struct { // good peers during garbage collection pidstore PeerIDStore + metrics *exchangeMetrics + ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. @@ -103,19 +105,20 @@ func (p *peerTracker) track() { p.done <- struct{}{} }() - connSubs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + evtBus := p.host.EventBus() + connSubs, err := evtBus.Subscribe(&event.EvtPeerConnectednessChanged{}) if err != nil { log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) return } - identifySub, err := p.host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) + identifySub, err := evtBus.Subscribe(&event.EvtPeerIdentificationCompleted{}) if err != nil { log.Errorw("subscribing to EvtPeerIdentificationCompleted", "err", err) return } - protocolSub, err := p.host.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) + protocolSub, err := evtBus.Subscribe(&event.EvtPeerProtocolsUpdated{}) if err != nil { log.Errorw("subscribing to EvtPeerProtocolsUpdated", "err", err) return @@ -124,9 +127,7 @@ func (p *peerTracker) track() { for { select { case <-p.ctx.Done(): - err = connSubs.Close() - errors.Join(err, identifySub.Close(), protocolSub.Close()) - if err != nil { + if err := closeSubscriptions(connSubs, identifySub, protocolSub); err != nil { log.Errorw("closing subscriptions", "err", err) } return @@ -135,35 +136,23 @@ func (p *peerTracker) track() { if network.NotConnected == ev.Connectedness { p.disconnected(ev.Peer) } - case subscription := <-identifySub.Out(): - ev := subscription.(event.EvtPeerIdentificationCompleted) - p.connected(ev.Peer) - case subscription := <-protocolSub.Out(): - ev := subscription.(event.EvtPeerProtocolsUpdated) + case identSubscription := <-identifySub.Out(): + ev := identSubscription.(event.EvtPeerIdentificationCompleted) + if slices.Contains(ev.Protocols, p.protocolID) { + p.connected(ev.Peer) + } + case protocolSubscription := <-protocolSub.Out(): + ev := protocolSubscription.(event.EvtPeerProtocolsUpdated) if slices.Contains(ev.Removed, p.protocolID) { p.disconnected(ev.Peer) - break } - p.connected(ev.Peer) + if slices.Contains(ev.Added, p.protocolID) { + p.connected(ev.Peer) + } } } } -// getPeers returns the tracker's currently tracked peers up to the `max`. -func (p *peerTracker) getPeers(max int) []libpeer.ID { - p.peerLk.RLock() - defer p.peerLk.RUnlock() - - peers := make([]libpeer.ID, 0, max) - for peer := range p.trackedPeers { - peers = append(peers, peer) - if len(peers) == max { - break - } - } - return peers -} - func (p *peerTracker) connected(pID libpeer.ID) { if err := pID.Validate(); err != nil { return @@ -173,15 +162,6 @@ func (p *peerTracker) connected(pID libpeer.ID) { return } - // check that peer supports our protocol id. - protocol, err := p.host.Peerstore().SupportsProtocols(pID, p.protocolID) - if err != nil { - return - } - if !slices.Contains(protocol, p.protocolID) { - return - } - for _, c := range p.host.Network().ConnsToPeer(pID) { // check if connection is short-termed and skip this peer if c.Stat().Limited { @@ -219,17 +199,21 @@ func (p *peerTracker) disconnected(pID libpeer.ID) { p.metrics.peersDisconnected(1) } -func (p *peerTracker) peers() []*peerStat { +// peers returns the tracker's currently tracked peers up to the `max`. +func (p *peerTracker) peers(max int) []*peerStat { p.peerLk.RLock() defer p.peerLk.RUnlock() - peers := make([]*peerStat, 0) + peers := make([]*peerStat, 0, max) for peerID := range p.trackedPeers { score := 0 if info := p.host.ConnManager().GetTagInfo(peerID); info != nil { score = info.Tags[string(p.protocolID)] } peers = append(peers, &peerStat{peerID: peerID, peerScore: score}) + if len(peers) == max { + break + } } return peers } @@ -300,3 +284,11 @@ func (p *peerTracker) updateScore(stats *peerStat, size uint64, duration time.Du score := stats.updateStats(size, duration) p.host.ConnManager().TagPeer(stats.peerID, string(p.protocolID), score) } + +func closeSubscriptions(subs ...event.Subscription) error { + var err error + for _, sub := range subs { + err = errors.Join(err, sub.Close()) + } + return err +} diff --git a/p2p/peer_tracker_test.go b/p2p/peer_tracker_test.go index 41796c8a..7151e086 100644 --- a/p2p/peer_tracker_test.go +++ b/p2p/peer_tracker_test.go @@ -62,7 +62,7 @@ func TestPeerTracker_Bootstrap(t *testing.T) { require.NoError(t, err) assert.Eventually(t, func() bool { - return len(tracker.getPeers(7)) > 0 + return len(tracker.peers(7)) > 0 }, time.Millisecond*500, time.Millisecond*100) } diff --git a/p2p/session.go b/p2p/session.go index 2e8594d2..dc297d9d 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -60,23 +60,28 @@ func newSession[H header.Header[H]]( requestTimeout time.Duration, metrics *exchangeMetrics, options ...option[H], -) *session[H] { +) (*session[H], error) { ctx, cancel := context.WithCancel(ctx) ses := &session[H]{ ctx: ctx, cancel: cancel, protocolID: protocolID, host: h, - queue: newPeerQueue(ctx, peerTracker.peers()), peerTracker: peerTracker, requestTimeout: requestTimeout, metrics: metrics, } + peers := peerTracker.peers(len(peerTracker.trackedPeers)) + if len(peers) == 0 { + return nil, errors.New("empty peer tracker") + } + ses.queue = newPeerQueue(ctx, peers) + for _, opt := range options { opt(ses) } - return ses + return ses, nil } // getRangeByHeight requests headers from different peers. diff --git a/p2p/session_test.go b/p2p/session_test.go index 7c8599f5..8aa1776a 100644 --- a/p2p/session_test.go +++ b/p2p/session_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" + swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -25,16 +27,21 @@ func Test_PrepareRequests(t *testing.T) { func Test_Validate(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - ses := newSession( + peerId := peer.ID("test") + pT := &peerTracker{trackedPeers: make(map[peer.ID]struct{})} + pT.trackedPeers[peerId] = struct{}{} + pT.host = blankhost.NewBlankHost(swarm.GenSwarm(t)) + ses, err := newSession( context.Background(), nil, - &peerTracker{trackedPeers: make(map[peer.ID]struct{})}, + pT, "", time.Second, nil, withValidation(head), ) + require.NoError(t, err) headers := suite.GenDummyHeaders(5) - err := ses.verify(headers) + err = ses.verify(headers) assert.NoError(t, err) } @@ -42,17 +49,22 @@ func Test_Validate(t *testing.T) { func Test_ValidateFails(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - ses := newSession( + + peerId := peer.ID("test") + pT := &peerTracker{trackedPeers: make(map[peer.ID]struct{})} + pT.trackedPeers[peerId] = struct{}{} + pT.host = blankhost.NewBlankHost(swarm.GenSwarm(t)) + ses, err := newSession( context.Background(), - nil, - &peerTracker{trackedPeers: make(map[peer.ID]struct{})}, + blankhost.NewBlankHost(swarm.GenSwarm(t)), + pT, "", time.Second, nil, withValidation(head), ) - + require.NoError(t, err) headers := suite.GenDummyHeaders(5) // break adjacency headers[2] = headers[4] - err := ses.verify(headers) + err = ses.verify(headers) assert.Error(t, err) }