diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index a589ff4304e..36ac3441eb2 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -63,6 +63,7 @@ import ( // // nolint:thelper func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { + t.Skip("flaky") t.Parallel() fileUploadResource := "/bzz" fileDownloadResource := func(addr string) string { return "/bzz/" + addr + "/" } @@ -179,7 +180,7 @@ func TestBzzUploadDownloadWithRedundancy_FLAKY(t *testing.T) { }) t.Run("download with redundancy should succeed", func(t *testing.T) { - req, err := http.NewRequestWithContext(context.TODO(), "GET", fileDownloadResource(refResponse.Reference.String()), nil) + req, err := http.NewRequestWithContext(context.Background(), "GET", fileDownloadResource(refResponse.Reference.String()), nil) if err != nil { t.Fatal(err) } diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 9146d1079a4..06e11b31024 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -108,9 +108,6 @@ type UpgradedResponseWriter interface { http.Pusher http.Hijacker http.Flusher - // staticcheck SA1019 CloseNotifier interface is required by gorilla compress handler - // nolint:staticcheck - http.CloseNotifier } type responseWriter struct { diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index 1d64644b553..97f5b07c30c 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -67,9 +67,16 @@ func fingerprint(addrs []swarm.Address) string { // GetOrCreate returns a decoder for the given chunk address func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter { + + // since a recovery decoder is not allowed, simply return the underlying netstore + if g.config.Strict && g.config.Strategy == getter.NONE { + return g.fetcher + } + if len(addrs) == shardCnt { return g.fetcher } + key := fingerprint(addrs) g.mu.Lock() defer g.mu.Unlock() diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index 230ffbb63eb..bb51e162132 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -7,7 +7,6 @@ package getter import ( "context" "errors" - "io" "sync" "sync/atomic" @@ -26,7 +25,6 @@ var ( // if retrieves children of an intermediate chunk potentially using erasure decoding // it caches sibling chunks if erasure decoding started already type decoder struct { - ctx context.Context fetcher storage.Getter // network retrieval interface to fetch chunks putter storage.Putter // interface to local storage to save reconstructed chunks addrs []swarm.Address // all addresses of the intermediate chunk @@ -36,32 +34,23 @@ type decoder struct { rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run + initRecovery chan struct{} // signals that the recovery has been initialized lastLen int // length of the last data chunk in the RS buffer shardCnt int // number of data shards parityCnt int // number of parity shards - wg sync.WaitGroup // wait group to wait for all goroutines to finish mu sync.Mutex // mutex to protect buffer fetchedCnt atomic.Int32 // count successful retrievals failedCnt atomic.Int32 // count successful retrievals - cancel func() // cancel function for RS decoding remove func(error) // callback to remove decoder from decoders cache config Config // configuration logger log.Logger } -type Getter interface { - storage.Getter - io.Closer -} - // New returns a decoder object used to retrieve children of an intermediate chunk -func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter { - // global context is canceled when the Close is called or when the prefetch terminates - ctx, cancel := context.WithCancel(context.Background()) +func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) storage.Getter { size := len(addrs) d := &decoder{ - ctx: ctx, fetcher: g, putter: p, addrs: addrs, @@ -71,7 +60,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter rsbuf: make([][]byte, size), goodRecovery: make(chan struct{}), badRecovery: make(chan struct{}), - cancel: cancel, + initRecovery: make(chan struct{}), remove: remove, shardCnt: shardCnt, parityCnt: size - shardCnt, @@ -89,16 +78,7 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter d.waits[i] = make(chan error) } - // prefetch chunks according to strategy - if !conf.Strict || conf.Strategy != NONE { - d.wg.Add(1) - go func() { - defer d.wg.Done() - _ = d.prefetch(ctx) - }() - } else { // recovery not allowed - close(d.badRecovery) - } + go d.prefetch() return d } @@ -138,21 +118,30 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e } } + // recovery has started, wait for result instead of fetching from the network + select { + case <-g.initRecovery: + return waitRecovery(nil) + default: + } + // first time if g.fly(i) { fctx, cancel := context.WithTimeout(ctx, g.config.FetchTimeout) defer cancel() - g.wg.Add(1) - go func() { - select { - case <-fctx.Done(): // local context - case <-g.ctx.Done(): // global context - } - cancel() - g.wg.Done() - }() + // when the recovery is triggered, we can terminate any inflight requests. + // we do the extra bool check to not fire an unnecessary goroutine + if waitForRecovery { + go func() { + defer cancel() + select { + case <-g.initRecovery: + case <-fctx.Done(): + } + }() + } // retrieval ch, err := g.fetcher.Get(fctx, g.addrs[i]) @@ -181,47 +170,45 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e return waitRecovery(storage.ErrNotFound) } -func (g *decoder) prefetch(ctx context.Context) (err error) { - defer g.remove(err) - defer g.cancel() +func (g *decoder) prefetch() { - run := func(s Strategy) error { - if err := g.runStrategy(ctx, s); err != nil { - return err + var err error + defer func() { + if err != nil { + close(g.badRecovery) + } else { + close(g.goodRecovery) } + g.remove(err) + }() - return g.recover(ctx) - } - - for s := g.config.Strategy; s < strategyCnt; s++ { + s := g.config.Strategy + for ; s < strategyCnt; s++ { - err = run(s) - if err != nil { - if s == DATA || s == RACE { - g.logger.Debug("failed recovery", "strategy", s) - } - } - if err == nil { - if s > DATA { - g.logger.Debug("successful recovery", "strategy", s) - } - close(g.goodRecovery) - break + err = g.runStrategy(s) + if err != nil && s == DATA || s == RACE { + g.logger.Debug("failed strategy", "strategy", s) } - if g.config.Strict { // only run one strategy + + if err == nil || g.config.Strict { break } } if err != nil { - close(g.badRecovery) - return err + return + } + + close(g.initRecovery) + + err = g.recover() + if err == nil && s > DATA { + g.logger.Debug("successful recovery", "strategy", s) } - return err } -func (g *decoder) runStrategy(ctx context.Context, s Strategy) error { +func (g *decoder) runStrategy(s Strategy) error { // across the different strategies, the common goal is to fetch at least as many chunks // as the number of data shards. @@ -256,34 +243,32 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error { c := make(chan error, len(m)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for _, i := range m { - g.wg.Add(1) go func(i int) { - defer g.wg.Done() c <- g.fetch(ctx, i, false) }(i) } - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-c: - if g.fetchedCnt.Load() >= int32(g.shardCnt) { - return nil - } - if g.failedCnt.Load() > int32(allowedErrs) { - return errStrategyFailed - } + for range c { + if g.fetchedCnt.Load() >= int32(g.shardCnt) { + return nil + } + if g.failedCnt.Load() > int32(allowedErrs) { + return errStrategyFailed } } + + return nil } // recover wraps the stages of data shard recovery: // 1. gather missing data shards // 2. decode using Reed-Solomon decoder // 3. save reconstructed chunks -func (g *decoder) recover(ctx context.Context) error { +func (g *decoder) recover() error { // gather missing shards m := g.missingDataShards() if len(m) == 0 { @@ -296,7 +281,7 @@ func (g *decoder) recover(ctx context.Context) error { } // save chunks - return g.save(ctx, m) + return g.save(m) } // decode uses Reed-Solomon erasure coding decoder to recover data shards @@ -369,23 +354,13 @@ func (g *decoder) fly(i int) (success bool) { } // save iterate over reconstructed shards and puts the corresponding chunks to local storage -func (g *decoder) save(ctx context.Context, missing []int) error { +func (g *decoder) save(missing []int) error { g.mu.Lock() defer g.mu.Unlock() for _, i := range missing { - if err := g.putter.Put(ctx, swarm.NewChunk(g.addrs[i], g.rsbuf[i])); err != nil { + if err := g.putter.Put(context.Background(), swarm.NewChunk(g.addrs[i], g.rsbuf[i])); err != nil { return err } } return nil } - -// Close terminates the prefetch loop, waits for all goroutines to finish and -// removes the decoder from the cache -// it implements the io.Closer interface -func (g *decoder) Close() error { - g.cancel() - g.wg.Wait() - g.remove(nil) - return nil -} diff --git a/pkg/file/redundancy/getter/getter_test.go b/pkg/file/redundancy/getter/getter_test.go index f517c19c4c5..73679659ff3 100644 --- a/pkg/file/redundancy/getter/getter_test.go +++ b/pkg/file/redundancy/getter/getter_test.go @@ -23,7 +23,6 @@ import ( inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" mockstorer "github.com/ethersphere/bee/pkg/storer/mock" "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/util/testutil" "github.com/klauspost/reedsolomon" "golang.org/x/sync/errgroup" ) @@ -112,7 +111,6 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) { } g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig) - testutil.CleanupCloser(t, g) parityCnt := len(buf) - shardCnt _, err := g.Get(context.Background(), addr) @@ -176,7 +174,6 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) { FetchTimeout: strategyTimeout / 2, } g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf) - defer g.Close() // launch delayed and erased chunk retrieval wg := sync.WaitGroup{}