Skip to content

Commit

Permalink
feat: save missing chunks with putter
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon committed Nov 21, 2023
1 parent c85a884 commit 1a402fc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
8 changes: 5 additions & 3 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ type joiner struct {

ctx context.Context
getter storage.Getter
putter storage.Putter // required to save recovered data

chunkToSpan func(data []byte) (int, int64) // returns parity and span value from chunkData
}

// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
func New(ctx context.Context, getter storage.Getter, address swarm.Address) (file.Joiner, int64, error) {
func New(ctx context.Context, getter storage.Getter, putter storage.Putter, address swarm.Address) (file.Joiner, int64, error) {
getter = store.New(getter)
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := getter.Get(ctx, address)
Expand Down Expand Up @@ -87,6 +88,7 @@ func New(ctx context.Context, getter storage.Getter, address swarm.Address) (fil
refLength: refLength,
ctx: ctx,
getter: getter,
putter: putter,
span: span,
rootData: rootData,
rootParity: rootParity,
Expand Down Expand Up @@ -163,7 +165,7 @@ func (j *joiner) readAtOffset(
return
}
sAddresses, pAddresses := chunkAddresses(data[:pSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
if bytesToRead == 0 {
break
Expand Down Expand Up @@ -313,7 +315,7 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
return err
}
sAddresses, pAddresses := chunkAddresses(data[:eSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
ref := data[cursor : cursor+j.refLength]
var reportAddr swarm.Address
Expand Down
25 changes: 22 additions & 3 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type inflightChunk struct {
// it caches sibling chunks if erasure decoding was called on the level already
type getter struct {
storage.Getter
storage.Putter
mu sync.Mutex
sAddresses []swarm.Address // shard addresses
pAddresses []swarm.Address // parity addresses
Expand All @@ -35,7 +36,7 @@ type getter struct {
}

// New returns a getter object which is used to retrieve children of an intermediate chunk
func New(sAddresses, pAddresses []swarm.Address, g storage.Getter) storage.Getter {
func New(sAddresses, pAddresses []swarm.Address, g storage.Getter, p storage.Putter) storage.Getter {
encrypted := len(sAddresses[0].Bytes()) == swarm.HashSize*2
shards := len(sAddresses)
parities := len(pAddresses)
Expand All @@ -45,6 +46,7 @@ func New(sAddresses, pAddresses []swarm.Address, g storage.Getter) storage.Gette

return &getter{
Getter: g,
Putter: p,
sAddresses: sAddresses,
pAddresses: pAddresses,
cache: cache,
Expand Down Expand Up @@ -193,17 +195,26 @@ func (g *getter) cautiousStrategy(ctx context.Context) error {
return fmt.Errorf("redundancy getter: there are %d missing chunks in order to do recovery", requiredChunks-retrieved)
}

return g.erasureDecode()
return g.erasureDecode(ctx)
}

// erasureDecode perform Reed-Solomon recovery on data
// assumes it is called after filling up cache with the required amount of shards and parities
func (g *getter) erasureDecode() error {
func (g *getter) erasureDecode(ctx context.Context) error {
enc, err := reedsolomon.New(len(g.sAddresses), len(g.pAddresses))
if err != nil {
return err
}

// missing chunks
missingAddresses := make([]swarm.Address, len(g.sAddresses))
for _, addr := range g.sAddresses {
c := g.erasureData[g.cache[addr.String()].pos]
if c == nil {
missingAddresses = append(missingAddresses, addr)
}
}

err = enc.ReconstructData(g.erasureData)
if err != nil {
return err
Expand All @@ -216,6 +227,14 @@ func (g *getter) erasureDecode() error {
close(c.wait)
}
}
// save missing chunks
for _, addr := range missingAddresses {
data := g.erasureData[g.cache[addr.String()].pos]
err := g.Putter.Put(ctx, swarm.NewChunk(addr, data))
if err != nil {
return err
}
}
return nil
}

Expand Down

0 comments on commit 1a402fc

Please sign in to comment.