Skip to content

Commit

Permalink
feat(pushsync, retrieval): varying levels of filtering for the closes… (
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 10, 2023
1 parent f42feec commit 9d14736
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
9 changes: 9 additions & 0 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer)
)

defer func() {
// no peer was found which may mean that the node is suffering from connections issues
// we must slow down the pusher to prevent constant retries
if errors.Is(err, topology.ErrNotFound) {
select {
case <-time.After(time.Second * 5):
case <-s.quit:
}
}

wg.Done()
<-sem
if doRepeat {
Expand Down
20 changes: 18 additions & 2 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
// If no peer can be found from an origin peer, the origin peer may store the chunk.
// Non-origin peers store the chunk if the chunk is within depth.
// For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk.
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), ps.fullNode && !origin, topology.Select{Reachable: true, Healthy: true}, ps.skipList.ChunkPeers(ch.Address())...)

peer, err := ps.closestPeer(ch.Address(), origin)
if errors.Is(err, topology.ErrNotFound) {
if ps.skipList.PruneExpiresAfter(ch.Address(), overDraftRefresh) == 0 { //no overdraft peers, we have depleted ALL peers
if inflight == 0 {
Expand Down Expand Up @@ -420,6 +419,23 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
return nil, ErrNoPush
}

func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool) (swarm.Address, error) {

skipList := ps.skipList.ChunkPeers(chunkAddress)
includeSelf := ps.fullNode && !origin

peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...)
if errors.Is(err, topology.ErrNotFound) {
peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...)
if errors.Is(err, topology.ErrNotFound) {
return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...)
}
return peer, err
}

return peer, err
}

func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) {

span := tracing.FromContext(parentCtx)
Expand Down
14 changes: 13 additions & 1 deletion pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,19 @@ func (s *Service) prepareCredit(ctx context.Context, peer, chunk swarm.Address,
// retrieve request.
func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, allowUpstream bool) (swarm.Address, error) {

closest, err := s.peerSuggester.ClosestPeer(addr, false, topology.Select{Reachable: true, Healthy: true}, skipPeers...)
var (
closest swarm.Address
err error
)

closest, err = s.peerSuggester.ClosestPeer(addr, false, topology.Select{Reachable: true, Healthy: true}, skipPeers...)
if errors.Is(err, topology.ErrNotFound) {
closest, err = s.peerSuggester.ClosestPeer(addr, false, topology.Select{Reachable: true}, skipPeers...)
if errors.Is(err, topology.ErrNotFound) {
closest, err = s.peerSuggester.ClosestPeer(addr, false, topology.Select{}, skipPeers...)
}
}

if err != nil {
return swarm.Address{}, err
}
Expand Down

0 comments on commit 9d14736

Please sign in to comment.