Skip to content

Commit

Permalink
fix: cache store
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Oct 27, 2023
1 parent 60149ab commit aa11e12
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 431 deletions.
9 changes: 5 additions & 4 deletions pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ func (s *store) cleanup() error {
}

for _, b := range evictions {
s.logger.Debug("batch expired", "batch_id", hex.EncodeToString(b.ID))
err = s.evictFn(b.ID)
if err != nil {
return fmt.Errorf("evict batch %x: %w", b.ID, err)
}
err := s.store.Delete(valueKey(b.Value, b.ID))
if err != nil {
return fmt.Errorf("delete value key for batch %x: %w", b.ID, err)
Expand All @@ -301,10 +306,6 @@ func (s *store) cleanup() error {
if err != nil {
return fmt.Errorf("delete batch %x: %w", b.ID, err)
}
err = s.evictFn(b.ID)
if err != nil {
return fmt.Errorf("evict batch %x: %w", b.ID, err)
}
if s.batchExpiry != nil {
s.batchExpiry.HandleStampExpiry(b.ID)
}
Expand Down
70 changes: 69 additions & 1 deletion pkg/storer/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,54 @@ import (
"context"
"errors"
"fmt"
"time"

storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/swarm"
)

const (
cacheAccessLockKey = "cachestoreAccess"
cacheOverCapacity = "cacheOverCapacity"
)

func (db *DB) cacheWorker(ctx context.Context) {

defer db.inFlight.Done()

overCapTrigger, overCapUnsub := db.events.Subscribe(cacheOverCapacity)
defer overCapUnsub()

db.triggerCacheEviction()

for {
select {
case <-overCapTrigger:

var (
size = db.cacheObj.Size()
capc = db.cacheObj.Capacity()
)

if size <= capc {
continue
}

newSize := size - uint64((float64(capc) * 0.90)) // evict until cache size is 90% of capacity

err := db.Execute(ctx, func(s internal.Storage) error {
return db.cacheObj.RemoveOldest(ctx, s, s.ChunkStore(), newSize)
})
if err != nil {
db.logger.Warning("cache eviction failure", "error", err)
}
case <-ctx.Done():
return
}
}
}

// Lookup is the implementation of the CacheStore.Lookup method.
func (db *DB) Lookup() storage.Getter {
return getterWithMetrics{
Expand Down Expand Up @@ -50,6 +89,7 @@ func (db *DB) Lookup() storage.Getter {
func (db *DB) Cache() storage.Putter {
return putterWithMetrics{
storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error {
defer db.triggerCacheEviction()
// the cacheObj resets its state on failures and expects the transaction
// rollback to undo all the updates, so we need a lock here to prevent
// concurrent access to the cacheObj.
Expand All @@ -61,10 +101,38 @@ func (db *DB) Cache() storage.Putter {
if err != nil {
return fmt.Errorf("cache.Put: %w", errors.Join(err, rollback()))
}
db.metrics.CacheSize.Set(float64(db.cacheObj.Size()))
return errors.Join(err, commit())
}),
db.metrics,
"cachestore",
}
}

// CacheShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore.
func (db *DB) CacheShallowCopy(ctx context.Context, store internal.Storage, addrs ...swarm.Address) error {
defer db.triggerCacheEviction()
dur := captureDuration(time.Now())
err := db.cacheObj.ShallowCopy(ctx, store, addrs...)
db.metrics.MethodCallsDuration.WithLabelValues("cachestore", "ShallowCopy").Observe(dur())
if err != nil {
err = fmt.Errorf("cache shallow copy: %w", err)
db.metrics.MethodCalls.WithLabelValues("cachestore", "ShallowCopy", "failure").Inc()
} else {
db.metrics.MethodCalls.WithLabelValues("cachestore", "ShallowCopy", "success").Inc()
}
return err
}

func (db *DB) triggerCacheEviction() {

var (
size = db.cacheObj.Size()
capc = db.cacheObj.Capacity()
)

db.metrics.CacheSize.Set(float64(size))

if size > capc {
db.events.Trigger(cacheOverCapacity)
}
}
Loading

0 comments on commit aa11e12

Please sign in to comment.