Skip to content

Commit

Permalink
chore: metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Jan 23, 2024
1 parent be41e7c commit e295f79
Show file tree
Hide file tree
Showing 30 changed files with 283 additions and 208 deletions.
4 changes: 2 additions & 2 deletions pkg/storer/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/swarm"
)

Expand Down Expand Up @@ -97,7 +97,7 @@ func (db *DB) Cache() storage.Putter {
}

// CacheShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore.
func (db *DB) CacheShallowCopy(ctx context.Context, store internal.Storage, addrs ...swarm.Address) error {
func (db *DB) CacheShallowCopy(ctx context.Context, store transaction.Storage, addrs ...swarm.Address) error {
defer db.triggerCacheEviction()
dur := captureDuration(time.Now())
err := db.cacheObj.ShallowCopy(ctx, store, addrs...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storer/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
package storer

import (
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/events"
"github.com/ethersphere/bee/pkg/storer/internal/reserve"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
)

func (db *DB) Reserve() *reserve.Reserve {
return db.reserve
}

func (db *DB) Storage() internal.Storage {
func (db *DB) Storage() transaction.Storage {
return db.storage
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"

storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
)
Expand Down Expand Up @@ -72,7 +72,7 @@ func (c *Cache) Capacity() uint64 { return uint64(c.capacity) }

// Putter returns a Storage.Putter instance which adds the chunk to the underlying
// chunkstore and also adds a Cache entry for the chunk.
func (c *Cache) Putter(store internal.Storage) storage.Putter {
func (c *Cache) Putter(store transaction.Storage) storage.Putter {
return storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error {

c.chunkLock.Lock(chunk.Address().ByteString())
Expand Down Expand Up @@ -127,7 +127,7 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter {
// part of cache it will update the cache indexes. If the operation to update the
// cache indexes fail, we need to fail the operation as this should signal the user
// of this getter to rollback the operation.
func (c *Cache) Getter(store internal.Storage) storage.Getter {
func (c *Cache) Getter(store transaction.Storage) storage.Getter {
return storage.GetterFunc(func(ctx context.Context, address swarm.Address) (swarm.Chunk, error) {

c.chunkLock.Lock(address.ByteString())
Expand Down Expand Up @@ -188,7 +188,7 @@ func (c *Cache) Getter(store internal.Storage) storage.Getter {
// ShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore.
func (c *Cache) ShallowCopy(
ctx context.Context,
store internal.Storage,
store transaction.Storage,
addrs ...swarm.Address,
) (err error) {

Expand All @@ -197,7 +197,7 @@ func (c *Cache) ShallowCopy(

defer func() {
if err != nil {
err = store.Run(func(s internal.Store) error {
err = store.Run(func(s transaction.Store) error {
var rerr error
for _, addr := range addrs {
rerr = errors.Join(rerr, s.ChunkStore().Delete(context.Background(), addr))
Expand All @@ -210,7 +210,7 @@ func (c *Cache) ShallowCopy(
//consider only the amount that can fit, the rest should be deleted from the chunkstore.
if len(addrs) > c.capacity {

_ = store.Run(func(s internal.Store) error {
_ = store.Run(func(s transaction.Store) error {
for _, addr := range addrs[:len(addrs)-c.capacity] {
_ = s.ChunkStore().Delete(ctx, addr)
}
Expand All @@ -221,7 +221,7 @@ func (c *Cache) ShallowCopy(

entriesToAdd := make([]*cacheEntry, 0, len(addrs))

err = store.Run(func(s internal.Store) error {
err = store.Run(func(s transaction.Store) error {
for _, addr := range addrs {
entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()}
if has, err := s.IndexStore().Has(entry); err == nil && has {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (c *Cache) ShallowCopy(
// specifies the number of entries to remove.
func (c *Cache) RemoveOldest(
ctx context.Context,
st internal.Storage,
st transaction.Storage,
count uint64,
) error {
if count <= 0 {
Expand Down Expand Up @@ -299,7 +299,7 @@ func (c *Cache) RemoveOldest(
end = len(evictItems)
}

err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
for _, entry := range evictItems[i:end] {
err = s.IndexStore().Delete(entry)
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/cache"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/google/go-cmp/cmp"
)
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestCacheEntryItem(t *testing.T) {
}
}

func newTestStorage(t *testing.T) internal.Storage {
func newTestStorage(t *testing.T) transaction.Storage {
t.Helper()
return internal.NewInmemStorage()
}
Expand Down Expand Up @@ -230,7 +231,7 @@ func TestCache(t *testing.T) {

for i := 0; i < 5; i++ {
extraChunk := chunktest.GenerateTestRandomChunk()
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), extraChunk)
})
if err != nil {
Expand Down Expand Up @@ -274,7 +275,7 @@ func TestCache(t *testing.T) {
// return retErr
// }

// return st.Run(func(s internal.Store) error {
// return st.Run(func(s transaction.Store) error {
// return s.IndexStore().Put(i)
// })
// }
Expand Down Expand Up @@ -321,7 +322,7 @@ func TestShallowCopy(t *testing.T) {
// the chunkstore with chunks.
for _, ch := range chunks {

err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Put(context.Background(), ch)
})
if err != nil {
Expand Down Expand Up @@ -353,7 +354,7 @@ func TestShallowCopy(t *testing.T) {
// add the chunks to chunkstore. This simulates the reserve already populating
// the chunkstore with chunks.
for _, ch := range chunks1 {
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Put(context.Background(), ch)
})
if err != nil {
Expand Down Expand Up @@ -395,7 +396,7 @@ func TestShallowCopyOverCap(t *testing.T) {
// the chunkstore with chunks.
for _, ch := range chunks {

err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Put(context.Background(), ch)
})
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/storer/internal/chunkstamp/chunkstamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstamp"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/google/go-cmp/cmp"
)
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestStoreLoadDelete(t *testing.T) {
t.Fatalf("Get(...): unexpected error: have: %v; want: %v", err, storage.ErrNotFound)
}

if err := ts.Run(func(s internal.Store) error {
if err := ts.Run(func(s transaction.Store) error {
return chunkstamp.Store(s.IndexStore(), ns, chunk)
}); err != nil {
t.Fatalf("Store(...): unexpected error: %v", err)
Expand Down Expand Up @@ -199,13 +200,13 @@ func TestStoreLoadDelete(t *testing.T) {

t.Run("delete stored stamp", func(t *testing.T) {
if i%2 == 0 {
if err := ts.Run(func(s internal.Store) error {
if err := ts.Run(func(s transaction.Store) error {
return chunkstamp.Delete(s.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID())
}); err != nil {
t.Fatalf("Delete(...): unexpected error: %v", err)
}
} else {
if err := ts.Run(func(s internal.Store) error {
if err := ts.Run(func(s transaction.Store) error {
return chunkstamp.DeleteWithStamp(s.IndexStore(), ns, chunk.Address(), chunk.Stamp())
}); err != nil {
t.Fatalf("DeleteWithStamp(...): unexpected error: %v", err)
Expand All @@ -223,13 +224,13 @@ func TestStoreLoadDelete(t *testing.T) {

t.Run("delete all stored stamp index", func(t *testing.T) {

if err := ts.Run(func(s internal.Store) error {
if err := ts.Run(func(s transaction.Store) error {
return chunkstamp.Store(s.IndexStore(), ns, chunk)
}); err != nil {
t.Fatalf("Store(...): unexpected error: %v", err)
}

if err := ts.Run(func(s internal.Store) error {
if err := ts.Run(func(s transaction.Store) error {
return chunkstamp.DeleteAll(s.IndexStore(), ns, chunk.Address())
}); err != nil {
t.Fatalf("DeleteAll(...): unexpected error: %v", err)
Expand Down
22 changes: 11 additions & 11 deletions pkg/storer/internal/chunkstore/chunkstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/ethersphere/bee/pkg/storage/inmemstore"
"github.com/ethersphere/bee/pkg/storage/storagetest"
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestChunkStore(t *testing.T) {
t.Fatal(err)
}

st := internal.NewStorage(sharky, store)
st := transaction.NewStorage(sharky, store)

t.Cleanup(func() {
if err := store.Close(); err != nil {
Expand All @@ -131,7 +131,7 @@ func TestChunkStore(t *testing.T) {

t.Run("put chunks", func(t *testing.T) {
for _, ch := range testChunks {
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), ch)
})
if err != nil {
Expand All @@ -144,7 +144,7 @@ func TestChunkStore(t *testing.T) {
for idx, ch := range testChunks {
// only put duplicates for odd numbered indexes
if idx%2 != 0 {
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Put(context.TODO(), ch)
})
if err != nil {
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestChunkStore(t *testing.T) {
for idx, ch := range testChunks {
// Delete all even numbered indexes along with 0
if idx%2 == 0 {
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Delete(context.TODO(), ch.Address())
})
if err != nil {
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestChunkStore(t *testing.T) {
t.Run("delete duplicate chunks", func(t *testing.T) {
for idx, ch := range testChunks {
if idx%2 != 0 {
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Delete(context.TODO(), ch.Address())
})
if err != nil {
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestChunkStore(t *testing.T) {
t.Run("delete duplicate chunks again", func(t *testing.T) {
for idx, ch := range testChunks {
if idx%2 != 0 {
err := st.Run(func(s internal.Store) error {
err := st.Run(func(s transaction.Store) error {
return s.ChunkStore().Delete(context.TODO(), ch.Address())
})
if err != nil {
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestIterateLocations(t *testing.T) {
ctx := context.Background()

for _, ch := range testChunks {
assert.NoError(t, st.Run(func(s internal.Store) error { return s.ChunkStore().Put(ctx, ch) }))
assert.NoError(t, st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(ctx, ch) }))
}

readCount := 0
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestIterateLocations_Stop(t *testing.T) {
defer cancel()

for _, ch := range testChunks {
assert.NoError(t, st.Run(func(s internal.Store) error { return s.ChunkStore().Put(ctx, ch) }))
assert.NoError(t, st.Run(func(s transaction.Store) error { return s.ChunkStore().Put(ctx, ch) }))
}

readCount := 0
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestIterateLocations_Stop(t *testing.T) {
}

type chunkStore struct {
internal.Storage
transaction.Storage
sharky *sharky.Store
}

Expand All @@ -438,5 +438,5 @@ func makeStorage(t *testing.T) *chunkStore {
assert.NoError(t, sharky.Close())
})

return &chunkStore{internal.NewStorage(sharky, store), sharky}
return &chunkStore{transaction.NewStorage(sharky, store), sharky}
}
17 changes: 10 additions & 7 deletions pkg/storer/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/pkg/storage/inmemstore"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/pkg/swarm"
)

// PutterCloserWithReference provides a Putter which can be closed with a root
// swarm reference associated with this session.
type PutterCloserWithReference interface {
Put(context.Context, Store, swarm.Chunk) error
Put(context.Context, transaction.Store, swarm.Chunk) error
Close(storage.IndexStore, swarm.Address) error
Cleanup(Storage) error
Cleanup(transaction.Storage) error
}

var emptyAddr = make([]byte, swarm.HashSize)
Expand All @@ -46,7 +47,7 @@ func AddressBytesOrZero(addr swarm.Address) []byte {

// NewInmemStorage constructs a inmem Storage implementation which can be used
// for the tests in the internal packages.
func NewInmemStorage() Storage {
func NewInmemStorage() transaction.Storage {
ts := &inmemStorage{
indexStore: inmemstore.New(),
chunkStore: inmemchunkstore.New(),
Expand All @@ -60,7 +61,7 @@ type inmemStorage struct {
chunkStore storage.ChunkStore
}

func (t *inmemStorage) NewTransaction() (Transaction, func()) {
func (t *inmemStorage) NewTransaction() (transaction.Transaction, func()) {
return &inmemTrx{t.indexStore, t.chunkStore}, func() {}
}

Expand All @@ -81,9 +82,11 @@ func (t *inmemTrx) IndexStore() storage.IndexStore { return t.indexStore }
func (t *inmemTrx) ChunkStore() storage.ChunkStore { return t.chunkStore }
func (t *inmemTrx) Commit() error { return nil }

func (t *inmemStorage) ReadOnly() ReadOnlyStore { return &inmemReadOnly{t.indexStore, t.chunkStore} }
func (t *inmemStorage) Close() error { return nil }
func (t *inmemStorage) Run(f func(s Store) error) error {
func (t *inmemStorage) ReadOnly() transaction.ReadOnlyStore {
return &inmemReadOnly{t.indexStore, t.chunkStore}
}
func (t *inmemStorage) Close() error { return nil }
func (t *inmemStorage) Run(f func(s transaction.Store) error) error {
trx, done := t.NewTransaction()
defer done()
return f(trx)
Expand Down
Loading

0 comments on commit e295f79

Please sign in to comment.