Skip to content

Commit

Permalink
fix(puller): start syncing of peers in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 2, 2023
1 parent 9221ed0 commit 6e571e2
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,22 @@ func (p *Puller) disconnectPeer(addr swarm.Address) {
// recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius.
// Must be called under lock.
func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) {
var errs error
var wg sync.WaitGroup
for _, peer := range p.syncPeers {
peer.mtx.Lock()
errs = errors.Join(p.syncPeer(ctx, peer, storageRadius))
peer.mtx.Unlock()
}
if errs != nil {
p.logger.Debug("recalculation failed", "error", errs)
wg.Add(1)
go func(peer *syncPeer) {
defer wg.Done()
if err := p.syncPeer(ctx, peer, storageRadius); err != nil {
p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err)
}
}(peer)
}
wg.Wait()
}

// Must be called under syncPeer lock.
func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error {
peer.mtx.Lock()
defer peer.mtx.Unlock()

if peer.cursors == nil {
cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address)
Expand Down Expand Up @@ -490,8 +493,9 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {

// called when peer disconnects or on shutdown, cleans up ongoing sync operations
func (p *syncPeer) gone() {
for _, c := range p.binCancelFuncs {
for bin, c := range p.binCancelFuncs {
c()
delete(p.binCancelFuncs, bin)
}
p.wg.Wait()
}
Expand Down

0 comments on commit 6e571e2

Please sign in to comment.