Skip to content

Commit

Permalink
feat: cache
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 4, 2024
1 parent 83ca2f4 commit 729ad19
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 27 deletions.
6 changes: 3 additions & 3 deletions pkg/statestore/storeadapter/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ethersphere/bee/pkg/storage/migration"
)

func allSteps(st storage.BatchStore) migration.Steps {
func allSteps(st storage.Store) migration.Steps {
return map[uint64]migration.StepFn{
1: epochMigration(st),
2: deletePrefix(st, "sync_interval"),
Expand All @@ -23,7 +23,7 @@ func allSteps(st storage.BatchStore) migration.Steps {
}
}

func deletePrefix(s storage.BatchStore, prefix string) migration.StepFn {
func deletePrefix(s storage.Store, prefix string) migration.StepFn {
return func() error {
store := &StateStorerAdapter{s}
return store.Iterate(prefix, func(key, val []byte) (stop bool, err error) {
Expand All @@ -32,7 +32,7 @@ func deletePrefix(s storage.BatchStore, prefix string) migration.StepFn {
}
}

func epochMigration(s storage.BatchStore) migration.StepFn {
func epochMigration(s storage.Store) migration.StepFn {

return func() error {

Expand Down
2 changes: 1 addition & 1 deletion pkg/statestore/storeadapter/storeadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *StateStorerAdapter) deleteKeys(keys []string) error {
}

// NewStateStorerAdapter creates a new StateStorerAdapter.
func NewStateStorerAdapter(storage storage.BatchStore) (*StateStorerAdapter, error) {
func NewStateStorerAdapter(storage storage.Store) (*StateStorerAdapter, error) {
err := migration.Migrate(storage, "migration", allSteps(storage))
if err != nil {
return nil, err
Expand Down
21 changes: 13 additions & 8 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ func key(key storage.Key) string {
return storageutil.JoinFields(key.Namespace(), key.ID())
}

var _ storage.BatchStore = (*Cache)(nil)
var _ storage.IndexStore = (*Cache)(nil)

// Cache is a wrapper around a storage.Store that adds a layer
// of in-memory caching for the Get and Has operations.
type Cache struct {
storage.BatchStore
storage.IndexStore

lru *lru.Cache[string, []byte]
metrics metrics
Expand All @@ -29,7 +29,7 @@ type Cache struct {
// Wrap adds a layer of in-memory caching to storage.Reader Get and Has operations.
// It returns an error if the capacity is less than or equal to zero or if the
// given store implements storage.Tx
func Wrap(store storage.BatchStore, capacity int) (*Cache, error) {
func Wrap(store storage.IndexStore, capacity int) (*Cache, error) {
lru, err := lru.New[string, []byte](capacity)
if err != nil {
return nil, err
Expand All @@ -39,7 +39,7 @@ func Wrap(store storage.BatchStore, capacity int) (*Cache, error) {
}

// MustWrap is like Wrap but panics on error.
func MustWrap(store storage.BatchStore, capacity int) *Cache {
func MustWrap(store storage.IndexStore, capacity int) *Cache {
c, err := Wrap(store, capacity)
if err != nil {
panic(err)
Expand All @@ -66,7 +66,7 @@ func (c *Cache) Get(i storage.Item) error {
return i.Unmarshal(val)
}

if err := c.BatchStore.Get(i); err != nil {
if err := c.IndexStore.Get(i); err != nil {
return err
}

Expand All @@ -87,20 +87,25 @@ func (c *Cache) Has(k storage.Key) (bool, error) {
}

c.metrics.CacheMiss.Inc()
return c.BatchStore.Has(k)
return c.IndexStore.Has(k)
}

// Put implements storage.Store interface.
// On a call it also inserts the item into the cache so that the next
// call to Put and Has will be able to retrieve the item from cache.
func (c *Cache) Put(i storage.Item) error {
c.add(i)
return c.BatchStore.Put(i)
return c.IndexStore.Put(i)
}

// Delete implements storage.Store interface.
// On a call it also removes the item from the cache.
func (c *Cache) Delete(i storage.Item) error {
_ = c.lru.Remove(key(i))
return c.BatchStore.Delete(i)
return c.IndexStore.Delete(i)
}

func (c *Cache) Close() error {
c.lru.Purge()
return nil
}
2 changes: 1 addition & 1 deletion pkg/storer/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func testDebugInfo(t *testing.T, newStorer func() (*storer.DB, swarm.Address, er
},
ChunkStore: storer.ChunkStoreStat{
TotalChunks: 10,
SharedSlots: 0,
SharedSlots: 10,
},
Cache: storer.CacheStat{
Capacity: 1000000,
Expand Down
16 changes: 9 additions & 7 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/cache"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -66,7 +68,7 @@ func NewStorage(sharky *sharky.Store, bstore storage.BatchStore) Storage {

type transaction struct {
batch storage.Batch
indexstore *indexTrx
indexstore storage.IndexStore
chunkStore *chunkStoreTrx
sharkyTrx *sharkyTrx
metrics metrics
Expand All @@ -81,7 +83,7 @@ type transaction struct {
func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) {

b := s.bstore.Batch(ctx)
indexTrx := &indexTrx{s.bstore, b}
indexTrx := cache.MustWrap(&indexTrx{s.bstore, b}, math.MaxInt)
sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil}

t := &transaction{
Expand Down Expand Up @@ -183,21 +185,21 @@ func (t *transaction) Commit() (err error) {
}

// IndexStore gives acces to the index store of the transaction.
// Note that no writes are persisted to the disk until the commit is called.
// Reads return data from the disk and not what has been written to the transaction before the commit call.
// Write operations are cached, so following Reads returns what was recently written to the transaction.
// Note that writes are not persisted to the disk until the commit is called.
func (t *transaction) IndexStore() storage.IndexStore {
return t.indexstore
}

// ChunkStore gives acces to the chunkstore of the transaction.
// Note that no writes are persisted to the disk until the commit is called.
// Reads return data from the disk and not what has been written to the transaction before the commit call.
// Write operations are cached, so following Reads returns what was recently written to the transaction.
// Note that writes are not persisted to the disk until the commit is called.
func (t *transaction) ChunkStore() storage.ChunkStore {
return t.chunkStore
}

type chunkStoreTrx struct {
indexStore *indexTrx
indexStore storage.IndexStore
sharkyTrx *sharkyTrx
globalLocker *multex.Multex
lockedAddrs map[string]struct{}
Expand Down
39 changes: 39 additions & 0 deletions pkg/storer/internal/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,43 @@ func Test_TransactionStorage(t *testing.T) {
_, err = st.ReadOnly().ChunkStore().Get(context.Background(), ch2.Address())
assert.ErrorIs(t, err, storage.ErrNotFound)
})

t.Run("put-delete-chunk", func(t *testing.T) {
t.Parallel()

ch1 := test.GenerateTestRandomChunk()

_ = st.Run(context.Background(), func(s transaction.Store) error {
assert.NoError(t, s.ChunkStore().Put(context.Background(), ch1))
assert.NoError(t, s.ChunkStore().Put(context.Background(), ch1))
assert.NoError(t, s.ChunkStore().Delete(context.Background(), ch1.Address()))
return nil
})

has, err := st.ReadOnly().ChunkStore().Has(context.Background(), ch1.Address())
assert.NoError(t, err)
if !has {
t.Fatal("should have chunk")
}
})

t.Run("put-delete-chunk-twice", func(t *testing.T) {
t.Parallel()

ch1 := test.GenerateTestRandomChunk()

_ = st.Run(context.Background(), func(s transaction.Store) error {
assert.NoError(t, s.ChunkStore().Put(context.Background(), ch1))
assert.NoError(t, s.ChunkStore().Put(context.Background(), ch1))
assert.NoError(t, s.ChunkStore().Delete(context.Background(), ch1.Address()))
assert.NoError(t, s.ChunkStore().Delete(context.Background(), ch1.Address()))
return nil
})

has, err := st.ReadOnly().ChunkStore().Has(context.Background(), ch1.Address())
assert.NoError(t, err)
if has {
t.Fatal("should NOT have chunk")
}
})
}
22 changes: 15 additions & 7 deletions pkg/storer/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func TestUploadStore(t *testing.T) {
return storer.New(context.Background(), "", dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second))
})
})
// t.Run("disk", func(t *testing.T) {
// t.Parallel()
t.Run("disk", func(t *testing.T) {
t.Parallel()

// testUploadStore(t, diskStorer(t, dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second)))
// })
testUploadStore(t, diskStorer(t, dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second)))
})
}

func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) {
Expand All @@ -358,7 +358,7 @@ func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) {
t.Fatal(err)
}

putter, err := lstore.Upload(context.Background(), false, session.TagID)
putter, err := lstore.Upload(context.Background(), true, session.TagID)
if err != nil {
t.Fatal(err)
}
Expand All @@ -370,6 +370,13 @@ func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) {
}
}

root := chunktesting.GenerateTestRandomChunk()

err = putter.Done(root.Address())
if err != nil {
t.Fatal(err)
}

t.Run("report", func(t *testing.T) {
t.Run("commit", func(t *testing.T) {
err := lstore.Report(context.Background(), chunks[0], storage.ChunkSynced)
Expand All @@ -379,12 +386,13 @@ func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) {

wantTI := storer.SessionInfo{
TagID: session.TagID,
Split: 0,
Split: 3,
Seen: 0,
Sent: 0,
Synced: 1,
Stored: 0,
StartedAt: session.StartedAt,
Address: root.Address(),
}

gotTI, err := lstore.Session(session.TagID)
Expand All @@ -400,7 +408,7 @@ func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) {
if err != nil {
t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err)
}
if has {
if !has {
t.Fatalf("expected chunk %s to not be found", chunks[0].Address())
}
})
Expand Down

0 comments on commit 729ad19

Please sign in to comment.