Skip to content

Commit

Permalink
feat(puller): limiter (#4504)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Dec 13, 2023
1 parent 8eb322d commit c40734e
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
ratelimit "golang.org/x/time/rate"
)

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -65,6 +66,8 @@ type Puller struct {
rate *rate.Rate // rate of historical syncing

start sync.Once

limiter *ratelimit.Limiter
}

func New(
Expand Down Expand Up @@ -92,6 +95,7 @@ func New(
blockLister: blockLister,
rate: rate.New(DefaultHistRateWindow),
cancel: func() { /* Noop, since the context is initialized in the Start(). */ },
limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second), int(swarm.MaxBins)), // allows for 1 sync per second, max bins bursts
}

return p
Expand Down Expand Up @@ -301,15 +305,18 @@ func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8,

for {

// rate limit within neighborhood
if bin >= p.radius.StorageRadius() {
_ = p.limiter.Wait(ctx)
}

select {
case <-ctx.Done():
loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin)
return
default:
}

p.metrics.SyncWorkerIterCounter.Inc()

s, _, _, err := p.nextPeerInterval(peer, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
Expand Down

0 comments on commit c40734e

Please sign in to comment.