From b2b8423a35b4a4f29e633d500213bc15cd539060 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 26 Feb 2024 14:46:53 +0300 Subject: [PATCH] fix: small fixes --- pkg/file/joiner/joiner.go | 2 +- pkg/file/redundancy/getter/getter.go | 155 ++++++++++++++------------- 2 files changed, 79 insertions(+), 78 deletions(-) diff --git a/pkg/file/joiner/joiner.go b/pkg/file/joiner/joiner.go index dc9c850084c..2b91708b031 100644 --- a/pkg/file/joiner/joiner.go +++ b/pkg/file/joiner/joiner.go @@ -129,7 +129,7 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s maxBranching = rLevel.GetMaxShards() } } else { - // if root chunk has no redundancy, strategy is ignored and set to NONE and strict is set to true + // if root chunk has no redundancy, strategy is ignored and set to DATA and strict is set to true conf.Strategy = getter.DATA conf.Strict = true } diff --git a/pkg/file/redundancy/getter/getter.go b/pkg/file/redundancy/getter/getter.go index fe6f595ecf1..d58535be238 100644 --- a/pkg/file/redundancy/getter/getter.go +++ b/pkg/file/redundancy/getter/getter.go @@ -17,6 +17,11 @@ import ( "github.com/klauspost/reedsolomon" ) +var ( + errStrategyNotAllowed = errors.New("strategy not allowed") + errStrategyFailed = errors.New("strategy failed") +) + // decoder is a private implementation of storage.Getter // if retrieves children of an intermediate chunk potentially using erasure decoding // it caches sibling chunks if erasure decoding started already @@ -29,7 +34,7 @@ type decoder struct { waits []chan error // wait channels for each chunk rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks - badRecovery chan struct{} // + badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run lastLen int // length of the last data chunk in the RS buffer shardCnt int // number of data shards parityCnt int // number of parity shards @@ -86,8 +91,8 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter if !conf.Strict || conf.Strategy != NONE { d.wg.Add(1) go func() { + defer d.wg.Done() d.err = d.prefetch(ctx) - d.wg.Done() }() } else { // recovery not allowed close(d.badRecovery) @@ -110,10 +115,9 @@ func (g *decoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, err return swarm.NewChunk(addr, g.getData(i)), nil } -// fetch retrieves a chunk from the underlying storage -// it must be called asynchonously and only once for each chunk (singleflight pattern) -// it races with erasure recovery which takes precedence even if it started later -// due to the fact that erasure recovery could only implement global locking on all shards +// fetch retrieves a chunk from the netstore if it is the first time the chunk is fetched. +// If the fetch fails and waiting for the recovery is allowed, the function will wait +// for either a good or bad recovery signal. func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err error) { waitRecovery := func(err error) error { @@ -141,17 +145,17 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e g.wg.Add(1) defer g.wg.Done() + defer close(g.waits[i]) + // retrieval ch, err := g.fetcher.Get(fctx, g.addrs[i]) if err != nil { g.failedCnt.Add(1) - close(g.waits[i]) return waitRecovery(err) } g.fetchedCnt.Add(1) g.setData(i, ch.Data()) - close(g.waits[i]) return nil } @@ -168,63 +172,6 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e return waitRecovery(storage.ErrNotFound) } -func (g *decoder) unattemptedDataShards() (m []int) { - for i := 0; i < g.shardCnt; i++ { - select { - case <-g.waits[i]: // attempted - continue - default: - m = append(m, i) // remember the missing chunk - } - } - return m -} - -func (g *decoder) missingDataShards() (m []int) { - for i := 0; i < g.shardCnt; i++ { - if g.getData(i) == nil { - m = append(m, i) - } - } - return m -} - -// decode uses Reed-Solomon erasure coding decoder to recover data shards -// it must be called after shqrdcnt shards are retrieved -// it must be called under g.mu mutex protection -func (g *decoder) decode(ctx context.Context) error { - g.mu.Lock() - defer g.mu.Unlock() - enc, err := reedsolomon.New(g.shardCnt, g.parityCnt) - if err != nil { - return err - } - - // decode data - return enc.ReconstructData(g.rsbuf) -} - -// recover wraps the stages of data shard recovery: -// 1. gather missing data shards -// 2. decode using Reed-Solomon decoder -// 3. save reconstructed chunks -func (g *decoder) recover(ctx context.Context) error { - - // gather missing shards - m := g.missingDataShards() - if len(m) == 0 { - return nil - } - - // decode using Reed-Solomon decoder - if err := g.decode(ctx); err != nil { - return err - } - - // save chunks - return g.save(ctx, m) -} - func (g *decoder) prefetch(ctx context.Context) error { defer g.remove() @@ -276,7 +223,7 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error { switch s { case NONE: - return errors.New("prefetch not allowed") + return errStrategyNotAllowed case DATA: // only retrieve data shards m = g.unattemptedDataShards() @@ -284,7 +231,7 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error { case PROX: // proximity driven selective fetching // NOT IMPLEMENTED - return errors.New("prefetch not allowed") + return errStrategyNotAllowed case RACE: allowedErrs = g.parityCnt // retrieve all chunks at once enabling race among chunks @@ -294,34 +241,88 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error { } } - errC := make(chan error, len(m)) + c := make(chan error, len(m)) for _, i := range m { g.wg.Add(1) go func(i int) { defer g.wg.Done() - errC <- g.fetch(ctx, i, false) + c <- g.fetch(ctx, i, false) }(i) } - cnt := 0 - for { select { case <-ctx.Done(): return ctx.Err() - case <-errC: - if g.failedCnt.Load() > int32(allowedErrs) { - return errors.New("strategy failed") - } - cnt++ - if cnt == len(m) { + case <-c: + if g.fetchedCnt.Load() >= int32(g.shardCnt) { return nil } + if g.failedCnt.Load() > int32(allowedErrs) { + return errStrategyFailed + } } } } +// recover wraps the stages of data shard recovery: +// 1. gather missing data shards +// 2. decode using Reed-Solomon decoder +// 3. save reconstructed chunks +func (g *decoder) recover(ctx context.Context) error { + // gather missing shards + m := g.missingDataShards() + if len(m) == 0 { + return nil // recovery is not needed as there are no missing data chunks + } + + // decode using Reed-Solomon decoder + if err := g.decode(ctx); err != nil { + return err + } + + // save chunks + return g.save(ctx, m) +} + +// decode uses Reed-Solomon erasure coding decoder to recover data shards +// it must be called after shqrdcnt shards are retrieved +func (g *decoder) decode(ctx context.Context) error { + g.mu.Lock() + defer g.mu.Unlock() + + enc, err := reedsolomon.New(g.shardCnt, g.parityCnt) + if err != nil { + return err + } + + // decode data + return enc.ReconstructData(g.rsbuf) +} + +func (g *decoder) unattemptedDataShards() (m []int) { + for i := 0; i < g.shardCnt; i++ { + select { + case <-g.waits[i]: // attempted + continue + default: + m = append(m, i) // remember the missing chunk + } + } + return m +} + +// it must be called under mutex protection +func (g *decoder) missingDataShards() (m []int) { + for i := 0; i < g.shardCnt; i++ { + if g.getData(i) == nil { + m = append(m, i) + } + } + return m +} + // setData sets the data shard in the RS buffer func (g *decoder) setData(i int, chdata []byte) { g.mu.Lock()