Skip to content

Commit

Permalink
fix(cache): added mutex protection at chunk and eviction levels (#4546)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Jan 22, 2024
1 parent 2cdfadf commit 657f7ab
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
)

var now = time.Now
Expand All @@ -37,8 +39,10 @@ var (
// part of the reserve but are potentially useful to store for obtaining bandwidth
// incentives.
type Cache struct {
size atomic.Int64
capacity int
size atomic.Int64
capacity int
chunkLock *multex.Multex // protects storage ops at chunk level
removeLock sync.RWMutex // blocks Get and Put ops while cache items are being evicted.
}

// New creates a new Cache component with the specified capacity. The store is used
Expand All @@ -52,6 +56,7 @@ func New(ctx context.Context, store internal.Storage, capacity uint64) (*Cache,

c := &Cache{capacity: int(capacity)}
c.size.Store(int64(count))
c.chunkLock = multex.New()

return c, nil
}
Expand All @@ -69,6 +74,11 @@ func (c *Cache) Capacity() uint64 { return uint64(c.capacity) }
func (c *Cache) Putter(store internal.Storage) storage.Putter {
return storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error {

c.chunkLock.Lock(chunk.Address().ByteString())
defer c.chunkLock.Unlock(chunk.Address().ByteString())
c.removeLock.RLock()
defer c.removeLock.RUnlock()

newEntry := &cacheEntry{Address: chunk.Address()}
found, err := store.IndexStore().Has(newEntry)
if err != nil {
Expand Down Expand Up @@ -126,6 +136,11 @@ func (c *Cache) Getter(store internal.Storage) storage.Getter {
return nil, err
}

c.chunkLock.Lock(address.ByteString())
defer c.chunkLock.Unlock(address.ByteString())
c.removeLock.RLock()
defer c.removeLock.RUnlock()

// check if there is an entry in Cache. As this is the download path, we do
// a best-effort operation. So in case of any error we return the chunk.
entry := &cacheEntry{Address: address}
Expand Down Expand Up @@ -180,6 +195,9 @@ func (c *Cache) ShallowCopy(
addrs ...swarm.Address,
) (err error) {

c.removeLock.Lock()
defer c.removeLock.Unlock()

defer func() {
if err != nil {
for _, addr := range addrs {
Expand Down Expand Up @@ -249,6 +267,9 @@ func (c *Cache) RemoveOldest(
return nil
}

c.removeLock.Lock()
defer c.removeLock.Unlock()

evictItems := make([]*cacheEntry, 0, count)
err := store.IndexStore().Iterate(
storage.Query{
Expand Down

0 comments on commit 657f7ab

Please sign in to comment.