Skip to content

Commit

Permalink
chore: remove retrieval ctx scope leak (#4310)
Browse files Browse the repository at this point in the history
  • Loading branch information
janos authored Oct 2, 2023
1 parent c5af787 commit d7c1fd9
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
golang.org/x/time v0.3.0
gopkg.in/yaml.v2 v2.4.0
resenje.org/multex v0.1.0
resenje.org/singleflight v0.2.0
resenje.org/singleflight v0.4.0
resenje.org/web v0.4.3
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1453,8 +1453,8 @@ resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94
resenje.org/multex v0.1.0 h1:am9Ndt8dIAeGVaztD8ClsSX+e0EP3mj6UdsvjukKZig=
resenje.org/multex v0.1.0/go.mod h1:3rHOoMrzqLNzgGWPcl/1GfzN52g7iaPXhbvTQ8TjGaM=
resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME=
resenje.org/singleflight v0.2.0 h1:nJ17VAZunMiFrfrltQ4Qs4r9MIP1pZC8u+0iSUTNnvQ=
resenje.org/singleflight v0.2.0/go.mod h1:plheHgw2rd77IH3J6aN0Lu2JvMvHXoLknDwb6vN0dsE=
resenje.org/singleflight v0.4.0 h1:NdOEhCxEikK2S2WxGjZV9EGSsItolQKslOOi6pE1tJc=
resenje.org/singleflight v0.4.0/go.mod h1:lAgQK7VfjG6/pgredbQfmV0RvG/uVhKo6vSuZ0vCWfk=
resenje.org/web v0.4.3 h1:G9vceKKGvsVg0WpyafJEEMHfstoxSO8rG/1Bo7fOkhw=
resenje.org/web v0.4.3/go.mod h1:GZw/Jt7IGIYlytsyGdAV5CytZnaQu7GV2u1LLuViihc=
resenje.org/x v0.2.4/go.mod h1:1b2Xpo29FRc3IMvg/u46/IyjySl5IjvtuSjXTA/AOnk=
Expand Down
19 changes: 9 additions & 10 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Syncer struct {
quit chan struct{}
unwrap func(swarm.Chunk)
validStamp postage.ValidStampFn
intervalsSF singleflight.Group
intervalsSF singleflight.Group[string, *collectAddrsResult]
syncInProgress atomic.Int32
binLock *multex.Multex

Expand Down Expand Up @@ -391,19 +391,19 @@ func (s *Syncer) makeOffer(ctx context.Context, rn pb.Get) (*pb.Offer, error) {
return o, nil
}

type collectAddrsResult struct {
chs []*storer.BinC
topmost uint64
}

// collectAddrs collects chunk addresses at a bin starting at some start BinID until a limit is reached.
// The function waits for an unbounded amount of time for the first chunk to arrive.
// After the arrival of the first chunk, the subsequent chunks have a limited amount of time to arrive,
// after which the function returns the collected slice of chunks.
func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*storer.BinC, uint64, error) {
loggerV2 := s.logger.V(2).Register()

type result struct {
chs []*storer.BinC
topmost uint64
}

v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (interface{}, error) {
v, _, err := s.intervalsSF.Do(ctx, sfKey(bin, start), func(ctx context.Context) (*collectAddrsResult, error) {
var (
chs []*storer.BinC
topmost uint64
Expand Down Expand Up @@ -453,13 +453,12 @@ func (s *Syncer) collectAddrs(ctx context.Context, bin uint8, start uint64) ([]*
}
}

return &result{chs: chs, topmost: topmost}, nil
return &collectAddrsResult{chs: chs, topmost: topmost}, nil
})
if err != nil {
return nil, 0, err
}
r := v.(*result)
return r.chs, r.topmost, nil
return v.chs, v.topmost, nil
}

// processWant compares a received Want to a sent Offer and returns
Expand Down
11 changes: 4 additions & 7 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Service struct {
streamer p2p.Streamer
peerSuggester topology.ClosestPeerer
storer Storer
singleflight singleflight.Group
singleflight singleflight.Group[string, swarm.Chunk]
logger log.Logger
accounting accounting.Interface
metrics metrics
Expand Down Expand Up @@ -148,10 +148,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
s.metrics.RequestAttempts.Observe(float64(totalRetrieveAttempts))
}()

// topCtx is passing the tracing span to the first singleflight call
topCtx := ctx

v, _, err := s.singleflight.Do(topCtx, flightRoute, func(ctx context.Context) (interface{}, error) {
v, _, err := s.singleflight.Do(ctx, flightRoute, func(ctx context.Context) (swarm.Chunk, error) {

skip := skippeers.NewList()
defer skip.Close()
Expand Down Expand Up @@ -239,7 +236,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
inflight++

go func() {
ctx := tracing.WithContext(context.Background(), tracing.FromContext(topCtx))
ctx := tracing.WithContext(context.Background(), tracing.FromContext(ctx)) // todo: replace with `ctx := context.WithoutCancel(ctx)` when go 1.21 is supported to pass all context values
span, _, ctx := s.tracer.StartSpanFromContext(ctx, "retrieve-chunk", s.logger, opentracing.Tag{Key: "address", Value: chunkAddr.String()})
defer span.Finish()
s.retrieveChunk(ctx, chunkAddr, peer, resultC, action, origin)
Expand Down Expand Up @@ -273,7 +270,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s

s.metrics.RequestSuccessCounter.Inc()

return v.(swarm.Chunk), nil
return v, nil
}

func (s *Service) retrieveChunk(ctx context.Context, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool) {
Expand Down

0 comments on commit d7c1fd9

Please sign in to comment.