Skip to content

Commit

Permalink
fix(pushsync): multiplexer freeze (#4950)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Jan 21, 2025
1 parent fd88cbc commit 904f186
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/pusher/inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type attempts struct {
func (a *attempts) try(idAddress swarm.Address) bool {
a.mtx.Lock()
defer a.mtx.Unlock()

key := idAddress.ByteString()
a.attempts[key]++
return a.attempts[key] < a.retryCount
Expand Down
10 changes: 8 additions & 2 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
return true, err
}
case errors.Is(err, pushsync.ErrShallowReceipt):
if retry := s.shallowReceipt(op.identityAddress); retry {
if s.shallowReceipt(op.identityAddress) {
return true, err
}
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
Expand Down Expand Up @@ -330,6 +330,12 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err
if err != nil {
loggerV1.Error(err, "pusher: failed to store chunk")
}
case errors.Is(err, pushsync.ErrShallowReceipt):
if s.shallowReceipt(op.identityAddress) {
return err
}
// out of attempts for retry, swallow error
err = nil
case err != nil:
loggerV1.Error(err, "pusher: failed PushChunkToClosest")
}
Expand Down Expand Up @@ -362,7 +368,7 @@ func (s *Service) Close() error {
// Wait for chunks worker to finish
select {
case <-s.chunksWorkerQuitC:
case <-time.After(6 * time.Second):
case <-time.After(10 * time.Second):
}
return nil
}
17 changes: 5 additions & 12 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,6 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
skip := skippeers.NewList()
defer skip.Close()

neighborsOnly := false

for sentErrorsLeft > 0 {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -434,22 +432,14 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
if inflight == 0 {
return nil, err
}
// inflight request in progress, wait for it's result
ps.logger.Debug("next peer", "chunk_address", ch.Address(), "error", err)
continue
}

peerPO := swarm.Proximity(peer.Bytes(), ch.Address().Bytes())

// all future requests should land directly into the neighborhood
if neighborsOnly && peerPO < rad {
skip.Forever(idAddress, peer)
continue
}

// since we can reach into the neighborhood of the chunk
// act as the multiplexer and push the chunk in parallel to multiple peers
if peerPO >= rad {
neighborsOnly = true
if swarm.Proximity(peer.Bytes(), ch.Address().Bytes()) >= rad {
for ; parallelForwards > 0; parallelForwards-- {
retry()
sentErrorsLeft++
Expand Down Expand Up @@ -514,6 +504,8 @@ func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool, skipLis
}

func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) {

// here we use a background timeout context because we do not want another push attempt to cancel this one
ctx, cancel := context.WithTimeout(context.Background(), defaultTTL)
defer cancel()

Expand All @@ -536,6 +528,7 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes
select {
case resultChan <- receiptResult{pushTime: now, peer: peer, err: err, receipt: receipt}:
case <-parentCtx.Done():
ps.logger.Debug("push result parent context canceled", "chunk_address", ch.Address(), "peer_address", peer)
}
}()

Expand Down

0 comments on commit 904f186

Please sign in to comment.