From ec22e42528657fa78aa9af11d09c6b02da225ee7 Mon Sep 17 00:00:00 2001 From: aloknerurkar Date: Tue, 23 May 2023 15:05:25 +0530 Subject: [PATCH] fix(storer): fixes from load testing (#4091) --- cmd/bee/cmd/cmd.go | 4 ++-- pkg/pullsync/pullsync.go | 8 ++----- pkg/pullsync/pullsync_test.go | 4 ++-- pkg/shed/db.go | 4 ++-- pkg/storage/statestore.go | 2 +- pkg/storage/storagetest/benchmark.go | 5 +++-- pkg/storage/storagetest/chunkstore.go | 4 ++-- pkg/storer/internal/chunkstamp/chunkstamp.go | 5 +++++ pkg/storer/internal/reserve/reserve.go | 23 +++++++++++++++----- pkg/storer/mock/mockreserve.go | 2 -- pkg/storer/reserve_test.go | 17 +++++++++++++++ pkg/storer/storer.go | 4 ++-- pkg/storer/storer_test.go | 20 ++++++++--------- 13 files changed, 66 insertions(+), 36 deletions(-) diff --git a/cmd/bee/cmd/cmd.go b/cmd/bee/cmd/cmd.go index a5ecdfc6c2d..d34b9689649 100644 --- a/cmd/bee/cmd/cmd.go +++ b/cmd/bee/cmd/cmd.go @@ -240,8 +240,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) { cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory") cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize)) cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database") - cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes") - cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes") + cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 4*1024*1024, "size of block cache of the database in bytes") + cmd.Flags().Uint64(optionNameDBWriteBufferSize, 4*1024*1024, "size of the database write buffer in bytes") cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks") cmd.Flags().String(optionNamePassword, "", "password for decrypting keys") cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys") diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 469e4120710..db96e217fd9 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -262,9 +262,8 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start s.metrics.DbOps.Inc() s.metrics.LastReceived.WithLabelValues(fmt.Sprintf("%d", bin)).Set(float64(time.Now().Unix())) - putter := s.store.ReservePutter(ctx) for _, c := range chunksToPut { - if err := putter.Put(ctx, c); err != nil { + if err := s.store.ReservePut(ctx, c); err != nil { // in case of these errors, no new items are added to the storage, so it // is safe to continue with the next chunk if errors.Is(err, storage.ErrOverwriteNewerChunk) || errors.Is(err, storage.ErrOverwriteOfImmutableBatch) { @@ -272,13 +271,10 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start chunkErr = errors.Join(chunkErr, err) continue } - return 0, 0, errors.Join(chunkErr, err, putter.Cleanup()) + return 0, 0, errors.Join(chunkErr, err) } chunksPut++ } - if err := putter.Done(swarm.ZeroAddress); err != nil { - return 0, 0, errors.Join(chunkErr, err) - } } return topmost, chunksPut, chunkErr diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 27335c3c943..09131b9f44f 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -140,7 +140,7 @@ func TestIncoming_WantAll(t *testing.T) { // should have all haveChunks(t, clientDb, chunks...) - if p := clientDb.PutCalls(); p != 1 { + if p := clientDb.PutCalls(); p != len(chunks) { t.Fatalf("want %d puts but got %d", len(chunks), p) } } @@ -202,7 +202,7 @@ func TestIncoming_WantErrors(t *testing.T) { } haveChunks(t, clientDb, append(tChunks[:1], tChunks[3:5]...)...) - if p := clientDb.PutCalls(); p != 1 { + if p := clientDb.PutCalls(); p != len(chunks)-1 { t.Fatalf("want %d puts but got %d", len(chunks), p) } } diff --git a/pkg/shed/db.go b/pkg/shed/db.go index 79cda4e5f82..1106897d008 100644 --- a/pkg/shed/db.go +++ b/pkg/shed/db.go @@ -34,8 +34,8 @@ import ( var ( defaultOpenFilesLimit = uint64(256) - defaultBlockCacheCapacity = uint64(32 * 1024 * 1024) - defaultWriteBufferSize = uint64(32 * 1024 * 1024) + defaultBlockCacheCapacity = uint64(1 * 1024 * 1024) + defaultWriteBufferSize = uint64(1 * 1024 * 1024) defaultDisableSeeksCompaction = false ) diff --git a/pkg/storage/statestore.go b/pkg/storage/statestore.go index c0b49048e5c..2c0a3371202 100644 --- a/pkg/storage/statestore.go +++ b/pkg/storage/statestore.go @@ -74,7 +74,7 @@ func (pi *proxyItem) Unmarshal(data []byte) error { if pi == nil || pi.obj == nil { return nil } - + switch m := pi.obj.(type) { case encoding.BinaryUnmarshaler: return m.UnmarshalBinary(data) diff --git a/pkg/storage/storagetest/benchmark.go b/pkg/storage/storagetest/benchmark.go index a6dc02e0b56..2dd2c734e66 100644 --- a/pkg/storage/storagetest/benchmark.go +++ b/pkg/storage/storagetest/benchmark.go @@ -15,6 +15,7 @@ import ( "testing" "time" + postagetesting "github.com/ethersphere/bee/pkg/postage/testing" storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" ) @@ -310,12 +311,12 @@ func doDeleteChunk(b *testing.B, db storage.ChunkStore, g keyGenerator) { } } -func doWriteChunk(b *testing.B, db storage.ChunkStore, g entryGenerator) { +func doWriteChunk(b *testing.B, db storage.Putter, g entryGenerator) { b.Helper() for i := 0; i < b.N; i++ { addr := swarm.MustParseHexAddress(string(g.Key(i))) - chunk := swarm.NewChunk(addr, g.Value(i)) + chunk := swarm.NewChunk(addr, g.Value(i)).WithStamp(postagetesting.MustNewStamp()) if err := db.Put(context.Background(), chunk); err != nil { b.Fatalf("write key '%s': %v", string(g.Key(i)), err) } diff --git a/pkg/storage/storagetest/chunkstore.go b/pkg/storage/storagetest/chunkstore.go index d1e71122d8d..dbd3e4d79a3 100644 --- a/pkg/storage/storagetest/chunkstore.go +++ b/pkg/storage/storagetest/chunkstore.go @@ -194,13 +194,13 @@ func RunChunkStoreBenchmarkTests(b *testing.B, s storage.ChunkStore) { }) } -func BenchmarkChunkStoreWriteSequential(b *testing.B, s storage.ChunkStore) { +func BenchmarkChunkStoreWriteSequential(b *testing.B, s storage.Putter) { b.Helper() doWriteChunk(b, s, newSequentialEntryGenerator(b.N)) } -func BenchmarkChunkStoreWriteRandom(b *testing.B, s storage.ChunkStore) { +func BenchmarkChunkStoreWriteRandom(b *testing.B, s storage.Putter) { b.Helper() doWriteChunk(b, s, newFullRandomEntryGenerator(0, b.N)) diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index e9c02e0005a..c3384e649d9 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -151,6 +151,7 @@ func LoadWithBatchID(s storage.Store, namespace string, addr swarm.Address, batc var stamp swarm.Stamp cnt := 0 + found := false err := s.Iterate( storage.Query{ Factory: func() storage.Item { @@ -165,6 +166,7 @@ func LoadWithBatchID(s storage.Store, namespace string, addr swarm.Address, batc item := res.Entry.(*item) if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) { stamp = item.stamp + found = true return true, nil } return false, nil @@ -176,6 +178,9 @@ func LoadWithBatchID(s storage.Store, namespace string, addr swarm.Address, batc if cnt == 0 { return nil, storage.ErrNoStampsForChunk } + if !found { + return nil, fmt.Errorf("stamp not found for batchID %x: %w", batchID, storage.ErrNotFound) + } return stamp, nil } diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 6e5926c7ad6..969ffb768eb 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -127,13 +127,14 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C // 4. Update the stamp index newStampIndex = false - oldChunk := &batchRadiusItem{Bin: po, BatchID: chunk.Stamp().BatchID(), Address: item.ChunkAddress} + oldChunkPO := swarm.Proximity(r.baseAddr.Bytes(), item.ChunkAddress.Bytes()) + oldChunk := &batchRadiusItem{Bin: oldChunkPO, BatchID: chunk.Stamp().BatchID(), Address: item.ChunkAddress} err := indexStore.Get(oldChunk) if err != nil { return false, fmt.Errorf("failed getting old chunk item to replace: %w", err) } - err = removeChunk(store, oldChunk) + err = removeChunk(ctx, store, oldChunk) if err != nil { return false, fmt.Errorf("failed removing older chunk: %w", err) } @@ -330,12 +331,17 @@ func (r *Reserve) EvictBatchBin(ctx context.Context, store internal.Storage, bin c, err := store.ChunkStore().Get(ctx, item.Address) if err != nil { + if errors.Is(err, storage.ErrNotFound) { + continue + } return 0, err } cb(c) - err = removeChunk(store, item) + r.putterMu.Lock() + err = removeChunk(ctx, store, item) + r.putterMu.Unlock() if err != nil { return 0, err } @@ -344,11 +350,18 @@ func (r *Reserve) EvictBatchBin(ctx context.Context, store internal.Storage, bin return len(evicted), nil } -func removeChunk(store internal.Storage, item *batchRadiusItem) error { +func removeChunk(ctx context.Context, store internal.Storage, item *batchRadiusItem) error { indexStore := store.IndexStore() chunkStore := store.ChunkStore() + if has, err := indexStore.Has(item); err != nil || !has { + // removeChunk is called from two places, if we collect the chunk for eviction + // but if it is already removed because of postage stamp index collision or + // vice versa, we should return early + return err + } + err := indexStore.Delete(&chunkBinItem{Bin: item.Bin, BinID: item.BinID}) if err != nil { return err @@ -369,7 +382,7 @@ func removeChunk(store internal.Storage, item *batchRadiusItem) error { return err } - err = chunkStore.Delete(context.TODO(), item.Address) + err = chunkStore.Delete(ctx, item.Address) if err != nil { return err } diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index 9413863fcb1..b684dec3c34 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -196,8 +196,6 @@ func (s *ReserveStore) ReservePutter(ctx context.Context) storer.PutterSession { s.mtx.Lock() defer s.mtx.Unlock() - s.putCalls++ - return &reservePutterSession{ Putter: storage.PutterFunc(func(ctx context.Context, c swarm.Chunk) error { return s.put(ctx, c) diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 4fc85156b95..dc29cd64928 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -20,6 +20,7 @@ import ( pullerMock "github.com/ethersphere/bee/pkg/puller/mock" "github.com/ethersphere/bee/pkg/spinlock" storage "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage/storagetest" chunk "github.com/ethersphere/bee/pkg/storage/testing" storer "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" @@ -829,3 +830,19 @@ func checkSaved(t *testing.T, st *storer.DB, ch swarm.Chunk, stampSaved, chunkSt t.Fatalf("wanted err %s, got err %s", chunkStoreWantedErr, err) } } + +func BenchmarkReservePutter(b *testing.B) { + baseAddr := swarm.RandAddress(b) + storer, err := diskStorer(b, dbTestOps(baseAddr, 10000, nil, nil, time.Second))() + if err != nil { + b.Fatal(err) + } + + putter := storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error { + return storer.ReservePut(ctx, ch) + }) + + b.ResetTimer() + b.ReportAllocs() + storagetest.BenchmarkChunkStoreWriteSequential(b, putter) +} diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 96558e68ae3..9083ae03955 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -200,7 +200,7 @@ func initInmemRepository() (storage.Repository, io.Closer, error) { } txStore := leveldbstore.NewTxStore(store) - txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky) + txChunkStore := chunkstore.NewTxChunkStore(store, sharky) return storage.NewRepository(txStore, txChunkStore), closer(store, sharky), nil } @@ -273,7 +273,7 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st } txStore := leveldbstore.NewTxStore(store) - txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky) + txChunkStore := chunkstore.NewTxChunkStore(store, sharky) return storage.NewRepository(txStore, txChunkStore), closer(store, sharky, recoveryCloser), nil } diff --git a/pkg/storer/storer_test.go b/pkg/storer/storer_test.go index 9a8395fe706..e89e02bcf47 100644 --- a/pkg/storer/storer_test.go +++ b/pkg/storer/storer_test.go @@ -225,14 +225,14 @@ func makeDiskStorer(t *testing.T, opts *storer.Options) *storer.DB { return lstore } -func newStorer(t *testing.T, path string, opts *storer.Options) (*storer.DB, error) { - t.Helper() +func newStorer(tb testing.TB, path string, opts *storer.Options) (*storer.DB, error) { + tb.Helper() lstore, err := storer.New(context.Background(), path, opts) if err == nil { - t.Cleanup(func() { + tb.Cleanup(func() { err := lstore.Close() if err != nil { - t.Errorf("failed closing storer: %v", err) + tb.Errorf("failed closing storer: %v", err) } }) } @@ -240,16 +240,16 @@ func newStorer(t *testing.T, path string, opts *storer.Options) (*storer.DB, err return lstore, err } -func diskStorer(t *testing.T, opts *storer.Options) func() (*storer.DB, error) { - t.Helper() +func diskStorer(tb testing.TB, opts *storer.Options) func() (*storer.DB, error) { + tb.Helper() return func() (*storer.DB, error) { - return newStorer(t, t.TempDir(), opts) + return newStorer(tb, tb.TempDir(), opts) } } -func memStorer(t *testing.T, opts *storer.Options) func() (*storer.DB, error) { - t.Helper() +func memStorer(tb testing.TB, opts *storer.Options) func() (*storer.DB, error) { + tb.Helper() return func() (*storer.DB, error) { - return newStorer(t, "", opts) + return newStorer(tb, "", opts) } }