diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index af0e7de7524..2992ef0a8c3 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -95,7 +95,7 @@ func New( blockLister: blockLister, rate: rate.New(DefaultHistRateWindow), cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, - limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), int(swarm.MaxBins)), // allows for 1 sync per second, max bins bursts + limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second/2), int(swarm.MaxBins)), // allows for 2 syncs per second, max bins bursts } return p @@ -305,8 +305,15 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, for { - // rate limit within neighborhood - if bin >= p.radius.StorageRadius() { + s, _, _, err := p.nextPeerInterval(peer, bin) + if err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting") + return + } + + // rate limit historical syncing + if s <= cur { _ = p.limiter.Wait(ctx) } @@ -319,13 +326,6 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, p.metrics.SyncWorkerIterCounter.Inc() - s, _, _, err := p.nextPeerInterval(peer, bin) - if err != nil { - p.metrics.SyncWorkerErrCounter.Inc() - p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting") - return - } - syncStart := time.Now() top, count, err := p.syncer.Sync(ctx, peer, bin, s)