From e295f799eccb2f78d47a9e455c0c0eec80052121 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 23 Jan 2024 19:46:37 +0300 Subject: [PATCH] chore: metrics --- pkg/storer/cachestore.go | 4 +- pkg/storer/export_test.go | 4 +- pkg/storer/internal/cache/cache.go | 18 ++-- pkg/storer/internal/cache/cache_test.go | 13 +-- .../internal/chunkstamp/chunkstamp_test.go | 11 +-- .../internal/chunkstore/chunkstore_test.go | 22 ++--- pkg/storer/internal/internal.go | 17 ++-- pkg/storer/internal/pinning/pinning.go | 17 ++-- pkg/storer/internal/pinning/pinning_test.go | 33 +++---- pkg/storer/internal/reserve/reserve.go | 16 ++-- pkg/storer/internal/reserve/reserve_test.go | 3 +- .../internal/stampindex/stampindex_test.go | 7 +- pkg/storer/internal/transaction/metrics.go | 41 +++++++++ .../internal/{ => transaction}/transaction.go | 89 +++++++++++-------- .../{ => transaction}/transaction_test.go | 10 +-- pkg/storer/internal/upload/uploadstore.go | 17 ++-- .../internal/upload/uploadstore_test.go | 65 +++++++------- pkg/storer/migration/all_steps.go | 4 +- pkg/storer/migration/all_steps_test.go | 3 +- pkg/storer/migration/step_02.go | 4 +- pkg/storer/migration/step_02_test.go | 3 +- pkg/storer/migration/step_03.go | 12 +-- pkg/storer/migration/step_03_test.go | 9 +- pkg/storer/migration/step_04.go | 4 +- pkg/storer/migration/step_04_test.go | 10 +-- pkg/storer/pinstore.go | 7 +- pkg/storer/reserve.go | 8 +- pkg/storer/storer.go | 22 ++--- pkg/storer/storer_test.go | 7 +- pkg/storer/uploadstore.go | 11 +-- 30 files changed, 283 insertions(+), 208 deletions(-) create mode 100644 pkg/storer/internal/transaction/metrics.go rename pkg/storer/internal/{ => transaction}/transaction.go (80%) rename pkg/storer/internal/{ => transaction}/transaction_test.go (96%) diff --git a/pkg/storer/cachestore.go b/pkg/storer/cachestore.go index e732c92afc9..73be9d058fb 100644 --- a/pkg/storer/cachestore.go +++ b/pkg/storer/cachestore.go @@ -11,7 +11,7 @@ import ( "time" storage "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) @@ -97,7 +97,7 @@ func (db *DB) Cache() storage.Putter { } // CacheShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore. -func (db *DB) CacheShallowCopy(ctx context.Context, store internal.Storage, addrs ...swarm.Address) error { +func (db *DB) CacheShallowCopy(ctx context.Context, store transaction.Storage, addrs ...swarm.Address) error { defer db.triggerCacheEviction() dur := captureDuration(time.Now()) err := db.cacheObj.ShallowCopy(ctx, store, addrs...) diff --git a/pkg/storer/export_test.go b/pkg/storer/export_test.go index 6899b89d8c5..cdceaca2d51 100644 --- a/pkg/storer/export_test.go +++ b/pkg/storer/export_test.go @@ -5,16 +5,16 @@ package storer import ( - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/events" "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" ) func (db *DB) Reserve() *reserve.Reserve { return db.reserve } -func (db *DB) Storage() internal.Storage { +func (db *DB) Storage() transaction.Storage { return db.storage } diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index 5bc734e2c19..30c65e10317 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -15,7 +15,7 @@ import ( "time" storage "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "resenje.org/multex" ) @@ -72,7 +72,7 @@ func (c *Cache) Capacity() uint64 { return uint64(c.capacity) } // Putter returns a Storage.Putter instance which adds the chunk to the underlying // chunkstore and also adds a Cache entry for the chunk. -func (c *Cache) Putter(store internal.Storage) storage.Putter { +func (c *Cache) Putter(store transaction.Storage) storage.Putter { return storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { c.chunkLock.Lock(chunk.Address().ByteString()) @@ -127,7 +127,7 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter { // part of cache it will update the cache indexes. If the operation to update the // cache indexes fail, we need to fail the operation as this should signal the user // of this getter to rollback the operation. -func (c *Cache) Getter(store internal.Storage) storage.Getter { +func (c *Cache) Getter(store transaction.Storage) storage.Getter { return storage.GetterFunc(func(ctx context.Context, address swarm.Address) (swarm.Chunk, error) { c.chunkLock.Lock(address.ByteString()) @@ -188,7 +188,7 @@ func (c *Cache) Getter(store internal.Storage) storage.Getter { // ShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore. func (c *Cache) ShallowCopy( ctx context.Context, - store internal.Storage, + store transaction.Storage, addrs ...swarm.Address, ) (err error) { @@ -197,7 +197,7 @@ func (c *Cache) ShallowCopy( defer func() { if err != nil { - err = store.Run(func(s internal.Store) error { + err = store.Run(func(s transaction.Store) error { var rerr error for _, addr := range addrs { rerr = errors.Join(rerr, s.ChunkStore().Delete(context.Background(), addr)) @@ -210,7 +210,7 @@ func (c *Cache) ShallowCopy( //consider only the amount that can fit, the rest should be deleted from the chunkstore. if len(addrs) > c.capacity { - _ = store.Run(func(s internal.Store) error { + _ = store.Run(func(s transaction.Store) error { for _, addr := range addrs[:len(addrs)-c.capacity] { _ = s.ChunkStore().Delete(ctx, addr) } @@ -221,7 +221,7 @@ func (c *Cache) ShallowCopy( entriesToAdd := make([]*cacheEntry, 0, len(addrs)) - err = store.Run(func(s internal.Store) error { + err = store.Run(func(s transaction.Store) error { for _, addr := range addrs { entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()} if has, err := s.IndexStore().Has(entry); err == nil && has { @@ -257,7 +257,7 @@ func (c *Cache) ShallowCopy( // specifies the number of entries to remove. func (c *Cache) RemoveOldest( ctx context.Context, - st internal.Storage, + st transaction.Storage, count uint64, ) error { if count <= 0 { @@ -299,7 +299,7 @@ func (c *Cache) RemoveOldest( end = len(evictItems) } - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { for _, entry := range evictItems[i:end] { err = s.IndexStore().Delete(entry) if err != nil { diff --git a/pkg/storer/internal/cache/cache_test.go b/pkg/storer/internal/cache/cache_test.go index 57042cdf59e..ad23d8cc7f9 100644 --- a/pkg/storer/internal/cache/cache_test.go +++ b/pkg/storer/internal/cache/cache_test.go @@ -19,6 +19,7 @@ import ( chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/cache" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/google/go-cmp/cmp" ) @@ -86,7 +87,7 @@ func TestCacheEntryItem(t *testing.T) { } } -func newTestStorage(t *testing.T) internal.Storage { +func newTestStorage(t *testing.T) transaction.Storage { t.Helper() return internal.NewInmemStorage() } @@ -230,7 +231,7 @@ func TestCache(t *testing.T) { for i := 0; i < 5; i++ { extraChunk := chunktest.GenerateTestRandomChunk() - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.TODO(), extraChunk) }) if err != nil { @@ -274,7 +275,7 @@ func TestCache(t *testing.T) { // return retErr // } - // return st.Run(func(s internal.Store) error { + // return st.Run(func(s transaction.Store) error { // return s.IndexStore().Put(i) // }) // } @@ -321,7 +322,7 @@ func TestShallowCopy(t *testing.T) { // the chunkstore with chunks. for _, ch := range chunks { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.Background(), ch) }) if err != nil { @@ -353,7 +354,7 @@ func TestShallowCopy(t *testing.T) { // add the chunks to chunkstore. This simulates the reserve already populating // the chunkstore with chunks. for _, ch := range chunks1 { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.Background(), ch) }) if err != nil { @@ -395,7 +396,7 @@ func TestShallowCopyOverCap(t *testing.T) { // the chunkstore with chunks. for _, ch := range chunks { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.Background(), ch) }) if err != nil { diff --git a/pkg/storer/internal/chunkstamp/chunkstamp_test.go b/pkg/storer/internal/chunkstamp/chunkstamp_test.go index d9338cee2e2..e68b509c184 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp_test.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp_test.go @@ -15,6 +15,7 @@ import ( chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/google/go-cmp/cmp" ) @@ -152,7 +153,7 @@ func TestStoreLoadDelete(t *testing.T) { t.Fatalf("Get(...): unexpected error: have: %v; want: %v", err, storage.ErrNotFound) } - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return chunkstamp.Store(s.IndexStore(), ns, chunk) }); err != nil { t.Fatalf("Store(...): unexpected error: %v", err) @@ -199,13 +200,13 @@ func TestStoreLoadDelete(t *testing.T) { t.Run("delete stored stamp", func(t *testing.T) { if i%2 == 0 { - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return chunkstamp.Delete(s.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()) }); err != nil { t.Fatalf("Delete(...): unexpected error: %v", err) } } else { - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return chunkstamp.DeleteWithStamp(s.IndexStore(), ns, chunk.Address(), chunk.Stamp()) }); err != nil { t.Fatalf("DeleteWithStamp(...): unexpected error: %v", err) @@ -223,13 +224,13 @@ func TestStoreLoadDelete(t *testing.T) { t.Run("delete all stored stamp index", func(t *testing.T) { - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return chunkstamp.Store(s.IndexStore(), ns, chunk) }); err != nil { t.Fatalf("Store(...): unexpected error: %v", err) } - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return chunkstamp.DeleteAll(s.IndexStore(), ns, chunk.Address()) }); err != nil { t.Fatalf("DeleteAll(...): unexpected error: %v", err) diff --git a/pkg/storer/internal/chunkstore/chunkstore_test.go b/pkg/storer/internal/chunkstore/chunkstore_test.go index e49b6ec9700..8401a400a66 100644 --- a/pkg/storer/internal/chunkstore/chunkstore_test.go +++ b/pkg/storer/internal/chunkstore/chunkstore_test.go @@ -18,8 +18,8 @@ import ( "github.com/ethersphere/bee/pkg/storage/inmemstore" "github.com/ethersphere/bee/pkg/storage/storagetest" chunktest "github.com/ethersphere/bee/pkg/storage/testing" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/spf13/afero" "github.com/stretchr/testify/assert" @@ -119,7 +119,7 @@ func TestChunkStore(t *testing.T) { t.Fatal(err) } - st := internal.NewStorage(sharky, store) + st := transaction.NewStorage(sharky, store) t.Cleanup(func() { if err := store.Close(); err != nil { @@ -131,7 +131,7 @@ func TestChunkStore(t *testing.T) { t.Run("put chunks", func(t *testing.T) { for _, ch := range testChunks { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.TODO(), ch) }) if err != nil { @@ -144,7 +144,7 @@ func TestChunkStore(t *testing.T) { for idx, ch := range testChunks { // only put duplicates for odd numbered indexes if idx%2 != 0 { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.TODO(), ch) }) if err != nil { @@ -216,7 +216,7 @@ func TestChunkStore(t *testing.T) { for idx, ch := range testChunks { // Delete all even numbered indexes along with 0 if idx%2 == 0 { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Delete(context.TODO(), ch.Address()) }) if err != nil { @@ -279,7 +279,7 @@ func TestChunkStore(t *testing.T) { t.Run("delete duplicate chunks", func(t *testing.T) { for idx, ch := range testChunks { if idx%2 != 0 { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Delete(context.TODO(), ch.Address()) }) if err != nil { @@ -313,7 +313,7 @@ func TestChunkStore(t *testing.T) { t.Run("delete duplicate chunks again", func(t *testing.T) { for idx, ch := range testChunks { if idx%2 != 0 { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.ChunkStore().Delete(context.TODO(), ch.Address()) }) if err != nil { @@ -357,7 +357,7 @@ func TestIterateLocations(t *testing.T) { ctx := context.Background() for _, ch := range testChunks { - assert.NoError(t, st.Run(func(s internal.Store) error { return s.ChunkStore().Put(ctx, ch) })) + assert.NoError(t, st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(ctx, ch) })) } readCount := 0 @@ -391,7 +391,7 @@ func TestIterateLocations_Stop(t *testing.T) { defer cancel() for _, ch := range testChunks { - assert.NoError(t, st.Run(func(s internal.Store) error { return s.ChunkStore().Put(ctx, ch) })) + assert.NoError(t, st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(ctx, ch) })) } readCount := 0 @@ -422,7 +422,7 @@ func TestIterateLocations_Stop(t *testing.T) { } type chunkStore struct { - internal.Storage + transaction.Storage sharky *sharky.Store } @@ -438,5 +438,5 @@ func makeStorage(t *testing.T) *chunkStore { assert.NoError(t, sharky.Close()) }) - return &chunkStore{internal.NewStorage(sharky, store), sharky} + return &chunkStore{transaction.NewStorage(sharky, store), sharky} } diff --git a/pkg/storer/internal/internal.go b/pkg/storer/internal/internal.go index dd145119583..e948efc37f1 100644 --- a/pkg/storer/internal/internal.go +++ b/pkg/storer/internal/internal.go @@ -11,15 +11,16 @@ import ( "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" "github.com/ethersphere/bee/pkg/storage/inmemstore" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) // PutterCloserWithReference provides a Putter which can be closed with a root // swarm reference associated with this session. type PutterCloserWithReference interface { - Put(context.Context, Store, swarm.Chunk) error + Put(context.Context, transaction.Store, swarm.Chunk) error Close(storage.IndexStore, swarm.Address) error - Cleanup(Storage) error + Cleanup(transaction.Storage) error } var emptyAddr = make([]byte, swarm.HashSize) @@ -46,7 +47,7 @@ func AddressBytesOrZero(addr swarm.Address) []byte { // NewInmemStorage constructs a inmem Storage implementation which can be used // for the tests in the internal packages. -func NewInmemStorage() Storage { +func NewInmemStorage() transaction.Storage { ts := &inmemStorage{ indexStore: inmemstore.New(), chunkStore: inmemchunkstore.New(), @@ -60,7 +61,7 @@ type inmemStorage struct { chunkStore storage.ChunkStore } -func (t *inmemStorage) NewTransaction() (Transaction, func()) { +func (t *inmemStorage) NewTransaction() (transaction.Transaction, func()) { return &inmemTrx{t.indexStore, t.chunkStore}, func() {} } @@ -81,9 +82,11 @@ func (t *inmemTrx) IndexStore() storage.IndexStore { return t.indexStore } func (t *inmemTrx) ChunkStore() storage.ChunkStore { return t.chunkStore } func (t *inmemTrx) Commit() error { return nil } -func (t *inmemStorage) ReadOnly() ReadOnlyStore { return &inmemReadOnly{t.indexStore, t.chunkStore} } -func (t *inmemStorage) Close() error { return nil } -func (t *inmemStorage) Run(f func(s Store) error) error { +func (t *inmemStorage) ReadOnly() transaction.ReadOnlyStore { + return &inmemReadOnly{t.indexStore, t.chunkStore} +} +func (t *inmemStorage) Close() error { return nil } +func (t *inmemStorage) Run(f func(s transaction.Store) error) error { trx, done := t.NewTransaction() defer done() return f(trx) diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 8e172323f69..f5bf9a0c285 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -15,6 +15,7 @@ import ( storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/storageutil" "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/google/uuid" ) @@ -79,7 +80,7 @@ type collectionPutter struct { // Put adds a chunk to the pin collection. // The user of the putter MUST mutex lock the call to prevent data-races across multiple upload sessions. -func (c *collectionPutter) Put(ctx context.Context, st internal.Store, ch swarm.Chunk) error { +func (c *collectionPutter) Put(ctx context.Context, st transaction.Store, ch swarm.Chunk) error { // do not allow any Puts after putter was closed if c.closed { @@ -137,7 +138,7 @@ func (c *collectionPutter) Close(st storage.IndexStore, root swarm.Address) erro return nil } -func (c *collectionPutter) Cleanup(st internal.Storage) error { +func (c *collectionPutter) Cleanup(st transaction.Storage) error { if c.closed { return nil } @@ -146,7 +147,7 @@ func (c *collectionPutter) Cleanup(st internal.Storage) error { return fmt.Errorf("pin store: failed deleting collection chunks: %w", err) } - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { return s.IndexStore().Delete(&dirtyCollection{UUID: c.collection.UUID}) }) if err != nil { @@ -158,7 +159,7 @@ func (c *collectionPutter) Cleanup(st internal.Storage) error { } // CleanupDirty will iterate over all the dirty collections and delete them. -func CleanupDirty(st internal.Storage) error { +func CleanupDirty(st transaction.Storage) error { dirtyCollections := make([]*dirtyCollection, 0) err := st.ReadOnly().IndexStore().Iterate( @@ -211,7 +212,7 @@ func Pins(st storage.Reader) ([]swarm.Address, error) { return pins, nil } -func deleteCollectionChunks(ctx context.Context, st internal.Storage, collectionUUID []byte) error { +func deleteCollectionChunks(ctx context.Context, st transaction.Storage, collectionUUID []byte) error { chunksToDelete := make([]*pinChunkItem, 0) err := st.ReadOnly().IndexStore().Iterate( @@ -231,7 +232,7 @@ func deleteCollectionChunks(ctx context.Context, st internal.Storage, collection batchCnt := 1000 for i := 0; i < len(chunksToDelete); i += batchCnt { - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { end := i + batchCnt if end > len(chunksToDelete) { end = len(chunksToDelete) @@ -257,7 +258,7 @@ func deleteCollectionChunks(ctx context.Context, st internal.Storage, collection } // DeletePin will delete the root pin and all the chunks that are part of this collection. -func DeletePin(ctx context.Context, st internal.Storage, root swarm.Address) error { +func DeletePin(ctx context.Context, st transaction.Storage, root swarm.Address) error { collection := &pinCollectionItem{Addr: root} err := st.ReadOnly().IndexStore().Get(collection) @@ -269,7 +270,7 @@ func DeletePin(ctx context.Context, st internal.Storage, root swarm.Address) err return err } - return st.Run(func(s internal.Store) error { + return st.Run(func(s transaction.Store) error { err := s.IndexStore().Delete(collection) if err != nil { return fmt.Errorf("pin store: failed deleting root collection: %w", err) diff --git a/pkg/storer/internal/pinning/pinning_test.go b/pkg/storer/internal/pinning/pinning_test.go index 4e13cabe892..0c5b9c9707d 100644 --- a/pkg/storer/internal/pinning/pinning_test.go +++ b/pkg/storer/internal/pinning/pinning_test.go @@ -16,6 +16,7 @@ import ( chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storer/internal" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) @@ -25,7 +26,7 @@ type pinningCollection struct { dupChunks []swarm.Chunk } -func newTestStorage(t *testing.T) internal.Storage { +func newTestStorage(t *testing.T) transaction.Storage { t.Helper() storg := internal.NewInmemStorage() return storg @@ -70,7 +71,7 @@ func TestPinStore(t *testing.T) { var putter internal.PutterCloserWithReference var err error - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { putter, err = pinstore.NewCollection(s.IndexStore()) return err }) @@ -79,21 +80,21 @@ func TestPinStore(t *testing.T) { } for _, ch := range append(tc.uniqueChunks, tc.root) { - if err := st.Run(func(s internal.Store) error { + if err := st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, ch) }); err != nil { t.Fatal(err) } } for _, ch := range tc.dupChunks { - if err := st.Run(func(s internal.Store) error { + if err := st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, ch) }); err != nil { t.Fatal(err) } } - if err := st.Run(func(s internal.Store) error { + if err := st.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), tc.root.Address()) }); err != nil { t.Fatal(err) @@ -263,7 +264,7 @@ func TestPinStore(t *testing.T) { putter internal.PutterCloserWithReference err error ) - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { putter, err = pinstore.NewCollection(s.IndexStore()) return err }) @@ -271,21 +272,21 @@ func TestPinStore(t *testing.T) { t.Fatal(err) } - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, root) }) if err != nil { t.Fatal(err) } - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), root.Address()) }) if err != nil { t.Fatal(err) } - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, chunktest.GenerateTestRandomChunk()) }) if !errors.Is(err, pinstore.ErrPutterAlreadyClosed) { @@ -300,7 +301,7 @@ func TestPinStore(t *testing.T) { putter internal.PutterCloserWithReference err error ) - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { putter, err = pinstore.NewCollection(s.IndexStore()) return err }) @@ -308,14 +309,14 @@ func TestPinStore(t *testing.T) { t.Fatal(err) } - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, root) }) if err != nil { t.Fatal(err) } - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), swarm.ZeroAddress) }) if !errors.Is(err, pinstore.ErrCollectionRootAddressIsZero) { @@ -337,7 +338,7 @@ func TestCleanup(t *testing.T) { putter internal.PutterCloserWithReference err error ) - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { putter, err = pinstore.NewCollection(s.IndexStore()) return err }) @@ -346,7 +347,7 @@ func TestCleanup(t *testing.T) { } for _, ch := range chunks { - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, ch) }) if err != nil { @@ -380,7 +381,7 @@ func TestCleanup(t *testing.T) { putter internal.PutterCloserWithReference err error ) - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { putter, err = pinstore.NewCollection(s.IndexStore()) return err }) @@ -389,7 +390,7 @@ func TestCleanup(t *testing.T) { } for _, ch := range chunks { - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, ch) }) if err != nil { diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 1f54fe80a52..28b0e55ab6b 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -17,9 +17,9 @@ import ( "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/pkg/storer/internal/stampindex" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" "resenje.org/multex" @@ -37,12 +37,12 @@ type Reserve struct { radius atomic.Uint32 multx *multex.Multex - st internal.Storage + st transaction.Storage } func New( baseAddr swarm.Address, - st internal.Storage, + st transaction.Storage, capacity int, radiusSetter topology.SetStorageRadiuser, logger log.Logger, @@ -57,7 +57,7 @@ func New( multx: multex.New(), } - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { rItem := &radiusItem{} err := s.IndexStore().Get(rItem) if err != nil && !errors.Is(err, storage.ErrNotFound) { @@ -292,7 +292,7 @@ func (r *Reserve) EvictBatchBin( end = len(evicted) } - err := r.st.Run(func(s internal.Store) error { + err := r.st.Run(func(s transaction.Store) error { for _, item := range evicted[i:end] { err = r.removeChunkWithItem(ctx, s, item) if err != nil { @@ -312,7 +312,7 @@ func (r *Reserve) EvictBatchBin( func (r *Reserve) removeChunk( ctx context.Context, - trx internal.Store, + trx transaction.Store, chunkAddress swarm.Address, batchID []byte, ) error { @@ -330,7 +330,7 @@ func (r *Reserve) removeChunk( func (r *Reserve) removeChunkWithItem( ctx context.Context, - trx internal.Store, + trx transaction.Store, item *BatchRadiusItem, ) error { @@ -469,7 +469,7 @@ func (r *Reserve) EvictionTarget() int { func (r *Reserve) SetRadius(rad uint8) error { r.radius.Store(uint32(rad)) r.radiusSetter.SetStorageRadius(rad) - return r.st.Run(func(s internal.Store) error { + return r.st.Run(func(s transaction.Store) error { return s.IndexStore().Put(&radiusItem{Radius: rad}) }) } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index e9d580c155c..4220a97516e 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -22,6 +22,7 @@ import ( "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/pkg/storer/internal/reserve" "github.com/ethersphere/bee/pkg/storer/internal/stampindex" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" kademlia "github.com/ethersphere/bee/pkg/topology/mock" ) @@ -429,7 +430,7 @@ func checkStore(t *testing.T, s storage.Reader, k storage.Key, gone bool) { } } -func checkChunk(t *testing.T, s internal.ReadOnlyStore, ch swarm.Chunk, gone bool) { +func checkChunk(t *testing.T, s transaction.ReadOnlyStore, ch swarm.Chunk, gone bool) { t.Helper() h, err := s.ChunkStore().Has(context.Background(), ch.Address()) if err != nil { diff --git a/pkg/storer/internal/stampindex/stampindex_test.go b/pkg/storer/internal/stampindex/stampindex_test.go index 79122ea96ce..009c98085bc 100644 --- a/pkg/storer/internal/stampindex/stampindex_test.go +++ b/pkg/storer/internal/stampindex/stampindex_test.go @@ -14,13 +14,14 @@ import ( chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/stampindex" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" ) // newTestStorage is a helper function that creates a new storage. -func newTestStorage(t *testing.T) internal.Storage { +func newTestStorage(t *testing.T) transaction.Storage { t.Helper() inmemStorage := internal.NewInmemStorage() return inmemStorage @@ -122,7 +123,7 @@ func TestStoreLoadDelete(t *testing.T) { t.Run(ns, func(t *testing.T) { t.Run("store new stamp index", func(t *testing.T) { - err := ts.Run(func(s internal.Store) error { + err := ts.Run(func(s transaction.Store) error { return stampindex.Store(s.IndexStore(), ns, chunk) }) @@ -176,7 +177,7 @@ func TestStoreLoadDelete(t *testing.T) { t.Run("delete stored stamp index", func(t *testing.T) { - err := ts.Run(func(s internal.Store) error { + err := ts.Run(func(s transaction.Store) error { return stampindex.Delete(s.IndexStore(), ns, chunk) }) if err != nil { diff --git a/pkg/storer/internal/transaction/metrics.go b/pkg/storer/internal/transaction/metrics.go new file mode 100644 index 00000000000..e3cc263715d --- /dev/null +++ b/pkg/storer/internal/transaction/metrics.go @@ -0,0 +1,41 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction + +import ( + m "github.com/ethersphere/bee/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type metrics struct { + CommitCalls *prometheus.CounterVec + CommitDuration *prometheus.HistogramVec +} + +// newMetrics is a convenient constructor for creating new metrics. +func newMetrics() metrics { + const subsystem = "transaction" + + return metrics{ + CommitCalls: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "commit_calls", + Help: "Number of commit calls.", + }, + []string{"status"}, + ), + CommitDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "commit_duration", + Help: "The duration each commit call took.", + }, + []string{"status"}, + ), + } +} diff --git a/pkg/storer/internal/transaction.go b/pkg/storer/internal/transaction/transaction.go similarity index 80% rename from pkg/storer/internal/transaction.go rename to pkg/storer/internal/transaction/transaction.go index bcfc4bd0725..8b385caa398 100644 --- a/pkg/storer/internal/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -2,22 +2,23 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package internal +package transaction import ( "context" "errors" "fmt" "sync" + "time" + m "github.com/ethersphere/bee/pkg/metrics" "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/pkg/swarm" + "github.com/prometheus/client_golang/prometheus" ) -// TODO(esad): metrics - /* The rules of the transction is as follows: @@ -55,10 +56,12 @@ type store struct { bstore storage.BatchedStore chunkStoreMtx *sync.Mutex + + metrics metrics } func NewStorage(sharky *sharky.Store, bstore storage.BatchedStore) Storage { - return &store{sharky, bstore, &sync.Mutex{}} + return &store{sharky, bstore, &sync.Mutex{}, newMetrics()} } type transaction struct { @@ -68,6 +71,7 @@ type transaction struct { sharkyTrx *sharkyTrx chunkStoreMtx *sync.Mutex cleanup bool + metrics metrics } type chunkStoreTrx struct { @@ -91,39 +95,25 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } -type indexTrx struct { - store storage.Reader - batch storage.Batch -} - -func (s *indexTrx) Get(i storage.Item) error { return s.store.Get(i) } -func (s *indexTrx) Has(k storage.Key) (bool, error) { return s.store.Has(k) } -func (s *indexTrx) GetSize(k storage.Key) (int, error) { return s.store.GetSize(k) } -func (s *indexTrx) Iterate(q storage.Query, f storage.IterateFn) error { - return s.store.Iterate(q, f) -} -func (s *indexTrx) Count(k storage.Key) (int, error) { return s.store.Count(k) } -func (s *indexTrx) Put(i storage.Item) error { return s.batch.Put(i) } -func (s *indexTrx) Delete(i storage.Item) error { return s.batch.Delete(i) } - // NewTransaction returns a new storage transaction. // Commit must be called to persist data to the disk. // The callback function must be the final call of the transaction whether or not any errors // were returned from the storage ops or commit. Safest option is to do a defer call immediately after // creating the transaction. // Calls made to the transaction are NOT thread-safe. -func (x *store) NewTransaction() (Transaction, func()) { +func (s *store) NewTransaction() (Transaction, func()) { - b, _ := x.bstore.Batch(context.TODO()) - indexTrx := &indexTrx{x.bstore, b} - sharyTrx := &sharkyTrx{sharky: x.sharky} + b, _ := s.bstore.Batch(context.TODO()) + indexTrx := &indexTrx{s.bstore, b} + sharyTrx := &sharkyTrx{sharky: s.sharky} t := &transaction{ batch: b, indexstore: indexTrx, chunkStore: &chunkStoreTrx{indexTrx, sharyTrx}, sharkyTrx: sharyTrx, - chunkStoreMtx: x.chunkStoreMtx, + chunkStoreMtx: s.chunkStoreMtx, + metrics: s.metrics, } return t, func() { @@ -143,14 +133,14 @@ type readOnly struct { chunkStore *chunkStoreTrx } -func (x *store) ReadOnly() ReadOnlyStore { - indexStore := &indexTrx{store: x.bstore} - sharyTrx := &sharkyTrx{sharky: x.sharky} +func (s *store) ReadOnly() ReadOnlyStore { + indexStore := &indexTrx{store: s.bstore} + sharyTrx := &sharkyTrx{sharky: s.sharky} return &readOnly{indexStore, &chunkStoreTrx{indexStore, sharyTrx}} } -func (x *store) Run(f func(Store) error) error { - trx, done := x.NewTransaction() +func (s *store) Run(f func(Store) error) error { + trx, done := s.NewTransaction() defer done() err := f(trx) @@ -160,8 +150,13 @@ func (x *store) Run(f func(Store) error) error { return trx.Commit() } -func (x *store) Close() error { - return errors.Join(x.bstore.Close(), x.sharky.Close()) +// Metrics returns set of prometheus collectors. +func (s *store) Metrics() []prometheus.Collector { + return m.PrometheusCollectorsFromFields(s.metrics) +} + +func (s *store) Close() error { + return errors.Join(s.bstore.Close(), s.sharky.Close()) } func (t *readOnly) IndexStore() storage.Reader { @@ -172,6 +167,21 @@ func (t *readOnly) ChunkStore() storage.ReadOnlyChunkStore { return t.chunkStore } +type indexTrx struct { + store storage.Reader + batch storage.Batch +} + +func (s *indexTrx) Get(i storage.Item) error { return s.store.Get(i) } +func (s *indexTrx) Has(k storage.Key) (bool, error) { return s.store.Has(k) } +func (s *indexTrx) GetSize(k storage.Key) (int, error) { return s.store.GetSize(k) } +func (s *indexTrx) Iterate(q storage.Query, f storage.IterateFn) error { + return s.store.Iterate(q, f) +} +func (s *indexTrx) Count(k storage.Key) (int, error) { return s.store.Count(k) } +func (s *indexTrx) Put(i storage.Item) error { return s.batch.Put(i) } +func (s *indexTrx) Delete(i storage.Item) error { return s.batch.Delete(i) } + // IndexStore gives acces to the index store of the transaction. // Note that no writes are persisted to the disk until the commit is called. // Reads return data from the disk and not what has been written to the transaction before the commit call. @@ -191,13 +201,20 @@ func (t *transaction) ChunkStore() storage.ChunkStore { return t.chunkStore } -func (t *transaction) Commit() error { +func (t *transaction) Commit() (err error) { - defer func() { - t.sharkyTrx.writtenLocs = nil - }() + defer func(ti time.Time) { + t.sharkyTrx.writtenLocs = nil // clear written locs so that the done callback does not remove them + if err != nil { + t.metrics.CommitCalls.WithLabelValues("failure").Inc() + t.metrics.CommitDuration.WithLabelValues("failure").Observe(float64(time.Since(ti))) + } else { + t.metrics.CommitCalls.WithLabelValues("success").Inc() + t.metrics.CommitDuration.WithLabelValues("success").Observe(float64(time.Since(ti))) + } + }(time.Now()) - err := t.batch.Commit() + err = t.batch.Commit() if err != nil { for _, l := range t.sharkyTrx.writtenLocs { if rerr := t.sharkyTrx.sharky.Release(context.TODO(), l); rerr != nil { diff --git a/pkg/storer/internal/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go similarity index 96% rename from pkg/storer/internal/transaction_test.go rename to pkg/storer/internal/transaction/transaction_test.go index 3378686e234..3dd793854a5 100644 --- a/pkg/storer/internal/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package internal_test +package transaction_test import ( "context" @@ -15,8 +15,8 @@ import ( "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/leveldbstore" test "github.com/ethersphere/bee/pkg/storage/testing" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/cache" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" "github.com/stretchr/testify/assert" ) @@ -38,7 +38,7 @@ func Test_TransactionStorage(t *testing.T) { store, err := leveldbstore.New("", nil) assert.NoError(t, err) - st := internal.NewStorage(sharkyStore, store) + st := transaction.NewStorage(sharkyStore, store) t.Cleanup(func() { assert.NoError(t, st.Close()) }) @@ -106,7 +106,7 @@ func Test_TransactionStorage(t *testing.T) { ch1 := test.GenerateTestRandomChunk() ch2 := test.GenerateTestRandomChunk() - _ = st.Run(func(s internal.Store) error { + _ = st.Run(func(s transaction.Store) error { assert.NoError(t, s.IndexStore().Put(&cache.CacheEntryItem{Address: ch1.Address(), AccessTimestamp: 1})) assert.NoError(t, s.ChunkStore().Put(context.Background(), ch1)) assert.NoError(t, s.IndexStore().Put(&cache.CacheEntryItem{Address: ch2.Address(), AccessTimestamp: 1})) @@ -132,7 +132,7 @@ func Test_TransactionStorage(t *testing.T) { assert.Equal(t, ch1.Data(), ch2_get.Data()) assert.Equal(t, ch1.Address(), ch2_get.Address()) - _ = st.Run(func(s internal.Store) error { + _ = st.Run(func(s transaction.Store) error { assert.NoError(t, s.IndexStore().Delete(&cache.CacheEntryItem{Address: ch1.Address(), AccessTimestamp: 1})) assert.NoError(t, s.ChunkStore().Delete(context.Background(), ch1.Address())) assert.NoError(t, s.IndexStore().Delete(&cache.CacheEntryItem{Address: ch2.Address(), AccessTimestamp: 1})) diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 09372f9b689..e84b7ce175a 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -18,6 +18,7 @@ import ( "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/pkg/storer/internal/stampindex" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) @@ -403,7 +404,7 @@ func NewPutter(s storage.IndexStore, tagID uint64) (internal.PutterCloserWithRef // - pushItem entry to make it available for PushSubscriber // - add chunk to the chunkstore till it is synced // The user of the putter MUST mutex lock the call to prevent data-races across multiple upload sessions. -func (u *uploadPutter) Put(ctx context.Context, st internal.Store, chunk swarm.Chunk) error { +func (u *uploadPutter) Put(ctx context.Context, st transaction.Store, chunk swarm.Chunk) error { if u.closed { return errPutterAlreadyClosed } @@ -510,7 +511,7 @@ func (u *uploadPutter) Close(s storage.IndexStore, addr swarm.Address) error { return nil } -func (u *uploadPutter) Cleanup(st internal.Storage) error { +func (u *uploadPutter) Cleanup(st transaction.Storage) error { if u.closed { return nil } @@ -544,7 +545,7 @@ func (u *uploadPutter) Cleanup(st internal.Storage) error { batchCnt := 1000 for i := 0; i < len(itemsToDelete); i += batchCnt { - _ = st.Run(func(s internal.Store) error { + _ = st.Run(func(s transaction.Store) error { end := i + batchCnt if end > len(itemsToDelete) { end = len(itemsToDelete) @@ -557,13 +558,13 @@ func (u *uploadPutter) Cleanup(st internal.Storage) error { }) } - return st.Run(func(s internal.Store) error { + return st.Run(func(s transaction.Store) error { return s.IndexStore().Delete(&dirtyTagItem{TagID: u.tagID}) }) } // Remove removes all the state associated with the given address and batchID. -func remove(st internal.Store, address swarm.Address, batchID []byte) error { +func remove(st transaction.Store, address swarm.Address, batchID []byte) error { ui := &uploadItem{ Address: address, BatchID: batchID, @@ -599,7 +600,7 @@ func remove(st internal.Store, address swarm.Address, batchID []byte) error { } // CleanupDirty does a best-effort cleanup of dirty tags. This is called on startup. -func CleanupDirty(st internal.Storage) error { +func CleanupDirty(st transaction.Storage) error { dirtyTags := make([]*dirtyTagItem, 0) err := st.ReadOnly().IndexStore().Iterate( @@ -626,7 +627,7 @@ func CleanupDirty(st internal.Storage) error { // Report is the implementation of the PushReporter interface. func Report( ctx context.Context, - trx internal.Store, + trx transaction.Store, chunk swarm.Chunk, state storage.ChunkState, ) error { @@ -796,7 +797,7 @@ func ListAllTags(st storage.Reader) ([]TagItem, error) { return tags, nil } -func Iterate(ctx context.Context, s internal.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error { +func Iterate(ctx context.Context, s transaction.ReadOnlyStore, consumerFn func(chunk swarm.Chunk) (bool, error)) error { return s.IndexStore().Iterate(storage.Query{ Factory: func() storage.Item { return &pushItem{} }, }, func(r storage.Result) (bool, error) { diff --git a/pkg/storer/internal/upload/uploadstore_test.go b/pkg/storer/internal/upload/uploadstore_test.go index 126f92f65be..a4aa1849adb 100644 --- a/pkg/storer/internal/upload/uploadstore_test.go +++ b/pkg/storer/internal/upload/uploadstore_test.go @@ -21,6 +21,7 @@ import ( "github.com/ethersphere/bee/pkg/storage/storagetest" chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/storer/internal/upload" "github.com/ethersphere/bee/pkg/swarm" "github.com/google/go-cmp/cmp" @@ -430,7 +431,7 @@ func TestItemDirtyTagItem(t *testing.T) { } } -func newTestStorage(t *testing.T) internal.Storage { +func newTestStorage(t *testing.T) transaction.Storage { t.Helper() storg := internal.NewInmemStorage() @@ -458,7 +459,7 @@ func TestChunkPutter(t *testing.T) { for _, chunk := range chunktest.GenerateTestRandomChunks(10) { t.Run(fmt.Sprintf("chunk %s", chunk.Address()), func(t *testing.T) { t.Run("put new chunk", func(t *testing.T) { - err := ts.Run(func(s internal.Store) error { + err := ts.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, chunk) }) if err != nil { @@ -467,7 +468,7 @@ func TestChunkPutter(t *testing.T) { }) t.Run("put existing chunk", func(t *testing.T) { - err := ts.Run(func(s internal.Store) error { + err := ts.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, chunk) }) if err != nil { @@ -553,7 +554,7 @@ func TestChunkPutter(t *testing.T) { t.Run("close with reference", func(t *testing.T) { addr := swarm.RandAddress(t) - err := ts.Run(func(s internal.Store) error { + err := ts.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), addr) }) if err != nil { @@ -562,7 +563,7 @@ func TestChunkPutter(t *testing.T) { var ti upload.TagItem - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { ti, err = upload.TagInfo(s.IndexStore(), tag.TagID) return err }) @@ -583,7 +584,7 @@ func TestChunkPutter(t *testing.T) { }) t.Run("error after close", func(t *testing.T) { - err := ts.Run(func(s internal.Store) error { + err := ts.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, chunktest.GenerateTestRandomChunk()) }) if !errors.Is(err, upload.ErrPutterAlreadyClosed) { @@ -595,7 +596,7 @@ func TestChunkPutter(t *testing.T) { var putter internal.PutterCloserWithReference - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }) @@ -604,7 +605,7 @@ func TestChunkPutter(t *testing.T) { } for _, chunk := range chunktest.GenerateTestRandomChunks(5) { - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, chunk) }); err != nil { t.Fatalf("Put(...): unexpected error: %v", err) @@ -613,7 +614,7 @@ func TestChunkPutter(t *testing.T) { // close with different address addr := swarm.RandAddress(t) - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), addr) }); err != nil { t.Fatalf("Close(...): unexpected error %v", err) @@ -648,14 +649,14 @@ func TestChunkReporter(t *testing.T) { putter internal.PutterCloserWithReference err error ) - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }); err != nil { t.Fatalf("failed creating tag: %v", err) } - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }); err != nil { @@ -665,7 +666,7 @@ func TestChunkReporter(t *testing.T) { for idx, chunk := range chunktest.GenerateTestRandomChunks(10) { t.Run(fmt.Sprintf("chunk %s", chunk.Address()), func(t *testing.T) { - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, chunk) }); err != nil { t.Fatalf("Put(...): unexpected error: %v", err) @@ -673,7 +674,7 @@ func TestChunkReporter(t *testing.T) { report := func(ch swarm.Chunk, state int) { t.Helper() - if err := ts.Run(func(s internal.Store) error { + if err := ts.Run(func(s transaction.Store) error { return upload.Report(context.Background(), s, ch, state) }); err != nil { t.Fatalf("Report(...): unexpected error: %v", err) @@ -779,13 +780,13 @@ func TestChunkReporter(t *testing.T) { t.Run("close with reference", func(t *testing.T) { addr := swarm.RandAddress(t) - err := ts.Run(func(s internal.Store) error { return putter.Close(s.IndexStore(), addr) }) + err := ts.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), addr) }) if err != nil { t.Fatalf("Close(...): unexpected error %v", err) } var ti upload.TagItem - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { ti, err = upload.TagInfo(s.IndexStore(), tag.TagID) return err }) @@ -816,7 +817,7 @@ func TestStampIndexHandling(t *testing.T) { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -825,7 +826,7 @@ func TestStampIndexHandling(t *testing.T) { } var putter internal.PutterCloserWithReference - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }) @@ -914,7 +915,7 @@ func TestNextTagID(t *testing.T) { for i := 1; i < 4; i++ { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -947,7 +948,7 @@ func TestListTags(t *testing.T) { for i := range want { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -986,7 +987,7 @@ func TestIterate(t *testing.T) { t.Run("iterates chunks", func(t *testing.T) { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -995,7 +996,7 @@ func TestIterate(t *testing.T) { } var putter internal.PutterCloserWithReference - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }) @@ -1030,7 +1031,7 @@ func TestIterate(t *testing.T) { t.Fatalf("expected to iterate 0 chunks, got: %v", count) } - err = ts.Run(func(s internal.Store) error { return putter.Close(s.IndexStore(), swarm.ZeroAddress) }) + err = ts.Run(func(s transaction.Store) error { return putter.Close(s.IndexStore(), swarm.ZeroAddress) }) if err != nil { t.Fatalf("Close(...) error: %v", err) } @@ -1059,7 +1060,7 @@ func TestDeleteTag(t *testing.T) { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -1067,7 +1068,7 @@ func TestDeleteTag(t *testing.T) { t.Fatalf("failed creating tag: %v", err) } - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { return upload.DeleteTag(s.IndexStore(), tag.TagID) }) if err != nil { @@ -1087,7 +1088,7 @@ func TestBatchIDForChunk(t *testing.T) { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -1096,7 +1097,7 @@ func TestBatchIDForChunk(t *testing.T) { } var putter internal.PutterCloserWithReference - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }) @@ -1129,7 +1130,7 @@ func TestCleanup(t *testing.T) { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -1138,7 +1139,7 @@ func TestCleanup(t *testing.T) { } var putter internal.PutterCloserWithReference - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }) @@ -1178,7 +1179,7 @@ func TestCleanup(t *testing.T) { var tag upload.TagItem var err error - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { tag, err = upload.NextTag(s.IndexStore()) return err }) @@ -1187,7 +1188,7 @@ func TestCleanup(t *testing.T) { } var putter internal.PutterCloserWithReference - err = ts.Run(func(s internal.Store) error { + err = ts.Run(func(s transaction.Store) error { putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) return err }) @@ -1221,9 +1222,9 @@ func TestCleanup(t *testing.T) { }) } -func put(t *testing.T, ts internal.Storage, putter internal.PutterCloserWithReference, ch swarm.Chunk) error { +func put(t *testing.T, ts transaction.Storage, putter internal.PutterCloserWithReference, ch swarm.Chunk) error { t.Helper() - return ts.Run(func(s internal.Store) error { + return ts.Run(func(s transaction.Store) error { return putter.Put(context.Background(), s, ch) }) } diff --git a/pkg/storer/migration/all_steps.go b/pkg/storer/migration/all_steps.go index d43696067e7..59f5a4b28e2 100644 --- a/pkg/storer/migration/all_steps.go +++ b/pkg/storer/migration/all_steps.go @@ -7,15 +7,15 @@ package migration import ( "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/migration" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" ) // AfterInitSteps lists all migration steps for localstore IndexStore after the localstore is intiated. func AfterInitSteps( sharkyPath string, sharkyNoOfShards int, - st internal.Storage, + st transaction.Storage, ) migration.Steps { return map[uint64]migration.StepFn{ 1: step_01, diff --git a/pkg/storer/migration/all_steps_test.go b/pkg/storer/migration/all_steps_test.go index 0dd349f891a..29e0a901b8e 100644 --- a/pkg/storer/migration/all_steps_test.go +++ b/pkg/storer/migration/all_steps_test.go @@ -12,6 +12,7 @@ import ( "github.com/ethersphere/bee/pkg/storage/inmemstore" "github.com/ethersphere/bee/pkg/storage/migration" "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" localmigration "github.com/ethersphere/bee/pkg/storer/migration" ) @@ -33,7 +34,7 @@ func TestPreSteps(t *testing.T) { t.Parallel() store := internal.NewInmemStorage() - err := store.Run(func(s internal.Store) error { + err := store.Run(func(s transaction.Store) error { return migration.Migrate(s.IndexStore(), "migration", localmigration.AfterInitSteps("", 4, store)) }) assert.NoError(t, err) diff --git a/pkg/storer/migration/step_02.go b/pkg/storer/migration/step_02.go index 6e2391d4de0..e30cd4b6416 100644 --- a/pkg/storer/migration/step_02.go +++ b/pkg/storer/migration/step_02.go @@ -8,15 +8,15 @@ import ( "time" storage "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/cache" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) // step_02 migrates the cache to the new format. // the old cacheEntry item has the same key, but the value is different. So only // a Put is needed. -func step_02(st internal.Storage) func() error { +func step_02(st transaction.Storage) func() error { return func() error { diff --git a/pkg/storer/migration/step_02_test.go b/pkg/storer/migration/step_02_test.go index 050c01c62f8..65eb3109444 100644 --- a/pkg/storer/migration/step_02_test.go +++ b/pkg/storer/migration/step_02_test.go @@ -13,6 +13,7 @@ import ( storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/cache" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" ) @@ -56,7 +57,7 @@ func Test_Step_02(t *testing.T) { for i := 0; i < 10; i++ { entry := &testEntry{address: swarm.RandAddress(t)} addrs = append(addrs, entry) - err := store.Run(func(s internal.Store) error { + err := store.Run(func(s transaction.Store) error { return s.IndexStore().Put(entry) }) assert.NoError(t, err) diff --git a/pkg/storer/migration/step_03.go b/pkg/storer/migration/step_03.go index 60303bbc192..384883a5020 100644 --- a/pkg/storer/migration/step_03.go +++ b/pkg/storer/migration/step_03.go @@ -11,15 +11,15 @@ import ( "github.com/ethersphere/bee/pkg/log" storage "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) // step_03 is a migration step that removes all BinItem entries and migrates // ChunkBinItem and BatchRadiusItem entries to use a new BinID field. func step_03( - st internal.Storage, + st transaction.Storage, chunkType func(swarm.Chunk) swarm.ChunkType, ) func() error { return func() error { @@ -35,7 +35,7 @@ func step_03( // STEP 1 - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { for i := uint8(0); i < swarm.MaxBins; i++ { err := s.IndexStore().Delete(&reserve.BinItem{Bin: i}) if err != nil { @@ -72,7 +72,7 @@ func step_03( end = len(chunkBinItems) } - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { for _, item := range chunkBinItems[i:end] { err := s.IndexStore().Delete(item) if err != nil { @@ -116,7 +116,7 @@ func step_03( end = len(batchRadiusItems) } - err := st.Run(func(s internal.Store) error { + err := st.Run(func(s transaction.Store) error { for _, item := range batchRadiusItems[i:end] { chunk, err := s.ChunkStore().Get(context.Background(), item.Address) if err != nil && !errors.Is(err, storage.ErrNotFound) { @@ -133,7 +133,7 @@ func step_03( } else { var newBinID uint64 - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { newBinID, err = rs.IncBinID(s.IndexStore(), item.Bin) return err }) diff --git a/pkg/storer/migration/step_03_test.go b/pkg/storer/migration/step_03_test.go index 8b2ada23a7b..90a31e21ae4 100644 --- a/pkg/storer/migration/step_03_test.go +++ b/pkg/storer/migration/step_03_test.go @@ -13,6 +13,7 @@ import ( chunktest "github.com/ethersphere/bee/pkg/storage/testing" "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" "github.com/stretchr/testify/assert" @@ -31,7 +32,7 @@ func Test_Step_03(t *testing.T) { var chunksPerPO uint64 = 2 for i := uint8(0); i < swarm.MaxBins; i++ { - err := store.Run(func(s internal.Store) error { + err := store.Run(func(s transaction.Store) error { return s.IndexStore().Put(&reserve.BinItem{Bin: i, BinID: 10}) }) assert.NoError(t, err) @@ -48,7 +49,7 @@ func Test_Step_03(t *testing.T) { BatchID: ch.Stamp().BatchID(), ChunkType: swarm.ChunkTypeContentAddressed, } - err := store.Run(func(s internal.Store) error { + err := store.Run(func(s transaction.Store) error { return s.IndexStore().Put(cb) }) if err != nil { @@ -61,7 +62,7 @@ func Test_Step_03(t *testing.T) { Address: ch.Address(), BinID: 0, } - err = store.Run(func(s internal.Store) error { + err = store.Run(func(s transaction.Store) error { return s.IndexStore().Put(br) }) if err != nil { @@ -73,7 +74,7 @@ func Test_Step_03(t *testing.T) { continue } - err = store.Run(func(s internal.Store) error { + err = store.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.Background(), ch) }) if err != nil { diff --git a/pkg/storer/migration/step_04.go b/pkg/storer/migration/step_04.go index 2c8c5cbc5d2..fc84bda4eb1 100644 --- a/pkg/storer/migration/step_04.go +++ b/pkg/storer/migration/step_04.go @@ -10,8 +10,8 @@ import ( "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sharky" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) @@ -20,7 +20,7 @@ import ( func step_04( sharkyBasePath string, sharkyNoOfShards int, - st internal.Storage, + st transaction.Storage, ) func() error { return func() error { // for in-mem store, skip this step diff --git a/pkg/storer/migration/step_04_test.go b/pkg/storer/migration/step_04_test.go index 7656f1eb59e..5411a8a87ac 100644 --- a/pkg/storer/migration/step_04_test.go +++ b/pkg/storer/migration/step_04_test.go @@ -14,8 +14,8 @@ import ( "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/storage/inmemstore" chunktest "github.com/ethersphere/bee/pkg/storage/testing" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" "github.com/stretchr/testify/assert" @@ -38,21 +38,21 @@ func Test_Step_04(t *testing.T) { store := inmemstore.New() - storage := internal.NewStorage(sharkyStore, store) + storage := transaction.NewStorage(sharkyStore, store) stepFn := localmigration.Step_04(sharkyDir, 1, storage) chunks := chunktest.GenerateTestRandomChunks(10) for _, ch := range chunks { - err = storage.Run(func(s internal.Store) error { + err = storage.Run(func(s transaction.Store) error { return s.ChunkStore().Put(context.Background(), ch) }) assert.NoError(t, err) } for _, ch := range chunks[:2] { - err = storage.Run(func(s internal.Store) error { + err = storage.Run(func(s transaction.Store) error { return s.IndexStore().Delete(&chunkstore.RetrievalIndexItem{Address: ch.Address()}) }) assert.NoError(t, err) @@ -66,7 +66,7 @@ func Test_Step_04(t *testing.T) { sharkyStore, err = sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) assert.NoError(t, err) - store2 := internal.NewStorage(sharkyStore, store) + store2 := transaction.NewStorage(sharkyStore, store) // check that the chunks are still there for _, ch := range chunks[2:] { diff --git a/pkg/storer/pinstore.go b/pkg/storer/pinstore.go index ff11cdb4d21..a19509b3c7c 100644 --- a/pkg/storer/pinstore.go +++ b/pkg/storer/pinstore.go @@ -12,6 +12,7 @@ import ( storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) @@ -21,7 +22,7 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) { pinningPutter internal.PutterCloserWithReference err error ) - err = db.storage.Run(func(store internal.Store) error { + err = db.storage.Run(func(store transaction.Store) error { pinningPutter, err = pinstore.NewCollection(store.IndexStore()) if err != nil { return fmt.Errorf("pinstore.NewCollection: %w", err) @@ -36,7 +37,7 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) { Putter: putterWithMetrics{ storage.PutterFunc( func(ctx context.Context, chunk swarm.Chunk) error { - return db.storage.Run(func(s internal.Store) error { + return db.storage.Run(func(s transaction.Store) error { unlock := db.Lock(uploadsLock) defer unlock() return pinningPutter.Put(ctx, s, chunk) @@ -49,7 +50,7 @@ func (db *DB) NewCollection(ctx context.Context) (PutterSession, error) { done: func(address swarm.Address) error { unlock := db.Lock(uploadsLock) defer unlock() - return db.storage.Run(func(s internal.Store) error { + return db.storage.Run(func(s transaction.Store) error { return pinningPutter.Close(s.IndexStore(), address) }) }, diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 690736e7302..d6e2185cf8c 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -18,8 +18,8 @@ import ( "github.com/ethersphere/bee/pkg/postage" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/storageutil" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/swarm" ) @@ -115,7 +115,7 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) { evictBatches := make(map[string]bool) - err := db.storage.Run(func(t internal.Store) error { + err := db.storage.Run(func(t transaction.Store) error { return db.reserve.IterateChunksItems(0, func(ci reserve.ChunkItem) (bool, error) { if ci.Bin >= radius { count++ @@ -232,7 +232,7 @@ func (db *DB) evictExpiredBatches(ctx context.Context) error { if evicted > 0 { db.logger.Debug("evicted expired batch", "batch_id", hex.EncodeToString(batchID), "total_evicted", evicted) } - err = db.storage.Run(func(st internal.Store) error { + err = db.storage.Run(func(st transaction.Store) error { return st.IndexStore().Delete(&expiredBatchItem{BatchID: batchID}) }) if err != nil { @@ -297,7 +297,7 @@ func (db *DB) EvictBatch(ctx context.Context, batchID []byte) error { return nil } - err := db.storage.Run(func(tx internal.Store) error { + err := db.storage.Run(func(tx transaction.Store) error { return tx.IndexStore().Put(&expiredBatchItem{BatchID: batchID}) }) if err != nil { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 9c710901410..9f0cb489241 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -27,11 +27,11 @@ import ( "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/leveldbstore" "github.com/ethersphere/bee/pkg/storage/migration" - "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/storer/internal/cache" "github.com/ethersphere/bee/pkg/storer/internal/events" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/storer/internal/upload" localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" @@ -211,7 +211,7 @@ func closer(closers ...io.Closer) io.Closer { }) } -func initInmemRepository() (internal.Storage, io.Closer, error) { +func initInmemRepository() (transaction.Storage, io.Closer, error) { store, err := leveldbstore.New("", nil) if err != nil { return nil, nil, fmt.Errorf("failed creating inmem levelDB index store: %w", err) @@ -226,7 +226,7 @@ func initInmemRepository() (internal.Storage, io.Closer, error) { return nil, nil, fmt.Errorf("failed creating inmem sharky instance: %w", err) } - return internal.NewStorage(sharky, store), closer(store, sharky), nil + return transaction.NewStorage(sharky, store), closer(store, sharky), nil } // loggerName is the tree path name of the logger for this package. @@ -273,7 +273,7 @@ func initDiskRepository( ctx context.Context, basePath string, opts *Options, -) (internal.Storage, io.Closer, error) { +) (transaction.Storage, io.Closer, error) { store, err := initStore(basePath, opts) if err != nil { @@ -352,7 +352,7 @@ func initDiskRepository( return nil, nil, fmt.Errorf("failed creating sharky instance: %w", err) } - return internal.NewStorage(sharky, store), closer(store, sharky, recoveryCloser), nil + return transaction.NewStorage(sharky, store), closer(store, sharky, recoveryCloser), nil } const lockKeyNewSession string = "new_session" @@ -408,7 +408,7 @@ type DB struct { tracer *tracing.Tracer metrics metrics - storage internal.Storage + storage transaction.Storage lock *multex.Multex cacheObj *cache.Cache retrieval retrieval.Interface @@ -440,7 +440,7 @@ type workerOpts struct { // component stores. func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { var ( - st internal.Storage + st transaction.Storage err error dbCloser io.Closer ) @@ -473,7 +473,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { sharkyBasePath = path.Join(dirPath, sharkyPath) } - err = st.Run(func(s internal.Store) error { + err = st.Run(func(s transaction.Store) error { return migration.Migrate( s.IndexStore(), "migration", @@ -561,9 +561,9 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { // Metrics returns set of prometheus collectors. func (db *DB) Metrics() []prometheus.Collector { collectors := m.PrometheusCollectorsFromFields(db.metrics) - // if v, ok := db.repo.(m.Collector); ok { - // collectors = append(collectors, v.Metrics()...) - // } + if v, ok := db.storage.(m.Collector); ok { + collectors = append(collectors, v.Metrics()...) + } return collectors } diff --git a/pkg/storer/storer_test.go b/pkg/storer/storer_test.go index d6520045849..2eaddbfe08e 100644 --- a/pkg/storer/storer_test.go +++ b/pkg/storer/storer_test.go @@ -19,6 +19,7 @@ import ( "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/storer/internal" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/storer/internal/upload" localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" @@ -28,7 +29,7 @@ import ( func verifyChunks( t *testing.T, - st internal.Storage, + st transaction.Storage, chunks []swarm.Chunk, has bool, ) { @@ -49,7 +50,7 @@ func verifyChunks( func verifySessionInfo( t *testing.T, - st internal.Storage, + st transaction.Storage, sessionID uint64, chunks []swarm.Chunk, has bool, @@ -75,7 +76,7 @@ func verifySessionInfo( func verifyPinCollection( t *testing.T, - st internal.Storage, + st transaction.Storage, root swarm.Chunk, chunks []swarm.Chunk, has bool, diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index 0a20513878b..5688c81e16e 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -13,6 +13,7 @@ import ( storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/storer/internal/upload" "github.com/ethersphere/bee/pkg/swarm" ) @@ -26,7 +27,7 @@ func (db *DB) Report(ctx context.Context, chunk swarm.Chunk, state storage.Chunk unlock := db.Lock(uploadsLock) defer unlock() - err := db.storage.Run(func(s internal.Store) error { + err := db.storage.Run(func(s transaction.Store) error { return upload.Report(ctx, s, chunk, state) }) if err != nil { @@ -48,7 +49,7 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession err error ) - err = db.storage.Run(func(s internal.Store) error { + err = db.storage.Run(func(s transaction.Store) error { uploadPutter, err = upload.NewPutter(s.IndexStore(), tagID) if err != nil { return fmt.Errorf("upload.NewPutter: %w", err) @@ -72,7 +73,7 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { unlock := db.Lock(uploadsLock) defer unlock() - return db.storage.Run(func(s internal.Store) error { + return db.storage.Run(func(s transaction.Store) error { return errors.Join( uploadPutter.Put(ctx, s, chunk), func() error { @@ -91,7 +92,7 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession defer db.events.Trigger(subscribePushEventKey) unlock := db.Lock(uploadsLock) defer unlock() - return db.storage.Run(func(s internal.Store) error { + return db.storage.Run(func(s transaction.Store) error { return errors.Join( uploadPutter.Close(s.IndexStore(), address), func() error { @@ -142,7 +143,7 @@ func (db *DB) Session(tagID uint64) (SessionInfo, error) { // DeleteSession is the implementation of the UploadStore.DeleteSession method. func (db *DB) DeleteSession(tagID uint64) error { - return db.storage.Run(func(s internal.Store) error { + return db.storage.Run(func(s transaction.Store) error { return upload.DeleteTag(s.IndexStore(), tagID) }) }