Skip to content

Commit

Permalink
fix(getter): fix freeze and combine errors return
Browse files Browse the repository at this point in the history
  • Loading branch information
zelig committed Feb 28, 2024
1 parent 652977f commit 166974d
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 320 deletions.
4 changes: 2 additions & 2 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down
28 changes: 8 additions & 20 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,20 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
if rLevel == 0 {
t.Skip("NA")
}
req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fileDownloadResource(refResponse.Reference.String()), nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set(api.SwarmLookAheadBufferSizeHeader, "0")
req.Header.Set(api.SwarmRedundancyStrategyHeader, "0")
req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "false")
req.Header.Set(api.SwarmChunkRetrievalTimeoutHeader, fetchTimeout.String())

resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
t.Fatalf("expected status %d; got %d", http.StatusOK, resp.StatusCode)
}
_, err = dataReader.Seek(0, io.SeekStart)
if err != nil {
t.Fatal(err)
}
ok, err := dataReader.Equal(resp.Body)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("there should be missing data")
_, err = client.Do(req)
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected error %v; got %v", io.ErrUnexpectedEOF, err)
}
})

Expand All @@ -186,6 +173,7 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
req.Header.Set(api.SwarmLookAheadBufferSizeHeader, "0")
req.Header.Set(api.SwarmRedundancyStrategyHeader, "3")
req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "true")
req.Header.Set(api.SwarmChunkRetrievalTimeoutHeader, fetchTimeout.String())
Expand Down
19 changes: 13 additions & 6 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/pkg/file/splitter"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/log"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/util/testutil"
"github.com/ethersphere/bee/pkg/util/testutil/pseudorand"
"github.com/ethersphere/bee/pkg/util/testutil/racedetection"
"gitlab.com/nolash/go-mockbytes"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -1112,14 +1112,15 @@ func TestJoinerRedundancy(t *testing.T) {
strategyTimeout := 100 * time.Millisecond
// all data can be read back
readCheck := func(t *testing.T, expErr error) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), 15*strategyTimeout)
defer cancel()

strategyTimeoutStr := strategyTimeout.String()
decodeTimeoutStr := (10 * strategyTimeout).String()
fallback := true
s := getter.RACE

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1168,14 +1169,14 @@ func TestJoinerRedundancy(t *testing.T) {
}
}
t.Run("no recovery possible with no chunk stored", func(t *testing.T) {
readCheck(t, storage.ErrNotFound)
readCheck(t, context.DeadlineExceeded)
})

if err := putter.store(shardCnt - 1); err != nil {
t.Fatal(err)
}
t.Run("no recovery possible with 1 short of shardCnt chunks stored", func(t *testing.T) {
readCheck(t, storage.ErrNotFound)
readCheck(t, context.DeadlineExceeded)
})

if err := putter.store(1); err != nil {
Expand Down Expand Up @@ -1252,15 +1253,21 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) {
ctx := context.Background()
strategyTimeout := 100 * time.Millisecond
decodingTimeout := 600 * time.Millisecond
if racedetection.IsOn() {
decodingTimeout *= 2
}

strategyTimeoutStr := strategyTimeout.String()
decodingTimeoutStr := (2 * strategyTimeout).String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr)
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(ctx, time.Duration(levels)*(3*strategyTimeout+decodingTimeout))
defer cancel()
j, _, err := joiner.New(ctx, store, store, addr)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 166974d

Please sign in to comment.