Skip to content

Commit

Permalink
fix: small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 26, 2024
1 parent d0c3e41 commit b2b8423
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 78 deletions.
2 changes: 1 addition & 1 deletion pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s
maxBranching = rLevel.GetMaxShards()
}
} else {
// if root chunk has no redundancy, strategy is ignored and set to NONE and strict is set to true
// if root chunk has no redundancy, strategy is ignored and set to DATA and strict is set to true
conf.Strategy = getter.DATA
conf.Strict = true
}
Expand Down
155 changes: 78 additions & 77 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/klauspost/reedsolomon"
)

var (
errStrategyNotAllowed = errors.New("strategy not allowed")
errStrategyFailed = errors.New("strategy failed")
)

// decoder is a private implementation of storage.Getter
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
Expand All @@ -29,7 +34,7 @@ type decoder struct {
waits []chan error // wait channels for each chunk
rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding
goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks
badRecovery chan struct{} //
badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run
lastLen int // length of the last data chunk in the RS buffer
shardCnt int // number of data shards
parityCnt int // number of parity shards
Expand Down Expand Up @@ -86,8 +91,8 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
if !conf.Strict || conf.Strategy != NONE {
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.err = d.prefetch(ctx)
d.wg.Done()
}()
} else { // recovery not allowed
close(d.badRecovery)
Expand All @@ -110,10 +115,9 @@ func (g *decoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, err
return swarm.NewChunk(addr, g.getData(i)), nil
}

// fetch retrieves a chunk from the underlying storage
// it must be called asynchonously and only once for each chunk (singleflight pattern)
// it races with erasure recovery which takes precedence even if it started later
// due to the fact that erasure recovery could only implement global locking on all shards
// fetch retrieves a chunk from the netstore if it is the first time the chunk is fetched.
// If the fetch fails and waiting for the recovery is allowed, the function will wait
// for either a good or bad recovery signal.
func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err error) {

waitRecovery := func(err error) error {
Expand Down Expand Up @@ -141,17 +145,17 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
g.wg.Add(1)
defer g.wg.Done()

defer close(g.waits[i])

// retrieval
ch, err := g.fetcher.Get(fctx, g.addrs[i])
if err != nil {
g.failedCnt.Add(1)
close(g.waits[i])
return waitRecovery(err)
}

g.fetchedCnt.Add(1)
g.setData(i, ch.Data())
close(g.waits[i])
return nil
}

Expand All @@ -168,63 +172,6 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
return waitRecovery(storage.ErrNotFound)
}

func (g *decoder) unattemptedDataShards() (m []int) {
for i := 0; i < g.shardCnt; i++ {
select {
case <-g.waits[i]: // attempted
continue
default:
m = append(m, i) // remember the missing chunk
}
}
return m
}

func (g *decoder) missingDataShards() (m []int) {
for i := 0; i < g.shardCnt; i++ {
if g.getData(i) == nil {
m = append(m, i)
}
}
return m
}

// decode uses Reed-Solomon erasure coding decoder to recover data shards
// it must be called after shqrdcnt shards are retrieved
// it must be called under g.mu mutex protection
func (g *decoder) decode(ctx context.Context) error {
g.mu.Lock()
defer g.mu.Unlock()
enc, err := reedsolomon.New(g.shardCnt, g.parityCnt)
if err != nil {
return err
}

// decode data
return enc.ReconstructData(g.rsbuf)
}

// recover wraps the stages of data shard recovery:
// 1. gather missing data shards
// 2. decode using Reed-Solomon decoder
// 3. save reconstructed chunks
func (g *decoder) recover(ctx context.Context) error {

// gather missing shards
m := g.missingDataShards()
if len(m) == 0 {
return nil
}

// decode using Reed-Solomon decoder
if err := g.decode(ctx); err != nil {
return err
}

// save chunks
return g.save(ctx, m)
}

func (g *decoder) prefetch(ctx context.Context) error {
defer g.remove()

Expand Down Expand Up @@ -276,15 +223,15 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error {

switch s {
case NONE:
return errors.New("prefetch not allowed")
return errStrategyNotAllowed
case DATA:
// only retrieve data shards
m = g.unattemptedDataShards()
allowedErrs = 0
case PROX:
// proximity driven selective fetching
// NOT IMPLEMENTED
return errors.New("prefetch not allowed")
return errStrategyNotAllowed
case RACE:
allowedErrs = g.parityCnt
// retrieve all chunks at once enabling race among chunks
Expand All @@ -294,34 +241,88 @@ func (g *decoder) runStrategy(ctx context.Context, s Strategy) error {
}
}

errC := make(chan error, len(m))
c := make(chan error, len(m))

for _, i := range m {
g.wg.Add(1)
go func(i int) {
defer g.wg.Done()
errC <- g.fetch(ctx, i, false)
c <- g.fetch(ctx, i, false)
}(i)
}

cnt := 0

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-errC:
if g.failedCnt.Load() > int32(allowedErrs) {
return errors.New("strategy failed")
}
cnt++
if cnt == len(m) {
case <-c:
if g.fetchedCnt.Load() >= int32(g.shardCnt) {
return nil
}
if g.failedCnt.Load() > int32(allowedErrs) {
return errStrategyFailed
}
}
}
}

// recover wraps the stages of data shard recovery:
// 1. gather missing data shards
// 2. decode using Reed-Solomon decoder
// 3. save reconstructed chunks
func (g *decoder) recover(ctx context.Context) error {
// gather missing shards
m := g.missingDataShards()
if len(m) == 0 {
return nil // recovery is not needed as there are no missing data chunks
}

// decode using Reed-Solomon decoder
if err := g.decode(ctx); err != nil {
return err
}

// save chunks
return g.save(ctx, m)
}

// decode uses Reed-Solomon erasure coding decoder to recover data shards
// it must be called after shqrdcnt shards are retrieved
func (g *decoder) decode(ctx context.Context) error {
g.mu.Lock()
defer g.mu.Unlock()

enc, err := reedsolomon.New(g.shardCnt, g.parityCnt)
if err != nil {
return err
}

// decode data
return enc.ReconstructData(g.rsbuf)
}

func (g *decoder) unattemptedDataShards() (m []int) {
for i := 0; i < g.shardCnt; i++ {
select {
case <-g.waits[i]: // attempted
continue
default:
m = append(m, i) // remember the missing chunk
}
}
return m
}

// it must be called under mutex protection
func (g *decoder) missingDataShards() (m []int) {
for i := 0; i < g.shardCnt; i++ {
if g.getData(i) == nil {
m = append(m, i)
}
}
return m
}

// setData sets the data shard in the RS buffer
func (g *decoder) setData(i int, chdata []byte) {
g.mu.Lock()
Expand Down

0 comments on commit b2b8423

Please sign in to comment.