Skip to content

Commit

Permalink
fix: incorrectly placed context timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Jan 10, 2024
1 parent 5f8862a commit 0624ea4
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 50 deletions.
18 changes: 6 additions & 12 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fetchTimeout := 200 * time.Millisecond
fetchTimeout := 100 * time.Millisecond
store := mockstorer.NewForgettingStore(inmemchunkstore.New())
storerMock := mockstorer.NewWithChunkStore(store)
client, _, _, _ := newTestServer(t, testServerOptions{
Expand Down Expand Up @@ -149,24 +149,18 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
if rLevel == 0 {
t.Skip("NA")
}
req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fileDownloadResource(refResponse.Reference.String()), nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set(api.SwarmRedundancyStrategyHeader, "0")
req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "false")
req.Header.Set(api.SwarmChunkRetrievalTimeoutHeader, fetchTimeout.String())

resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("expected status %d; got %d", http.StatusOK, resp.StatusCode)
}
_, err = io.ReadAll(resp.Body)
if !errors.Is(err, io.ErrUnexpectedEOF) {
_, err = client.Do(req)
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected error %v; got %v", io.ErrUnexpectedEOF, err)
}
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@ func (j *joiner) readAtOffset(

func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) {
eg.Go(func() error {
ctx, cancel := context.WithTimeout(j.ctx, j.decoders.fetcherTimeout)
defer cancel()
ch, err := g.Get(ctx, addr)
ch, err := g.Get(j.ctx, addr)
if err != nil {
return err
}
Expand Down
64 changes: 34 additions & 30 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ import (
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
type decoder struct {
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
inflight []atomic.Bool // locks to protect wait channels and RS buffer
cache map[string]int // map from chunk address shard position index
waits []chan struct{} // wait channels for each chunk
rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding
ready chan struct{} // signal channel for successful retrieval of shardCnt chunks
lastLen int // length of the last data chunk in the RS buffer
shardCnt int // number of data shards
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
fetchedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
remove func() // callback to remove decoder from decoders cache
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
inflight []atomic.Bool // locks to protect wait channels and RS buffer
cache map[string]int // map from chunk address shard position index
waits []chan struct{} // wait channels for each chunk
rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding
ready chan struct{} // signal channel for successful retrieval of shardCnt chunks
lastLen int // length of the last data chunk in the RS buffer
shardCnt int // number of data shards
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
fetchedCnt atomic.Int32 // count successful retrievals
fetchTimeout time.Duration // timeout for fetching a chunk
cancel func() // cancel function for RS decoding
remove func() // callback to remove decoder from decoders cache
}

type Getter interface {
Expand All @@ -53,18 +54,19 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
strategyTimeout := StrategyTimeout

rsg := &decoder{
fetcher: g,
putter: p,
addrs: addrs,
inflight: make([]atomic.Bool, size),
cache: make(map[string]int, size),
waits: make([]chan struct{}, shardCnt),
rsbuf: make([][]byte, size),
ready: make(chan struct{}, 1),
cancel: cancel,
remove: remove,
shardCnt: shardCnt,
parityCnt: size - shardCnt,
fetcher: g,
putter: p,
addrs: addrs,
inflight: make([]atomic.Bool, size),
cache: make(map[string]int, size),
waits: make([]chan struct{}, shardCnt),
rsbuf: make([][]byte, size),
ready: make(chan struct{}, 1),
cancel: cancel,
remove: remove,
shardCnt: shardCnt,
parityCnt: size - shardCnt,
fetchTimeout: fetchTimeout,
}

// after init, cache and wait channels are immutable, need no locking
Expand All @@ -76,7 +78,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
// prefetch chunks according to strategy
rsg.wg.Add(1)
go func() {
rsg.prefetch(ctx, strategy, strict, strategyTimeout, fetchTimeout)
rsg.prefetch(ctx, strategy, strict, strategyTimeout)
rsg.wg.Done()
}()
return rsg
Expand Down Expand Up @@ -145,7 +147,9 @@ func (g *decoder) fly(i int, up bool) (success bool) {
// it races with erasure recovery which takes precedence even if it started later
// due to the fact that erasure recovery could only implement global locking on all shards
func (g *decoder) fetch(ctx context.Context, i int) {
ch, err := g.fetcher.Get(ctx, g.addrs[i])
fctx, cancel := context.WithTimeout(ctx, g.fetchTimeout)
defer cancel()
ch, err := g.fetcher.Get(fctx, g.addrs[i])
if err != nil {
_ = g.fly(i, false) // unset inflight
return
Expand Down
10 changes: 7 additions & 3 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
waitDelayed, waitErased := make(chan error, 1), make(chan error, 1)

// complete retrieval of delayed chunk by putting it into the store after a while
delay := +getter.StrategyTimeout / 4
delay := getter.StrategyTimeout / 4
if s == getter.NONE {
delay += getter.StrategyTimeout
}
Expand All @@ -194,11 +194,15 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
// delayed and erased chunk retrieval completes
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, getter.StrategyTimeout*time.Duration(5-s))
defer cancel()
_, err := g.Get(ctx, addrs[delayed])
waitDelayed <- err
}()
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, getter.StrategyTimeout*time.Duration(5-s))
defer cancel()
_, err := g.Get(ctx, addrs[erased])
waitErased <- err
}()
Expand Down Expand Up @@ -240,10 +244,10 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
case strict:
t.Fatalf("unexpected completion of erased chunk retrieval. got round %d", round)
case s == getter.NONE:
if round < 2 {
if round < 3 {
t.Fatalf("unexpected early completion of erased chunk retrieval. got round %d", round)
}
if round > 2 {
if round > 3 {
t.Fatalf("unexpected late completion of erased chunk retrieval. got round %d", round)
}
case s == getter.DATA:
Expand Down
4 changes: 2 additions & 2 deletions pkg/file/redundancy/getter/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func SetParamsInContext(ctx context.Context, s Strategy, fallbackmode bool, fetc
ctx = SetFetchTimeout(ctx, fetchTimeout)
return ctx
}
func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strategyTimeout, fetchTimeout time.Duration) {
func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strategyTimeout time.Duration) {
if strict && strategy == NONE {
return
}
Expand All @@ -105,7 +105,7 @@ func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strat
defer timer.Stop()
stop = timer.C
}
lctx, cancel := context.WithTimeout(ctx, fetchTimeout)
lctx, cancel := context.WithCancel(ctx)
cancels = append(cancels, cancel)
prefetch(lctx, g, s)

Expand Down

0 comments on commit 0624ea4

Please sign in to comment.