Skip to content

Commit

Permalink
feat: joiner and loadsaver with rootch init
Browse files Browse the repository at this point in the history
  • Loading branch information
nugaon committed May 22, 2024
1 parent df01783 commit abe6896
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
5 changes: 4 additions & 1 deletion pkg/api/bzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,10 @@ FETCH:
jsonhttp.InternalServerError(w, "mapStructure feed update")
return
}
address = wc.Address() // FIXME: init manifest with root chunk instead
address = wc.Address()
// modify ls and init with non-existing wrapped chunk
ls = loadsave.NewReadonlyWithRootCh(s.storer.Download(cache), wc)

feedDereferenced = true
curBytes, err := cur.MarshalBinary()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
return d
}

// NewWithRoot creates a new Joiner with the already fetched root chunk.
// A Joiner provides Read, Seek and Size functionalities.
func NewWithRootCh(ctx context.Context, g storage.Getter, putter storage.Putter, rootAddr swarm.Address, rootChunk swarm.Chunk) (file.Joiner, int64, error) {
return new(ctx, g, putter, rootAddr, rootChunk)
}

// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.
func New(ctx context.Context, g storage.Getter, putter storage.Putter, address swarm.Address) (file.Joiner, int64, error) {
// retrieve the root chunk to read the total data length the be retrieved
Expand All @@ -116,6 +122,10 @@ func New(ctx context.Context, g storage.Getter, putter storage.Putter, address s
return nil, 0, err
}

return new(ctx, g, putter, address, rootChunk)
}

func new(ctx context.Context, g storage.Getter, putter storage.Putter, address swarm.Address, rootChunk swarm.Chunk) (file.Joiner, int64, error) {

Check failure on line 128 in pkg/file/joiner/joiner.go

View workflow job for this annotation

GitHub Actions / Lint

function new has same name as predeclared identifier (predeclared)
chunkData := rootChunk.Data()
rootData := chunkData[swarm.SpanSize:]
refLength := len(address.Bytes())
Expand Down
28 changes: 24 additions & 4 deletions pkg/file/loadsave/loadsave.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type loadSave struct {
getter storage.Getter
putter storage.Putter
pipelineFn func() pipeline.Interface
rootCh swarm.Chunk
}

// New returns a new read-write load-saver.
Expand All @@ -48,14 +49,33 @@ func NewReadonly(getter storage.Getter) file.LoadSaver {
}
}

// NewReadonly returns a new read-only load-saver
// which will error on write.
func NewReadonlyWithRootCh(getter storage.Getter, rootCh swarm.Chunk) file.LoadSaver {
return &loadSave{
getter: getter,
rootCh: rootCh,
}
}

func (ls *loadSave) Load(ctx context.Context, ref []byte) ([]byte, error) {
j, _, err := joiner.New(ctx, ls.getter, ls.putter, swarm.NewAddress(ref))
if err != nil {
return nil, err
var j file.Joiner
if ls.rootCh == nil || !bytes.Equal(ls.rootCh.Address().Bytes(), ref[:swarm.HashSize]) {
joiner, _, err := joiner.New(ctx, ls.getter, ls.putter, swarm.NewAddress(ref))
if err != nil {
return nil, err
}
j = joiner
} else {
joiner, _, err := joiner.NewWithRootCh(ctx, ls.getter, ls.putter, swarm.NewAddress(ref), ls.rootCh)
if err != nil {
return nil, err
}
j = joiner
}

buf := bytes.NewBuffer(nil)
_, err = file.JoinReadAll(ctx, j, buf)
_, err := file.JoinReadAll(ctx, j, buf)
if err != nil {
return nil, err
}
Expand Down
41 changes: 16 additions & 25 deletions pkg/node/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,10 @@ func bootstrapNode(
logger.Info("bootstrap: trying to fetch stamps snapshot")

var (
snapshotReference swarm.Address
reader file.Joiner
l int64
eventsJSON []byte
snapshotRootCh swarm.Chunk
reader file.Joiner
l int64
eventsJSON []byte
)

for i := 0; i < getSnapshotRetries; i++ {
Expand All @@ -210,7 +210,7 @@ func bootstrapNode(
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

snapshotReference, err = getLatestSnapshot(ctx, localStore.Download(true), snapshotFeed)
snapshotRootCh, err = getLatestSnapshot(ctx, localStore.Download(true), snapshotFeed)
if err != nil {
logger.Warning("bootstrap: fetching snapshot failed", "error", err)
continue
Expand All @@ -229,7 +229,7 @@ func bootstrapNode(
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

reader, l, err = joiner.New(ctx, localStore.Download(true), localStore.Cache(), snapshotReference)
reader, l, err = joiner.NewWithRootCh(ctx, localStore.Download(true), localStore.Cache(), snapshotRootCh.Address(), snapshotRootCh)
if err != nil {
logger.Warning("bootstrap: file joiner failed", "error", err)
continue
Expand Down Expand Up @@ -278,7 +278,7 @@ func getLatestSnapshot(
ctx context.Context,
st storage.Getter,
address swarm.Address,
) (swarm.Address, error) {
) (swarm.Chunk, error) {
ls := loadsave.NewReadonly(st)
feedFactory := factory.New(st)

Expand All @@ -287,12 +287,12 @@ func getLatestSnapshot(
ls,
)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("not a manifest: %w", err)
return nil, fmt.Errorf("not a manifest: %w", err)
}

e, err := m.Lookup(ctx, "/")
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("node lookup: %w", err)
return nil, fmt.Errorf("node lookup: %w", err)
}

var (
Expand All @@ -303,46 +303,37 @@ func getLatestSnapshot(
if e := meta["swarm-feed-owner"]; e != "" {
owner, err = hex.DecodeString(e)
if err != nil {
return swarm.ZeroAddress, err
return nil, err
}
}
if e := meta["swarm-feed-topic"]; e != "" {
topic, err = hex.DecodeString(e)
if err != nil {
return swarm.ZeroAddress, err
return nil, err
}
}
if e := meta["swarm-feed-type"]; e != "" {
err := t.FromString(e)
if err != nil {
return swarm.ZeroAddress, err
return nil, err
}
}
if len(owner) == 0 || len(topic) == 0 {
return swarm.ZeroAddress, fmt.Errorf("node lookup: %s", "feed metadata absent")
return nil, fmt.Errorf("node lookup: %s", "feed metadata absent")
}
f := feeds.New(topic, common.BytesToAddress(owner))

l, err := feedFactory.NewLookup(*t, f)
if err != nil {
return swarm.ZeroAddress, fmt.Errorf("feed lookup failed: %w", err)
return nil, fmt.Errorf("feed lookup failed: %w", err)
}

u, _, _, err := l.At(ctx, time.Now().Unix(), 0)
if err != nil {
return swarm.ZeroAddress, err
}

_, ref, err := feeds.LegacyPayload(u)
if err != nil {
wrappedChunk, err := feeds.FromChunk(u)
if err != nil {
return swarm.ZeroAddress, err
}
ref = wrappedChunk.Address()
return nil, err
}

return ref, nil // FIXME root chunk return, use only getWrappedChunk
return feeds.GetWrappedChunk(ctx, st, u)
}

func batchStoreExists(s storage.StateStorer) (bool, error) {
Expand Down

0 comments on commit abe6896

Please sign in to comment.