Skip to content

Commit

Permalink
fix: kademlia to use storage radius as neighborhood depth (#4410)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 19, 2023
1 parent 61d90a9 commit aa68793
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 82 deletions.
5 changes: 4 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func NewBee(
var swapService *swap.Service

kad, err := kademlia.New(swarmAddress, addressbook, hive, p2ps, logger,
kademlia.Options{Bootnodes: bootnodes, BootnodeMode: o.BootnodeMode, StaticNodes: o.StaticNodes, IgnoreRadius: !chainEnabled, DataDir: o.DataDir})
kademlia.Options{Bootnodes: bootnodes, BootnodeMode: o.BootnodeMode, StaticNodes: o.StaticNodes, DataDir: o.DataDir})
if err != nil {
return nil, fmt.Errorf("unable to create kademlia: %w", err)
}
Expand Down Expand Up @@ -967,6 +967,9 @@ func NewBee(
if prev == uint32(swarm.MaxBins) {
close(initialRadiusC)
}
if !o.FullNodeMode { // light and ultra-light nodes do not have a reserve worker to set the radius.
kad.SetStorageRadius(r)
}
case <-ctx.Done():
unsub()
return
Expand Down
16 changes: 14 additions & 2 deletions pkg/topology/kademlia/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,20 @@ const (
type PeerFilterFunc = peerFilterFunc
type FilterFunc = filtersFunc

func (k *Kad) IsWithinDepth(addr swarm.Address) bool {
return swarm.Proximity(k.base.Bytes(), addr.Bytes()) >= k.NeighborhoodDepth()
func (k *Kad) IsWithinConnectionDepth(addr swarm.Address) bool {
return swarm.Proximity(k.base.Bytes(), addr.Bytes()) >= k.ConnectionDepth()
}

func (k *Kad) ConnectionDepth() uint8 {
k.depthMu.RLock()
defer k.depthMu.RUnlock()
return k.depth
}

func (k *Kad) StorageRadius() uint8 {
k.depthMu.RLock()
defer k.depthMu.RUnlock()
return k.storageRadius
}

// IsBalanced returns if Kademlia is balanced to bin.
Expand Down
45 changes: 14 additions & 31 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type Options struct {
PruneFunc pruneFunc
StaticNodes []swarm.Address
FilterFunc filtersFunc
IgnoreRadius bool
DataDir string

BitSuffixLength *int
Expand All @@ -106,7 +105,6 @@ type kadOptions struct {
PruneFunc pruneFunc
StaticNodes []swarm.Address
FilterFunc filtersFunc
IgnoreRadius bool

TimeToRetry time.Duration
ShortRetry time.Duration
Expand All @@ -127,7 +125,6 @@ func newKadOptions(o Options) kadOptions {
PruneFunc: o.PruneFunc,
StaticNodes: o.StaticNodes,
FilterFunc: o.FilterFunc,
IgnoreRadius: o.IgnoreRadius,
// copy or use default
TimeToRetry: defaultValDuration(o.TimeToRetry, defaultTimeToRetry),
ShortRetry: defaultValDuration(o.ShortRetry, defaultShortRetry),
Expand Down Expand Up @@ -285,7 +282,7 @@ func (k *Kad) connectBalanced(wg *sync.WaitGroup, peerConnChan chan<- *peerConnI
return false
}

depth := k.NeighborhoodDepth()
depth := k.neighborhoodDepth()

for i := range k.commonBinPrefixes {

Expand Down Expand Up @@ -351,7 +348,7 @@ func (k *Kad) connectNeighbours(wg *sync.WaitGroup, peerConnChan chan<- *peerCon
_ = k.knownPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) {

// out of depth, skip bin
if po < k.NeighborhoodDepth() {
if po < k.neighborhoodDepth() {
return false, true, nil
}

Expand Down Expand Up @@ -445,9 +442,7 @@ func (k *Kad) connectionAttemptsHandler(ctx context.Context, wg *sync.WaitGroup,
k.metrics.TotalOutboundConnections.Inc()
k.collector.Record(peer.addr, im.PeerLogIn(time.Now(), im.PeerConnectionDirectionOutbound))

k.depthMu.Lock()
k.recalcDepth()
k.depthMu.Unlock()

k.logger.Info("connected to peer", "peer_address", peer.addr, "proximity_order", peer.po)
k.notifyManageLoop()
Expand Down Expand Up @@ -580,7 +575,7 @@ func (k *Kad) manage() {
}

if k.bootnode {
depth := k.NeighborhoodDepth()
depth := k.neighborhoodDepth()

k.metrics.CurrentDepth.Set(float64(depth))
k.metrics.CurrentlyKnownPeers.Set(float64(k.knownPeers.Length()))
Expand All @@ -589,12 +584,12 @@ func (k *Kad) manage() {
continue
}

oldDepth := k.NeighborhoodDepth()
oldDepth := k.neighborhoodDepth()
k.connectBalanced(&wg, balanceChan)
k.connectNeighbours(&wg, neighbourhoodChan)
wg.Wait()

depth := k.NeighborhoodDepth()
depth := k.neighborhoodDepth()

k.opt.PruneFunc(depth)

Expand Down Expand Up @@ -835,6 +830,9 @@ func binSaturated(oversaturationAmount int, staticNode staticPeerFunc) binSatura
// recalcDepth calculates, assigns the new depth, and returns if depth has changed
func (k *Kad) recalcDepth() {

k.depthMu.Lock()
defer k.depthMu.Unlock()

var (
peers = k.connectedPeers
filter = k.opt.FilterFunc(im.Reachability(false))
Expand Down Expand Up @@ -893,10 +891,6 @@ func (k *Kad) recalcDepth() {
return false, false, nil
})

if k.storageRadius < depth && !k.opt.IgnoreRadius {
depth = k.storageRadius
}

if depth > candidate {
depth = candidate
}
Expand Down Expand Up @@ -974,7 +968,7 @@ func (k *Kad) connect(ctx context.Context, peer swarm.Address, ma ma.Multiaddr)
func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) error {
var addrs []swarm.Address

depth := k.NeighborhoodDepth()
depth := k.neighborhoodDepth()
isNeighbor := swarm.Proximity(peer.Bytes(), k.base.Bytes()) >= depth

outer:
Expand Down Expand Up @@ -1147,9 +1141,7 @@ func (k *Kad) onConnected(ctx context.Context, addr swarm.Address) error {

k.waitNext.Remove(addr)

k.depthMu.Lock()
k.recalcDepth()
k.depthMu.Unlock()

k.notifyManageLoop()
k.notifyPeerSig()
Expand All @@ -1168,9 +1160,7 @@ func (k *Kad) Disconnected(peer p2p.Peer) {
k.metrics.TotalInboundDisconnections.Inc()
k.collector.Record(peer.Address, im.PeerLogOut(time.Now()))

k.depthMu.Lock()
k.recalcDepth()
k.depthMu.Unlock()

k.notifyManageLoop()
k.notifyPeerSig()
Expand Down Expand Up @@ -1288,9 +1278,7 @@ func (k *Kad) Reachable(addr swarm.Address, status p2p.ReachabilityStatus) {
k.collector.Record(addr, im.PeerReachability(status))
k.logger.Debug("reachability of peer updated", "peer_address", addr, "reachability", status)
if status == p2p.ReachabilityStatusPublic {
k.depthMu.Lock()
k.recalcDepth()
k.depthMu.Unlock()
k.notifyManageLoop()
}
}
Expand Down Expand Up @@ -1357,11 +1345,11 @@ func filterOps(filter topology.Select) []im.FilterOp {
}

// NeighborhoodDepth returns the current Kademlia depth.
func (k *Kad) NeighborhoodDepth() uint8 {
func (k *Kad) neighborhoodDepth() uint8 {
k.depthMu.RLock()
defer k.depthMu.RUnlock()

return k.depth
return k.storageRadius
}

func (k *Kad) SetStorageRadius(d uint8) {
Expand All @@ -1377,13 +1365,8 @@ func (k *Kad) SetStorageRadius(d uint8) {
k.metrics.CurrentStorageDepth.Set(float64(k.storageRadius))
k.logger.Debug("kademlia set storage radius", "radius", k.storageRadius)

oldDepth := k.depth
k.recalcDepth()

if oldDepth != k.depth {
k.notifyManageLoop()
k.notifyPeerSig()
}
k.notifyManageLoop()
k.notifyPeerSig()
}

func (k *Kad) Snapshot() *topology.KadParams {
Expand Down Expand Up @@ -1433,7 +1416,7 @@ func (k *Kad) Snapshot() *topology.KadParams {
Connected: k.connectedPeers.Length(),
Timestamp: time.Now(),
NNLowWatermark: k.opt.LowWaterMark,
Depth: k.NeighborhoodDepth(),
Depth: k.neighborhoodDepth(),
Reachability: k.reachability.String(),
NetworkAvailability: k.p2p.NetworkStatus().String(),
Bins: topology.KadBins{
Expand Down
Loading

0 comments on commit aa68793

Please sign in to comment.