diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 2fff5739af2..9011e86c7f6 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -148,6 +148,15 @@ func (s *Service) chunksWorker(warmupTime time.Duration, tracer *tracing.Tracer) ) defer func() { + // no peer was found which may mean that the node is suffering from connections issues + // we must slow down the pusher to prevent constant retries + if errors.Is(err, topology.ErrNotFound) { + select { + case <-time.After(time.Second * 5): + case <-s.quit: + } + } + wg.Done() <-sem if doRepeat { diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 6ef29619935..0755c215e23 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -340,8 +340,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo // If no peer can be found from an origin peer, the origin peer may store the chunk. // Non-origin peers store the chunk if the chunk is within depth. // For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk. - peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), ps.fullNode && !origin, topology.Select{Reachable: true, Healthy: true}, ps.skipList.ChunkPeers(ch.Address())...) - + peer, err := ps.closestPeer(ch.Address(), origin) if errors.Is(err, topology.ErrNotFound) { if ps.skipList.PruneExpiresAfter(ch.Address(), overDraftRefresh) == 0 { //no overdraft peers, we have depleted ALL peers if inflight == 0 { @@ -420,6 +419,23 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo return nil, ErrNoPush } +func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool) (swarm.Address, error) { + + skipList := ps.skipList.ChunkPeers(chunkAddress) + includeSelf := ps.fullNode && !origin + + peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...) + if errors.Is(err, topology.ErrNotFound) { + peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) + if errors.Is(err, topology.ErrNotFound) { + return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...) + } + return peer, err + } + + return peer, err +} + func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { span := tracing.FromContext(parentCtx) diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 8f6be31e364..5fb48d57948 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -366,7 +366,19 @@ func (s *Service) prepareCredit(ctx context.Context, peer, chunk swarm.Address, // retrieve request. func (s *Service) closestPeer(addr swarm.Address, skipPeers []swarm.Address, allowUpstream bool) (swarm.Address, error) { - closest, err := s.peerSuggester.ClosestPeer(addr, false, topology.Select{Reachable: true, Healthy: true}, skipPeers...) + var ( + closest swarm.Address + err error + ) + + closest, err = s.peerSuggester.ClosestPeer(addr, false, topology.Select{Reachable: true, Healthy: true}, skipPeers...) + if errors.Is(err, topology.ErrNotFound) { + closest, err = s.peerSuggester.ClosestPeer(addr, false, topology.Select{Reachable: true}, skipPeers...) + if errors.Is(err, topology.ErrNotFound) { + closest, err = s.peerSuggester.ClosestPeer(addr, false, topology.Select{}, skipPeers...) + } + } + if err != nil { return swarm.Address{}, err }