Skip to content

Commit

Permalink
fix: removed cache and fixed shallow copy
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 4, 2024
1 parent 729ad19 commit 75cfe60
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 55 deletions.
40 changes: 23 additions & 17 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,41 +195,47 @@ func (c *Cache) ShallowCopy(
c.removeLock.Lock()
defer c.removeLock.Unlock()

entriesToAdd := make([]*cacheEntry, 0, len(addrs))

defer func() {
if err != nil {
err = store.Run(ctx, func(s transaction.Store) error {
var rerr error
for _, addr := range addrs {
rerr = errors.Join(rerr, s.ChunkStore().Delete(context.Background(), addr))
for _, entry := range entriesToAdd {
rerr = errors.Join(rerr, s.ChunkStore().Delete(context.Background(), entry.Address))
}
return rerr
})
}
}()

//consider only the amount that can fit, the rest should be deleted from the chunkstore.
if len(addrs) > c.capacity {
_ = store.Run(ctx, func(s transaction.Store) error {
for _, addr := range addrs {
entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()}
if has, _ := s.IndexStore().Has(entry); has {
// Since the caller no longer references the chunk, and if the chunk is already referenced
// by the cache store, then we must decrement the refCnt by one.
// See https://github.com/ethersphere/bee/issues/4530.
_ = s.ChunkStore().Delete(ctx, addr)
continue
}
entriesToAdd = append(entriesToAdd, entry)
}
return nil
})

//consider only the amount that can fit, the rest should be deleted from the chunkstore.
if len(entriesToAdd) > c.capacity {
_ = store.Run(ctx, func(s transaction.Store) error {
for _, addr := range addrs[:len(addrs)-c.capacity] {
_ = s.ChunkStore().Delete(ctx, addr)
for _, entry := range entriesToAdd[:len(entriesToAdd)-c.capacity] {
_ = s.ChunkStore().Delete(ctx, entry.Address)
}
return nil
})
addrs = addrs[len(addrs)-c.capacity:]
entriesToAdd = entriesToAdd[len(entriesToAdd)-c.capacity:]
}

entriesToAdd := make([]*cacheEntry, 0, len(addrs))

err = store.Run(ctx, func(s transaction.Store) error {
for _, addr := range addrs {
entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()}
if has, err := s.IndexStore().Has(entry); err == nil && has {
continue
}
entriesToAdd = append(entriesToAdd, entry)
}

for _, entry := range entriesToAdd {
err = s.IndexStore().Put(entry)
if err != nil {
Expand Down
10 changes: 0 additions & 10 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,7 @@ func TestCache(t *testing.T) {
t.Fatal(err)
}
}
// return error for state update, which occurs at the end of Get/Put operations
retErr := errors.New("dummy error")
// st.putFn = func(i storage.Item) error {
// if i.Namespace() == "cacheOrderIndex" {
// return retErr
// }

// return st.Run(context.Background(), func(s transaction.Store) error {
// return s.IndexStore().Put(i)
// })
// }

// on error the cache expects the overarching transactions to clean itself up
// and undo any store updates. So here we only want to ensure the state is
Expand Down
10 changes: 3 additions & 7 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/cache"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -83,7 +81,7 @@ type transaction struct {
func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) {

b := s.bstore.Batch(ctx)
indexTrx := cache.MustWrap(&indexTrx{s.bstore, b}, math.MaxInt)
indexTrx := &indexTrx{s.bstore, b}
sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil}

t := &transaction{
Expand Down Expand Up @@ -185,15 +183,13 @@ func (t *transaction) Commit() (err error) {
}

// IndexStore gives acces to the index store of the transaction.
// Write operations are cached, so following Reads returns what was recently written to the transaction.
// Note that writes are not persisted to the disk until the commit is called.
// Note that no writes are persisted to the disk until the commit is called.
func (t *transaction) IndexStore() storage.IndexStore {
return t.indexstore
}

// ChunkStore gives acces to the chunkstore of the transaction.
// Write operations are cached, so following Reads returns what was recently written to the transaction.
// Note that writes are not persisted to the disk until the commit is called.
// Note that no writes are persisted to the disk until the commit is called.
func (t *transaction) ChunkStore() storage.ChunkStore {
return t.chunkStore
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func Test_TransactionStorage(t *testing.T) {

has, err := st.ReadOnly().ChunkStore().Has(context.Background(), ch1.Address())
assert.NoError(t, err)
if has {
if !has {
t.Fatal("should NOT have chunk")
}
})
Expand Down
44 changes: 24 additions & 20 deletions pkg/storer/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error {
unlock := db.Lock(uploadsLock)
defer unlock()
return db.storage.Run(ctx, func(s transaction.Store) error {
return errors.Join(
uploadPutter.Put(ctx, s, chunk),
func() error {
if pinningPutter != nil {
return errors.Join(
db.storage.Run(ctx, func(s transaction.Store) error {
return uploadPutter.Put(ctx, s, chunk)
}),
func() error {
if pinningPutter != nil {
return db.storage.Run(ctx, func(s transaction.Store) error {
return pinningPutter.Put(ctx, s, chunk)
}
return nil
}(),
)
})
})
}
return nil
}(),
)
}),
db.metrics,
"uploadstore",
Expand All @@ -92,17 +94,19 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
defer db.events.Trigger(subscribePushEventKey)
unlock := db.Lock(uploadsLock)
defer unlock()
return db.storage.Run(ctx, func(s transaction.Store) error {
return errors.Join(
uploadPutter.Close(s.IndexStore(), address),
func() error {
if pinningPutter != nil {
return errors.Join(
db.storage.Run(ctx, func(s transaction.Store) error {
return uploadPutter.Close(s.IndexStore(), address)
}),
func() error {
if pinningPutter != nil {
return db.storage.Run(ctx, func(s transaction.Store) error {
return pinningPutter.Close(s.IndexStore(), address)
}
return nil
}(),
)
})
})
}
return nil
}(),
)
},
cleanup: func() error {
defer db.events.Trigger(subscribePushEventKey)
Expand Down

0 comments on commit 75cfe60

Please sign in to comment.