Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: erasure decoder #4448

Merged
merged 45 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
2d6c2e0
feat: @zelig scratch
nugaon Nov 13, 2023
aa80189
feat: span deserialization with parity
nugaon Nov 14, 2023
35a56ca
feat: erasure table GetMinShards
nugaon Nov 15, 2023
efdb5fe
feat: getLevel and getErasureTable in redundancy level
nugaon Nov 15, 2023
b5c65c8
feat: chunkPayloadSize and getShards for subtrieSection calculation
nugaon Nov 15, 2023
2bfe47a
fix: payloadsize calculation
nugaon Nov 15, 2023
c42d9d4
fix: chunkPayloadSize condition operator and return after grouperr
nugaon Nov 15, 2023
144b51f
fix: wrong parameter passing
nugaon Nov 15, 2023
8d4f0f9
feat: getter init
nugaon Nov 17, 2023
f0d386f
refactor: renaming and close channel
nugaon Nov 20, 2023
4a48b43
feat: cautious strategy and erasureData
nugaon Nov 20, 2023
95dc830
refactor: not needed iterator variable
nugaon Nov 20, 2023
61584cf
refactor: not needed variables
nugaon Nov 20, 2023
9c15d29
feat: erasuredata mutex
nugaon Nov 21, 2023
20bae64
feat: save missing chunks with putter
nugaon Nov 21, 2023
3f0d877
refactor: merge address iteration
nugaon Nov 23, 2023
2259222
refactor: missing chunks filtering
nugaon Nov 23, 2023
52f47b9
refactor: move functions to file utils
nugaon Nov 24, 2023
c2a247d
feat: handle chunk addr is not in intermediate ch
nugaon Nov 27, 2023
166281f
fix: negation on chainnelIsClosed
nugaon Nov 27, 2023
076681c
refactor: cannot recover error
nugaon Nov 27, 2023
a6a05a1
feat: is not recovered error
nugaon Nov 27, 2023
768d78c
refactor: error types and get address check
nugaon Nov 27, 2023
1e0973c
test: unit
nugaon Nov 27, 2023
aab5a70
fix: joiner interface changes
nugaon Nov 27, 2023
9c6e229
refactor: move pipeline creation logic from hashtrie test
nugaon Nov 28, 2023
517639b
fix: wrong parenthesis at rootShard calculation
nugaon Nov 28, 2023
1df8bac
fix: add mutex locking to processing method
nugaon Nov 29, 2023
0c36f1e
fix: span changes affect the chunk data of the next iteration
nugaon Nov 29, 2023
62df2bd
test: integration readat
nugaon Nov 29, 2023
087623a
refactor: move newErasureHashTrieWriter to hashtrie test
nugaon Nov 30, 2023
d2fa862
fix: pass chunkStore and cache object for joiner putter
nugaon Nov 30, 2023
25f8828
fix: add mutex lock around reconstructData
nugaon Dec 1, 2023
043e6bb
refactor: error lint issue
nugaon Dec 1, 2023
6620030
refactor: write error checks
nugaon Dec 1, 2023
13fc099
fix: put mutex lock because of race condition
nugaon Dec 1, 2023
a7e7d03
fix: add mutex lock to getAfterProcessed
nugaon Dec 1, 2023
14705a2
fix: mutex
nugaon Dec 1, 2023
c667ce5
fix: readAt offset checking
nugaon Dec 1, 2023
972ac95
test: encrypted joiner
nugaon Dec 1, 2023
9e092c8
feat: referenceCount
nugaon Dec 3, 2023
abaa2b0
fix: use new span encoding in joiner
nugaon Dec 3, 2023
e196141
refactor: remove unused getLevel function
nugaon Dec 3, 2023
012c8c6
refactor: unused variable
nugaon Dec 3, 2023
9cd6f19
fix: close channel if it is not closed
nugaon Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethersphere/bee/pkg/topology"
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/ethersphere/bee/pkg/topology"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/pkg/feeds"
"github.com/ethersphere/bee/pkg/file/joiner"
Expand Down Expand Up @@ -175,7 +176,7 @@ func (s *Service) fileUploadHandler(
}

factory := requestPipelineFactory(ctx, putter, encrypt, rLevel)
l := loadsave.New(s.storer.ChunkStore(), factory)
l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), factory)

m, err := manifest.NewDefaultManifest(l, encrypt)
if err != nil {
Expand Down Expand Up @@ -443,7 +444,7 @@ func (s *Service) serveManifestEntry(

// downloadHandler contains common logic for dowloading Swarm file from API
func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *http.Request, reference swarm.Address, additionalHeaders http.Header, etag bool) {
reader, l, err := joiner.New(r.Context(), s.storer.Download(true), reference)
reader, l, err := joiner.New(r.Context(), s.storer.Download(true), s.storer.Cache(), reference)
if err != nil {
if errors.Is(err, storage.ErrNotFound) || errors.Is(err, topology.ErrNotFound) {
logger.Debug("api download: not found ", "address", reference, "error", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/bzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestFeedIndirection(t *testing.T) {
t.Fatal(err)
}
m, err := manifest.NewDefaultManifest(
loadsave.New(storer.ChunkStore(), pipelineFactory(storer.Cache(), false, 0)),
loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0)),
false,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/dirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func storeDir(
loggerV1 := logger.V(1).Build()

p := requestPipelineFn(putter, encrypt, rLevel)
ls := loadsave.New(getter, requestPipelineFactory(ctx, putter, encrypt, rLevel))
ls := loadsave.New(getter, putter, requestPipelineFactory(ctx, putter, encrypt, rLevel))

dirManifest, err := manifest.NewDefaultManifest(ls, encrypt)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) {
logger: logger,
}

l := loadsave.New(s.storer.ChunkStore(), requestPipelineFactory(r.Context(), putter, false, 0))
l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0))
feedManifest, err := manifest.NewDefaultManifest(l, false)
if err != nil {
logger.Debug("create manifest failed", "error", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *Service) pinRootHash(w http.ResponseWriter, r *http.Request) {
}

getter := s.storer.Download(true)
traverser := traversal.New(getter)
traverser := traversal.New(getter, s.storer.Cache())

sem := semaphore.NewWeighted(100)
var errTraverse error
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/addresses/addresses_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestAddressesGetterIterateChunkAddresses(t *testing.T) {

addressesGetter := addresses.NewGetter(store, addressIterFunc)

j, _, err := joiner.New(ctx, addressesGetter, rootChunk.Address())
j, _, err := joiner.New(ctx, addressesGetter, store, rootChunk.Address())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func testSplitThenJoin(t *testing.T) {
}

// then join
r, l, err := joiner.New(ctx, store, resultAddress)
r, l, err := joiner.New(ctx, store, store, resultAddress)
if err != nil {
t.Fatal(err)
}
Expand Down
135 changes: 99 additions & 36 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,83 @@ package joiner

import (
"context"
"encoding/binary"
"errors"
"io"
"sync"
"sync/atomic"

"github.com/ethersphere/bee/pkg/bmt"
"github.com/ethersphere/bee/pkg/encryption"
"github.com/ethersphere/bee/pkg/encryption/store"
"github.com/ethersphere/bee/pkg/file"
"github.com/ethersphere/bee/pkg/file/redundancy"
"github.com/ethersphere/bee/pkg/file/redundancy/getter"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/sync/errgroup"
)

type joiner struct {
addr swarm.Address
rootData []byte
span int64
off int64
refLength int
addr swarm.Address
rootData []byte
span int64
off int64
refLength int
rootParity int
maxBranching int // maximum branching in an intermediate chunk

ctx context.Context
getter storage.Getter
putter storage.Putter // required to save recovered data

chunkToSpan func(data []byte) (redundancy.Level, int64) // returns parity and span value from chunkData
}

// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
func New(ctx context.Context, getter storage.Getter, address swarm.Address) (file.Joiner, int64, error) {
func New(ctx context.Context, getter storage.Getter, putter storage.Putter, address swarm.Address) (file.Joiner, int64, error) {
getter = store.New(getter)
// retrieve the root chunk to read the total data length the be retrieved
rootChunk, err := getter.Get(ctx, address)
if err != nil {
return nil, 0, err
}

var chunkData = rootChunk.Data()

span := int64(binary.LittleEndian.Uint64(chunkData[:swarm.SpanSize]))
chunkData := rootChunk.Data()
rootData := chunkData[swarm.SpanSize:]
refLength := len(address.Bytes())
encryption := false
if refLength != swarm.HashSize {
encryption = true
}
rLevel, span := chunkToSpan(chunkData)
rootParity := 0
maxBranching := swarm.ChunkSize / refLength
spanFn := func(data []byte) (redundancy.Level, int64) {
return 0, int64(bmt.LengthFromSpan(data[:swarm.SpanSize]))
}
// override stuff if root chunk has redundancy
if rLevel != redundancy.NONE {
_, parities := file.ReferenceCount(uint64(span), rLevel, encryption)
rootParity = parities
spanFn = chunkToSpan
if encryption {
maxBranching = rLevel.GetMaxEncShards()
} else {
maxBranching = rLevel.GetMaxShards()
}
}

j := &joiner{
addr: rootChunk.Address(),
refLength: len(address.Bytes()),
ctx: ctx,
getter: getter,
span: span,
rootData: chunkData[swarm.SpanSize:],
addr: rootChunk.Address(),
refLength: refLength,
ctx: ctx,
getter: getter,
putter: putter,
span: span,
rootData: rootData,
rootParity: rootParity,
maxBranching: maxBranching,
chunkToSpan: spanFn,
}

return j, span, nil
Expand Down Expand Up @@ -81,7 +113,7 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) {
}
var bytesRead int64
var eg errgroup.Group
j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, &eg)
j.readAtOffset(buffer, j.rootData, 0, j.span, off, 0, readLen, &bytesRead, j.rootParity, &eg)

err = eg.Wait()
if err != nil {
Expand All @@ -93,7 +125,13 @@ func (j *joiner) ReadAt(buffer []byte, off int64) (read int, err error) {

var ErrMalformedTrie = errors.New("malformed tree")

func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffset, bytesToRead int64, bytesRead *int64, eg *errgroup.Group) {
func (j *joiner) readAtOffset(
b, data []byte,
cur, subTrieSize, off, bufferOffset, bytesToRead int64,
bytesRead *int64,
parity int,
eg *errgroup.Group,
) {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) {
dataOffsetStart := off - cur
Expand All @@ -109,14 +147,23 @@ func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffse
return
}

pSize, err := file.ChunkPayloadSize(data)
if err != nil {
eg.Go(func() error {
return err
})
return
}
sAddresses, pAddresses := file.ChunkAddresses(data[:pSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
if bytesToRead == 0 {
break
}

// fast forward the cursor
sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
if cur+sec < off {
sec := j.subtrieSection(data, cursor, pSize, parity, subTrieSize)
if cur+sec <= off {
cur += sec
continue
}
Expand All @@ -139,19 +186,20 @@ func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffse

func(address swarm.Address, b []byte, cur, subTrieSize, off, bufferOffset, bytesToRead, subtrieSpanLimit int64) {
eg.Go(func() error {
ch, err := j.getter.Get(j.ctx, address)
ch, err := getter.Get(j.ctx, address)
if err != nil {
return err
}

chunkData := ch.Data()[8:]
subtrieSpan := int64(chunkToSpan(ch.Data()))
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, subtrieParity := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)

if subtrieSpan > subtrieSpanLimit {
return ErrMalformedTrie
}

j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, eg)
j.readAtOffset(b, chunkData, cur, subtrieSpan, off, bufferOffset, currentReadSize, bytesRead, subtrieParity, eg)
return nil
})
}(address, b, cur, subtrieSpan, off, bufferOffset, currentReadSize, subtrieSpanLimit)
Expand All @@ -163,8 +211,13 @@ func (j *joiner) readAtOffset(b, data []byte, cur, subTrieSize, off, bufferOffse
}
}

// getShards returns the effective reference number respective to the intermediate chunk payload length and its parities
func (j *joiner) getShards(payloadSize, parities int) int {
return (payloadSize - parities*swarm.HashSize) / j.refLength
}

// brute-forces the subtrie size for each of the sections in this intermediate chunk
func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64 {
func (j *joiner) subtrieSection(data []byte, startIdx, payloadSize, parities int, subtrieSize int64) int64 {
// assume we have a trie of size `y` then we can assume that all of
// the forks except for the last one on the right are of equal size
// this is due to how the splitter wraps levels.
Expand All @@ -173,9 +226,9 @@ func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64
// where y is the size of the subtrie, refs are the number of references
// x is constant (the brute forced value) and l is the size of the last subtrie
var (
refs = int64(len(data) / refLen) // how many references in the intermediate chunk
branching = int64(4096 / refLen) // branching factor is chunkSize divided by reference length
branchSize = int64(4096)
refs = int64(j.getShards(payloadSize, parities)) // how many effective references in the intermediate chunk
branching = int64(j.maxBranching) // branching factor is chunkSize divided by reference length
branchSize = int64(swarm.ChunkSize)
)
for {
whatsLeft := subtrieSize - (branchSize * (refs - 1))
Expand All @@ -186,7 +239,7 @@ func subtrieSection(data []byte, startIdx, refLen int, subtrieSize int64) int64
}

// handle last branch edge case
if startIdx == int(refs-1)*refLen {
if startIdx == int(refs-1)*j.refLength {
return subtrieSize - (refs-1)*branchSize
}
return branchSize
Expand Down Expand Up @@ -229,10 +282,10 @@ func (j *joiner) IterateChunkAddresses(fn swarm.AddressIterFunc) error {
return err
}

return j.processChunkAddresses(j.ctx, fn, j.rootData, j.span)
return j.processChunkAddresses(j.ctx, fn, j.rootData, j.span, j.rootParity)
}

func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIterFunc, data []byte, subTrieSize int64) error {
func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIterFunc, data []byte, subTrieSize int64, parity int) error {
// we are at a leaf data chunk
if subTrieSize <= int64(len(data)) {
return nil
Expand All @@ -248,6 +301,12 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter

var wg sync.WaitGroup

eSize, err := file.ChunkPayloadSize(data)
if err != nil {
return err
}
sAddresses, pAddresses := file.ChunkAddresses(data[:eSize], parity, j.refLength)
getter := getter.New(sAddresses, pAddresses, j.getter, j.putter)
for cursor := 0; cursor < len(data); cursor += j.refLength {
ref := data[cursor : cursor+j.refLength]
var reportAddr swarm.Address
Expand All @@ -262,7 +321,7 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
return err
}

sec := subtrieSection(data, cursor, j.refLength, subTrieSize)
sec := j.subtrieSection(data, cursor, eSize, parity, subTrieSize)
if sec <= swarm.ChunkSize {
continue
}
Expand All @@ -273,15 +332,16 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
eg.Go(func() error {
defer wg.Done()

ch, err := j.getter.Get(ectx, address)
ch, err := getter.Get(ectx, address)
if err != nil {
return err
}

chunkData := ch.Data()[8:]
subtrieSpan := int64(chunkToSpan(ch.Data()))
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)

return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan)
return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan, parities)
})
}(address, eg)

Expand All @@ -295,6 +355,9 @@ func (j *joiner) Size() int64 {
return j.span
}

func chunkToSpan(data []byte) uint64 {
return binary.LittleEndian.Uint64(data[:8])
// UTILITIES

func chunkToSpan(data []byte) (redundancy.Level, int64) {
level, spanBytes := redundancy.DecodeSpan(data[:swarm.SpanSize])
return level, int64(bmt.LengthFromSpan(spanBytes))
}
Loading
Loading