Skip to content

Commit

Permalink
Merge pull request #139 from celestiaorg/min-amount-of-peers-in-tracker
Browse files Browse the repository at this point in the history
feat(peer-tracker): add minAmount of peers tracked in tracker
  • Loading branch information
renaynay authored Dec 22, 2023
2 parents 28ff21c + 30b2efd commit 6afe95f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
69 changes: 47 additions & 22 deletions p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"sort"
"sync"
"time"

Expand All @@ -15,8 +16,10 @@ import (
const (
// defaultScore specifies the score for newly connected peers.
defaultScore float32 = 1
// maxTrackerSize specifies the max amount of peers that can be added to the peerTracker.
// maxPeerTrackerSize specifies the max amount of peers that can be added to the peerTracker.
maxPeerTrackerSize = 100
// minPeerTrackerSizeBeforeGC specifies the minimum amount of tracked peers before the peerTracker starts removing peers with lower peer scores.
minPeerTrackerSizeBeforeGC = 10
)

var (
Expand Down Expand Up @@ -240,31 +243,53 @@ func (p *peerTracker) gc() {
p.done <- struct{}{}
return
case <-ticker.C:
p.peerLk.Lock()

now := time.Now()
var deletedDisconnectedNum int
for id, peer := range p.disconnectedPeers {
if peer.pruneDeadline.Before(now) {
delete(p.disconnectedPeers, id)
deletedDisconnectedNum++
}
}
p.cleanUpDisconnectedPeers()
p.cleanUpTrackedPeers()
p.dumpPeers(p.ctx)
}
}
}

var deletedTrackedNum int
for id, peer := range p.trackedPeers {
if peer.peerScore <= defaultScore {
delete(p.trackedPeers, id)
deletedTrackedNum++
}
}
p.peerLk.Unlock()
func (p *peerTracker) cleanUpDisconnectedPeers() {
p.peerLk.Lock()
defer p.peerLk.Unlock()

p.metrics.peersDisconnected(-deletedDisconnectedNum)
p.metrics.peersTracked(-deletedTrackedNum)
p.dumpPeers(p.ctx)
now := time.Now()
var deletedDisconnectedNum int
for id, peer := range p.disconnectedPeers {
if peer.pruneDeadline.Before(now) {
delete(p.disconnectedPeers, id)
deletedDisconnectedNum++
}
}
p.metrics.peersDisconnected(-deletedDisconnectedNum)
}

func (p *peerTracker) cleanUpTrackedPeers() {
p.peerLk.Lock()
defer p.peerLk.Unlock()

if len(p.trackedPeers) <= minPeerTrackerSizeBeforeGC {
return
}

var deletedTrackedNum int
orderedPeers := make([]*peerStat, 0, len(p.trackedPeers))
for _, peer := range p.trackedPeers {
orderedPeers = append(orderedPeers, peer)
}
sort.Slice(orderedPeers, func(i, j int) bool {
return orderedPeers[i].peerScore < orderedPeers[j].peerScore
})

for _, peer := range orderedPeers[:len(orderedPeers)-minPeerTrackerSizeBeforeGC] {
if peer.peerScore > defaultScore {
break
}
delete(p.trackedPeers, peer.peerID)
deletedTrackedNum++
}
p.metrics.peersTracked(-deletedTrackedNum)
}

// dumpPeers stores peers to the peerTracker's PeerIDStore if
Expand Down
13 changes: 10 additions & 3 deletions p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ func TestPeerTracker_GC(t *testing.T) {

maxAwaitingTime = time.Millisecond

peerlist := generateRandomPeerlist(t, 4)
peerlist := generateRandomPeerlist(t, minPeerTrackerSizeBeforeGC)
for i := 0; i < minPeerTrackerSizeBeforeGC; i++ {
p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5}
}

// add peers to trackedPeers to make total number of peers > maxPeerTrackerSize
peerlist = generateRandomPeerlist(t, 4)
pid1 := peerlist[0]
pid2 := peerlist[1]
pid3 := peerlist[2]
Expand All @@ -54,13 +60,14 @@ func TestPeerTracker_GC(t *testing.T) {
err = p.stop(ctx)
require.NoError(t, err)

require.Nil(t, p.trackedPeers[pid1])
// ensure amount of peers in trackedPeers is equal to minPeerTrackerSizeBeforeGC
require.Len(t, p.trackedPeers, minPeerTrackerSizeBeforeGC)
require.Nil(t, p.disconnectedPeers[pid3])

// ensure good peers were dumped to store
peers, err := pidstore.Load(ctx)
require.NoError(t, err)
assert.Equal(t, 1, len(peers))
require.Equal(t, minPeerTrackerSizeBeforeGC, len(peers))
}

func TestPeerTracker_BlockPeer(t *testing.T) {
Expand Down

0 comments on commit 6afe95f

Please sign in to comment.