From 0624ea47e3e30eab8de8ee5ab72596fc137011d0 Mon Sep 17 00:00:00 2001 From: zelig Date: Wed, 10 Jan 2024 07:47:08 +0100 Subject: [PATCH] fix: incorrectly placed context timeouts --- pkg/api/bzz_test.go | 18 +++---- pkg/file/joiner/joiner.go | 4 +- pkg/file/redundancy/getter/getter.go | 64 ++++++++++++----------- pkg/file/redundancy/getter/getter_test.go | 10 ++-- pkg/file/redundancy/getter/strategies.go | 4 +- 5 files changed, 50 insertions(+), 50 deletions(-) diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 6544f02174b..5027197ba3f 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -72,7 +72,7 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { if err != nil { t.Fatal(err) } - fetchTimeout := 200 * time.Millisecond + fetchTimeout := 100 * time.Millisecond store := mockstorer.NewForgettingStore(inmemchunkstore.New()) storerMock := mockstorer.NewWithChunkStore(store) client, _, _, _ := newTestServer(t, testServerOptions{ @@ -149,7 +149,9 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { if rLevel == 0 { t.Skip("NA") } - req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", fileDownloadResource(refResponse.Reference.String()), nil) if err != nil { t.Fatal(err) } @@ -157,16 +159,8 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) { req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "false") req.Header.Set(api.SwarmChunkRetrievalTimeoutHeader, fetchTimeout.String()) - resp, err := client.Do(req) - if err != nil { - t.Fatal(err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Fatalf("expected status %d; got %d", http.StatusOK, resp.StatusCode) - } - _, err = io.ReadAll(resp.Body) - if !errors.Is(err, io.ErrUnexpectedEOF) { + _, err = client.Do(req) + if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("expected error %v; got %v", io.ErrUnexpectedEOF, err) } }) diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index c59c2146669..4b50ab47977 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -251,9 +251,7 @@ func (j *joiner) readAtOffset( func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) { eg.Go(func() error { - ctx, cancel := context.WithTimeout(j.ctx, j.decoders.fetcherTimeout) - defer cancel() - ch, err := g.Get(ctx, addr) + ch, err := g.Get(j.ctx, addr) if err != nil { return err } diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 787f251c438..196de329523 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -20,22 +20,23 @@ import ( // if retrieves children of an intermediate chunk potentially using erasure decoding // it caches sibling chunks if erasure decoding started already type decoder struct { - fetcher storage.Getter // network retrieval interface to fetch chunks - putter storage.Putter // interface to local storage to save reconstructed chunks - addrs []swarm.Address // all addresses of the intermediate chunk - inflight []atomic.Bool // locks to protect wait channels and RS buffer - cache map[string]int // map from chunk address shard position index - waits []chan struct{} // wait channels for each chunk - rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding - ready chan struct{} // signal channel for successful retrieval of shardCnt chunks - lastLen int // length of the last data chunk in the RS buffer - shardCnt int // number of data shards - parityCnt int // number of parity shards - wg sync.WaitGroup // wait group to wait for all goroutines to finish - mu sync.Mutex // mutex to protect buffer - fetchedCnt atomic.Int32 // count successful retrievals - cancel func() // cancel function for RS decoding - remove func() // callback to remove decoder from decoders cache + fetcher storage.Getter // network retrieval interface to fetch chunks + putter storage.Putter // interface to local storage to save reconstructed chunks + addrs []swarm.Address // all addresses of the intermediate chunk + inflight []atomic.Bool // locks to protect wait channels and RS buffer + cache map[string]int // map from chunk address shard position index + waits []chan struct{} // wait channels for each chunk + rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding + ready chan struct{} // signal channel for successful retrieval of shardCnt chunks + lastLen int // length of the last data chunk in the RS buffer + shardCnt int // number of data shards + parityCnt int // number of parity shards + wg sync.WaitGroup // wait group to wait for all goroutines to finish + mu sync.Mutex // mutex to protect buffer + fetchedCnt atomic.Int32 // count successful retrievals + fetchTimeout time.Duration // timeout for fetching a chunk + cancel func() // cancel function for RS decoding + remove func() // callback to remove decoder from decoders cache } type Getter interface { @@ -53,18 +54,19 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter strategyTimeout := StrategyTimeout rsg := &decoder{ - fetcher: g, - putter: p, - addrs: addrs, - inflight: make([]atomic.Bool, size), - cache: make(map[string]int, size), - waits: make([]chan struct{}, shardCnt), - rsbuf: make([][]byte, size), - ready: make(chan struct{}, 1), - cancel: cancel, - remove: remove, - shardCnt: shardCnt, - parityCnt: size - shardCnt, + fetcher: g, + putter: p, + addrs: addrs, + inflight: make([]atomic.Bool, size), + cache: make(map[string]int, size), + waits: make([]chan struct{}, shardCnt), + rsbuf: make([][]byte, size), + ready: make(chan struct{}, 1), + cancel: cancel, + remove: remove, + shardCnt: shardCnt, + parityCnt: size - shardCnt, + fetchTimeout: fetchTimeout, } // after init, cache and wait channels are immutable, need no locking @@ -76,7 +78,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter // prefetch chunks according to strategy rsg.wg.Add(1) go func() { - rsg.prefetch(ctx, strategy, strict, strategyTimeout, fetchTimeout) + rsg.prefetch(ctx, strategy, strict, strategyTimeout) rsg.wg.Done() }() return rsg @@ -145,7 +147,9 @@ func (g *decoder) fly(i int, up bool) (success bool) { // it races with erasure recovery which takes precedence even if it started later // due to the fact that erasure recovery could only implement global locking on all shards func (g *decoder) fetch(ctx context.Context, i int) { - ch, err := g.fetcher.Get(ctx, g.addrs[i]) + fctx, cancel := context.WithTimeout(ctx, g.fetchTimeout) + defer cancel() + ch, err := g.fetcher.Get(fctx, g.addrs[i]) if err != nil { _ = g.fly(i, false) // unset inflight return diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index 03fda3b8425..e0f59d9bd74 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -176,7 +176,7 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { waitDelayed, waitErased := make(chan error, 1), make(chan error, 1) // complete retrieval of delayed chunk by putting it into the store after a while - delay := +getter.StrategyTimeout / 4 + delay := getter.StrategyTimeout / 4 if s == getter.NONE { delay += getter.StrategyTimeout } @@ -194,11 +194,15 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { // delayed and erased chunk retrieval completes go func() { defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, getter.StrategyTimeout*time.Duration(5-s)) + defer cancel() _, err := g.Get(ctx, addrs[delayed]) waitDelayed <- err }() go func() { defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, getter.StrategyTimeout*time.Duration(5-s)) + defer cancel() _, err := g.Get(ctx, addrs[erased]) waitErased <- err }() @@ -240,10 +244,10 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { case strict: t.Fatalf("unexpected completion of erased chunk retrieval. got round %d", round) case s == getter.NONE: - if round < 2 { + if round < 3 { t.Fatalf("unexpected early completion of erased chunk retrieval. got round %d", round) } - if round > 2 { + if round > 3 { t.Fatalf("unexpected late completion of erased chunk retrieval. got round %d", round) } case s == getter.DATA: diff --git a/pkg/file/redundancy/getter/strategies.go b/pkg/file/redundancy/getter/strategies.go index 9b0dcfb7935..b60f307c19c 100644 --- a/pkg/file/redundancy/getter/strategies.go +++ b/pkg/file/redundancy/getter/strategies.go @@ -82,7 +82,7 @@ func SetParamsInContext(ctx context.Context, s Strategy, fallbackmode bool, fetc ctx = SetFetchTimeout(ctx, fetchTimeout) return ctx } -func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strategyTimeout, fetchTimeout time.Duration) { +func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strategyTimeout time.Duration) { if strict && strategy == NONE { return } @@ -105,7 +105,7 @@ func (g *decoder) prefetch(ctx context.Context, strategy int, strict bool, strat defer timer.Stop() stop = timer.C } - lctx, cancel := context.WithTimeout(ctx, fetchTimeout) + lctx, cancel := context.WithCancel(ctx) cancels = append(cancels, cancel) prefetch(lctx, g, s)