From a5945f7fc61727bd48c28190d999cd9579deb6ae Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 22 Jan 2024 22:00:26 +0300 Subject: [PATCH] fix: asd --- pkg/puller/puller.go | 60 ++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index ee1cbd5f00d..0c90d351945 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -294,31 +294,35 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint ctx, cancel := context.WithCancel(parentCtx) peer.setBinCancel(cancel, bin) - sync := func(isHistorical bool, address swarm.Address, cursor uint64, bin uint8, done func()) { + sync := func(isHistorical bool, address swarm.Address, start uint64, bin uint8, done func()) { p.metrics.SyncWorkerCounter.Inc() defer p.wg.Done() defer p.metrics.SyncWorkerDoneCounter.Inc() defer done() - for { - s, err := p.nextPeerInterval(address, bin) - if err != nil { - p.metrics.SyncWorkerErrCounter.Inc() - p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting") - return - } + var ( + cursor = start + err error + ) + for { if isHistorical { + start, err = p.nextPeerInterval(address, bin) + if err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting") + return + } + // historical sync has caught up to the cursor, exit - if s > cursor { + if start > cursor { return } // rate limit historical syncing _ = p.limiter.Wait(ctx) - } else if s < cursor { - // livesync always starts at >= cursor - s = cursor + } else { + start += 1 } select { @@ -331,14 +335,23 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncWorkerIterCounter.Inc() syncStart := time.Now() - top, count, err := p.syncer.Sync(ctx, address, bin, s) + top, count, err := p.syncer.Sync(ctx, address, bin, start) if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() - p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", address, "bin", bin, "from", s, "topmost", top) + p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", address, "bin", bin, "from", start, "topmost", top) return } + if err != nil { + p.metrics.SyncWorkerErrCounter.Inc() + if errors.Is(err, p2p.ErrPeerNotFound) { + p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + return + } + loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top) + } + if isHistorical { p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count)) p.rate.Add(count) @@ -346,22 +359,15 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count)) } - if top >= s { - if err := p.addPeerInterval(address, bin, s, top); err != nil { + // pulled at least one chunk + if top >= start { + if err := p.addPeerInterval(address, bin, start, top); err != nil { p.metrics.SyncWorkerErrCounter.Inc() p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address) return } - loggerV2.Debug("syncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", address) - } - - if err != nil { - p.metrics.SyncWorkerErrCounter.Inc() - if errors.Is(err, p2p.ErrPeerNotFound) { - p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", s, "topmost", top) - return - } - loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", s, "topmost", top) + loggerV2.Debug("syncWorker pulled", "bin", bin, "start", start, "topmost", top, "duration", time.Since(syncStart), "peer_address", address) + start = top } } } @@ -374,7 +380,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint peer.wg.Add(1) p.wg.Add(1) - go sync(false, peer.address, cursor+1, bin, peer.wg.Done) + go sync(false, peer.address, cursor, bin, peer.wg.Done) } func (p *Puller) Close() error {