Skip to content

Commit

Permalink
fix(kademlia, reacher): various connection fixes (#4412)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 24, 2023
1 parent b6ea8b3 commit a3decde
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 159 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/ipfs/go-cid v0.4.1
github.com/kardianos/service v1.2.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs=
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4=
github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
Expand Down
76 changes: 31 additions & 45 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethersphere/bee/pkg/p2p/protobuf"
"github.com/ethersphere/bee/pkg/ratelimit"
"github.com/ethersphere/bee/pkg/swarm"
lru "github.com/hashicorp/golang-lru"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
Expand All @@ -42,11 +41,8 @@ const (
peersStreamName = "peers"
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
maxBatchSize = 30
pingTimeout = time.Second * 5 // time to wait for ping to succeed
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
cacheSize = 100000
bitsPerByte = 8
cachePrefix = swarm.MaxBins / bitsPerByte // enough bytes (32 bits) to uniquely identify a peer
pingTimeout = time.Second * 15 // time to wait for ping to succeed
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
)

var (
Expand All @@ -70,17 +66,11 @@ type Service struct {
wg sync.WaitGroup
peersChan chan pb.Peers
sem *semaphore.Weighted
lru *lru.Cache // cache for unreachable peers
bootnode bool
allowPrivateCIDRs bool
}

func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, logger log.Logger) (*Service, error) {
lruCache, err := lru.New(cacheSize)
if err != nil {
return nil, err
}

func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, logger log.Logger) *Service {
svc := &Service{
streamer: streamer,
logger: logger.WithName(loggerName).Register(),
Expand All @@ -91,8 +81,7 @@ func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, network
outLimiter: ratelimit.New(limitRate, limitBurst),
quit: make(chan struct{}),
peersChan: make(chan pb.Peers),
sem: semaphore.NewWeighted(int64(31)),
lru: lruCache,
sem: semaphore.NewWeighted(int64(swarm.MaxBins)),
bootnode: bootnode,
allowPrivateCIDRs: allowPrivateCIDRs,
}
Expand All @@ -101,7 +90,7 @@ func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, network
svc.startCheckPeersHandler()
}

return svc, nil
return svc
}

func (s *Service) Protocol() p2p.ProtocolSpec {
Expand Down Expand Up @@ -299,53 +288,29 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
mtx := sync.Mutex{}
wg := sync.WaitGroup{}

for _, p := range peers.Peers {

overlay := swarm.NewAddress(p.Overlay)
cacheOverlay := overlay.ByteString()[:cachePrefix]

// cached peer, skip
if _, ok := s.lru.Get(cacheOverlay); ok {
continue
}

// if peer exists already in the addressBook, skip
if _, err := s.addressBook.Get(overlay); err == nil {
_ = s.lru.Add(cacheOverlay, nil)
continue
}
addPeer := func(newPeer *pb.BzzAddress, multiUnderlay ma.Multiaddr) {

err := s.sem.Acquire(ctx, 1)
if err != nil {
return
}

wg.Add(1)
go func(newPeer *pb.BzzAddress, cacheOverlay string) {

go func() {
s.metrics.PeerConnectAttempts.Inc()

defer func() {
s.sem.Release(1)
// mark peer as seen
_ = s.lru.Add(cacheOverlay, nil)
wg.Done()
}()

multiUnderlay, err := ma.NewMultiaddrBytes(newPeer.Underlay)
if err != nil {
s.metrics.PeerUnderlayErr.Inc()
s.logger.Error(err, "multi address underlay")
return
}

ctx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()

start := time.Now()

// check if the underlay is usable by doing a raw ping using libp2p
if _, err = s.streamer.Ping(ctx, multiUnderlay); err != nil {
if _, err := s.streamer.Ping(ctx, multiUnderlay); err != nil {
s.metrics.PingFailureTime.Observe(time.Since(start).Seconds())
s.metrics.UnreachablePeers.Inc()
s.logger.Debug("unreachable peer underlay", "peer_address", hex.EncodeToString(newPeer.Overlay), "underlay", multiUnderlay)
Expand All @@ -362,7 +327,7 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
Nonce: newPeer.Nonce,
}

err = s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
err := s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
if err != nil {
s.metrics.StorePeerErr.Inc()
s.logger.Warning("skipping peer in response", "peer_address", newPeer.String(), "error", err)
Expand All @@ -372,7 +337,28 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
mtx.Lock()
peersToAdd = append(peersToAdd, bzzAddress.Overlay)
mtx.Unlock()
}(p, cacheOverlay)
}()

}

for _, p := range peers.Peers {

multiUnderlay, err := ma.NewMultiaddrBytes(p.Underlay)
if err != nil {
s.metrics.PeerUnderlayErr.Inc()
s.logger.Debug("multi address underlay", "error", err)
continue
}

// if peer exists already in the addressBook
// and if the underlays match, skip
addr, err := s.addressBook.Get(swarm.NewAddress(p.Overlay))
if err == nil && addr.Underlay.Equal(multiUnderlay) {
continue
}

// add peer does not exist in the addressbook
addPeer(p, multiUnderlay)
}
wg.Wait()

Expand Down
8 changes: 4 additions & 4 deletions pkg/hive/hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestHandlerRateLimit(t *testing.T) {
// new recorder for handling Ping
streamer := streamtest.New()
// create a hive server that handles the incoming stream
server, _ := hive.New(streamer, addressbookclean, networkID, false, true, logger)
server := hive.New(streamer, addressbookclean, networkID, false, true, logger)
testutil.CleanupCloser(t, server)

serverAddress := swarm.RandAddress(t)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestHandlerRateLimit(t *testing.T) {
}

// create a hive client that will do broadcast
client, _ := hive.New(serverRecorder, addressbook, networkID, false, true, logger)
client := hive.New(serverRecorder, addressbook, networkID, false, true, logger)
err := client.BroadcastPeers(context.Background(), serverAddress, peers...)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestBroadcastPeers_FLAKY(t *testing.T) {
streamer = streamtest.New()
}
// create a hive server that handles the incoming stream
server, _ := hive.New(streamer, addressbookclean, networkID, false, true, logger)
server := hive.New(streamer, addressbookclean, networkID, false, true, logger)
testutil.CleanupCloser(t, server)

// setup the stream recorder to record stream data
Expand All @@ -264,7 +264,7 @@ func TestBroadcastPeers_FLAKY(t *testing.T) {
)

// create a hive client that will do broadcast
client, _ := hive.New(recorder, addressbook, networkID, false, tc.allowPrivateCIDRs, logger)
client := hive.New(recorder, addressbook, networkID, false, tc.allowPrivateCIDRs, logger)
if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/node/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ func bootstrapNode(
b.p2pService = p2ps
b.p2pHalter = p2ps

hive, err := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, o.AllowPrivateCIDRs, logger)
if err != nil {
return nil, fmt.Errorf("hive: %w", err)
}
hive := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, o.AllowPrivateCIDRs, logger)

if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
Expand Down
5 changes: 1 addition & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,7 @@ func NewBee(
return nil, fmt.Errorf("pingpong service: %w", err)
}

hive, err := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, o.AllowPrivateCIDRs, logger)
if err != nil {
return nil, fmt.Errorf("hive: %w", err)
}
hive := hive.New(p2ps, addressbook, networkID, o.BootnodeMode, o.AllowPrivateCIDRs, logger)

if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
return nil, fmt.Errorf("hive service: %w", err)
Expand Down
63 changes: 12 additions & 51 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,15 @@ import (
)

const (
pingTimeout = time.Second * 5
pingMaxAttempts = 3
pingTimeout = time.Second * 15
workers = 8
retryAfterDuration = time.Second * 15
)

type peerState int

const (
waiting peerState = iota
inProgress
retryAfterDuration = time.Minute * 5
)

type peer struct {
overlay swarm.Address
addr ma.Multiaddr
retryAfter time.Time
attempts int
state peerState
}

type reacher struct {
Expand All @@ -56,7 +46,6 @@ type reacher struct {

type Options struct {
PingTimeout time.Duration
PingMaxAttempts int
Workers int
RetryAfterDuration time.Duration
}
Expand All @@ -75,7 +64,6 @@ func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reach
if o == nil {
o = &Options{
PingTimeout: pingTimeout,
PingMaxAttempts: pingMaxAttempts,
Workers: workers,
RetryAfterDuration: retryAfterDuration,
}
Expand Down Expand Up @@ -148,11 +136,7 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
for p := range c {

r.mu.Lock()
p.attempts++
var (
overlay = p.overlay
attempts = p.attempts
)
overlay := p.overlay
r.mu.Unlock()

now := time.Now()
Expand All @@ -166,26 +150,12 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
r.metrics.Pings.WithLabelValues("success").Inc()
r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic)
r.deletePeer(p)
continue
}

r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())

// max attempts have been reached
if attempts >= r.options.PingMaxAttempts {
} else {
r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate)
r.deletePeer(p)
continue
}

// mark peer as 'waiting', increase retry-after duration, and notify workers about more work
r.mu.Lock()
p.state = waiting
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration * time.Duration(attempts))
r.mu.Unlock()

r.notifyManage()
}
}
Expand All @@ -194,18 +164,16 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()

now := time.Now()
nextClosest := time.Time{}
var (
now = time.Now()
nextClosest time.Time
)

for _, p := range r.peers {

if p.state == inProgress {
continue
}

// here, retry after is in the past so we can ping this peer
// retry after has expired, retry
if now.After(p.retryAfter) {
p.state = inProgress
p.retryAfter = time.Now().Add(r.options.RetryAfterDuration)
return p, 0
}

Expand All @@ -230,13 +198,6 @@ func (r *reacher) notifyManage() {
}
}

func (r *reacher) deletePeer(p *peer) {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.peers, p.overlay.ByteString())
}

// Connected adds a new peer to the queue for testing reachability.
func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
r.mu.Lock()
Expand Down
3 changes: 0 additions & 3 deletions pkg/p2p/libp2p/internal/reacher/reacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

var defaultOptions = reacher.Options{
PingTimeout: time.Second * 5,
PingMaxAttempts: 3,
Workers: 8,
RetryAfterDuration: time.Millisecond,
}
Expand Down Expand Up @@ -121,8 +120,6 @@ func TestDisconnected(t *testing.T) {
r.Connected(swarm.RandAddress(t), nil)
r.Connected(disconnectedOverlay, disconnectedMa)
r.Disconnected(disconnectedOverlay)

time.Sleep(time.Millisecond * 50) // wait for reachable func to be called
}

type mock struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/spinlock/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var ErrTimedOut = errors.New("timed out waiting for condition")

// Wait blocks execution until condition is satisfied or until it times out.
func Wait(timeoutDur time.Duration, cond func() bool) error {
return WaitWithInterval(timeoutDur, time.Millisecond*20, cond)
return WaitWithInterval(timeoutDur, time.Millisecond*50, cond)
}

// WaitWithInterval blocks execution until condition is satisfied or until it times out.
Expand Down
4 changes: 4 additions & 0 deletions pkg/topology/kademlia/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ func closestPeer(peers *pslice.PSlice, addr swarm.Address) (swarm.Address, error

return closest, nil
}

func (k *Kad) Trigger() {
k.manageC <- struct{}{}
}
Loading

0 comments on commit a3decde

Please sign in to comment.