From 5b47b14725069c2bb8d0f014e5a0d83f7b72cfa8 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 9 Feb 2024 07:10:21 +0300 Subject: [PATCH] fix: mem cache --- pkg/storer/internal/transaction/mem.go | 65 +++++++++++++++++++ pkg/storer/internal/transaction/mem_test.go | 25 +++++++ .../internal/transaction/transaction.go | 50 +++++++++----- .../internal/transaction/transaction_test.go | 2 +- pkg/storer/uploadstore.go | 41 +++++------- 5 files changed, 141 insertions(+), 42 deletions(-) create mode 100644 pkg/storer/internal/transaction/mem.go create mode 100644 pkg/storer/internal/transaction/mem_test.go diff --git a/pkg/storer/internal/transaction/mem.go b/pkg/storer/internal/transaction/mem.go new file mode 100644 index 00000000000..a8a71c4bb4e --- /dev/null +++ b/pkg/storer/internal/transaction/mem.go @@ -0,0 +1,65 @@ +package transaction + +import ( + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage/storageutil" +) + +type memcache struct { + storage.IndexStore + data map[string][]byte +} + +func NewMemCache(st storage.IndexStore) *memcache { + return &memcache{st, make(map[string][]byte)} +} + +func (m *memcache) Get(i storage.Item) error { + if val, ok := m.data[key(i)]; ok { + return i.Unmarshal(val) + } + + if err := m.IndexStore.Get(i); err != nil { + return err + } + + m.add(i) + + return nil +} + +func (m *memcache) Has(k storage.Key) (bool, error) { + if _, ok := m.data[key(k)]; ok { + return true, nil + } + return m.IndexStore.Has(k) +} + +func (m *memcache) Put(i storage.Item) error { + m.add(i) + return m.IndexStore.Put(i) +} + +func (m *memcache) Delete(i storage.Item) error { + delete(m.data, key(i)) + return m.IndexStore.Delete(i) +} + +// key returns a string representation of the given key. +func key(key storage.Key) string { + return storageutil.JoinFields(key.Namespace(), key.ID()) +} + +// add caches given item. +func (m *memcache) add(i storage.Item) { + b, err := i.Marshal() + if err != nil { + return + } + + m.data[key(i)] = b +} + +func (m *memcache) Close() error { + return nil +} diff --git a/pkg/storer/internal/transaction/mem_test.go b/pkg/storer/internal/transaction/mem_test.go new file mode 100644 index 00000000000..cf032a45bd0 --- /dev/null +++ b/pkg/storer/internal/transaction/mem_test.go @@ -0,0 +1,25 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package transaction_test + +import ( + "testing" + + "github.com/ethersphere/bee/pkg/storage/leveldbstore" + "github.com/ethersphere/bee/pkg/storage/storagetest" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" +) + +func TestCache(t *testing.T) { + t.Parallel() + + store, err := leveldbstore.New(t.TempDir(), nil) + if err != nil { + t.Fatalf("create store failed: %v", err) + } + + cache := transaction.NewMemCache(store) + storagetest.TestStore(t, cache) +} diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 8884cd0c0a6..e50ada4ee47 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -2,6 +2,23 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +/* +Package transaction provides transaction support for localstore operations. +All writes to the localstore (both indexstore and chunkstore) must be made using a transaction. +The transaction must be commited for the writes to be stored on the disk. +Writes to the transaction are cached in memory so that future Reads return the cached entries, or if not available, entries stored on the disk. + +The rules of the transction is as follows: + +-sharky_write -> write to disk, keep sharky location in memory +-sharky_release -> keep location in memory, do not release from the disk +-indexstore write -> write to batch +-on commit -> if batch_commit succeeds, release sharky_release locations from the disk + -> if batch_commit fails or is not called, release all sharky_write location from the disk, do nothing for sharky_release + +See the transaction method for more details. +*/ + package transaction import ( @@ -19,18 +36,6 @@ import ( "resenje.org/multex" ) -// TODO(esad): remove contexts from sharky and any other storage call - -/* -The rules of the transction is as follows: - --sharky_write -> write to disk, keep sharky location in memory --sharky_release -> keep location in memory, do not release from the disk --store write -> write to batch --on commit -> if batch_commit succeeds, release sharky_release locations from the disk - -> if batch_commit fails or is not called, release all sharky_write location from the disk, do nothing for sharky_release -*/ - type Transaction interface { Store Commit() error @@ -79,18 +84,26 @@ type transaction struct { // were returned from the storage ops or commit. Safest option is to do a defer call immediately after // creating the transaction. // Calls made to the transaction are NOT thread-safe. +// Write operations are stored in memory so that future Read operations return what is currently captured in the transaciton. +// For example, calling chunkstore.Put twice and then chunkstore.Delete once will cause the chunk to be stored with a refCnt of 1. +// This is important for certain operations like storing the same chunk multiple times in the same transaction with the +// expectation that the refCnt in the chunkstore correctly responds to number of Put calls. +// Note that the indexstore iterator returns items based on the snapshot of what's currently stored on the disk, and no +// write operations to the transaction affect what is returned from the iterator. func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) { b := s.bstore.Batch(ctx) - indexTrx := &indexTrx{s.bstore, b, s.metrics} - sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil} + + index := NewMemCache(&indexTrx{s.bstore, b, s.metrics}) + + sharky := &sharkyTrx{s.sharky, s.metrics, nil, nil} t := &transaction{ start: time.Now(), batch: b, - indexstore: indexTrx, - chunkStore: &chunkStoreTrx{indexTrx, sharyTrx, s.chunkLocker, make(map[string]struct{}), s.metrics, false}, - sharkyTrx: sharyTrx, + indexstore: index, + chunkStore: &chunkStoreTrx{index, sharky, s.chunkLocker, make(map[string]struct{}), s.metrics, false}, + sharkyTrx: sharky, metrics: s.metrics, } @@ -119,6 +132,9 @@ func (s *store) ChunkStore() storage.ReadOnlyChunkStore { return &chunkStoreTrx{indexStore, sharyTrx, s.chunkLocker, nil, s.metrics, true} } +// Run creates a new transaction and gives the caller access to the transaction +// in the form of a callback function. After the callback returns, the transaction +// is commited to the disk. See the Transaction method for more details on how transactions operate internally. func (s *store) Run(ctx context.Context, f func(Store) error) error { trx, done := s.NewTransaction(ctx) defer done() diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index f8fdd06af5b..12d6efecc18 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -182,7 +182,7 @@ func Test_TransactionStorage(t *testing.T) { has, err := st.ChunkStore().Has(context.Background(), ch1.Address()) assert.NoError(t, err) - if !has { + if has { t.Fatal("should NOT have chunk") } }) diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index cbe0965f49d..68c6f158127 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -73,19 +73,15 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { unlock := db.Lock(uploadsLock) defer unlock() - return errors.Join( - db.storage.Run(ctx, func(s transaction.Store) error { - return uploadPutter.Put(ctx, s, chunk) - }), - func() error { - if pinningPutter != nil { - return db.storage.Run(ctx, func(s transaction.Store) error { + return db.storage.Run(ctx, func(s transaction.Store) error { + return errors.Join(uploadPutter.Put(ctx, s, chunk), + func() error { + if pinningPutter != nil { return pinningPutter.Put(ctx, s, chunk) - }) - } - return nil - }(), - ) + } + return nil + }()) + }) }), db.metrics, "uploadstore", @@ -94,19 +90,16 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession defer db.events.Trigger(subscribePushEventKey) unlock := db.Lock(uploadsLock) defer unlock() - return errors.Join( - db.storage.Run(ctx, func(s transaction.Store) error { - return uploadPutter.Close(s.IndexStore(), address) - }), - func() error { - if pinningPutter != nil { - return db.storage.Run(ctx, func(s transaction.Store) error { + return db.storage.Run(ctx, func(s transaction.Store) error { + return errors.Join( + uploadPutter.Close(s.IndexStore(), address), + func() error { + if pinningPutter != nil { return pinningPutter.Close(s.IndexStore(), address) - }) - } - return nil - }(), - ) + } + return nil + }()) + }) }, cleanup: func() error { defer db.events.Trigger(subscribePushEventKey)