Skip to content

Commit

Permalink
fix(redundancy/getter): wait for recovery and return error (#4581)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Feb 27, 2024
1 parent b905de1 commit 652977f
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 233 deletions.
4 changes: 2 additions & 2 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV
strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h
strategyTimeout := getter.DefaultStrategyTimeout.String()

ctx := r.Context()
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout)
ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, &strategyTimeout, logger)
if err != nil {
logger.Error(err, err.Error())
jsonhttp.BadRequest(w, "could not parse headers")
Expand Down
26 changes: 20 additions & 6 deletions pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,33 @@ func TestBzzUploadDownloadWithRedundancy(t *testing.T) {
if rLevel == 0 {
t.Skip("NA")
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fileDownloadResource(refResponse.Reference.String()), nil)
req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil)
if err != nil {
t.Fatal(err)
}
req.Header.Set(api.SwarmRedundancyStrategyHeader, "0")
req.Header.Set(api.SwarmRedundancyFallbackModeHeader, "false")
req.Header.Set(api.SwarmChunkRetrievalTimeoutHeader, fetchTimeout.String())

_, err = client.Do(req)
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected error %v; got %v", io.ErrUnexpectedEOF, err)
resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
t.Fatalf("expected status %d; got %d", http.StatusOK, resp.StatusCode)
}
_, err = dataReader.Seek(0, io.SeekStart)
if err != nil {
t.Fatal(err)
}
ok, err := dataReader.Equal(resp.Body)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("there should be missing data")
}
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s
maxBranching = rLevel.GetMaxShards()
}
} else {
// if root chunk has no redundancy, strategy is ignored and set to NONE and strict is set to true
// if root chunk has no redundancy, strategy is ignored and set to DATA and strict is set to true
conf.Strategy = getter.DATA
conf.Strict = true
}
Expand Down
19 changes: 6 additions & 13 deletions pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/pkg/file/splitter"
filetest "github.com/ethersphere/bee/pkg/file/testing"
"github.com/ethersphere/bee/pkg/log"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/pkg/storage/testing"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/util/testutil"
"github.com/ethersphere/bee/pkg/util/testutil/pseudorand"
"github.com/ethersphere/bee/pkg/util/testutil/racedetection"
"gitlab.com/nolash/go-mockbytes"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -1112,15 +1112,14 @@ func TestJoinerRedundancy(t *testing.T) {
strategyTimeout := 100 * time.Millisecond
// all data can be read back
readCheck := func(t *testing.T, expErr error) {
ctx, cancel := context.WithTimeout(context.Background(), 15*strategyTimeout)
defer cancel()
ctx := context.Background()

strategyTimeoutStr := strategyTimeout.String()
decodeTimeoutStr := (10 * strategyTimeout).String()
fallback := true
s := getter.RACE

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodeTimeoutStr, &strategyTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1169,14 +1168,14 @@ func TestJoinerRedundancy(t *testing.T) {
}
}
t.Run("no recovery possible with no chunk stored", func(t *testing.T) {
readCheck(t, context.DeadlineExceeded)
readCheck(t, storage.ErrNotFound)
})

if err := putter.store(shardCnt - 1); err != nil {
t.Fatal(err)
}
t.Run("no recovery possible with 1 short of shardCnt chunks stored", func(t *testing.T) {
readCheck(t, context.DeadlineExceeded)
readCheck(t, storage.ErrNotFound)
})

if err := putter.store(1); err != nil {
Expand Down Expand Up @@ -1253,21 +1252,15 @@ func TestJoinerRedundancyMultilevel(t *testing.T) {
canReadRange := func(t *testing.T, s getter.Strategy, fallback bool, levels int, canRead bool) {
ctx := context.Background()
strategyTimeout := 100 * time.Millisecond
decodingTimeout := 600 * time.Millisecond
if racedetection.IsOn() {
decodingTimeout *= 2
}

strategyTimeoutStr := strategyTimeout.String()
decodingTimeoutStr := (2 * strategyTimeout).String()

ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr)
ctx, err := getter.SetConfigInContext(ctx, &s, &fallback, &decodingTimeoutStr, &strategyTimeoutStr, log.Noop)
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(ctx, time.Duration(levels)*(3*strategyTimeout+decodingTimeout))
defer cancel()
j, _, err := joiner.New(ctx, store, store, addr)
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 652977f

Please sign in to comment.