Skip to content

Commit

Permalink
fix: pusher identity fix
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 11, 2024
1 parent 0aae083 commit e7067c4
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Op struct {
Err chan error
Direct bool
Span opentracing.Span

identityAddress swarm.Address
}

type OpChan <-chan *Op
Expand Down Expand Up @@ -218,7 +220,9 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
op.Err <- err
continue
}
op.identityAddress = idAddress
if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) {
if op.Direct {
select {
Expand All @@ -245,12 +249,8 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {

func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) {
loggerV1 := logger.V(1).Build()
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
return true, err
}

defer s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())

if _, err := s.validStamp(op.Chunk); err != nil {
loggerV1.Warning(
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
return true, err
}
case errors.Is(err, pushsync.ErrShallowReceipt):
if retry := s.shallowReceipt(idAddress); retry {
if retry := s.shallowReceipt(op.identityAddress); retry {
return true, err
}
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
Expand All @@ -300,13 +300,11 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (

func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error {
loggerV1 := logger.V(1).Build()
idAddress, err := soc.IdentityAddress(op.Chunk)
if err != nil {
return err
}

var err error

defer func() {
s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
select {
case op.Err <- err:
default:
Expand Down

0 comments on commit e7067c4

Please sign in to comment.