Skip to content

Commit

Permalink
fix(storer): fixes from load testing (#4091)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored May 23, 2023
1 parent 3b4eba0 commit ec22e42
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 36 deletions.
4 changes: 2 additions & 2 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameDataDir, filepath.Join(c.homeDir, ".bee"), "data directory")
cmd.Flags().Uint64(optionNameCacheCapacity, 1000000, fmt.Sprintf("cache capacity in chunks, multiply by %d to get approximate capacity in bytes", swarm.ChunkSize))
cmd.Flags().Uint64(optionNameDBOpenFilesLimit, 200, "number of open files allowed by database")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 32*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 32*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Uint64(optionNameDBBlockCacheCapacity, 4*1024*1024, "size of block cache of the database in bytes")
cmd.Flags().Uint64(optionNameDBWriteBufferSize, 4*1024*1024, "size of the database write buffer in bytes")
cmd.Flags().Bool(optionNameDBDisableSeeksCompaction, false, "disables db compactions triggered by seeks")
cmd.Flags().String(optionNamePassword, "", "password for decrypting keys")
cmd.Flags().String(optionNamePasswordFile, "", "path to a file that contains password for decrypting keys")
Expand Down
8 changes: 2 additions & 6 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,19 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
s.metrics.DbOps.Inc()
s.metrics.LastReceived.WithLabelValues(fmt.Sprintf("%d", bin)).Set(float64(time.Now().Unix()))

putter := s.store.ReservePutter(ctx)
for _, c := range chunksToPut {
if err := putter.Put(ctx, c); err != nil {
if err := s.store.ReservePut(ctx, c); err != nil {
// in case of these errors, no new items are added to the storage, so it
// is safe to continue with the next chunk
if errors.Is(err, storage.ErrOverwriteNewerChunk) || errors.Is(err, storage.ErrOverwriteOfImmutableBatch) {
s.logger.Debug("overwrite newer chunk", "error", err, "peer_address", peer, "chunk", c)
chunkErr = errors.Join(chunkErr, err)
continue
}
return 0, 0, errors.Join(chunkErr, err, putter.Cleanup())
return 0, 0, errors.Join(chunkErr, err)
}
chunksPut++
}
if err := putter.Done(swarm.ZeroAddress); err != nil {
return 0, 0, errors.Join(chunkErr, err)
}
}

return topmost, chunksPut, chunkErr
Expand Down
4 changes: 2 additions & 2 deletions pkg/pullsync/pullsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestIncoming_WantAll(t *testing.T) {

// should have all
haveChunks(t, clientDb, chunks...)
if p := clientDb.PutCalls(); p != 1 {
if p := clientDb.PutCalls(); p != len(chunks) {
t.Fatalf("want %d puts but got %d", len(chunks), p)
}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestIncoming_WantErrors(t *testing.T) {
}

haveChunks(t, clientDb, append(tChunks[:1], tChunks[3:5]...)...)
if p := clientDb.PutCalls(); p != 1 {
if p := clientDb.PutCalls(); p != len(chunks)-1 {
t.Fatalf("want %d puts but got %d", len(chunks), p)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (

var (
defaultOpenFilesLimit = uint64(256)
defaultBlockCacheCapacity = uint64(32 * 1024 * 1024)
defaultWriteBufferSize = uint64(32 * 1024 * 1024)
defaultBlockCacheCapacity = uint64(1 * 1024 * 1024)
defaultWriteBufferSize = uint64(1 * 1024 * 1024)
defaultDisableSeeksCompaction = false
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (pi *proxyItem) Unmarshal(data []byte) error {
if pi == nil || pi.obj == nil {
return nil
}

switch m := pi.obj.(type) {
case encoding.BinaryUnmarshaler:
return m.UnmarshalBinary(data)
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/storagetest/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"
"time"

postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
)
Expand Down Expand Up @@ -310,12 +311,12 @@ func doDeleteChunk(b *testing.B, db storage.ChunkStore, g keyGenerator) {
}
}

func doWriteChunk(b *testing.B, db storage.ChunkStore, g entryGenerator) {
func doWriteChunk(b *testing.B, db storage.Putter, g entryGenerator) {
b.Helper()

for i := 0; i < b.N; i++ {
addr := swarm.MustParseHexAddress(string(g.Key(i)))
chunk := swarm.NewChunk(addr, g.Value(i))
chunk := swarm.NewChunk(addr, g.Value(i)).WithStamp(postagetesting.MustNewStamp())
if err := db.Put(context.Background(), chunk); err != nil {
b.Fatalf("write key '%s': %v", string(g.Key(i)), err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storagetest/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ func RunChunkStoreBenchmarkTests(b *testing.B, s storage.ChunkStore) {
})
}

func BenchmarkChunkStoreWriteSequential(b *testing.B, s storage.ChunkStore) {
func BenchmarkChunkStoreWriteSequential(b *testing.B, s storage.Putter) {
b.Helper()

doWriteChunk(b, s, newSequentialEntryGenerator(b.N))
}

func BenchmarkChunkStoreWriteRandom(b *testing.B, s storage.ChunkStore) {
func BenchmarkChunkStoreWriteRandom(b *testing.B, s storage.Putter) {
b.Helper()

doWriteChunk(b, s, newFullRandomEntryGenerator(0, b.N))
Expand Down
5 changes: 5 additions & 0 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func LoadWithBatchID(s storage.Store, namespace string, addr swarm.Address, batc
var stamp swarm.Stamp

cnt := 0
found := false
err := s.Iterate(
storage.Query{
Factory: func() storage.Item {
Expand All @@ -165,6 +166,7 @@ func LoadWithBatchID(s storage.Store, namespace string, addr swarm.Address, batc
item := res.Entry.(*item)
if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) {
stamp = item.stamp
found = true
return true, nil
}
return false, nil
Expand All @@ -176,6 +178,9 @@ func LoadWithBatchID(s storage.Store, namespace string, addr swarm.Address, batc
if cnt == 0 {
return nil, storage.ErrNoStampsForChunk
}
if !found {
return nil, fmt.Errorf("stamp not found for batchID %x: %w", batchID, storage.ErrNotFound)
}

return stamp, nil
}
Expand Down
23 changes: 18 additions & 5 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C
// 4. Update the stamp index
newStampIndex = false

oldChunk := &batchRadiusItem{Bin: po, BatchID: chunk.Stamp().BatchID(), Address: item.ChunkAddress}
oldChunkPO := swarm.Proximity(r.baseAddr.Bytes(), item.ChunkAddress.Bytes())
oldChunk := &batchRadiusItem{Bin: oldChunkPO, BatchID: chunk.Stamp().BatchID(), Address: item.ChunkAddress}
err := indexStore.Get(oldChunk)
if err != nil {
return false, fmt.Errorf("failed getting old chunk item to replace: %w", err)
}

err = removeChunk(store, oldChunk)
err = removeChunk(ctx, store, oldChunk)
if err != nil {
return false, fmt.Errorf("failed removing older chunk: %w", err)
}
Expand Down Expand Up @@ -330,12 +331,17 @@ func (r *Reserve) EvictBatchBin(ctx context.Context, store internal.Storage, bin

c, err := store.ChunkStore().Get(ctx, item.Address)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
continue
}
return 0, err
}

cb(c)

err = removeChunk(store, item)
r.putterMu.Lock()
err = removeChunk(ctx, store, item)
r.putterMu.Unlock()
if err != nil {
return 0, err
}
Expand All @@ -344,11 +350,18 @@ func (r *Reserve) EvictBatchBin(ctx context.Context, store internal.Storage, bin
return len(evicted), nil
}

func removeChunk(store internal.Storage, item *batchRadiusItem) error {
func removeChunk(ctx context.Context, store internal.Storage, item *batchRadiusItem) error {

indexStore := store.IndexStore()
chunkStore := store.ChunkStore()

if has, err := indexStore.Has(item); err != nil || !has {
// removeChunk is called from two places, if we collect the chunk for eviction
// but if it is already removed because of postage stamp index collision or
// vice versa, we should return early
return err
}

err := indexStore.Delete(&chunkBinItem{Bin: item.Bin, BinID: item.BinID})
if err != nil {
return err
Expand All @@ -369,7 +382,7 @@ func removeChunk(store internal.Storage, item *batchRadiusItem) error {
return err
}

err = chunkStore.Delete(context.TODO(), item.Address)
err = chunkStore.Delete(ctx, item.Address)
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/storer/mock/mockreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ func (s *ReserveStore) ReservePutter(ctx context.Context) storer.PutterSession {
s.mtx.Lock()
defer s.mtx.Unlock()

s.putCalls++

return &reservePutterSession{
Putter: storage.PutterFunc(func(ctx context.Context, c swarm.Chunk) error {
return s.put(ctx, c)
Expand Down
17 changes: 17 additions & 0 deletions pkg/storer/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pullerMock "github.com/ethersphere/bee/pkg/puller/mock"
"github.com/ethersphere/bee/pkg/spinlock"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storagetest"
chunk "github.com/ethersphere/bee/pkg/storage/testing"
storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstamp"
Expand Down Expand Up @@ -829,3 +830,19 @@ func checkSaved(t *testing.T, st *storer.DB, ch swarm.Chunk, stampSaved, chunkSt
t.Fatalf("wanted err %s, got err %s", chunkStoreWantedErr, err)
}
}

func BenchmarkReservePutter(b *testing.B) {
baseAddr := swarm.RandAddress(b)
storer, err := diskStorer(b, dbTestOps(baseAddr, 10000, nil, nil, time.Second))()
if err != nil {
b.Fatal(err)
}

putter := storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error {
return storer.ReservePut(ctx, ch)
})

b.ResetTimer()
b.ReportAllocs()
storagetest.BenchmarkChunkStoreWriteSequential(b, putter)
}
4 changes: 2 additions & 2 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func initInmemRepository() (storage.Repository, io.Closer, error) {
}

txStore := leveldbstore.NewTxStore(store)
txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)
txChunkStore := chunkstore.NewTxChunkStore(store, sharky)

return storage.NewRepository(txStore, txChunkStore), closer(store, sharky), nil
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func initDiskRepository(ctx context.Context, basePath string, opts *Options) (st
}

txStore := leveldbstore.NewTxStore(store)
txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)
txChunkStore := chunkstore.NewTxChunkStore(store, sharky)

return storage.NewRepository(txStore, txChunkStore), closer(store, sharky, recoveryCloser), nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/storer/storer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,31 +225,31 @@ func makeDiskStorer(t *testing.T, opts *storer.Options) *storer.DB {
return lstore
}

func newStorer(t *testing.T, path string, opts *storer.Options) (*storer.DB, error) {
t.Helper()
func newStorer(tb testing.TB, path string, opts *storer.Options) (*storer.DB, error) {
tb.Helper()
lstore, err := storer.New(context.Background(), path, opts)
if err == nil {
t.Cleanup(func() {
tb.Cleanup(func() {
err := lstore.Close()
if err != nil {
t.Errorf("failed closing storer: %v", err)
tb.Errorf("failed closing storer: %v", err)
}
})
}

return lstore, err
}

func diskStorer(t *testing.T, opts *storer.Options) func() (*storer.DB, error) {
t.Helper()
func diskStorer(tb testing.TB, opts *storer.Options) func() (*storer.DB, error) {
tb.Helper()
return func() (*storer.DB, error) {
return newStorer(t, t.TempDir(), opts)
return newStorer(tb, tb.TempDir(), opts)
}
}

func memStorer(t *testing.T, opts *storer.Options) func() (*storer.DB, error) {
t.Helper()
func memStorer(tb testing.TB, opts *storer.Options) func() (*storer.DB, error) {
tb.Helper()
return func() (*storer.DB, error) {
return newStorer(t, "", opts)
return newStorer(tb, "", opts)
}
}

0 comments on commit ec22e42

Please sign in to comment.