diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index a84c841f89e..a78b863af5b 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -61,7 +61,6 @@ type Service struct { metrics metrics inLimiter *ratelimit.Limiter outLimiter *ratelimit.Limiter - clearMtx sync.Mutex quit chan struct{} wg sync.WaitGroup peersChan chan pb.Peers @@ -243,13 +242,8 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St } func (s *Service) disconnect(peer p2p.Peer) error { - - s.clearMtx.Lock() - defer s.clearMtx.Unlock() - s.inLimiter.Clear(peer.Address.ByteString()) s.outLimiter.Clear(peer.Address.ByteString()) - return nil } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index e410bda6a4d..c37764b76ee 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -258,9 +258,11 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin } /* - The syncing behavior diverges for peers outside and winthin the storage radius. + The syncing behavior diverges for peers outside and within the storage radius. For neighbor peers, we sync ALL bins greater than or equal to the storage radius. For peers with PO lower than the storage radius, we must sync ONLY the bin that is the PO. + For peers peer with PO lower than the storage radius and even lower than the allowed minimum threshold, + no syncing is done. */ if peer.po >= storageRadius { @@ -289,9 +291,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po]) } } else { - for bin := uint8(0); bin < p.bins; bin++ { - peer.cancelBin(bin) - } + peer.stop() } return nil @@ -356,11 +356,11 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top) } + _ = p.limiter.WaitN(ctx, count) + if isHistorical { p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count)) p.rate.Add(count) - // rate limit historical syncing - _ = p.limiter.WaitN(ctx, count) } else { p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index f3b99e140f0..a30308f794a 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -23,6 +23,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/p2p/protobuf" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/pullsync/pb" + "github.com/ethersphere/bee/v2/pkg/ratelimit" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer" @@ -45,10 +46,12 @@ var ( ) const ( - MaxCursor = math.MaxUint64 - DefaultMaxPage uint64 = 250 - pageTimeout = time.Second - makeOfferTimeout = 15 * time.Minute + MaxCursor = math.MaxUint64 + DefaultMaxPage uint64 = 250 + pageTimeout = time.Second + makeOfferTimeout = 15 * time.Minute + handleMaxChunksPerSecond = 100 + handleRequestsLimitRate = time.Second / handleMaxChunksPerSecond // handle max 100 chunks per second per peer ) // Interface is the PullSync interface. @@ -74,6 +77,8 @@ type Syncer struct { maxPage uint64 + limiter *ratelimit.Limiter + Interface io.Closer } @@ -96,6 +101,7 @@ func New( logger: logger.WithName(loggerName).Register(), quit: make(chan struct{}), maxPage: maxPage, + limiter: ratelimit.New(handleRequestsLimitRate, int(maxPage)), } } @@ -113,9 +119,109 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { Handler: s.cursorHandler, }, }, + DisconnectIn: s.disconnect, + DisconnectOut: s.disconnect, } } +// handler handles an incoming request to sync an interval +func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { + + select { + case <-s.quit: + return nil + default: + s.syncInProgress.Add(1) + defer s.syncInProgress.Add(-1) + } + + r := protobuf.NewReader(stream) + defer func() { + if err != nil { + _ = stream.Reset() + } else { + _ = stream.FullClose() + } + }() + + ctx, cancel := context.WithCancel(streamCtx) + defer cancel() + + go func() { + select { + case <-s.quit: + cancel() + case <-ctx.Done(): + return + } + }() + + var rn pb.Get + if err := r.ReadMsgWithContext(ctx, &rn); err != nil { + return fmt.Errorf("read get range: %w", err) + } + + // recreate the reader to allow the first one to be garbage collected + // before the makeOffer function call, to reduce the total memory allocated + // while makeOffer is executing (waiting for the new chunks) + w, r := protobuf.NewWriterAndReader(stream) + + // make an offer to the upstream peer in return for the requested range + offer, err := s.makeOffer(ctx, rn) + if err != nil { + return fmt.Errorf("make offer: %w", err) + } + + if err := w.WriteMsgWithContext(ctx, offer); err != nil { + return fmt.Errorf("write offer: %w", err) + } + + // we don't have any hashes to offer in this range (the + // interval is empty). nothing more to do + if len(offer.Chunks) == 0 { + return nil + } + + s.metrics.SentOffered.Add(float64(len(offer.Chunks))) + + var want pb.Want + if err := r.ReadMsgWithContext(ctx, &want); err != nil { + return fmt.Errorf("read want: %w", err) + } + + chs, err := s.processWant(ctx, offer, &want) + if err != nil { + return fmt.Errorf("process want: %w", err) + } + + for _, c := range chs { + var stamp []byte + if c.Stamp() != nil { + stamp, err = c.Stamp().MarshalBinary() + if err != nil { + return fmt.Errorf("serialise stamp: %w", err) + } + } + + deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp} + if err := w.WriteMsgWithContext(ctx, &deliver); err != nil { + return fmt.Errorf("write delivery: %w", err) + } + s.metrics.Sent.Inc() + } + + // slow down future requests + waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs))) + if err != nil { + return fmt.Errorf("rate limiter: %w", err) + } + if waitDur > 0 { + s.logger.Debug("rate limited peer", "wait_duration", waitDur, "peer_address", p.Address) + } + + return nil +} + // Sync syncs a batch of chunks starting at a start BinID. // It returns the BinID of highest chunk that was synced from the given // batch and the total number of chunks the downstream peer has sent. @@ -283,94 +389,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start return topmost, chunksPut, chunkErr } -// handler handles an incoming request to sync an interval -func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Stream) (err error) { - select { - case <-s.quit: - return nil - default: - s.syncInProgress.Add(1) - defer s.syncInProgress.Add(-1) - } - - r := protobuf.NewReader(stream) - defer func() { - if err != nil { - _ = stream.Reset() - } else { - _ = stream.FullClose() - } - }() - - ctx, cancel := context.WithCancel(streamCtx) - defer cancel() - - go func() { - select { - case <-s.quit: - cancel() - case <-ctx.Done(): - return - } - }() - - var rn pb.Get - if err := r.ReadMsgWithContext(ctx, &rn); err != nil { - return fmt.Errorf("read get range: %w", err) - } - - // recreate the reader to allow the first one to be garbage collected - // before the makeOffer function call, to reduce the total memory allocated - // while makeOffer is executing (waiting for the new chunks) - w, r := protobuf.NewWriterAndReader(stream) - - // make an offer to the upstream peer in return for the requested range - offer, err := s.makeOffer(ctx, rn) - if err != nil { - return fmt.Errorf("make offer: %w", err) - } - - if err := w.WriteMsgWithContext(ctx, offer); err != nil { - return fmt.Errorf("write offer: %w", err) - } - - // we don't have any hashes to offer in this range (the - // interval is empty). nothing more to do - if len(offer.Chunks) == 0 { - return nil - } - - s.metrics.SentOffered.Add(float64(len(offer.Chunks))) - - var want pb.Want - if err := r.ReadMsgWithContext(ctx, &want); err != nil { - return fmt.Errorf("read want: %w", err) - } - - chs, err := s.processWant(ctx, offer, &want) - if err != nil { - return fmt.Errorf("process want: %w", err) - } - - for _, c := range chs { - var stamp []byte - if c.Stamp() != nil { - stamp, err = c.Stamp().MarshalBinary() - if err != nil { - return fmt.Errorf("serialise stamp: %w", err) - } - } - - deliver := pb.Delivery{Address: c.Address().Bytes(), Data: c.Data(), Stamp: stamp} - if err := w.WriteMsgWithContext(ctx, &deliver); err != nil { - return fmt.Errorf("write delivery: %w", err) - } - s.metrics.Sent.Inc() - } - - return nil -} - // makeOffer tries to assemble an offer for a given requested interval. func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) { @@ -552,6 +570,11 @@ func (s *Syncer) cursorHandler(ctx context.Context, p p2p.Peer, stream p2p.Strea return nil } +func (s *Syncer) disconnect(peer p2p.Peer) error { + s.limiter.Clear(peer.Address.ByteString()) + return nil +} + func (s *Syncer) Close() error { s.logger.Info("pull syncer shutting down") close(s.quit) diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index fb02542c1a1..77872de2849 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -8,6 +8,7 @@ package ratelimit import ( + "context" "sync" "time" @@ -32,7 +33,29 @@ func New(r time.Duration, burst int) *Limiter { // Allow checks if the limiter that belongs to 'key' has not exceeded the limit. func (l *Limiter) Allow(key string, count int) bool { + return l.getLimiter(key).AllowN(time.Now(), count) +} + +// Wait blocks until the limiter permits n events to happen. Returns the time duration +// the limiter waited for to allow the number of events to occur. +func (l *Limiter) Wait(ctx context.Context, key string, count int) (time.Duration, error) { + limiter := l.getLimiter(key) + + n := time.Now() + + if limiter.AllowN(n, count) { + return 0, nil + } + + err := limiter.WaitN(ctx, count) + + return time.Since(n), err +} + +// Clear deletes the limiter that belongs to 'key' +func (l *Limiter) getLimiter(key string) *rate.Limiter { l.mtx.Lock() + defer l.mtx.Unlock() limiter, ok := l.limiter[key] if !ok { @@ -40,16 +63,11 @@ func (l *Limiter) Allow(key string, count int) bool { l.limiter[key] = limiter } - // We are intentionally not defer calling Unlock in order to reduce locking extent. - // Individual limiter is capable for handling concurrent calls. - l.mtx.Unlock() - - return limiter.AllowN(time.Now(), count) + return limiter } // Clear deletes the limiter that belongs to 'key' func (l *Limiter) Clear(key string) { - l.mtx.Lock() defer l.mtx.Unlock() diff --git a/pkg/ratelimit/ratelimit_test.go b/pkg/ratelimit/ratelimit_test.go index e31fe5722bc..7839dc524d4 100644 --- a/pkg/ratelimit/ratelimit_test.go +++ b/pkg/ratelimit/ratelimit_test.go @@ -5,6 +5,7 @@ package ratelimit_test import ( + "context" "testing" "time" @@ -41,3 +42,37 @@ func TestRateLimit(t *testing.T) { t.Fatal("want allowed") } } + +func TestWait(t *testing.T) { + t.Parallel() + + var ( + key = "test" + rate = time.Second + burst = 4 + ) + + limiter := ratelimit.New(rate, burst) + + if !limiter.Allow(key, 1) { + t.Fatal("want allowed") + } + + waitDur, err := limiter.Wait(context.Background(), key, 1) + if err != nil { + t.Fatalf("got err %v", err) + } + + if waitDur != 0 { + t.Fatalf("expected the limiter to NOT wait, got %s", waitDur) + } + + waitDur, err = limiter.Wait(context.Background(), key, burst) + if err != nil { + t.Fatalf("got err %v", err) + } + + if waitDur < rate { + t.Fatalf("expected the limiter to wait at least %s, got %s", rate, waitDur) + } +} diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 6d44f98987e..fe389048344 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -109,8 +109,6 @@ func (db *DB) countWithinRadius(ctx context.Context) (int, error) { db.metrics.ReserveMissingBatch.Set(float64(missing)) reserveSizeWithinRadius.Store(uint64(count)) - db.ReserveSizeWithinRadius() - return count, err }