Skip to content

Commit

Permalink
fix: upload store fixes (#4562)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill authored Feb 5, 2024
1 parent 4532f79 commit 771a40b
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 172 deletions.
38 changes: 16 additions & 22 deletions pkg/storer/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
)

type UploadStat struct {
TotalUploaded int
TotalSynced int
TotalUploaded uint64
TotalSynced uint64
}

type PinningStat struct {
Expand Down Expand Up @@ -80,28 +80,22 @@ func (db *DB) DebugInfo(ctx context.Context) (Info, error) {
})

var (
uploaded int
synced int
uploaded uint64
synced uint64
)
eg.Go(func() error {
return upload.IterateAll(
db.repo.IndexStore(),
func(_ swarm.Address, isSynced bool) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-db.quit:
return true, ErrDBQuit
default:
}

uploaded++
if isSynced {
synced++
}
return false, nil
},
)
return upload.IterateAllTagItems(db.repo.IndexStore(), func(ti *upload.TagItem) (bool, error) {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-db.quit:
return true, ErrDBQuit
default:
}
uploaded += ti.Split
synced += ti.Synced
return false, nil
})
})

var (
Expand Down
63 changes: 29 additions & 34 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/ethersphere/bee/pkg/storage/storageutil"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstamp"
"github.com/ethersphere/bee/pkg/storer/internal/stampindex"
"github.com/ethersphere/bee/pkg/swarm"
)

Expand Down Expand Up @@ -231,10 +230,17 @@ type uploadItem struct {
TagID uint64
Uploaded int64
Synced int64

// IdFunc overrides the ID method.
// This used to get the ID from the item where the address and batchID were not marshalled.
IdFunc func() string
}

// ID implements the storage.Item interface.
func (i uploadItem) ID() string {
if i.IdFunc != nil {
return i.IdFunc()
}
return storageutil.JoinFields(i.Address.ByteString(), string(i.BatchID))
}

Expand Down Expand Up @@ -351,10 +357,6 @@ func (i dirtyTagItem) String() string {
return storageutil.JoinFields(i.Namespace(), i.ID())
}

// stampIndexUploadNamespace represents the
// namespace name of the stamp index for upload.
const stampIndexUploadNamespace = "upload"

var (
// errPutterAlreadyClosed is returned when trying to Put a new chunk
// after the putter has been closed.
Expand Down Expand Up @@ -420,28 +422,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora
return nil
}

switch item, loaded, err := stampindex.LoadOrStore(
s.IndexStore(),
writer,
stampIndexUploadNamespace,
chunk,
); {
case err != nil:
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
case loaded && item.ChunkIsImmutable:
return errOverwriteOfImmutableBatch
case loaded && !item.ChunkIsImmutable:
prev := binary.BigEndian.Uint64(item.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev > curr {
return errOverwriteOfNewerBatch
}
err = stampindex.Store(writer, stampIndexUploadNamespace, chunk)
if err != nil {
return fmt.Errorf("failed updating stamp index: %w", err)
}
}

u.split++

if err := s.ChunkStore().Put(ctx, chunk); err != nil {
Expand Down Expand Up @@ -711,10 +691,9 @@ func Report(
return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err)
}

ui.Synced = now().UnixNano()
err = batch.Put(ui)
err = batch.Delete(ui)
if err != nil {
return fmt.Errorf("failed updating uploadItem %s: %w", ui, err)
return fmt.Errorf("failed deleting uploadItem %s: %w", ui, err)
}

return batch.Commit()
Expand Down Expand Up @@ -848,15 +827,31 @@ func DeleteTag(st storage.Store, tagID uint64) error {
return nil
}

func IterateAll(st storage.Store, iterateFn func(addr swarm.Address, isSynced bool) (bool, error)) error {
func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error {
return st.Iterate(
storage.Query{
Factory: func() storage.Item { return new(uploadItem) },
},
func(r storage.Result) (bool, error) {
address := swarm.NewAddress([]byte(r.ID[:32]))
synced := r.Entry.(*uploadItem).Synced != 0
return iterateFn(address, synced)
ui := r.Entry.(*uploadItem)
ui.IdFunc = func() string {
return r.ID
}
return iterateFn(ui)
},
)
}

func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error {
return st.Iterate(
storage.Query{
Factory: func() storage.Item {
return new(TagItem)
},
},
func(result storage.Result) (bool, error) {
ti := result.Entry.(*TagItem)
return cb(ti)
},
)
}
Expand Down
164 changes: 48 additions & 116 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package upload_test
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math"
Expand All @@ -16,7 +15,6 @@ import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/postage"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storagetest"
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
Expand Down Expand Up @@ -528,20 +526,27 @@ func TestChunkPutter(t *testing.T) {

t.Run("iterate all", func(t *testing.T) {
count := 0
err := upload.IterateAll(ts.IndexStore(), func(addr swarm.Address, synced bool) (bool, error) {
count++
if synced {
t.Fatal("expected synced to be false")
}
has, err := ts.ChunkStore().Has(context.Background(), addr)
if err != nil {
t.Fatalf("unexpected error in Has(...): %v", err)
}
if !has {
t.Fatalf("expected chunk to be present %s", addr.String())
}
return false, nil
})
err := ts.IndexStore().Iterate(
storage.Query{
Factory: func() storage.Item { return new(upload.UploadItem) },
},
func(r storage.Result) (bool, error) {
address := swarm.NewAddress([]byte(r.ID[:32]))
synced := r.Entry.(*upload.UploadItem).Synced != 0
count++
if synced {
t.Fatal("expected synced to be false")
}
has, err := ts.ChunkStore().Has(context.Background(), address)
if err != nil {
t.Fatalf("unexpected error in Has(...): %v", err)
}
if !has {
t.Fatalf("expected chunk to be present %s", address.String())
}
return false, nil
},
)
if err != nil {
t.Fatalf("IterateAll(...): unexpected error %v", err)
}
Expand Down Expand Up @@ -573,6 +578,28 @@ func TestChunkPutter(t *testing.T) {
if diff := cmp.Diff(wantTI, ti); diff != "" {
t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff)
}

t.Run("iterate all tag items", func(t *testing.T) {
var tagItemsCount, uploaded, synced uint64
err := upload.IterateAllTagItems(ts.IndexStore(), func(ti *upload.TagItem) (bool, error) {
uploaded += ti.Split
synced += ti.Synced
tagItemsCount++
return false, nil
})
if err != nil {
t.Fatalf("IterateAllTagItems(...): unexpected error %v", err)
}
if tagItemsCount != 1 {
t.Fatalf("unexpected tagItemsCount: want 1 have %d", tagItemsCount)
}
if uploaded != 20 {
t.Fatalf("unexpected uploaded: want 20 have %d", uploaded)
}
if synced != 0 {
t.Fatalf("unexpected synced: want 0 have %d", synced)
}
})
})

t.Run("error after close", func(t *testing.T) {
Expand Down Expand Up @@ -711,28 +738,20 @@ func TestChunkReporter(t *testing.T) {
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
err = ts.IndexStore().Get(ui)
has, err := ts.IndexStore().Has(ui)
if err != nil {
t.Fatalf("Get(...): unexpected error: %v", err)
}
wantUI := &upload.UploadItem{
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
TagID: tag.TagID,
Uploaded: now().UnixNano(),
Synced: now().UnixNano(),
t.Fatalf("unexpected error: %v", err)
}

if diff := cmp.Diff(wantUI, ui); diff != "" {
t.Fatalf("Get(...): unexpected UploadItem (-want +have):\n%s", diff)
if has {
t.Fatalf("expected to not be found: %s", ui)
}

pi := &upload.PushItem{
Timestamp: now().UnixNano(),
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
has, err := ts.IndexStore().Has(pi)
has, err = ts.IndexStore().Has(pi)
if err != nil {
t.Fatalf("Has(...): unexpected error: %v", err)
}
Expand Down Expand Up @@ -780,93 +799,6 @@ func TestChunkReporter(t *testing.T) {
})
}

func TestStampIndexHandling(t *testing.T) {
t.Parallel()

ts := newTestStorage(t)

tag, err := upload.NextTag(ts.IndexStore())
if err != nil {
t.Fatalf("failed creating tag: %v", err)
}

putter, err := upload.NewPutter(ts, tag.TagID)
if err != nil {
t.Fatalf("failed creating putter: %v", err)
}

t.Run("put chunk with immutable batch", func(t *testing.T) {
chunk := chunktest.GenerateTestRandomChunk()
chunk = chunk.WithBatch(
chunk.Radius(),
chunk.Depth(),
chunk.BucketDepth(),
true,
)
if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(chunk.Stamp())

want := upload.ErrOverwriteOfImmutableBatch
have := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2)
if !errors.Is(have, want) {
t.Fatalf("Put(...): unexpected error:\n\twant: %v\n\thave: %v", want, have)
}
})

t.Run("put existing index with older batch timestamp", func(t *testing.T) {
chunk := chunktest.GenerateTestRandomChunk()
if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

decTS := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
encTS := make([]byte, 8)
binary.BigEndian.PutUint64(encTS, decTS-1)

stamp := postage.NewStamp(
chunk.Stamp().BatchID(),
chunk.Stamp().Index(),
encTS,
chunk.Stamp().Sig(),
)

chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(stamp)

want := upload.ErrOverwriteOfNewerBatch
have := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2)
if !errors.Is(have, want) {
t.Fatalf("Put(...): unexpected error:\n\twant: %v\n\thave: %v", want, have)
}
})

t.Run("put existing chunk with newer batch timestamp", func(t *testing.T) {
chunk := chunktest.GenerateTestRandomChunk()
if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

decTS := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
encTS := make([]byte, 8)
binary.BigEndian.PutUint64(encTS, decTS+1)

stamp := postage.NewStamp(
chunk.Stamp().BatchID(),
chunk.Stamp().Index(),
encTS,
chunk.Stamp().Sig(),
)

chunk2 := chunktest.GenerateTestRandomChunk().WithStamp(stamp)

if err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk2); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}
})
}

func TestNextTagID(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions pkg/storer/migration/all_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func AfterInitSteps(
2: step_02,
3: step_03(chunkStore, reserve.ChunkType),
4: step_04(sharkyPath, sharkyNoOfShards),
5: step_05,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/storer/migration/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ var (
Step_02 = step_02
Step_03 = step_03
Step_04 = step_04
Step_05 = step_05
)
Loading

0 comments on commit 771a40b

Please sign in to comment.