From 75cfe607da2da3030802a052d147a96474e5e87d Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sun, 4 Feb 2024 08:59:59 +0300 Subject: [PATCH] fix: removed cache and fixed shallow copy --- pkg/storer/internal/cache/cache.go | 40 ++++++++++------- pkg/storer/internal/cache/cache_test.go | 10 ----- .../internal/transaction/transaction.go | 10 ++--- .../internal/transaction/transaction_test.go | 2 +- pkg/storer/uploadstore.go | 44 ++++++++++--------- 5 files changed, 51 insertions(+), 55 deletions(-) diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index de91fc65972..f6f89690b80 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -195,41 +195,47 @@ func (c *Cache) ShallowCopy( c.removeLock.Lock() defer c.removeLock.Unlock() + entriesToAdd := make([]*cacheEntry, 0, len(addrs)) + defer func() { if err != nil { err = store.Run(ctx, func(s transaction.Store) error { var rerr error - for _, addr := range addrs { - rerr = errors.Join(rerr, s.ChunkStore().Delete(context.Background(), addr)) + for _, entry := range entriesToAdd { + rerr = errors.Join(rerr, s.ChunkStore().Delete(context.Background(), entry.Address)) } return rerr }) } }() - //consider only the amount that can fit, the rest should be deleted from the chunkstore. - if len(addrs) > c.capacity { + _ = store.Run(ctx, func(s transaction.Store) error { + for _, addr := range addrs { + entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()} + if has, _ := s.IndexStore().Has(entry); has { + // Since the caller no longer references the chunk, and if the chunk is already referenced + // by the cache store, then we must decrement the refCnt by one. + // See https://github.com/ethersphere/bee/issues/4530. + _ = s.ChunkStore().Delete(ctx, addr) + continue + } + entriesToAdd = append(entriesToAdd, entry) + } + return nil + }) + //consider only the amount that can fit, the rest should be deleted from the chunkstore. + if len(entriesToAdd) > c.capacity { _ = store.Run(ctx, func(s transaction.Store) error { - for _, addr := range addrs[:len(addrs)-c.capacity] { - _ = s.ChunkStore().Delete(ctx, addr) + for _, entry := range entriesToAdd[:len(entriesToAdd)-c.capacity] { + _ = s.ChunkStore().Delete(ctx, entry.Address) } return nil }) - addrs = addrs[len(addrs)-c.capacity:] + entriesToAdd = entriesToAdd[len(entriesToAdd)-c.capacity:] } - entriesToAdd := make([]*cacheEntry, 0, len(addrs)) - err = store.Run(ctx, func(s transaction.Store) error { - for _, addr := range addrs { - entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()} - if has, err := s.IndexStore().Has(entry); err == nil && has { - continue - } - entriesToAdd = append(entriesToAdd, entry) - } - for _, entry := range entriesToAdd { err = s.IndexStore().Put(entry) if err != nil { diff --git a/pkg/storer/internal/cache/cache_test.go b/pkg/storer/internal/cache/cache_test.go index 221bb88823d..3ddc6b16381 100644 --- a/pkg/storer/internal/cache/cache_test.go +++ b/pkg/storer/internal/cache/cache_test.go @@ -268,17 +268,7 @@ func TestCache(t *testing.T) { t.Fatal(err) } } - // return error for state update, which occurs at the end of Get/Put operations retErr := errors.New("dummy error") - // st.putFn = func(i storage.Item) error { - // if i.Namespace() == "cacheOrderIndex" { - // return retErr - // } - - // return st.Run(context.Background(), func(s transaction.Store) error { - // return s.IndexStore().Put(i) - // }) - // } // on error the cache expects the overarching transactions to clean itself up // and undo any store updates. So here we only want to ensure the state is diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 661e23d6460..87effbd7b98 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -8,13 +8,11 @@ import ( "context" "errors" "fmt" - "math" "time" m "github.com/ethersphere/bee/pkg/metrics" "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storage/cache" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/pkg/swarm" "github.com/prometheus/client_golang/prometheus" @@ -83,7 +81,7 @@ type transaction struct { func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) { b := s.bstore.Batch(ctx) - indexTrx := cache.MustWrap(&indexTrx{s.bstore, b}, math.MaxInt) + indexTrx := &indexTrx{s.bstore, b} sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil} t := &transaction{ @@ -185,15 +183,13 @@ func (t *transaction) Commit() (err error) { } // IndexStore gives acces to the index store of the transaction. -// Write operations are cached, so following Reads returns what was recently written to the transaction. -// Note that writes are not persisted to the disk until the commit is called. +// Note that no writes are persisted to the disk until the commit is called. func (t *transaction) IndexStore() storage.IndexStore { return t.indexstore } // ChunkStore gives acces to the chunkstore of the transaction. -// Write operations are cached, so following Reads returns what was recently written to the transaction. -// Note that writes are not persisted to the disk until the commit is called. +// Note that no writes are persisted to the disk until the commit is called. func (t *transaction) ChunkStore() storage.ChunkStore { return t.chunkStore } diff --git a/pkg/storer/internal/transaction/transaction_test.go b/pkg/storer/internal/transaction/transaction_test.go index a0b1d8f5650..016a826b088 100644 --- a/pkg/storer/internal/transaction/transaction_test.go +++ b/pkg/storer/internal/transaction/transaction_test.go @@ -182,7 +182,7 @@ func Test_TransactionStorage(t *testing.T) { has, err := st.ReadOnly().ChunkStore().Has(context.Background(), ch1.Address()) assert.NoError(t, err) - if has { + if !has { t.Fatal("should NOT have chunk") } }) diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index d993626a708..c80e736c80f 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -73,17 +73,19 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { unlock := db.Lock(uploadsLock) defer unlock() - return db.storage.Run(ctx, func(s transaction.Store) error { - return errors.Join( - uploadPutter.Put(ctx, s, chunk), - func() error { - if pinningPutter != nil { + return errors.Join( + db.storage.Run(ctx, func(s transaction.Store) error { + return uploadPutter.Put(ctx, s, chunk) + }), + func() error { + if pinningPutter != nil { + return db.storage.Run(ctx, func(s transaction.Store) error { return pinningPutter.Put(ctx, s, chunk) - } - return nil - }(), - ) - }) + }) + } + return nil + }(), + ) }), db.metrics, "uploadstore", @@ -92,17 +94,19 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession defer db.events.Trigger(subscribePushEventKey) unlock := db.Lock(uploadsLock) defer unlock() - return db.storage.Run(ctx, func(s transaction.Store) error { - return errors.Join( - uploadPutter.Close(s.IndexStore(), address), - func() error { - if pinningPutter != nil { + return errors.Join( + db.storage.Run(ctx, func(s transaction.Store) error { + return uploadPutter.Close(s.IndexStore(), address) + }), + func() error { + if pinningPutter != nil { + return db.storage.Run(ctx, func(s transaction.Store) error { return pinningPutter.Close(s.IndexStore(), address) - } - return nil - }(), - ) - }) + }) + } + return nil + }(), + ) }, cleanup: func() error { defer db.events.Trigger(subscribePushEventKey)