Skip to content

Commit

Permalink
feat: erasure decoder (#4448)
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon authored Dec 4, 2023
1 parent 31d0862 commit cacdbd6
Show file tree
Hide file tree
Showing 24 changed files with 944 additions and 118 deletions.
7 changes: 4 additions & 3 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/topology"
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/ethersphere/bee/pkg/topology"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/joiner"
Expand Down Expand Up @@ -175,7 +176,7 @@ func (s *Service) fileUploadHandler(
}

factory := requestPipelineFactory(ctx, putter, encrypt, rLevel)
l := loadsave.New(s.storer.ChunkStore(), factory)
l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), factory)

m, err := manifest.NewDefaultManifest(l, encrypt)
if err != nil {
Expand Down Expand Up @@ -443,7 +444,7 @@ func (s *Service) serveManifestEntry(

// downloadHandler contains common logic for dowloading Swarm file from API
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag bool) {
reader, l, err := joiner.New(r.Context(), s.storer.Download(true), reference)
reader, l, err := joiner.New(r.Context(), s.storer.Download(true), s.storer.Cache(), reference)
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
logger.Debug("api download: not found ", "address", reference, "error", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestFeedIndirection(t *testing.T) {
t.Fatal(err)
}
m, err := manifest.NewDefaultManifest(
loadsave.New(storer.ChunkStore(), pipelineFactory(storer.Cache(), false, 0)),
loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0)),
false,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func storeDir(
loggerV1 := logger.V(1).Build()

p := requestPipelineFn(putter, encrypt, rLevel)
ls := loadsave.New(getter, requestPipelineFactory(ctx, putter, encrypt, rLevel))
ls := loadsave.New(getter, putter, requestPipelineFactory(ctx, putter, encrypt, rLevel))

dirManifest, err := manifest.NewDefaultManifest(ls, encrypt)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
logger: logger,
}

l := loadsave.New(s.storer.ChunkStore(), requestPipelineFactory(r.Context(), putter, false, 0))
l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0))
feedManifest, err := manifest.NewDefaultManifest(l, false)
if err != nil {
logger.Debug("create manifest failed", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) {
}

getter := s.storer.Download(true)
traverser := traversal.New(getter)
traverser := traversal.New(getter, s.storer.Cache())

sem := semaphore.NewWeighted(100)
var errTraverse error
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/addresses/addresses_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestAddressesGetterIterateChunkAddresses(t *testing.T) {

addressesGetter := addresses.NewGetter(store, addressIterFunc)

j, _, err := joiner.New(ctx, addressesGetter, rootChunk.Address())
j, _, err := joiner.New(ctx, addressesGetter, store, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func testSplitThenJoin(t *testing.T) {
}

// then join
r, l, err := joiner.New(ctx, store, resultAddress)
r, l, err := joiner.New(ctx, store, store, resultAddress)
if err != nil {
t.Fatal(err)
}
Expand Down
135 changes: 99 additions & 36 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,83 @@ package joiner

import (
"context"
"encoding/binary"
"errors"
"io"
"sync"
"sync/atomic"

"github.com/ethersphere/bee/pkg/bmt"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/encryption/store"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/sync/errgroup"
)

type joiner struct {
addr swarm.Address
rootData []byte
span int64
off int64
refLength int
addr swarm.Address
rootData []byte
span int64
off int64
refLength int
rootParity int
maxBranching int // maximum branching in an intermediate chunk

ctx context.Context
getter storage.Getter
putter storage.Putter // required to save recovered data

chunkToSpan func(data []byte) (redundancy.Level, int64) // returns parity and span value from chunkData
}

// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
func New(ctx context.Context, getter storage.Getter, address swarm.Address) (file.Joiner, int64, error) {
func New(ctx context.Context, getter storage.Getter, putter storage.Putter, address swarm.Address) (file.Joiner, int64, error) {
getter = store.New(getter)
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := getter.Get(ctx, address)
if err != nil {
return nil, 0, err
}

var chunkData = rootChunk.Data()

span := int64(binary.LittleEndian.Uint64(chunkData[:swarm.SpanSize]))
chunkData := rootChunk.Data()
rootData := chunkData[swarm.SpanSize:]
refLength := len(address.Bytes())
encryption := false
if refLength != swarm.HashSize {
encryption = true
}
rLevel, span := chunkToSpan(chunkData)
rootParity := 0
maxBranching := swarm.ChunkSize / refLength
spanFn := func(data []byte) (redundancy.Level, int64) {
return 0, int64(bmt.LengthFromSpan(data[:swarm.SpanSize]))
}
// override stuff if root chunk has redundancy
if rLevel != redundancy.NONE {
_, parities := file.ReferenceCount(uint64(span), rLevel, encryption)
rootParity = parities
spanFn = chunkToSpan
if encryption {
maxBranching = rLevel.GetMaxEncShards()
} else {
maxBranching = rLevel.GetMaxShards()
}
}

j := &joiner{
addr: rootChunk.Address(),
refLength: len(address.Bytes()),
ctx: ctx,
getter: getter,
span: span,
rootData: chunkData[swarm.SpanSize:],
addr: rootChunk.Address(),
refLength: refLength,
ctx: ctx,
getter: getter,
putter: putter,
span: span,
rootData: rootData,
rootParity: rootParity,
maxBranching: maxBranching,
chunkToSpan: spanFn,
}

return j, span, nil
Expand Down Expand Up @@ -81,7 +113,7 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) {
}
var bytesRead int64
var eg errgroup.Group
j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, &eg)
j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, &eg)

err = eg.Wait()
if err != nil {
Expand All @@ -93,7 +125,13 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) {

var ErrMalformedTrie = errors.New("malformed tree")

func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffset, bytesToRead int64, bytesRead *int64, eg *errgroup.Group) {
func (j *joiner) readAtOffset(
b, data []byte,
cur, subTrieSize, off, bufferOffset, bytesToRead int64,
bytesRead *int64,
parity int,
eg *errgroup.Group,
) {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) {
dataOffsetStart := off - cur
Expand All @@ -109,14 +147,23 @@ func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffse
return
}

pSize, err := file.ChunkPayloadSize(data)
if err != nil {
eg.Go(func() error {
return err
})
return
}
sAddresses, pAddresses := file.ChunkAddresses(data[:pSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
if bytesToRead == 0 {
break
}

// fast forward the cursor
sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
if cur+sec < off {
sec := j.subtrieSection(data, cursor, pSize, parity, subTrieSize)
if cur+sec <= off {
cur += sec
continue
}
Expand All @@ -139,19 +186,20 @@ func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffse

func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) {
eg.Go(func() error {
ch, err := j.getter.Get(j.ctx, address)
ch, err := getter.Get(j.ctx, address)
if err != nil {
return err
}

chunkData := ch.Data()[8:]
subtrieSpan := int64(chunkToSpan(ch.Data()))
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, subtrieParity := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)

if subtrieSpan > subtrieSpanLimit {
return ErrMalformedTrie
}

j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, eg)
j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg)
return nil
})
}(address, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit)
Expand All @@ -163,8 +211,13 @@ func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffse
}
}

// getShards returns the effective reference number respective to the intermediate chunk payload length and its parities
func (j *joiner) getShards(payloadSize, parities int) int {
return (payloadSize - parities*swarm.HashSize) / j.refLength
}

// brute-forces the subtrie size for each of the sections in this intermediate chunk
func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64 {
func (j *joiner) subtrieSection(data []byte, startIdx, payloadSize, parities int, subtrieSize int64) int64 {
// assume we have a trie of size `y` then we can assume that all of
// the forks except for the last one on the right are of equal size
// this is due to how the splitter wraps levels.
Expand All @@ -173,9 +226,9 @@ func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64
// where y is the size of the subtrie, refs are the number of references
// x is constant (the brute forced value) and l is the size of the last subtrie
var (
refs = int64(len(data) / refLen) // how many references in the intermediate chunk
branching = int64(4096 / refLen) // branching factor is chunkSize divided by reference length
branchSize = int64(4096)
refs = int64(j.getShards(payloadSize, parities)) // how many effective references in the intermediate chunk
branching = int64(j.maxBranching) // branching factor is chunkSize divided by reference length
branchSize = int64(swarm.ChunkSize)
)
for {
whatsLeft := subtrieSize - (branchSize * (refs - 1))
Expand All @@ -186,7 +239,7 @@ func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64
}

// handle last branch edge case
if startIdx == int(refs-1)*refLen {
if startIdx == int(refs-1)*j.refLength {
return subtrieSize - (refs-1)*branchSize
}
return branchSize
Expand Down Expand Up @@ -229,10 +282,10 @@ func (j *joiner) IterateChunkAddresses(fn swarm.AddressIterFunc) error {
return err
}

return j.processChunkAddresses(j.ctx, fn, j.rootData, j.span)
return j.processChunkAddresses(j.ctx, fn, j.rootData, j.span, j.rootParity)
}

func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIterFunc, data []byte, subTrieSize int64) error {
func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIterFunc, data []byte, subTrieSize int64, parity int) error {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) {
return nil
Expand All @@ -248,6 +301,12 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter

var wg sync.WaitGroup

eSize, err := file.ChunkPayloadSize(data)
if err != nil {
return err
}
sAddresses, pAddresses := file.ChunkAddresses(data[:eSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
ref := data[cursor : cursor+j.refLength]
var reportAddr swarm.Address
Expand All @@ -262,7 +321,7 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
return err
}

sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
sec := j.subtrieSection(data, cursor, eSize, parity, subTrieSize)
if sec <= swarm.ChunkSize {
continue
}
Expand All @@ -273,15 +332,16 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
eg.Go(func() error {
defer wg.Done()

ch, err := j.getter.Get(ectx, address)
ch, err := getter.Get(ectx, address)
if err != nil {
return err
}

chunkData := ch.Data()[8:]
subtrieSpan := int64(chunkToSpan(ch.Data()))
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)

return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan)
return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan, parities)
})
}(address, eg)

Expand All @@ -295,6 +355,9 @@ func (j *joiner) Size() int64 {
return j.span
}

func chunkToSpan(data []byte) uint64 {
return binary.LittleEndian.Uint64(data[:8])
// UTILITIES

func chunkToSpan(data []byte) (redundancy.Level, int64) {
level, spanBytes := redundancy.DecodeSpan(data[:swarm.SpanSize])
return level, int64(bmt.LengthFromSpan(spanBytes))
}
Loading

0 comments on commit cacdbd6

Please sign in to comment.