Skip to content

Commit

Permalink
Discovery zero refresh timer (#8661)
Browse files Browse the repository at this point in the history
This fixes an issue where the mumbai testnet node struggle to find
peers. Before this fix in general test peer numbers are typically around
20 in total between eth66, eth67 and eth68. For new peers some can
struggle to find even a single peer after days of operation.

These are the numbers after 12 hours or running on a node which
previously could not find any peers: eth66=13, eth67=76, eth68=91.

The root cause of this issue is the following:

- A significant number of mumbai peers around the boot node return
network ids which are different from those currently available in the
DHT
- The available nodes are all consequently busy and return 'too many
peers' for long periods

These issues case a significant number of discovery timeouts, some of
the queries will never receive a response.

This causes the discovery read loop to enter a channel deadlock - which
means that no responses are processed, nor timeouts fired. This causes
the discovery process in the node to stop. From then on it just
re-requests handshakes from a relatively small number of peers.

This check in fixes this situation with the following changes:

- Remove the deadlock by running the timer in a separate go-routine so
it can run independently of the main request processing.
- Allow the discovery process matcher to match on port if no id match
can be established on initial ping. This allows subsequent node
validation to proceed and if the node proves to be valid via the
remainder of the look-up and handshake process it us used as a valid
peer.
- Completely unsolicited responses, i.e. those which come from a
completely unknown ip:port combination continue to be ignored.
-
  • Loading branch information
mh0lt authored Nov 7, 2023
1 parent e08b031 commit 509a7af
Show file tree
Hide file tree
Showing 20 changed files with 684 additions and 197 deletions.
2 changes: 1 addition & 1 deletion cl/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) {
// Start stream handlers
handlers.NewConsensusHandlers(s.ctx, s.db, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start()

net, err := discover.ListenV5(s.ctx, conn, localNode, discCfg)
net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ func main() {
}

if *runv5 {
if _, err := discover.ListenV5(ctx, conn, ln, cfg); err != nil {
if _, err := discover.ListenV5(ctx, "any", conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
} else {
if _, err := discover.ListenUDP(ctx, conn, ln, cfg); err != nil {
if _, err := discover.ListenUDP(ctx, "any", conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/observer/observer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,5 @@ func (server *Server) Listen(ctx context.Context) (*discover.UDPv4, error) {

server.logger.Debug("Discovery UDP listener is up", "addr", realAddr)

return discover.ListenV4(ctx, conn, server.localNode, server.discConfig)
return discover.ListenV4(ctx, "any", conn, server.localNode, server.discConfig)
}
4 changes: 1 addition & 3 deletions erigon-lib/diagnostics/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package diagnostics

import "reflect"

func (p PeerStatistics) Type() Type {
return Type(reflect.TypeOf(p))
return TypeOf(p)
}
92 changes: 69 additions & 23 deletions erigon-lib/diagnostics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"

"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"
Expand All @@ -17,15 +18,43 @@ const (
ckChan ctxKey = iota
)

type Type reflect.Type
type Type interface {
reflect.Type
Context() context.Context
Err() error
}

type diagType struct {
reflect.Type
}

var cancelled = func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}()

func (t diagType) Context() context.Context {
providerMutex.Lock()
defer providerMutex.Unlock()
if reg := providers[t]; reg != nil {
return reg.context
}

return cancelled
}

func (t diagType) Err() error {
return t.Context().Err()
}

type Info interface {
Type() Type
}

func TypeOf(i Info) Type {
t := reflect.TypeOf(i)
return Type(t)
return diagType{t}
}

type Provider interface {
Expand All @@ -50,7 +79,7 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) {
providerMutex.Lock()
defer providerMutex.Unlock()

reg, _ := providers[infoType]
reg := providers[infoType]

if reg != nil {
for _, p := range reg.providers {
Expand All @@ -73,14 +102,16 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) {
func StartProviders(ctx context.Context, infoType Type, logger log.Logger) {
providerMutex.Lock()

reg, _ := providers[infoType]

toStart := make([]Provider, len(reg.providers))
reg := providers[infoType]

for i, provider := range reg.providers {
toStart[i] = provider
if reg == nil {
reg = &registry{}
providers[infoType] = reg
}

toStart := make([]Provider, len(reg.providers))
copy(toStart, reg.providers)

reg.context = ctx

providerMutex.Unlock()
Expand All @@ -105,18 +136,29 @@ func startProvider(ctx context.Context, infoType Type, provider Provider, logger
}
}

func Send[I Info](ctx context.Context, info I) error {
func Send[I Info](info I) error {
ctx := info.Type().Context()

if ctx.Err() != nil {
if !errors.Is(ctx.Err(), context.Canceled) {
// drop the diagnostic message if there is
// no active diagnostic context for the type
return nil
}

return ctx.Err()
}

cval := ctx.Value(ckChan)
if c, ok := cval.(chan I); ok {
select {
case c <- info:
default:
// drop the diagnostic message if the receiver is busy
// so the sender is not blocked on non critcal actions

if cp, ok := cval.(*atomic.Pointer[chan I]); ok {
if c := (*cp).Load(); c != nil {
select {
case *c <- info:
default:
// drop the diagnostic message if the receiver is busy
// so the sender is not blocked on non critcal actions
}
}
} else {
return fmt.Errorf("unexpected channel type: %T", cval)
Expand All @@ -126,16 +168,20 @@ func Send[I Info](ctx context.Context, info I) error {
}

func Context[I Info](ctx context.Context, buffer int) (context.Context, <-chan I, context.CancelFunc) {
ch := make(chan I, buffer)
ctx = context.WithValue(ctx, ckChan, ch)
c := make(chan I, buffer)
cp := atomic.Pointer[chan I]{}
cp.Store(&c)

ctx = context.WithValue(ctx, ckChan, &cp)
ctx, cancel := context.WithCancel(ctx)

return ctx, ch, func() {
if ch != nil {
toClose := ch
ch = nil
close(toClose)
}
return ctx, *cp.Load(), func() {
cancel()

if cp.CompareAndSwap(&c, nil) {
ch := c
c = nil
close(ch)
}
}
}
23 changes: 21 additions & 2 deletions erigon-lib/diagnostics/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (t *testProvider) StartDiagnostics(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-timer.C:
diagnostics.Send(ctx, testInfo{count})
diagnostics.Send(testInfo{count})
count++
}
}
Expand All @@ -54,6 +54,25 @@ func TestProviderRegistration(t *testing.T) {
}
}

func TestDelayedProviderRegistration(t *testing.T) {

time.AfterFunc(1*time.Second, func() {
// diagnostics provider
provider := &testProvider{}
diagnostics.RegisterProvider(provider, diagnostics.TypeOf(testInfo{}), log.Root())
})

// diagnostics receiver
ctx, ch, cancel := diagnostics.Context[testInfo](context.Background(), 1)
diagnostics.StartProviders(ctx, diagnostics.TypeOf(testInfo{}), log.Root())

for info := range ch {
if info.count == 3 {
cancel()
}
}
}

func TestProviderFuncRegistration(t *testing.T) {

// diagnostics provider
Expand All @@ -68,7 +87,7 @@ func TestProviderFuncRegistration(t *testing.T) {
case <-ctx.Done():
return nil
case <-timer.C:
diagnostics.Send(ctx, testInfo{count})
diagnostics.Send(testInfo{count})
count++
}
}
Expand Down
2 changes: 2 additions & 0 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ Loop:
if req != nil {
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
logger.Debug(fmt.Sprintf("[%s] Requested header", logPrefix), "from", req.Number, "length", req.Length)
cfg.hd.UpdateStats(req, false /* skeleton */, peer)
cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
Expand Down Expand Up @@ -233,6 +234,7 @@ Loop:
if req != nil {
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
logger.Debug(fmt.Sprintf("[%s] Requested skeleton", logPrefix), "from", req.Number, "length", req.Length)
cfg.hd.UpdateStats(req, true /* skeleton */, peer)
lastSkeletonTime = time.Now()
}
Expand Down
42 changes: 27 additions & 15 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (

// Config for the "Looking for peers" message.
dialStatsLogInterval = 60 * time.Second // printed at most this often
dialStatsPeerLimit = 20 // but not if more than this many dialed peers

// Endpoint resolution is throttled with bounded backoff.
initialResolveDelay = 60 * time.Second
Expand Down Expand Up @@ -94,6 +93,7 @@ var (
// to create peer connections to nodes arriving through the iterator.
type dialScheduler struct {
dialConfig
mutex sync.Mutex
setupFunc dialSetupFunc
wg sync.WaitGroup
cancel context.CancelFunc
Expand Down Expand Up @@ -126,8 +126,8 @@ type dialScheduler struct {
historyTimerTime mclock.AbsTime

// for logStats
lastStatsLog mclock.AbsTime
doneSinceLastLog int
dialed int
errors map[string]uint
}

type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error
Expand Down Expand Up @@ -177,8 +177,9 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF
remPeerCh: make(chan *conn),

subProtocolVersion: subProtocolVersion,
errors: map[string]uint{},
}
d.lastStatsLog = d.clock.Now()

d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
Expand Down Expand Up @@ -232,6 +233,9 @@ func (d *dialScheduler) loop(it enode.Iterator) {
historyExp = make(chan struct{}, 1)
)

logTimer := time.NewTicker(dialStatsLogInterval)
defer logTimer.Stop()

loop:
for {
// Launch new dials if slots are available.
Expand All @@ -243,13 +247,15 @@ loop:
nodesCh = nil
}
d.rearmHistoryTimer(historyExp)
//d.logStats()

select {
case <-d.ctx.Done():
it.Close()
break loop

case <-logTimer.C:
d.logStats()

case node := <-nodesCh:
if err := d.checkDial(node); err != nil {
d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
Expand All @@ -261,7 +267,7 @@ loop:
id := task.dest.ID()
delete(d.dialing, id)
d.updateStaticPool(id)
d.doneSinceLastLog++
d.dialed++

case c := <-d.addPeerCh:
if c.is(dynDialedConn) || c.is(staticDialedConn) {
Expand Down Expand Up @@ -337,15 +343,16 @@ func (d *dialScheduler) readNodes(it enode.Iterator) {
// or comes back online.
// nolint
func (d *dialScheduler) logStats() {
now := d.clock.Now()
if d.lastStatsLog.Add(dialStatsLogInterval) > now {
return
}
if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
d.log.Info("[p2p] Looking for peers", "protocol", d.subProtocolVersion, "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.doneSinceLastLog, "static", len(d.static))
vals := []interface{}{"protocol", d.subProtocolVersion,
"peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.dialed, "static", len(d.static)}

d.mutex.Lock()
for err, count := range d.errors {
vals = append(vals, err, count)
}
d.doneSinceLastLog = 0
d.lastStatsLog = now
d.mutex.Unlock()

d.log.Debug("[p2p] Dial scheduler", vals...)
}

// rearmHistoryTimer configures d.historyTimer to fire when the
Expand Down Expand Up @@ -543,7 +550,12 @@ func (t *dialTask) resolve(d *dialScheduler) bool {
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
fd, err := d.dialer.Dial(d.ctx, t.dest)
if err != nil {
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
cleanErr := cleanupDialErr(err)
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanErr)

d.mutex.Lock()
d.errors[cleanErr.Error()] = d.errors[cleanErr.Error()] + 1
d.mutex.Unlock()
return &dialError{err}
}
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
Expand Down
9 changes: 7 additions & 2 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (cfg Config) withDefaults(defaultReplyTimeout time.Duration) Config {
}

// ListenUDP starts listening for discovery packets on the given UDP socket.
func ListenUDP(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(ctx, c, ln, cfg)
func ListenUDP(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(ctx, protocol, c, ln, cfg)
}

// ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled
Expand All @@ -96,3 +96,8 @@ type ReadPacket struct {
Data []byte
Addr *net.UDPAddr
}

type UnhandledPacket struct {
ReadPacket
Reason error
}
2 changes: 2 additions & 0 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (it *lookup) slowdown() {
func (it *lookup) query(n *node, reply chan<- []*node) {
fails := it.tab.db.FindFails(n.ID(), n.IP())
r, err := it.queryfunc(n)

if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
Expand All @@ -180,6 +181,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
for _, n := range r {
it.tab.addSeenNode(n)
}

reply <- r
}

Expand Down
Loading

0 comments on commit 509a7af

Please sign in to comment.