From 5914dfd5e7a542402e93cb9d6aca24b48703e2b9 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 4 Oct 2023 18:36:27 +0300 Subject: [PATCH] feat(retrieval, pushsync): include all errors for failed requests --- pkg/pushsync/pushsync.go | 10 +++++++++- pkg/retrieval/retrieval.go | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 6ef29619935..6ea2d0a5c1d 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -326,6 +326,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo } } + var joinedErrs error + retry() for sentErrorsLeft > 0 { @@ -357,6 +359,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo continue // there is still an inflight request, wait for it's result } + joinedErrs = errors.Join(joinedErrs, err) + ps.logger.Debug("sleeping to refresh overdraft balance", "chunk_address", ch.Address()) select { @@ -372,6 +376,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo if inflight == 0 { return nil, err } + joinedErrs = errors.Join(joinedErrs, err) ps.logger.Debug("next peer", "chunk_address", ch.Address(), "error", err) continue } @@ -389,6 +394,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo if err != nil { retry() ps.skipList.Add(ch.Address(), peer, overDraftRefresh) + joinedErrs = errors.Join(joinedErrs, err) continue } ps.skipList.Add(ch.Address(), peer, sanctionWait) @@ -408,6 +414,8 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo return result.receipt, nil } + joinedErrs = errors.Join(joinedErrs, result.err) + ps.metrics.TotalFailedSendAttempts.Inc() ps.logger.Debug("could not push to peer", "chunk_address", ch.Address(), "peer_address", result.peer, "error", result.err) @@ -417,7 +425,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo } } - return nil, ErrNoPush + return nil, errors.Join(joinedErrs, ErrNoPush) } func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 8f6be31e364..ec080c74910 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -180,6 +180,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s retry() + var joinedErrs error + inflight := 0 for errorsLeft > 0 { @@ -203,6 +205,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s loggerV1.Debug("no peers left", "chunk_address", chunkAddr, "errors_left", errorsLeft, "isOrigin", origin, "error", err) return nil, err } + joinedErrs = errors.Join(joinedErrs, err) continue // there is still an inflight request, wait for it's result } @@ -222,6 +225,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s loggerV1.Debug("peer selection", "chunk_address", chunkAddr, "error", err) return nil, err } + joinedErrs = errors.Join(joinedErrs, err) continue } @@ -229,8 +233,10 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s if err != nil { skip.Add(chunkAddr, peer, overDraftRefresh) retry() + joinedErrs = errors.Join(joinedErrs, err) continue } + skip.Add(chunkAddr, peer, skippeers.MaxDuration) inflight++ @@ -251,6 +257,8 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s return res.chunk, nil } + joinedErrs = errors.Join(joinedErrs, res.err) + loggerV1.Debug("failed to get chunk", "chunk_address", chunkAddr, "peer_address", res.peer, "peer_proximity", swarm.Proximity(res.peer.Bytes(), s.addr.Bytes()), "error", res.err) @@ -260,7 +268,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s } } - return nil, storage.ErrNotFound + return nil, errors.Join(joinedErrs, storage.ErrNotFound) }) if err != nil { s.metrics.RequestFailureCounter.Inc()