diff --git a/pkg/intervalstore/intervals.go b/pkg/puller/intervalstore/intervals.go similarity index 100% rename from pkg/intervalstore/intervals.go rename to pkg/puller/intervalstore/intervals.go diff --git a/pkg/intervalstore/intervals_test.go b/pkg/puller/intervalstore/intervals_test.go similarity index 100% rename from pkg/intervalstore/intervals_test.go rename to pkg/puller/intervalstore/intervals_test.go diff --git a/pkg/intervalstore/main_test.go b/pkg/puller/intervalstore/main_test.go similarity index 100% rename from pkg/intervalstore/main_test.go rename to pkg/puller/intervalstore/main_test.go diff --git a/pkg/intervalstore/store_test.go b/pkg/puller/intervalstore/store_test.go similarity index 100% rename from pkg/intervalstore/store_test.go rename to pkg/puller/intervalstore/store_test.go diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 2992ef0a8c3..f2cfd15eee0 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -15,9 +15,9 @@ import ( "sync" "time" - "github.com/ethersphere/bee/pkg/intervalstore" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/p2p" + "github.com/ethersphere/bee/pkg/puller/intervalstore" "github.com/ethersphere/bee/pkg/pullsync" "github.com/ethersphere/bee/pkg/rate" "github.com/ethersphere/bee/pkg/storage" @@ -95,7 +95,7 @@ func New( blockLister: blockLister, rate: rate.New(DefaultHistRateWindow), cancel: func() { /* Noop, since the context is initialized in the Start(). */ }, - limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second/2), int(swarm.MaxBins)), // allows for 2 syncs per second, max bins bursts + limiter: ratelimit.NewLimiter(ratelimit.Every(time.Second/4), int(swarm.MaxBins)), // allows for 2 syncs per second, max bins bursts } return p @@ -129,18 +129,15 @@ func (p *Puller) manage(ctx context.Context) { p.syncPeersMtx.Lock() defer p.syncPeersMtx.Unlock() - // peersDisconnected is used to mark and prune peers that are no longer connected. - peersDisconnected := make(map[string]*syncPeer) - for _, peer := range p.syncPeers { - peersDisconnected[peer.address.ByteString()] = peer - } - newRadius := p.radius.StorageRadius() // reset all intervals below the new radius to resync: // 1. previously evicted chunks // 2. previously ignored chunks due to a higher radius if newRadius < prevRadius { + for _, peer := range p.syncPeers { + p.disconnectPeer(peer.address) + } err := p.resetIntervals(prevRadius) if err != nil { p.logger.Debug("reset lower sync radius failed", "error", err) @@ -148,6 +145,12 @@ func (p *Puller) manage(ctx context.Context) { } prevRadius = newRadius + // peersDisconnected is used to mark and prune peers that are no longer connected. + peersDisconnected := make(map[string]*syncPeer) + for _, peer := range p.syncPeers { + peersDisconnected[peer.address.ByteString()] = peer + } + _ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { if _, ok := p.syncPeers[addr.ByteString()]; !ok { p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po) @@ -187,7 +190,7 @@ func (p *Puller) disconnectPeer(addr swarm.Address) { loggerV2.Debug("disconnecting peer", "peer_address", addr) if peer, ok := p.syncPeers[addr.ByteString()]; ok { peer.mtx.Lock() - peer.gone() + peer.stop() peer.mtx.Unlock() } delete(p.syncPeers, addr.ByteString()) @@ -229,7 +232,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin if storedEpoch != epoch { // cancel all bins - peer.gone() + peer.stop() p.logger.Debug("peer epoch change detected, resetting past synced intervals", "stored_epoch", storedEpoch, "new_epoch", epoch, "peer_address", peer.address) @@ -285,81 +288,97 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin // syncPeerBin will start historical and live syncing for the peer for a particular bin. // Must be called under syncPeer lock. -func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur uint64) { - binCtx, cancel := context.WithCancel(ctx) - peer.setBinCancel(cancel, bin) - peer.wg.Add(1) - p.wg.Add(1) - go p.syncWorker(binCtx, peer.address, bin, cur, peer.wg.Done) -} - -func (p *Puller) syncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64, done func()) { +func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint8, cursor uint64) { loggerV2 := p.logger.V(2).Register() - p.metrics.SyncWorkerCounter.Inc() - defer p.wg.Done() - defer p.metrics.SyncWorkerDoneCounter.Inc() - defer done() + ctx, cancel := context.WithCancel(parentCtx) + peer.setBinCancel(cancel, bin) - loggerV2.Debug("syncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) + sync := func(isHistorical bool, address swarm.Address, start uint64, bin uint8, done func()) { + p.metrics.SyncWorkerCounter.Inc() - for { + defer p.wg.Done() + defer p.metrics.SyncWorkerDoneCounter.Inc() + defer done() - s, _, _, err := p.nextPeerInterval(peer, bin) - if err != nil { - p.metrics.SyncWorkerErrCounter.Inc() - p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting") - return - } - - // rate limit historical syncing - if s <= cur { - _ = p.limiter.Wait(ctx) - } + var ( + cursor = start + err error + ) - select { - case <-ctx.Done(): - loggerV2.Debug("syncWorker context cancelled", "peer_address", peer, "bin", bin) - return - default: - } + for { + if isHistorical { // overide start with the next interval if historical syncing + start, err = p.nextPeerInterval(address, bin) + if err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting") + return + } + + // historical sync has caught up to the cursor, exit + if start > cursor { + return + } + // rate limit historical syncing + _ = p.limiter.Wait(ctx) + } - p.metrics.SyncWorkerIterCounter.Inc() + select { + case <-ctx.Done(): + loggerV2.Debug("syncWorker context cancelled", "peer_address", address, "bin", bin) + return + default: + } - syncStart := time.Now() - top, count, err := p.syncer.Sync(ctx, peer, bin, s) + p.metrics.SyncWorkerIterCounter.Inc() - if top == math.MaxUint64 { - p.metrics.MaxUintErrCounter.Inc() - p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", peer, "bin", bin, "from", s, "topmost", top) - return - } + syncStart := time.Now() + top, count, err := p.syncer.Sync(ctx, address, bin, start) - if top <= cur { - p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count)) - p.rate.Add(count) - } else { - p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) - } + if top == math.MaxUint64 { + p.metrics.MaxUintErrCounter.Inc() + p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", address, "bin", bin, "from", start, "topmost", top) + return + } - if top >= s { - if err := p.addPeerInterval(peer, bin, s, top); err != nil { + if err != nil { p.metrics.SyncWorkerErrCounter.Inc() - p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", peer) - return + if errors.Is(err, p2p.ErrPeerNotFound) { + p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + return + } + loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) } - loggerV2.Debug("syncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", peer) - } - if err != nil { - p.metrics.SyncWorkerErrCounter.Inc() - if errors.Is(err, p2p.ErrPeerNotFound) { - p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) - return + if isHistorical { + p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count)) + p.rate.Add(count) + } else { + p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) + } + + // pulled at least one chunk + if top >= start { + if err := p.addPeerInterval(address, bin, start, top); err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address) + return + } + loggerV2.Debug("syncWorker pulled", "bin", bin, "start", start, "topmost", top, "isHistorical", isHistorical, "duration", time.Since(syncStart), "peer_address", address) + start = top + 1 } - p.logger.Debug("syncWorker interval failed", "error", err, "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) } } + + if cursor > 0 { + peer.wg.Add(1) + p.wg.Add(1) + go sync(true, peer.address, cursor, bin, peer.wg.Done) + } + + peer.wg.Add(1) + p.wg.Add(1) + go sync(false, peer.address, cursor+1, bin, peer.wg.Done) } func (p *Puller) Close() error { @@ -443,17 +462,17 @@ func (p *Puller) resetIntervals(upto uint8) (err error) { return } -func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (start, end uint64, empty bool, err error) { +func (p *Puller) nextPeerInterval(peer swarm.Address, bin uint8) (uint64, error) { p.intervalMtx.Lock() defer p.intervalMtx.Unlock() i, err := p.getOrCreateInterval(peer, bin) if err != nil { - return 0, 0, false, err + return 0, err } - start, end, empty = i.Next(0) - return start, end, empty, nil + start, _, _ := i.Next(0) + return start, nil } // Must be called underlock. @@ -506,7 +525,7 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { } // called when peer disconnects or on shutdown, cleans up ongoing sync operations -func (p *syncPeer) gone() { +func (p *syncPeer) stop() { for bin, c := range p.binCancelFuncs { c() delete(p.binCancelFuncs, bin) diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 339e492f488..f76fe292432 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -11,9 +11,9 @@ import ( "testing" "time" - "github.com/ethersphere/bee/pkg/intervalstore" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/puller" + "github.com/ethersphere/bee/pkg/puller/intervalstore" mockps "github.com/ethersphere/bee/pkg/pullsync/mock" "github.com/ethersphere/bee/pkg/spinlock" "github.com/ethersphere/bee/pkg/statestore/mock" @@ -438,11 +438,12 @@ func TestContinueSyncing(t *testing.T) { time.Sleep(100 * time.Millisecond) kad.Trigger() - time.Sleep(time.Second) - calls := len(pullsync.SyncCalls(addr)) - if calls != 1 { - t.Fatalf("unexpected amount of calls, got %d", calls) + err := spinlock.Wait(time.Second, func() bool { + return len(pullsync.SyncCalls(addr)) == 1 + }) + if err != nil { + t.Fatal(err) } }