Skip to content

Commit

Permalink
fix: cache and stamper store (#4433)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 31, 2023
1 parent f780811 commit f32ff23
Show file tree
Hide file tree
Showing 14 changed files with 500 additions and 494 deletions.
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func NewBee(
syncErr.Store(err)
return nil, fmt.Errorf("unable to start batch service: %w", err)
} else {
err = post.SetExpired()
err = post.SetExpired(ctx)
if err != nil {
return nil, fmt.Errorf("unable to set expirations: %w", err)
}
Expand All @@ -811,7 +811,7 @@ func NewBee(
logger.Error(err, "unable to sync batches")
b.syncingStopped.Signal() // trigger shutdown in start.go
} else {
err = post.SetExpired()
err = post.SetExpired(ctx)
if err != nil {
logger.Error(err, "unable to set expirations")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/postage/batchstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ func (s *store) cleanup() error {
}

for _, b := range evictions {
s.logger.Debug("batch expired", "batch_id", hex.EncodeToString(b.ID))
err = s.evictFn(b.ID)
if err != nil {
return fmt.Errorf("evict batch %x: %w", b.ID, err)
}
err := s.store.Delete(valueKey(b.Value, b.ID))
if err != nil {
return fmt.Errorf("delete value key for batch %x: %w", b.ID, err)
Expand All @@ -301,10 +306,6 @@ func (s *store) cleanup() error {
if err != nil {
return fmt.Errorf("delete batch %x: %w", b.ID, err)
}
err = s.evictFn(b.ID)
if err != nil {
return fmt.Errorf("evict batch %x: %w", b.ID, err)
}
if s.batchExpiry != nil {
s.batchExpiry.HandleStampExpiry(b.ID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,5 @@ type BatchEventListener interface {

type BatchExpiryHandler interface {
HandleStampExpiry([]byte)
SetExpired() error
SetExpired(context.Context) error
}
3 changes: 2 additions & 1 deletion pkg/postage/mock/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mock

import (
"bytes"
"context"
"math/big"
"sync"

Expand Down Expand Up @@ -51,7 +52,7 @@ type mockPostage struct {
acceptAll bool
}

func (m *mockPostage) SetExpired() error {
func (m *mockPostage) SetExpired(ctx context.Context) error {
return nil
}

Expand Down
81 changes: 52 additions & 29 deletions pkg/postage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package postage

import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -193,7 +194,57 @@ func (ps *service) HandleStampExpiry(id []byte) {
}

// SetExpired removes all expired batches from the stamp issuers.
func (ps *service) SetExpired() error {
func (ps *service) SetExpired(ctx context.Context) error {

ps.logger.Debug("removing expired stamp data from stamperstore. this may take a while if the node has not been restarted in a while.")

deleteItemC := make(chan *StampItem)
go func() {
for item := range deleteItemC {
_ = ps.store.Delete(item)
}
}()

go func() {
count := 0
defer func() {
close(deleteItemC)
ps.logger.Debug("removed expired stamps", "count", count)
}()

err := ps.store.Iterate(
storage.Query{
Factory: func() storage.Item {
return new(StampItem)
},
}, func(result storage.Result) (bool, error) {
item := result.Entry.(*StampItem)
exists, err := ps.postageStore.Exists(item.BatchID)
if err != nil {
return false, fmt.Errorf("set expired: checking if batch exists for stamp item %s: %w", hex.EncodeToString(item.BatchID), err)
}
if !exists {
count++

select {
case deleteItemC <- item:
case <-ctx.Done():
return false, ctx.Err()
}

if count%100_000 == 0 {
ps.logger.Debug("still removing expired stamps from stamperstore", "count", count)
}
}

return false, nil
})

if err != nil {
ps.logger.Warning("removing expired stamp iterator failed", "error", err)
}
}()

ps.lock.Lock()
defer ps.lock.Unlock()

Expand All @@ -210,34 +261,6 @@ func (ps *service) SetExpired() error {
}
}

var deleteItems []*StampItem

err := ps.store.Iterate(
storage.Query{
Factory: func() storage.Item {
return new(StampItem)
},
}, func(result storage.Result) (bool, error) {
item := result.Entry.(*StampItem)
exists, err := ps.postageStore.Exists(item.BatchID)
if err != nil {
return false, fmt.Errorf("set expired: checking if batch exists for stamp item %s: %w", hex.EncodeToString(item.BatchID), err)
}
if !exists {
deleteItems = append(deleteItems, item)
}
return false, nil
})
if err != nil {
return err
}

for _, item := range deleteItems {
if err := ps.store.Delete(item); err != nil {
return fmt.Errorf("set expired: delete stamp for expired batch %s: %w", hex.EncodeToString(item.BatchID), err)
}
}

return ps.reload()
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/postage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package postage_test

import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"io"
Expand Down Expand Up @@ -243,7 +244,7 @@ func TestSetExpired(t *testing.T) {
t.Fatalf("expected %v, got %v", postage.ErrNotUsable, err)
}

err = ps.SetExpired()
err = ps.SetExpired(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
90 changes: 76 additions & 14 deletions pkg/storer/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,65 @@ import (
"context"
"errors"
"fmt"
"time"

storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/swarm"
)

const (
cacheAccessLockKey = "cachestoreAccess"
cacheOverCapacity = "cacheOverCapacity"
)

func (db *DB) cacheWorker(ctx context.Context) {

defer db.inFlight.Done()

overCapTrigger, overCapUnsub := db.events.Subscribe(cacheOverCapacity)
defer overCapUnsub()

db.triggerCacheEviction()

for {
select {
case <-ctx.Done():
return
case <-overCapTrigger:

var (
size = db.cacheObj.Size()
capc = db.cacheObj.Capacity()
)
if size <= capc {
continue
}

evict := min(1_000, (size - capc))

dur := captureDuration(time.Now())
err := db.Execute(ctx, func(s internal.Storage) error {
return db.cacheObj.RemoveOldest(ctx, s, s.ChunkStore(), evict)
})
db.metrics.MethodCallsDuration.WithLabelValues("cachestore", "RemoveOldest").Observe(dur())
if err != nil {
db.metrics.MethodCalls.WithLabelValues("cachestore", "RemoveOldest", "failure").Inc()
db.logger.Warning("cache eviction failure", "error", err)
} else {
db.logger.Debug("cache eviction finished", "evicted", evict, "duration_sec", dur())
db.metrics.MethodCalls.WithLabelValues("cachestore", "RemoveOldest", "success").Inc()
}
db.triggerCacheEviction()
case <-db.quit:
return
}
}
}

// Lookup is the implementation of the CacheStore.Lookup method.
func (db *DB) Lookup() storage.Getter {
return getterWithMetrics{
storage.GetterFunc(func(ctx context.Context, address swarm.Address) (swarm.Chunk, error) {
// the cacheObj resets its state on failures and expects the transaction
// rollback to undo all the updates, so we need a lock here to prevent
// concurrent access to the cacheObj.
db.lock.Lock(cacheAccessLockKey)
defer db.lock.Unlock(cacheAccessLockKey)

txnRepo, commit, rollback := db.repo.NewTx(ctx)
ch, err := db.cacheObj.Getter(txnRepo).Get(ctx, address)
switch {
Expand All @@ -50,21 +90,43 @@ func (db *DB) Lookup() storage.Getter {
func (db *DB) Cache() storage.Putter {
return putterWithMetrics{
storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error {
// the cacheObj resets its state on failures and expects the transaction
// rollback to undo all the updates, so we need a lock here to prevent
// concurrent access to the cacheObj.
db.lock.Lock(cacheAccessLockKey)
defer db.lock.Unlock(cacheAccessLockKey)

defer db.triggerCacheEviction()
txnRepo, commit, rollback := db.repo.NewTx(ctx)
err := db.cacheObj.Putter(txnRepo).Put(ctx, ch)
if err != nil {
return fmt.Errorf("cache.Put: %w", errors.Join(err, rollback()))
}
db.metrics.CacheSize.Set(float64(db.cacheObj.Size()))
return errors.Join(err, commit())
}),
db.metrics,
"cachestore",
}
}

// CacheShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore.
func (db *DB) CacheShallowCopy(ctx context.Context, store internal.Storage, addrs ...swarm.Address) error {
defer db.triggerCacheEviction()
dur := captureDuration(time.Now())
err := db.cacheObj.ShallowCopy(ctx, store, addrs...)
db.metrics.MethodCallsDuration.WithLabelValues("cachestore", "ShallowCopy").Observe(dur())
if err != nil {
err = fmt.Errorf("cache shallow copy: %w", err)
db.metrics.MethodCalls.WithLabelValues("cachestore", "ShallowCopy", "failure").Inc()
} else {
db.metrics.MethodCalls.WithLabelValues("cachestore", "ShallowCopy", "success").Inc()
}
return err
}

func (db *DB) triggerCacheEviction() {

var (
size = db.cacheObj.Size()
capc = db.cacheObj.Capacity()
)
db.metrics.CacheSize.Set(float64(size))

if size > capc {
db.events.Trigger(cacheOverCapacity)
}
}
21 changes: 19 additions & 2 deletions pkg/storer/cachestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/spinlock"
storage "github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storagetest"
chunktesting "github.com/ethersphere/bee/pkg/storage/testing"
Expand Down Expand Up @@ -99,6 +100,7 @@ func testCacheStore(t *testing.T, newStorer func() (*storer.DB, error)) {
lstore.SetRepoStorePutHook(nil)
// add chunks beyond capacity and verify the correct chunks are removed
// from the cache based on last access order

newChunks := chunktesting.GenerateTestRandomChunks(5)
putter := lstore.Cache()
for _, ch := range newChunks {
Expand All @@ -113,8 +115,23 @@ func testCacheStore(t *testing.T, newStorer func() (*storer.DB, error)) {
t.Fatal(err)
}

if info.Cache.Size != 10 {
t.Fatalf("unexpected cache size: want 10 have %d", info.Cache.Size)
info, err = lstore.DebugInfo(context.Background())
if err != nil {
t.Fatal(err)
}

err = spinlock.WaitWithInterval(time.Second*5, time.Second, func() bool {
info, err = lstore.DebugInfo(context.Background())
if err != nil {
t.Fatal(err)
}
if info.Cache.Size == 10 {
return true
}
return false
})
if err != nil {
t.Fatal(err)
}
})
}
Expand Down
Loading

0 comments on commit f32ff23

Please sign in to comment.