Skip to content

Commit

Permalink
fix: mem cache
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 14, 2024
1 parent e26d5de commit 5b47b14
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 42 deletions.
65 changes: 65 additions & 0 deletions pkg/storer/internal/transaction/mem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package transaction

import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storageutil"
)

type memcache struct {
storage.IndexStore
data map[string][]byte
}

func NewMemCache(st storage.IndexStore) *memcache {
return &memcache{st, make(map[string][]byte)}
}

func (m *memcache) Get(i storage.Item) error {
if val, ok := m.data[key(i)]; ok {
return i.Unmarshal(val)
}

if err := m.IndexStore.Get(i); err != nil {
return err
}

m.add(i)

return nil
}

func (m *memcache) Has(k storage.Key) (bool, error) {
if _, ok := m.data[key(k)]; ok {
return true, nil
}
return m.IndexStore.Has(k)
}

func (m *memcache) Put(i storage.Item) error {
m.add(i)
return m.IndexStore.Put(i)
}

func (m *memcache) Delete(i storage.Item) error {
delete(m.data, key(i))
return m.IndexStore.Delete(i)
}

// key returns a string representation of the given key.
func key(key storage.Key) string {
return storageutil.JoinFields(key.Namespace(), key.ID())
}

// add caches given item.
func (m *memcache) add(i storage.Item) {
b, err := i.Marshal()
if err != nil {
return
}

m.data[key(i)] = b
}

func (m *memcache) Close() error {
return nil
}
25 changes: 25 additions & 0 deletions pkg/storer/internal/transaction/mem_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package transaction_test

import (
"testing"

"github.com/ethersphere/bee/pkg/storage/leveldbstore"
"github.com/ethersphere/bee/pkg/storage/storagetest"
"github.com/ethersphere/bee/pkg/storer/internal/transaction"
)

func TestCache(t *testing.T) {
t.Parallel()

store, err := leveldbstore.New(t.TempDir(), nil)
if err != nil {
t.Fatalf("create store failed: %v", err)
}

cache := transaction.NewMemCache(store)
storagetest.TestStore(t, cache)
}
50 changes: 33 additions & 17 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

/*
Package transaction provides transaction support for localstore operations.
All writes to the localstore (both indexstore and chunkstore) must be made using a transaction.
The transaction must be commited for the writes to be stored on the disk.
Writes to the transaction are cached in memory so that future Reads return the cached entries, or if not available, entries stored on the disk.
The rules of the transction is as follows:
-sharky_write -> write to disk, keep sharky location in memory
-sharky_release -> keep location in memory, do not release from the disk
-indexstore write -> write to batch
-on commit -> if batch_commit succeeds, release sharky_release locations from the disk
-> if batch_commit fails or is not called, release all sharky_write location from the disk, do nothing for sharky_release
See the transaction method for more details.
*/

package transaction

import (
Expand All @@ -19,18 +36,6 @@ import (
"resenje.org/multex"
)

// TODO(esad): remove contexts from sharky and any other storage call

/*
The rules of the transction is as follows:
-sharky_write -> write to disk, keep sharky location in memory
-sharky_release -> keep location in memory, do not release from the disk
-store write -> write to batch
-on commit -> if batch_commit succeeds, release sharky_release locations from the disk
-> if batch_commit fails or is not called, release all sharky_write location from the disk, do nothing for sharky_release
*/

type Transaction interface {
Store
Commit() error
Expand Down Expand Up @@ -79,18 +84,26 @@ type transaction struct {
// were returned from the storage ops or commit. Safest option is to do a defer call immediately after
// creating the transaction.
// Calls made to the transaction are NOT thread-safe.
// Write operations are stored in memory so that future Read operations return what is currently captured in the transaciton.
// For example, calling chunkstore.Put twice and then chunkstore.Delete once will cause the chunk to be stored with a refCnt of 1.
// This is important for certain operations like storing the same chunk multiple times in the same transaction with the
// expectation that the refCnt in the chunkstore correctly responds to number of Put calls.
// Note that the indexstore iterator returns items based on the snapshot of what's currently stored on the disk, and no
// write operations to the transaction affect what is returned from the iterator.
func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) {

b := s.bstore.Batch(ctx)
indexTrx := &indexTrx{s.bstore, b, s.metrics}
sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil}

index := NewMemCache(&indexTrx{s.bstore, b, s.metrics})

sharky := &sharkyTrx{s.sharky, s.metrics, nil, nil}

t := &transaction{
start: time.Now(),
batch: b,
indexstore: indexTrx,
chunkStore: &chunkStoreTrx{indexTrx, sharyTrx, s.chunkLocker, make(map[string]struct{}), s.metrics, false},
sharkyTrx: sharyTrx,
indexstore: index,
chunkStore: &chunkStoreTrx{index, sharky, s.chunkLocker, make(map[string]struct{}), s.metrics, false},
sharkyTrx: sharky,
metrics: s.metrics,
}

Expand Down Expand Up @@ -119,6 +132,9 @@ func (s *store) ChunkStore() storage.ReadOnlyChunkStore {
return &chunkStoreTrx{indexStore, sharyTrx, s.chunkLocker, nil, s.metrics, true}
}

// Run creates a new transaction and gives the caller access to the transaction
// in the form of a callback function. After the callback returns, the transaction
// is commited to the disk. See the Transaction method for more details on how transactions operate internally.
func (s *store) Run(ctx context.Context, f func(Store) error) error {
trx, done := s.NewTransaction(ctx)
defer done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func Test_TransactionStorage(t *testing.T) {

has, err := st.ChunkStore().Has(context.Background(), ch1.Address())
assert.NoError(t, err)
if !has {
if has {
t.Fatal("should NOT have chunk")
}
})
Expand Down
41 changes: 17 additions & 24 deletions pkg/storer/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,15 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error {
unlock := db.Lock(uploadsLock)
defer unlock()
return errors.Join(
db.storage.Run(ctx, func(s transaction.Store) error {
return uploadPutter.Put(ctx, s, chunk)
}),
func() error {
if pinningPutter != nil {
return db.storage.Run(ctx, func(s transaction.Store) error {
return db.storage.Run(ctx, func(s transaction.Store) error {
return errors.Join(uploadPutter.Put(ctx, s, chunk),
func() error {
if pinningPutter != nil {
return pinningPutter.Put(ctx, s, chunk)
})
}
return nil
}(),
)
}
return nil
}())
})
}),
db.metrics,
"uploadstore",
Expand All @@ -94,19 +90,16 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
defer db.events.Trigger(subscribePushEventKey)
unlock := db.Lock(uploadsLock)
defer unlock()
return errors.Join(
db.storage.Run(ctx, func(s transaction.Store) error {
return uploadPutter.Close(s.IndexStore(), address)
}),
func() error {
if pinningPutter != nil {
return db.storage.Run(ctx, func(s transaction.Store) error {
return db.storage.Run(ctx, func(s transaction.Store) error {
return errors.Join(
uploadPutter.Close(s.IndexStore(), address),
func() error {
if pinningPutter != nil {
return pinningPutter.Close(s.IndexStore(), address)
})
}
return nil
}(),
)
}
return nil
}())
})
},
cleanup: func() error {
defer db.events.Trigger(subscribePushEventKey)
Expand Down

0 comments on commit 5b47b14

Please sign in to comment.