diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 0945bdee05e..8f019b43e14 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -36,6 +36,8 @@ type Op struct { Err chan error Direct bool Span opentracing.Span + + identityAddress swarm.Address } type OpChan <-chan *Op @@ -218,7 +220,9 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { idAddress, err := soc.IdentityAddress(op.Chunk) if err != nil { op.Err <- err + continue } + op.identityAddress = idAddress if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) { if op.Direct { select { @@ -245,12 +249,8 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) { loggerV1 := logger.V(1).Build() - idAddress, err := soc.IdentityAddress(op.Chunk) - if err != nil { - return true, err - } - defer s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID()) + defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) if _, err := s.validStamp(op.Chunk); err != nil { loggerV1.Warning( @@ -278,7 +278,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( return true, err } case errors.Is(err, pushsync.ErrShallowReceipt): - if retry := s.shallowReceipt(idAddress); retry { + if retry := s.shallowReceipt(op.identityAddress); retry { return true, err } if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil { @@ -300,13 +300,11 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error { loggerV1 := logger.V(1).Build() - idAddress, err := soc.IdentityAddress(op.Chunk) - if err != nil { - return err - } + + var err error defer func() { - s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID()) + s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) select { case op.Err <- err: default: