Skip to content

Commit

Permalink
fix: asd
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Jan 22, 2024
1 parent 6cb07cc commit a5945f7
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,31 +294,35 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
ctx, cancel := context.WithCancel(parentCtx)
peer.setBinCancel(cancel, bin)

sync := func(isHistorical bool, address swarm.Address, cursor uint64, bin uint8, done func()) {
sync := func(isHistorical bool, address swarm.Address, start uint64, bin uint8, done func()) {
p.metrics.SyncWorkerCounter.Inc()

defer p.wg.Done()
defer p.metrics.SyncWorkerDoneCounter.Inc()
defer done()

for {
s, err := p.nextPeerInterval(address, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting")
return
}
var (
cursor = start
err error
)

for {
if isHistorical {
start, err = p.nextPeerInterval(address, bin)
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker nextPeerInterval failed, quitting")
return
}

// historical sync has caught up to the cursor, exit
if s > cursor {
if start > cursor {
return
}
// rate limit historical syncing
_ = p.limiter.Wait(ctx)
} else if s < cursor {
// livesync always starts at >= cursor
s = cursor
} else {
start += 1
}

select {
Expand All @@ -331,37 +335,39 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
p.metrics.SyncWorkerIterCounter.Inc()

syncStart := time.Now()
top, count, err := p.syncer.Sync(ctx, address, bin, s)
top, count, err := p.syncer.Sync(ctx, address, bin, start)

if top == math.MaxUint64 {
p.metrics.MaxUintErrCounter.Inc()
p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", address, "bin", bin, "from", s, "topmost", top)
p.logger.Error(nil, "syncWorker max uint64 encountered, quitting", "peer_address", address, "bin", bin, "from", start, "topmost", top)
return
}

if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
if errors.Is(err, p2p.ErrPeerNotFound) {
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
return
}
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
}

if isHistorical {
p.metrics.SyncedCounter.WithLabelValues("historical").Add(float64(count))
p.rate.Add(count)
} else {
p.metrics.SyncedCounter.WithLabelValues("live").Add(float64(count))
}

if top >= s {
if err := p.addPeerInterval(address, bin, s, top); err != nil {
// pulled at least one chunk
if top >= start {
if err := p.addPeerInterval(address, bin, start, top); err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address)
return
}
loggerV2.Debug("syncWorker pulled", "bin", bin, "start", s, "topmost", top, "duration", time.Since(syncStart), "peer_address", address)
}

if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
if errors.Is(err, p2p.ErrPeerNotFound) {
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", s, "topmost", top)
return
}
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", s, "topmost", top)
loggerV2.Debug("syncWorker pulled", "bin", bin, "start", start, "topmost", top, "duration", time.Since(syncStart), "peer_address", address)
start = top
}
}
}
Expand All @@ -374,7 +380,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint

peer.wg.Add(1)
p.wg.Add(1)
go sync(false, peer.address, cursor+1, bin, peer.wg.Done)
go sync(false, peer.address, cursor, bin, peer.wg.Done)
}

func (p *Puller) Close() error {
Expand Down

0 comments on commit a5945f7

Please sign in to comment.