Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: upload store fixes #4562

Merged
merged 8 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions pkg/storer/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
)

type UploadStat struct {
TotalUploaded int
TotalSynced int
TotalUploaded uint64
TotalSynced uint64
}

type PinningStat struct {
Expand Down Expand Up @@ -80,28 +80,22 @@ func (db *DB) DebugInfo(ctx context.Context) (Info, error) {
})

var (
uploaded int
synced int
uploaded uint64
synced uint64
)
eg.Go(func() error {
return upload.IterateAll(
db.repo.IndexStore(),
func(_ swarm.Address, isSynced bool) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-db.quit:
return true, ErrDBQuit
default:
}

uploaded++
if isSynced {
synced++
}
return false, nil
},
)
return upload.IterateAllTagItems(db.repo.IndexStore(), func(ti *upload.TagItem) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-db.quit:
return true, ErrDBQuit
default:
}
uploaded += ti.Split
synced += ti.Synced
return false, nil
})
})

var (
Expand Down
63 changes: 29 additions & 34 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ethersphere/bee/pkg/storage/storageutil"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstamp"
"github.com/ethersphere/bee/pkg/storer/internal/stampindex"
"github.com/ethersphere/bee/pkg/swarm"
)

Expand Down Expand Up @@ -231,10 +230,17 @@ type uploadItem struct {
TagID uint64
Uploaded int64
Synced int64

// IdFunc overrides the ID method.
// This used to get the ID from the item where the address and batchID were not marshalled.
IdFunc func() string
}

// ID implements the storage.Item interface.
func (i uploadItem) ID() string {
if i.IdFunc != nil {
return i.IdFunc()
}
return storageutil.JoinFields(i.Address.ByteString(), string(i.BatchID))
}

Expand Down Expand Up @@ -351,10 +357,6 @@ func (i dirtyTagItem) String() string {
return storageutil.JoinFields(i.Namespace(), i.ID())
}

// stampIndexUploadNamespace represents the
// namespace name of the stamp index for upload.
const stampIndexUploadNamespace = "upload"

var (
// errPutterAlreadyClosed is returned when trying to Put a new chunk
// after the putter has been closed.
Expand Down Expand Up @@ -420,28 +422,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora
return nil
}

switch item, loaded, err := stampindex.LoadOrStore(
s.IndexStore(),
writer,
stampIndexUploadNamespace,
chunk,
); {
case err != nil:
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
case loaded && item.ChunkIsImmutable:
return errOverwriteOfImmutableBatch
case loaded && !item.ChunkIsImmutable:
prev := binary.BigEndian.Uint64(item.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev > curr {
return errOverwriteOfNewerBatch
}
err = stampindex.Store(writer, stampIndexUploadNamespace, chunk)
if err != nil {
return fmt.Errorf("failed updating stamp index: %w", err)
}
}

u.split++

if err := s.ChunkStore().Put(ctx, chunk); err != nil {
Expand Down Expand Up @@ -711,10 +691,9 @@ func Report(
return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err)
}

ui.Synced = now().UnixNano()
err = batch.Put(ui)
err = batch.Delete(ui)
if err != nil {
return fmt.Errorf("failed updating uploadItem %s: %w", ui, err)
return fmt.Errorf("failed deleting uploadItem %s: %w", ui, err)
}

return batch.Commit()
Expand Down Expand Up @@ -848,15 +827,31 @@ func DeleteTag(st storage.Store, tagID uint64) error {
return nil
}

func IterateAll(st storage.Store, iterateFn func(addr swarm.Address, isSynced bool) (bool, error)) error {
func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error {
return st.Iterate(
storage.Query{
Factory: func() storage.Item { return new(uploadItem) },
},
func(r storage.Result) (bool, error) {
address := swarm.NewAddress([]byte(r.ID[:32]))
synced := r.Entry.(*uploadItem).Synced != 0
return iterateFn(address, synced)
ui := r.Entry.(*uploadItem)
ui.IdFunc = func() string {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
return r.ID
}
return iterateFn(ui)
},
)
}

func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
return st.Iterate(
storage.Query{
Factory: func() storage.Item {
return new(TagItem)
},
},
func(result storage.Result) (bool, error) {
ti := result.Entry.(*TagItem)
return cb(ti)
},
)
}
Expand Down
164 changes: 48 additions & 116 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package upload_test
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math"
Expand All @@ -16,7 +15,6 @@ import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/postage"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storagetest"
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
Expand Down Expand Up @@ -528,20 +526,27 @@ func TestChunkPutter(t *testing.T) {

t.Run("iterate all", func(t *testing.T) {
count := 0
err := upload.IterateAll(ts.IndexStore(), func(addr swarm.Address, synced bool) (bool, error) {
count++
if synced {
t.Fatal("expected synced to be false")
}
has, err := ts.ChunkStore().Has(context.Background(), addr)
if err != nil {
t.Fatalf("unexpected error in Has(...): %v", err)
}
if !has {
t.Fatalf("expected chunk to be present %s", addr.String())
}
return false, nil
})
err := ts.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return new(upload.UploadItem) },
},
func(r storage.Result) (bool, error) {
address := swarm.NewAddress([]byte(r.ID[:32]))
synced := r.Entry.(*upload.UploadItem).Synced != 0
count++
if synced {
t.Fatal("expected synced to be false")
}
has, err := ts.ChunkStore().Has(context.Background(), address)
if err != nil {
t.Fatalf("unexpected error in Has(...): %v", err)
}
if !has {
t.Fatalf("expected chunk to be present %s", address.String())
}
return false, nil
},
)
if err != nil {
t.Fatalf("IterateAll(...): unexpected error %v", err)
}
Expand Down Expand Up @@ -573,6 +578,28 @@ func TestChunkPutter(t *testing.T) {
if diff := cmp.Diff(wantTI, ti); diff != "" {
t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff)
}

t.Run("iterate all tag items", func(t *testing.T) {
var tagItemsCount, uploaded, synced uint64
err := upload.IterateAllTagItems(ts.IndexStore(), func(ti *upload.TagItem) (bool, error) {
uploaded += ti.Split
synced += ti.Synced
tagItemsCount++
return false, nil
})
if err != nil {
t.Fatalf("IterateAllTagItems(...): unexpected error %v", err)
}
if tagItemsCount != 1 {
t.Fatalf("unexpected tagItemsCount: want 1 have %d", tagItemsCount)
}
if uploaded != 20 {
t.Fatalf("unexpected uploaded: want 20 have %d", uploaded)
}
if synced != 0 {
t.Fatalf("unexpected synced: want 0 have %d", synced)
}
})
})

t.Run("error after close", func(t *testing.T) {
Expand Down Expand Up @@ -711,28 +738,20 @@ func TestChunkReporter(t *testing.T) {
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
err = ts.IndexStore().Get(ui)
has, err := ts.IndexStore().Has(ui)
if err != nil {
t.Fatalf("Get(...): unexpected error: %v", err)
}
wantUI := &upload.UploadItem{
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
TagID: tag.TagID,
Uploaded: now().UnixNano(),
Synced: now().UnixNano(),
t.Fatalf("unexpected error: %v", err)
}

if diff := cmp.Diff(wantUI, ui); diff != "" {
t.Fatalf("Get(...): unexpected UploadItem (-want +have):\n%s", diff)
if has {
t.Fatalf("expected to not be found: %s", ui)
}

pi := &upload.PushItem{
Timestamp: now().UnixNano(),
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
has, err := ts.IndexStore().Has(pi)
has, err = ts.IndexStore().Has(pi)
if err != nil {
t.Fatalf("Has(...): unexpected error: %v", err)
}
Expand Down Expand Up @@ -780,93 +799,6 @@ func TestChunkReporter(t *testing.T) {
})
}

func TestStampIndexHandling(t *testing.T) {
t.Parallel()

ts := newTestStorage(t)

tag, err := upload.NextTag(ts.IndexStore())
if err != nil {
t.Fatalf("failed creating tag: %v", err)
}

putter, err := upload.NewPutter(ts, tag.TagID)
if err != nil {
t.Fatalf("failed creating putter: %v", err)
}

t.Run("put chunk with immutable batch", func(t *testing.T) {
chunk := chunktest.GenerateTestRandomChunk()
chunk = chunk.WithBatch(
chunk.Radius(),
chunk.Depth(),
chunk.BucketDepth(),
true,
)
if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(chunk.Stamp())

want := upload.ErrOverwriteOfImmutableBatch
have := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2)
if !errors.Is(have, want) {
t.Fatalf("Put(...): unexpected error:\n\twant: %v\n\thave: %v", want, have)
}
})

t.Run("put existing index with older batch timestamp", func(t *testing.T) {
chunk := chunktest.GenerateTestRandomChunk()
if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

decTS := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
encTS := make([]byte, 8)
binary.BigEndian.PutUint64(encTS, decTS-1)

stamp := postage.NewStamp(
chunk.Stamp().BatchID(),
chunk.Stamp().Index(),
encTS,
chunk.Stamp().Sig(),
)

chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(stamp)

want := upload.ErrOverwriteOfNewerBatch
have := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2)
if !errors.Is(have, want) {
t.Fatalf("Put(...): unexpected error:\n\twant: %v\n\thave: %v", want, have)
}
})

t.Run("put existing chunk with newer batch timestamp", func(t *testing.T) {
chunk := chunktest.GenerateTestRandomChunk()
if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

decTS := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
encTS := make([]byte, 8)
binary.BigEndian.PutUint64(encTS, decTS+1)

stamp := postage.NewStamp(
chunk.Stamp().BatchID(),
chunk.Stamp().Index(),
encTS,
chunk.Stamp().Sig(),
)

chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(stamp)

if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}
})
}

func TestNextTagID(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions pkg/storer/migration/all_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func AfterInitSteps(
2: step_02,
3: step_03(chunkStore, reserve.ChunkType),
4: step_04(sharkyPath, sharkyNoOfShards),
5: step_05,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/storer/migration/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ var (
Step_02 = step_02
Step_03 = step_03
Step_04 = step_04
Step_05 = step_05
)
Loading
Loading