diff --git a/pkg/statestore/storeadapter/migration.go b/pkg/statestore/storeadapter/migration.go index 89fc5b5e7f4..42a4dcf2239 100644 --- a/pkg/statestore/storeadapter/migration.go +++ b/pkg/statestore/storeadapter/migration.go @@ -11,7 +11,7 @@ import ( "github.com/ethersphere/bee/pkg/storage/migration" ) -func allSteps(st storage.BatchedStore) migration.Steps { +func allSteps(st storage.BatchStore) migration.Steps { return map[uint64]migration.StepFn{ 1: epochMigration(st), 2: deletePrefix(st, "sync_interval"), @@ -22,7 +22,7 @@ func allSteps(st storage.BatchedStore) migration.Steps { } } -func deletePrefix(s storage.BatchedStore, prefix string) migration.StepFn { +func deletePrefix(s storage.BatchStore, prefix string) migration.StepFn { return func() error { store := &StateStorerAdapter{s} return store.Iterate(prefix, func(key, val []byte) (stop bool, err error) { @@ -31,7 +31,7 @@ func deletePrefix(s storage.BatchedStore, prefix string) migration.StepFn { } } -func epochMigration(s storage.BatchedStore) migration.StepFn { +func epochMigration(s storage.BatchStore) migration.StepFn { return func() error { diff --git a/pkg/statestore/storeadapter/storeadapter.go b/pkg/statestore/storeadapter/storeadapter.go index ab595d7c633..e34ef923792 100644 --- a/pkg/statestore/storeadapter/storeadapter.go +++ b/pkg/statestore/storeadapter/storeadapter.go @@ -229,7 +229,7 @@ func (s *StateStorerAdapter) deleteKeys(keys []string) error { } // NewStateStorerAdapter creates a new StateStorerAdapter. -func NewStateStorerAdapter(storage storage.BatchedStore) (*StateStorerAdapter, error) { +func NewStateStorerAdapter(storage storage.BatchStore) (*StateStorerAdapter, error) { err := migration.Migrate(storage, "migration", allSteps(storage)) if err != nil { return nil, err diff --git a/pkg/storage/cache/cache.go b/pkg/storage/cache/cache.go index bf407e35f11..fa1647ae673 100644 --- a/pkg/storage/cache/cache.go +++ b/pkg/storage/cache/cache.go @@ -15,12 +15,12 @@ func key(key storage.Key) string { return storageutil.JoinFields(key.Namespace(), key.ID()) } -var _ storage.BatchedStore = (*Cache)(nil) +var _ storage.BatchStore = (*Cache)(nil) // Cache is a wrapper around a storage.Store that adds a layer // of in-memory caching for the Get and Has operations. type Cache struct { - storage.BatchedStore + storage.BatchStore lru *lru.Cache[string, []byte] metrics metrics @@ -29,7 +29,7 @@ type Cache struct { // Wrap adds a layer of in-memory caching to storage.Reader Get and Has operations. // It returns an error if the capacity is less than or equal to zero or if the // given store implements storage.Tx -func Wrap(store storage.BatchedStore, capacity int) (*Cache, error) { +func Wrap(store storage.BatchStore, capacity int) (*Cache, error) { lru, err := lru.New[string, []byte](capacity) if err != nil { return nil, err @@ -39,7 +39,7 @@ func Wrap(store storage.BatchedStore, capacity int) (*Cache, error) { } // MustWrap is like Wrap but panics on error. -func MustWrap(store storage.BatchedStore, capacity int) *Cache { +func MustWrap(store storage.BatchStore, capacity int) *Cache { c, err := Wrap(store, capacity) if err != nil { panic(err) @@ -66,7 +66,7 @@ func (c *Cache) Get(i storage.Item) error { return i.Unmarshal(val) } - if err := c.BatchedStore.Get(i); err != nil { + if err := c.BatchStore.Get(i); err != nil { return err } @@ -87,7 +87,7 @@ func (c *Cache) Has(k storage.Key) (bool, error) { } c.metrics.CacheMiss.Inc() - return c.BatchedStore.Has(k) + return c.BatchStore.Has(k) } // Put implements storage.Store interface. @@ -95,12 +95,12 @@ func (c *Cache) Has(k storage.Key) (bool, error) { // call to Put and Has will be able to retrieve the item from cache. func (c *Cache) Put(i storage.Item) error { c.add(i) - return c.BatchedStore.Put(i) + return c.BatchStore.Put(i) } // Delete implements storage.Store interface. // On a call it also removes the item from the cache. func (c *Cache) Delete(i storage.Item) error { _ = c.lru.Remove(key(i)) - return c.BatchedStore.Delete(i) + return c.BatchStore.Delete(i) } diff --git a/pkg/storage/migration/index.go b/pkg/storage/migration/index.go index bd239e36170..5365b5fe910 100644 --- a/pkg/storage/migration/index.go +++ b/pkg/storage/migration/index.go @@ -62,7 +62,7 @@ func (o *options) applyAll(opts []option) { // NewStepOnIndex creates new migration step with update and/or delete operation. // Migration will iterate on all elements selected by query and delete or update items // based on supplied callback functions. -func NewStepOnIndex(s storage.BatchedStore, query storage.Query, opts ...option) StepFn { +func NewStepOnIndex(s storage.BatchStore, query storage.Query, opts ...option) StepFn { o := defaultOptions() o.applyAll(opts) diff --git a/pkg/storage/migration/migration_test.go b/pkg/storage/migration/migration_test.go index 83de5add073..84242348746 100644 --- a/pkg/storage/migration/migration_test.go +++ b/pkg/storage/migration/migration_test.go @@ -281,7 +281,7 @@ func TestMigrate(t *testing.T) { }) } -func assertObjectExists(t *testing.T, s storage.BatchedStore, keys ...storage.Key) { +func assertObjectExists(t *testing.T, s storage.BatchStore, keys ...storage.Key) { t.Helper() for _, key := range keys { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 48332167d53..b76d3f9a275 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -189,8 +189,8 @@ type Writer interface { Delete(Item) error } -// BatchedStore is a store that supports batching of Writer method calls. -type BatchedStore interface { +// BatchStore is a store that supports batching of Writer method calls. +type BatchStore interface { Store Batcher } diff --git a/pkg/storage/storagetest/batch.go b/pkg/storage/storagetest/batch.go index dde203daeea..16fa04a2dcd 100644 --- a/pkg/storage/storagetest/batch.go +++ b/pkg/storage/storagetest/batch.go @@ -13,7 +13,7 @@ import ( "github.com/google/go-cmp/cmp" ) -func TestBatchedStore(t *testing.T, bs storage.BatchedStore) { +func TestBatchedStore(t *testing.T, bs storage.BatchStore) { item := &obj1{Id: "id", SomeInt: 1, Buf: []byte("data")} t.Run("duplicates are rejected", func(t *testing.T) { diff --git a/pkg/storage/storagetest/storage.go b/pkg/storage/storagetest/storage.go index b5609d84587..3a7327949ef 100644 --- a/pkg/storage/storagetest/storage.go +++ b/pkg/storage/storagetest/storage.go @@ -851,7 +851,7 @@ func BenchmarkStore(b *testing.B, s storage.Store) { // BenchmarkBatchedStore provides a benchmark suite for the // storage.BatchedStore. Only the Write and Delete methods are tested. -func BenchmarkBatchedStore(b *testing.B, bs storage.BatchedStore) { +func BenchmarkBatchedStore(b *testing.B, bs storage.BatchStore) { b.Run("WriteInBatches", func(b *testing.B) { BenchmarkWriteInBatches(b, bs) }) @@ -946,7 +946,7 @@ func BenchmarkWriteSequential(b *testing.B, db storage.Store) { doWrite(b, db, g) } -func BenchmarkWriteInBatches(b *testing.B, bs storage.BatchedStore) { +func BenchmarkWriteInBatches(b *testing.B, bs storage.BatchStore) { g := newSequentialEntryGenerator(b.N) batch := bs.Batch(context.Background()) resetBenchmark(b) @@ -965,7 +965,7 @@ func BenchmarkWriteInBatches(b *testing.B, bs storage.BatchedStore) { } } -func BenchmarkWriteInFixedSizeBatches(b *testing.B, bs storage.BatchedStore) { +func BenchmarkWriteInFixedSizeBatches(b *testing.B, bs storage.BatchStore) { g := newSequentialEntryGenerator(b.N) writer := newBatchDBWriter(bs) resetBenchmark(b) @@ -1016,7 +1016,7 @@ func BenchmarkDeleteSequential(b *testing.B, db storage.Store) { doDelete(b, db, g) } -func BenchmarkDeleteInBatches(b *testing.B, bs storage.BatchedStore) { +func BenchmarkDeleteInBatches(b *testing.B, bs storage.BatchStore) { g := newSequentialEntryGenerator(b.N) doWrite(b, bs, g) resetBenchmark(b) @@ -1034,7 +1034,7 @@ func BenchmarkDeleteInBatches(b *testing.B, bs storage.BatchedStore) { } } -func BenchmarkDeleteInFixedSizeBatches(b *testing.B, bs storage.BatchedStore) { +func BenchmarkDeleteInFixedSizeBatches(b *testing.B, bs storage.BatchStore) { g := newSequentialEntryGenerator(b.N) doWrite(b, bs, g) resetBenchmark(b) diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 073487ed258..1fda090bd25 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -55,12 +55,12 @@ type Storage interface { type store struct { sharky *sharky.Store - bstore storage.BatchedStore + bstore storage.BatchStore metrics metrics chunkLocker *multex.Multex } -func NewStorage(sharky *sharky.Store, bstore storage.BatchedStore) Storage { +func NewStorage(sharky *sharky.Store, bstore storage.BatchStore) Storage { return &store{sharky, bstore, newMetrics(), multex.New()} } @@ -201,9 +201,10 @@ func (t *transaction) Commit() (err error) { for _, l := range t.sharkyTrx.releasedLocs { h := handleMetric("sharky_release", t.metrics) - if rerr := t.sharkyTrx.sharky.Release(context.TODO(), l); rerr != nil { + rerr := t.sharkyTrx.sharky.Release(context.TODO(), l) + h(rerr) + if rerr != nil { err = errors.Join(err, fmt.Errorf("failed releasing location afer commit %s: %w", l, rerr)) - h(err) } } @@ -250,18 +251,19 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) } func (c *chunkStoreTrx) lock(addr swarm.Address) func() { - if c.readOnly { // directly lock + // directly lock + if c.readOnly { c.globalLocker.Lock(addr.ByteString()) - } else if _, ok := c.lockedAddrs[addr.ByteString()]; !ok { // lock chunk only once in the same transaction + return func() { c.globalLocker.Unlock(addr.ByteString()) } + } + + // lock chunk only once in the same transaction + if _, ok := c.lockedAddrs[addr.ByteString()]; !ok { c.globalLocker.Lock(addr.ByteString()) c.lockedAddrs[addr.ByteString()] = struct{}{} } - return func() { - if c.readOnly { - c.globalLocker.Unlock(addr.ByteString()) - } - } + return func() {} // unlocking the chunk will be done in the Commit() } type sharkyTrx struct { diff --git a/pkg/storer/migration/all_steps.go b/pkg/storer/migration/all_steps.go index 59f5a4b28e2..39f2b5a7438 100644 --- a/pkg/storer/migration/all_steps.go +++ b/pkg/storer/migration/all_steps.go @@ -26,7 +26,7 @@ func AfterInitSteps( } // BeforeInitSteps lists all migration steps for localstore IndexStore before the localstore is intiated. -func BeforeInitSteps(st storage.BatchedStore) migration.Steps { +func BeforeInitSteps(st storage.BatchStore) migration.Steps { return map[uint64]migration.StepFn{ 1: RefCountSizeInc(st), } diff --git a/pkg/storer/migration/refCntSize.go b/pkg/storer/migration/refCntSize.go index 53ebfcafdfc..3a28d99affe 100644 --- a/pkg/storer/migration/refCntSize.go +++ b/pkg/storer/migration/refCntSize.go @@ -101,7 +101,7 @@ func (r OldRetrievalIndexItem) String() string { return storageutil.JoinFields(r.Namespace(), r.ID()) } -func RefCountSizeInc(s storage.BatchedStore) func() error { +func RefCountSizeInc(s storage.BatchStore) func() error { return func() error { logger := log.NewLogger("migration-RefCountSizeInc", log.WithSink(os.Stdout))