From 6e571e2f0c9b3a5f70792bd3911d8c3a1d84a454 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 2 Nov 2023 22:59:27 +0300 Subject: [PATCH] fix(puller): start syncing of peers in parallel --- pkg/puller/puller.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index c4f6933fa97..1f536910223 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -192,19 +192,22 @@ func (p *Puller) disconnectPeer(addr swarm.Address) { // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. // Must be called under lock. func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) { - var errs error + var wg sync.WaitGroup for _, peer := range p.syncPeers { - peer.mtx.Lock() - errs = errors.Join(p.syncPeer(ctx, peer, storageRadius)) - peer.mtx.Unlock() - } - if errs != nil { - p.logger.Debug("recalculation failed", "error", errs) + wg.Add(1) + go func(peer *syncPeer) { + defer wg.Done() + if err := p.syncPeer(ctx, peer, storageRadius); err != nil { + p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err) + } + }(peer) } + wg.Wait() } -// Must be called under syncPeer lock. func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error { + peer.mtx.Lock() + defer peer.mtx.Unlock() if peer.cursors == nil { cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address) @@ -490,8 +493,9 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer { // called when peer disconnects or on shutdown, cleans up ongoing sync operations func (p *syncPeer) gone() { - for _, c := range p.binCancelFuncs { + for bin, c := range p.binCancelFuncs { c() + delete(p.binCancelFuncs, bin) } p.wg.Wait() }