Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(retrieval, pushsync): collect all errors for failed requests #4377

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
}
}

var joinedErrs error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: maybe allOccuredErrors would be clearer. The same is below.


retry()

for sentErrorsLeft > 0 {
Expand Down Expand Up @@ -357,6 +359,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
continue // there is still an inflight request, wait for it's result
}

joinedErrs = errors.Join(joinedErrs, err)

ps.logger.Debug("sleeping to refresh overdraft balance", "chunk_address", ch.Address())

select {
Expand All @@ -372,6 +376,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
if inflight == 0 {
return nil, err
}
joinedErrs = errors.Join(joinedErrs, err)
ps.logger.Debug("next peer", "chunk_address", ch.Address(), "error", err)
continue
}
Expand All @@ -389,6 +394,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
if err != nil {
retry()
ps.skipList.Add(ch.Address(), peer, overDraftRefresh)
joinedErrs = errors.Join(joinedErrs, err)
continue
}
ps.skipList.Add(ch.Address(), peer, sanctionWait)
Expand All @@ -408,6 +414,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
return result.receipt, nil
}

joinedErrs = errors.Join(joinedErrs, result.err)

ps.metrics.TotalFailedSendAttempts.Inc()
ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err)

Expand All @@ -417,7 +425,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
}
}

return nil, ErrNoPush
return nil, errors.Join(joinedErrs, ErrNoPush)
}

func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s

retry()

var joinedErrs error

inflight := 0

for errorsLeft > 0 {
Expand All @@ -203,6 +205,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
loggerV1.Debug("no peers left", "chunk_address", chunkAddr, "errors_left", errorsLeft, "isOrigin", origin, "error", err)
return nil, err
}
joinedErrs = errors.Join(joinedErrs, err)
continue // there is still an inflight request, wait for it's result
}

Expand All @@ -222,15 +225,18 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
loggerV1.Debug("peer selection", "chunk_address", chunkAddr, "error", err)
return nil, err
}
joinedErrs = errors.Join(joinedErrs, err)
continue
}

action, err := s.prepareCredit(ctx, peer, chunkAddr, origin)
if err != nil {
skip.Add(chunkAddr, peer, overDraftRefresh)
retry()
joinedErrs = errors.Join(joinedErrs, err)
continue
}

skip.Add(chunkAddr, peer, skippeers.MaxDuration)

inflight++
Expand All @@ -251,6 +257,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
return res.chunk, nil
}

joinedErrs = errors.Join(joinedErrs, res.err)

loggerV1.Debug("failed to get chunk", "chunk_address", chunkAddr, "peer_address", res.peer,
"peer_proximity", swarm.Proximity(res.peer.Bytes(), s.addr.Bytes()), "error", res.err)

Expand All @@ -260,7 +268,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
}
}

return nil, storage.ErrNotFound
return nil, errors.Join(joinedErrs, storage.ErrNotFound)
})
if err != nil {
s.metrics.RequestFailureCounter.Inc()
Expand Down
Loading