Skip to content

Commit

Permalink
feat(p2p/peerTracker): remove gc and add subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jul 10, 2024
1 parent 80ae9b4 commit 10c442b
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 122 deletions.
6 changes: 3 additions & 3 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]](
}
}

id := protocolID(params.networkID)
ex := &Exchange[H]{
host: host,
protocolID: protocolID(params.networkID),
peerTracker: newPeerTracker(host, gater, params.pidstore, metrics),
protocolID: id,
peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics),
Params: params,
metrics: metrics,
}
Expand All @@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
ex.ctx, ex.cancel = context.WithCancel(context.Background())
log.Infow("client: starting client", "protocol ID", ex.protocolID)

go ex.peerTracker.gc()
go ex.peerTracker.track()

// bootstrap the peerTracker with trusted peers as well as previously seen
Expand Down
70 changes: 49 additions & 21 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
libhost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
require.NoError(t, err)
servers[index].Start(context.Background()) //nolint:errcheck
exchange.peerTracker.peerLk.Lock()
exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()}
exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{}
exchange.peerTracker.peerLk.Unlock()
}

Expand All @@ -262,30 +264,38 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
// from another peer with lower score after receiving header.ErrNotFound
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
hosts := createMocknet(t, 3)
hosts := quicHosts(t, 3)

// create client + server(it does not have needed headers)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10)
tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore}

hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)
hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90)

// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
hosts[2], tmServerSideStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
require.NoError(t, serverSideEx.Start(context.Background()))
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})

exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

h, err := store.GetByHeight(context.Background(), 5)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), h, 8)
require.NoError(t, err)
// ensure that peerScore for the second peer is changed
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID())
require.NotEqual(t, 20, newPeerScore)
}

Expand Down Expand Up @@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) {
func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
// create blankhost because mocknet does not support deadlines
swarm0 := swarm.GenSwarm(t)
host0 := blankhost.NewBlankHost(swarm0)
mngr, err := connmgr.NewConnManager(0, 50)
require.NoError(t, err)
host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr))
swarm1 := swarm.GenSwarm(t)
host1 := blankhost.NewBlankHost(swarm1)
swarm2 := swarm.GenSwarm(t)
Expand Down Expand Up @@ -495,24 +507,25 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})
prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
prevScore := score(t, exchg.host, host1.ID())
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200}
host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100)
exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), gen, 3)
require.NoError(t, err)
newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
newPeerScore := score(t, exchg.host, host1.ID())
assert.NotEqual(t, newPeerScore, prevScore)
}

// TestExchange_RequestPartialRange enusres in case of receiving a partial response
// from server, Exchange will re-request remaining headers from another peer
func TestExchange_RequestPartialRange(t *testing.T) {
hosts := createMocknet(t, 3)
hosts := quicHosts(t, 3)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

// create one more server(with more headers in the store)
Expand All @@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)

require.NoError(t, err)
require.NoError(t, serverSideEx.Start(ctx))
exchg.peerTracker.peerLk.Lock()
prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
prevScoreBefore2 := 50
// reducing peerScore of the second server, so our exchange will request host[1] first.
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50}
prevScoreBefore1 := score(t, exchg.host, hosts[1].ID())
prevScoreBefore2 := score(t, exchg.host, hosts[2].ID())
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

gen, err := store.GetByHeight(context.Background(), 1)
Expand All @@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) {
require.NoError(t, err)

exchg.peerTracker.peerLk.Lock()
prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore
prevScoreAfter1 := score(t, exchg.host, hosts[1].ID())
prevScoreAfter2 := score(t, exchg.host, hosts[2].ID())
exchg.peerTracker.peerLk.Unlock()

assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1)
Expand All @@ -561,7 +575,6 @@ func createP2PExAndServer(
host, tpeer libhost.Host,
) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) {
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)

serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store,
WithNetworkID[ServerParameters](networkID),
)
Expand All @@ -582,7 +595,7 @@ func createP2PExAndServer(
time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer

ex.peerTracker.peerLk.Lock()
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0}
ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{}
ex.peerTracker.peerLk.Unlock()

t.Cleanup(func() {
Expand All @@ -595,12 +608,15 @@ func createP2PExAndServer(

func quicHosts(t *testing.T, n int) []libhost.Host {
hosts := make([]libhost.Host, n)
var err error
for i := range hosts {
swrm := swarm.GenSwarm(t, swarm.OptDisableTCP)
hosts[i] = blankhost.NewBlankHost(swrm)
require.NoError(t, err)
hosts[i], err = libp2p.New()
for _, host := range hosts[:i] {
hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL)
err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D
time.Sleep(t.timeout)
return nil, header.ErrNoHead
}

func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) {
time.Sleep(t.timeout)
return t.Store.GetRange(ctx, from, to)
}

func score(t *testing.T, h libhost.Host, id peer.ID) int {
t.Helper()
tags := h.ConnManager().GetTagInfo(id)
tag, _ := tags.Tags[string(protocolID(networkID))]
return tag
}
Loading

0 comments on commit 10c442b

Please sign in to comment.