diff --git a/pkg/node/node.go b/pkg/node/node.go index 3abfeb264e8..847e2154d00 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -796,7 +796,7 @@ func NewBee( syncErr.Store(err) return nil, fmt.Errorf("unable to start batch service: %w", err) } else { - err = post.SetExpired() + err = post.SetExpired(ctx) if err != nil { return nil, fmt.Errorf("unable to set expirations: %w", err) } @@ -811,7 +811,7 @@ func NewBee( logger.Error(err, "unable to sync batches") b.syncingStopped.Signal() // trigger shutdown in start.go } else { - err = post.SetExpired() + err = post.SetExpired(ctx) if err != nil { logger.Error(err, "unable to set expirations") } diff --git a/pkg/postage/batchstore/store.go b/pkg/postage/batchstore/store.go index 99069292ea3..658f72dd3d1 100644 --- a/pkg/postage/batchstore/store.go +++ b/pkg/postage/batchstore/store.go @@ -293,6 +293,11 @@ func (s *store) cleanup() error { } for _, b := range evictions { + s.logger.Debug("batch expired", "batch_id", hex.EncodeToString(b.ID)) + err = s.evictFn(b.ID) + if err != nil { + return fmt.Errorf("evict batch %x: %w", b.ID, err) + } err := s.store.Delete(valueKey(b.Value, b.ID)) if err != nil { return fmt.Errorf("delete value key for batch %x: %w", b.ID, err) @@ -301,10 +306,6 @@ func (s *store) cleanup() error { if err != nil { return fmt.Errorf("delete batch %x: %w", b.ID, err) } - err = s.evictFn(b.ID) - if err != nil { - return fmt.Errorf("evict batch %x: %w", b.ID, err) - } if s.batchExpiry != nil { s.batchExpiry.HandleStampExpiry(b.ID) } diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 38660c13b48..3b61420939c 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -98,5 +98,5 @@ type BatchEventListener interface { type BatchExpiryHandler interface { HandleStampExpiry([]byte) - SetExpired() error + SetExpired(context.Context) error } diff --git a/pkg/postage/mock/service.go b/pkg/postage/mock/service.go index e137aaaac79..d47e5ddcaf3 100644 --- a/pkg/postage/mock/service.go +++ b/pkg/postage/mock/service.go @@ -6,6 +6,7 @@ package mock import ( "bytes" + "context" "math/big" "sync" @@ -51,7 +52,7 @@ type mockPostage struct { acceptAll bool } -func (m *mockPostage) SetExpired() error { +func (m *mockPostage) SetExpired(ctx context.Context) error { return nil } diff --git a/pkg/postage/service.go b/pkg/postage/service.go index ef63e521d80..5b625a590d0 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -6,6 +6,7 @@ package postage import ( "bytes" + "context" "encoding/hex" "errors" "fmt" @@ -193,7 +194,57 @@ func (ps *service) HandleStampExpiry(id []byte) { } // SetExpired removes all expired batches from the stamp issuers. -func (ps *service) SetExpired() error { +func (ps *service) SetExpired(ctx context.Context) error { + + ps.logger.Debug("removing expired stamp data from stamperstore. this may take a while if the node has not been restarted in a while.") + + deleteItemC := make(chan *StampItem) + go func() { + for item := range deleteItemC { + _ = ps.store.Delete(item) + } + }() + + go func() { + count := 0 + defer func() { + close(deleteItemC) + ps.logger.Debug("removed expired stamps", "count", count) + }() + + err := ps.store.Iterate( + storage.Query{ + Factory: func() storage.Item { + return new(StampItem) + }, + }, func(result storage.Result) (bool, error) { + item := result.Entry.(*StampItem) + exists, err := ps.postageStore.Exists(item.BatchID) + if err != nil { + return false, fmt.Errorf("set expired: checking if batch exists for stamp item %s: %w", hex.EncodeToString(item.BatchID), err) + } + if !exists { + count++ + + select { + case deleteItemC <- item: + case <-ctx.Done(): + return false, ctx.Err() + } + + if count%100_000 == 0 { + ps.logger.Debug("still removing expired stamps from stamperstore", "count", count) + } + } + + return false, nil + }) + + if err != nil { + ps.logger.Warning("removing expired stamp iterator failed", "error", err) + } + }() + ps.lock.Lock() defer ps.lock.Unlock() @@ -210,34 +261,6 @@ func (ps *service) SetExpired() error { } } - var deleteItems []*StampItem - - err := ps.store.Iterate( - storage.Query{ - Factory: func() storage.Item { - return new(StampItem) - }, - }, func(result storage.Result) (bool, error) { - item := result.Entry.(*StampItem) - exists, err := ps.postageStore.Exists(item.BatchID) - if err != nil { - return false, fmt.Errorf("set expired: checking if batch exists for stamp item %s: %w", hex.EncodeToString(item.BatchID), err) - } - if !exists { - deleteItems = append(deleteItems, item) - } - return false, nil - }) - if err != nil { - return err - } - - for _, item := range deleteItems { - if err := ps.store.Delete(item); err != nil { - return fmt.Errorf("set expired: delete stamp for expired batch %s: %w", hex.EncodeToString(item.BatchID), err) - } - } - return ps.reload() } diff --git a/pkg/postage/service_test.go b/pkg/postage/service_test.go index 6c32b95f582..7056b204e83 100644 --- a/pkg/postage/service_test.go +++ b/pkg/postage/service_test.go @@ -6,6 +6,7 @@ package postage_test import ( "bytes" + "context" crand "crypto/rand" "errors" "io" @@ -243,7 +244,7 @@ func TestSetExpired(t *testing.T) { t.Fatalf("expected %v, got %v", postage.ErrNotUsable, err) } - err = ps.SetExpired() + err = ps.SetExpired(context.Background()) if err != nil { t.Fatal(err) } diff --git a/pkg/storer/cachestore.go b/pkg/storer/cachestore.go index 8aa112eaca6..b3d89547de7 100644 --- a/pkg/storer/cachestore.go +++ b/pkg/storer/cachestore.go @@ -8,25 +8,65 @@ import ( "context" "errors" "fmt" + "time" storage "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal" "github.com/ethersphere/bee/pkg/swarm" ) const ( - cacheAccessLockKey = "cachestoreAccess" + cacheOverCapacity = "cacheOverCapacity" ) +func (db *DB) cacheWorker(ctx context.Context) { + + defer db.inFlight.Done() + + overCapTrigger, overCapUnsub := db.events.Subscribe(cacheOverCapacity) + defer overCapUnsub() + + db.triggerCacheEviction() + + for { + select { + case <-ctx.Done(): + return + case <-overCapTrigger: + + var ( + size = db.cacheObj.Size() + capc = db.cacheObj.Capacity() + ) + if size <= capc { + continue + } + + evict := min(1_000, (size - capc)) + + dur := captureDuration(time.Now()) + err := db.Execute(ctx, func(s internal.Storage) error { + return db.cacheObj.RemoveOldest(ctx, s, s.ChunkStore(), evict) + }) + db.metrics.MethodCallsDuration.WithLabelValues("cachestore", "RemoveOldest").Observe(dur()) + if err != nil { + db.metrics.MethodCalls.WithLabelValues("cachestore", "RemoveOldest", "failure").Inc() + db.logger.Warning("cache eviction failure", "error", err) + } else { + db.logger.Debug("cache eviction finished", "evicted", evict, "duration_sec", dur()) + db.metrics.MethodCalls.WithLabelValues("cachestore", "RemoveOldest", "success").Inc() + } + db.triggerCacheEviction() + case <-db.quit: + return + } + } +} + // Lookup is the implementation of the CacheStore.Lookup method. func (db *DB) Lookup() storage.Getter { return getterWithMetrics{ storage.GetterFunc(func(ctx context.Context, address swarm.Address) (swarm.Chunk, error) { - // the cacheObj resets its state on failures and expects the transaction - // rollback to undo all the updates, so we need a lock here to prevent - // concurrent access to the cacheObj. - db.lock.Lock(cacheAccessLockKey) - defer db.lock.Unlock(cacheAccessLockKey) - txnRepo, commit, rollback := db.repo.NewTx(ctx) ch, err := db.cacheObj.Getter(txnRepo).Get(ctx, address) switch { @@ -50,21 +90,43 @@ func (db *DB) Lookup() storage.Getter { func (db *DB) Cache() storage.Putter { return putterWithMetrics{ storage.PutterFunc(func(ctx context.Context, ch swarm.Chunk) error { - // the cacheObj resets its state on failures and expects the transaction - // rollback to undo all the updates, so we need a lock here to prevent - // concurrent access to the cacheObj. - db.lock.Lock(cacheAccessLockKey) - defer db.lock.Unlock(cacheAccessLockKey) - + defer db.triggerCacheEviction() txnRepo, commit, rollback := db.repo.NewTx(ctx) err := db.cacheObj.Putter(txnRepo).Put(ctx, ch) if err != nil { return fmt.Errorf("cache.Put: %w", errors.Join(err, rollback())) } - db.metrics.CacheSize.Set(float64(db.cacheObj.Size())) return errors.Join(err, commit()) }), db.metrics, "cachestore", } } + +// CacheShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore. +func (db *DB) CacheShallowCopy(ctx context.Context, store internal.Storage, addrs ...swarm.Address) error { + defer db.triggerCacheEviction() + dur := captureDuration(time.Now()) + err := db.cacheObj.ShallowCopy(ctx, store, addrs...) + db.metrics.MethodCallsDuration.WithLabelValues("cachestore", "ShallowCopy").Observe(dur()) + if err != nil { + err = fmt.Errorf("cache shallow copy: %w", err) + db.metrics.MethodCalls.WithLabelValues("cachestore", "ShallowCopy", "failure").Inc() + } else { + db.metrics.MethodCalls.WithLabelValues("cachestore", "ShallowCopy", "success").Inc() + } + return err +} + +func (db *DB) triggerCacheEviction() { + + var ( + size = db.cacheObj.Size() + capc = db.cacheObj.Capacity() + ) + db.metrics.CacheSize.Set(float64(size)) + + if size > capc { + db.events.Trigger(cacheOverCapacity) + } +} diff --git a/pkg/storer/cachestore_test.go b/pkg/storer/cachestore_test.go index b479d944ca6..dc2264491e7 100644 --- a/pkg/storer/cachestore_test.go +++ b/pkg/storer/cachestore_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/ethersphere/bee/pkg/spinlock" storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storage/storagetest" chunktesting "github.com/ethersphere/bee/pkg/storage/testing" @@ -99,6 +100,7 @@ func testCacheStore(t *testing.T, newStorer func() (*storer.DB, error)) { lstore.SetRepoStorePutHook(nil) // add chunks beyond capacity and verify the correct chunks are removed // from the cache based on last access order + newChunks := chunktesting.GenerateTestRandomChunks(5) putter := lstore.Cache() for _, ch := range newChunks { @@ -113,8 +115,23 @@ func testCacheStore(t *testing.T, newStorer func() (*storer.DB, error)) { t.Fatal(err) } - if info.Cache.Size != 10 { - t.Fatalf("unexpected cache size: want 10 have %d", info.Cache.Size) + info, err = lstore.DebugInfo(context.Background()) + if err != nil { + t.Fatal(err) + } + + err = spinlock.WaitWithInterval(time.Second*5, time.Second, func() bool { + info, err = lstore.DebugInfo(context.Background()) + if err != nil { + t.Fatal(err) + } + if info.Cache.Size == 10 { + return true + } + return false + }) + if err != nil { + t.Fatal(err) } }) } diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index 53e40d6d821..354219cbea7 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -27,123 +27,15 @@ const cacheEntrySize = swarm.HashSize + 8 var _ storage.Item = (*cacheEntry)(nil) -var CacheEvictionBatchSize = 1000 - var ( errMarshalCacheEntryInvalidAddress = errors.New("marshal cacheEntry: invalid address") errMarshalCacheEntryInvalidTimestamp = errors.New("marshal cacheEntry: invalid timestamp") errUnmarshalCacheEntryInvalidSize = errors.New("unmarshal cacheEntry: invalid size") ) -type cacheEntry struct { - Address swarm.Address - AccessTimestamp int64 -} - -func (c *cacheEntry) ID() string { return c.Address.ByteString() } - -func (cacheEntry) Namespace() string { return "cacheEntry" } - -func (c *cacheEntry) Marshal() ([]byte, error) { - entryBuf := make([]byte, cacheEntrySize) - if c.Address.IsZero() { - return nil, errMarshalCacheEntryInvalidAddress - } - if c.AccessTimestamp <= 0 { - return nil, errMarshalCacheEntryInvalidTimestamp - } - copy(entryBuf[:swarm.HashSize], c.Address.Bytes()) - binary.LittleEndian.PutUint64(entryBuf[swarm.HashSize:], uint64(c.AccessTimestamp)) - return entryBuf, nil -} - -func (c *cacheEntry) Unmarshal(buf []byte) error { - if len(buf) != cacheEntrySize { - return errUnmarshalCacheEntryInvalidSize - } - newEntry := new(cacheEntry) - newEntry.Address = swarm.NewAddress(append(make([]byte, 0, swarm.HashSize), buf[:swarm.HashSize]...)) - newEntry.AccessTimestamp = int64(binary.LittleEndian.Uint64(buf[swarm.HashSize:])) - *c = *newEntry - return nil -} - -func (c *cacheEntry) Clone() storage.Item { - if c == nil { - return nil - } - return &cacheEntry{ - Address: c.Address.Clone(), - AccessTimestamp: c.AccessTimestamp, - } -} - -func (c cacheEntry) String() string { - return fmt.Sprintf( - "cacheEntry { Address: %s AccessTimestamp: %s }", - c.Address, - time.Unix(c.AccessTimestamp, 0).UTC().Format(time.RFC3339), - ) -} - -var _ storage.Item = (*cacheOrderIndex)(nil) - -type cacheOrderIndex struct { - AccessTimestamp int64 - Address swarm.Address -} - -func keyFromID(ts int64, addr swarm.Address) string { - tsStr := fmt.Sprintf("%d", ts) - return tsStr + addr.ByteString() -} - -func idFromKey(key string) (int64, swarm.Address, error) { - ts := key[:len(key)-swarm.HashSize] - addr := key[len(key)-swarm.HashSize:] - n, err := strconv.ParseInt(ts, 10, 64) - if err != nil { - return 0, swarm.ZeroAddress, err - } - return n, swarm.NewAddress([]byte(addr)), nil -} - -func (c *cacheOrderIndex) ID() string { - return keyFromID(c.AccessTimestamp, c.Address) -} - -func (cacheOrderIndex) Namespace() string { return "cacheOrderIndex" } - -func (cacheOrderIndex) Marshal() ([]byte, error) { - return nil, nil -} - -func (cacheOrderIndex) Unmarshal(_ []byte) error { - return nil -} - -func (c *cacheOrderIndex) Clone() storage.Item { - if c == nil { - return nil - } - return &cacheOrderIndex{ - AccessTimestamp: c.AccessTimestamp, - Address: c.Address.Clone(), - } -} - -func (c cacheOrderIndex) String() string { - return fmt.Sprintf( - "cacheOrderIndex { AccessTimestamp: %d Address: %s }", - c.AccessTimestamp, - c.Address.ByteString(), - ) -} - // Cache is the part of the localstore which keeps track of the chunks that are not // part of the reserve but are potentially useful to store for obtaining bandwidth -// incentives. In order to avoid GC we will only keep track of a fixed no. of chunks -// as part of the cache and evict a chunk as soon as we go above capacity. +// incentives. type Cache struct { size atomic.Int64 capacity int @@ -158,84 +50,12 @@ func New(ctx context.Context, store internal.Storage, capacity uint64) (*Cache, return nil, fmt.Errorf("failed counting cache entries: %w", err) } - if count > int(capacity) { - err := removeOldest( - ctx, - store.IndexStore(), - store.IndexStore(), - store.ChunkStore(), - count-int(capacity), - ) - if err != nil { - return nil, fmt.Errorf("failed removing oldest cache entries: %w", err) - } - count = int(capacity) - } - c := &Cache{capacity: int(capacity)} c.size.Store(int64(count)) return c, nil } -// removeOldest removes the oldest cache entries from the store. The count -// specifies the number of entries to remove. -func removeOldest( - ctx context.Context, - store storage.Reader, - writer storage.Writer, - chStore storage.ChunkStore, - count int, -) error { - if count <= 0 { - return nil - } - - evictItems := make([]*cacheEntry, 0, count) - err := store.Iterate( - storage.Query{ - Factory: func() storage.Item { return &cacheOrderIndex{} }, - ItemProperty: storage.QueryItemID, - }, - func(res storage.Result) (bool, error) { - accessTime, addr, err := idFromKey(res.ID) - if err != nil { - return false, fmt.Errorf("failed to parse cache order index %s: %w", res.ID, err) - } - entry := &cacheEntry{ - Address: addr, - AccessTimestamp: accessTime, - } - evictItems = append(evictItems, entry) - count-- - return count == 0, nil - }, - ) - if err != nil { - return fmt.Errorf("failed iterating over cache order index: %w", err) - } - - for _, entry := range evictItems { - err = writer.Delete(entry) - if err != nil { - return fmt.Errorf("failed deleting cache entry %s: %w", entry, err) - } - err = writer.Delete(&cacheOrderIndex{ - Address: entry.Address, - AccessTimestamp: entry.AccessTimestamp, - }) - if err != nil { - return fmt.Errorf("failed deleting cache order index %s: %w", entry.Address, err) - } - err = chStore.Delete(ctx, entry.Address) - if err != nil { - return fmt.Errorf("failed deleting chunk %s from chunkstore: %w", entry.Address, err) - } - } - - return nil -} - // Size returns the current size of the cache. func (c *Cache) Size() uint64 { return uint64(c.size.Load()) @@ -248,6 +68,7 @@ func (c *Cache) Capacity() uint64 { return uint64(c.capacity) } // chunkstore and also adds a Cache entry for the chunk. func (c *Cache) Putter(store internal.Storage) storage.Putter { return storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { + newEntry := &cacheEntry{Address: chunk.Address()} found, err := store.IndexStore().Has(newEntry) if err != nil { @@ -278,21 +99,6 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter { return fmt.Errorf("failed adding cache order index: %w", err) } - evicted := false - if c.size.Load() >= int64(c.capacity) { - evicted = true - err := removeOldest( - ctx, - store.IndexStore(), - batch, - store.ChunkStore(), - CacheEvictionBatchSize, - ) - if err != nil { - return fmt.Errorf("failed removing oldest cache entries: %w", err) - } - } - if err := batch.Commit(); err != nil { return fmt.Errorf("batch commit: %w", err) } @@ -302,9 +108,6 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter { return fmt.Errorf("failed adding chunk to chunkstore: %w", err) } - if evicted { - c.size.Add(-int64(CacheEvictionBatchSize)) - } c.size.Add(1) return nil @@ -317,6 +120,7 @@ func (c *Cache) Putter(store internal.Storage) storage.Putter { // of this getter to rollback the operation. func (c *Cache) Getter(store internal.Storage) storage.Getter { return storage.GetterFunc(func(ctx context.Context, address swarm.Address) (swarm.Chunk, error) { + ch, err := store.ChunkStore().Get(ctx, address) if err != nil { return nil, err @@ -369,10 +173,8 @@ func (c *Cache) Getter(store internal.Storage) storage.Getter { }) } -// MoveFromReserve moves the chunks from the reserve to the cache. This is -// called when the reserve is full and we need to perform eviction. -// It avoids the need to delete the chunk and re-add it to the cache. -func (c *Cache) MoveFromReserve( +// ShallowCopy creates cache entries with the expectation that the chunk already exists in the chunkstore. +func (c *Cache) ShallowCopy( ctx context.Context, store internal.Storage, addrs ...swarm.Address, @@ -386,18 +188,20 @@ func (c *Cache) MoveFromReserve( } }() - batch, err := store.IndexStore().Batch(ctx) - if err != nil { - return fmt.Errorf("failed creating batch: %w", err) + //consider only the amount that can fit, the rest should be deleted from the chunkstore. + if len(addrs) > c.capacity { + for _, addr := range addrs[:len(addrs)-c.capacity] { + _ = store.ChunkStore().Delete(ctx, addr) + } + addrs = addrs[len(addrs)-c.capacity:] } entriesToAdd := make([]*cacheEntry, 0, len(addrs)) for _, addr := range addrs { - entry := &cacheEntry{Address: addr} + entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()} if has, err := store.IndexStore().Has(entry); err == nil && has { continue } - entry.AccessTimestamp = now().UnixNano() entriesToAdd = append(entriesToAdd, entry) } @@ -405,30 +209,9 @@ func (c *Cache) MoveFromReserve( return nil } - //consider only the amount that can fit, the rest should be deleted from the chunkstore. - if len(entriesToAdd) > c.capacity { - for _, e := range entriesToAdd[:len(entriesToAdd)-c.capacity] { - _ = store.ChunkStore().Delete(ctx, e.Address) - } - entriesToAdd = entriesToAdd[len(entriesToAdd)-c.capacity:] - } - - var entriesToRemove int - if c.size.Load()+int64(len(entriesToAdd)) > int64(c.capacity) { - entriesToRemove = int(c.size.Load() + int64(len(entriesToAdd)) - int64(c.capacity)) - } - - if entriesToRemove > 0 { - err := removeOldest( - ctx, - store.IndexStore(), - batch, - store.ChunkStore(), - entriesToRemove, - ) - if err != nil { - return fmt.Errorf("failed removing oldest cache entries: %w", err) - } + batch, err := store.IndexStore().Batch(ctx) + if err != nil { + return fmt.Errorf("failed creating batch: %w", err) } for _, entry := range entriesToAdd { @@ -449,7 +232,190 @@ func (c *Cache) MoveFromReserve( return fmt.Errorf("batch commit: %w", err) } - c.size.Add(int64(len(entriesToAdd)) - int64(entriesToRemove)) + c.size.Add(int64(len(entriesToAdd))) + + return nil +} + +// RemoveOldest removes the oldest cache entries from the store. The count +// specifies the number of entries to remove. +func (c *Cache) RemoveOldest( + ctx context.Context, + store internal.Storage, + chStore storage.ChunkStore, + count uint64, +) error { + if count <= 0 { + return nil + } + + evictItems := make([]*cacheEntry, 0, count) + err := store.IndexStore().Iterate( + storage.Query{ + Factory: func() storage.Item { return &cacheOrderIndex{} }, + ItemProperty: storage.QueryItemID, + }, + func(res storage.Result) (bool, error) { + accessTime, addr, err := idFromKey(res.ID) + if err != nil { + return false, fmt.Errorf("failed to parse cache order index %s: %w", res.ID, err) + } + entry := &cacheEntry{ + Address: addr, + AccessTimestamp: accessTime, + } + evictItems = append(evictItems, entry) + count-- + return count == 0, nil + }, + ) + if err != nil { + return fmt.Errorf("failed iterating over cache order index: %w", err) + } + + batchCnt := 1_000 + + for i := 0; i < len(evictItems); i += batchCnt { + end := i + batchCnt + if end > len(evictItems) { + end = len(evictItems) + } + + batch, err := store.IndexStore().Batch(ctx) + if err != nil { + return fmt.Errorf("failed creating batch: %w", err) + } + + for _, entry := range evictItems[i:end] { + err = batch.Delete(entry) + if err != nil { + return fmt.Errorf("failed deleting cache entry %s: %w", entry, err) + } + err = batch.Delete(&cacheOrderIndex{ + Address: entry.Address, + AccessTimestamp: entry.AccessTimestamp, + }) + if err != nil { + return fmt.Errorf("failed deleting cache order index %s: %w", entry.Address, err) + } + err = chStore.Delete(ctx, entry.Address) + if err != nil { + return fmt.Errorf("failed deleting chunk %s from chunkstore: %w", entry.Address, err) + } + } + + err = batch.Commit() + if err != nil { + return err + } + + c.size.Add(-int64(len(evictItems))) + } + + return nil +} + +type cacheEntry struct { + Address swarm.Address + AccessTimestamp int64 +} + +func (c *cacheEntry) ID() string { return c.Address.ByteString() } + +func (cacheEntry) Namespace() string { return "cacheEntry" } + +func (c *cacheEntry) Marshal() ([]byte, error) { + entryBuf := make([]byte, cacheEntrySize) + if c.Address.IsZero() { + return nil, errMarshalCacheEntryInvalidAddress + } + if c.AccessTimestamp <= 0 { + return nil, errMarshalCacheEntryInvalidTimestamp + } + copy(entryBuf[:swarm.HashSize], c.Address.Bytes()) + binary.LittleEndian.PutUint64(entryBuf[swarm.HashSize:], uint64(c.AccessTimestamp)) + return entryBuf, nil +} + +func (c *cacheEntry) Unmarshal(buf []byte) error { + if len(buf) != cacheEntrySize { + return errUnmarshalCacheEntryInvalidSize + } + newEntry := new(cacheEntry) + newEntry.Address = swarm.NewAddress(append(make([]byte, 0, swarm.HashSize), buf[:swarm.HashSize]...)) + newEntry.AccessTimestamp = int64(binary.LittleEndian.Uint64(buf[swarm.HashSize:])) + *c = *newEntry + return nil +} +func (c *cacheEntry) Clone() storage.Item { + if c == nil { + return nil + } + return &cacheEntry{ + Address: c.Address.Clone(), + AccessTimestamp: c.AccessTimestamp, + } +} + +func (c cacheEntry) String() string { + return fmt.Sprintf( + "cacheEntry { Address: %s AccessTimestamp: %s }", + c.Address, + time.Unix(c.AccessTimestamp, 0).UTC().Format(time.RFC3339), + ) +} + +var _ storage.Item = (*cacheOrderIndex)(nil) + +type cacheOrderIndex struct { + AccessTimestamp int64 + Address swarm.Address +} + +func keyFromID(ts int64, addr swarm.Address) string { + tsStr := fmt.Sprintf("%d", ts) + return tsStr + addr.ByteString() +} + +func idFromKey(key string) (int64, swarm.Address, error) { + ts := key[:len(key)-swarm.HashSize] + addr := key[len(key)-swarm.HashSize:] + n, err := strconv.ParseInt(ts, 10, 64) + if err != nil { + return 0, swarm.ZeroAddress, err + } + return n, swarm.NewAddress([]byte(addr)), nil +} + +func (c *cacheOrderIndex) ID() string { + return keyFromID(c.AccessTimestamp, c.Address) +} + +func (cacheOrderIndex) Namespace() string { return "cacheOrderIndex" } + +func (cacheOrderIndex) Marshal() ([]byte, error) { + return nil, nil +} + +func (cacheOrderIndex) Unmarshal(_ []byte) error { return nil } + +func (c *cacheOrderIndex) Clone() storage.Item { + if c == nil { + return nil + } + return &cacheOrderIndex{ + AccessTimestamp: c.AccessTimestamp, + Address: c.Address.Clone(), + } +} + +func (c cacheOrderIndex) String() string { + return fmt.Sprintf( + "cacheOrderIndex { AccessTimestamp: %d Address: %s }", + c.AccessTimestamp, + c.Address.ByteString(), + ) +} diff --git a/pkg/storer/internal/cache/cache_test.go b/pkg/storer/internal/cache/cache_test.go index e43d47c3d1b..6450dc71198 100644 --- a/pkg/storer/internal/cache/cache_test.go +++ b/pkg/storer/internal/cache/cache_test.go @@ -158,12 +158,9 @@ func (t *timeProvider) Now() func() time.Time { func TestMain(m *testing.M) { p := &timeProvider{t: time.Now().UnixNano()} done := cache.ReplaceTimeNow(p.Now()) - old := cache.CacheEvictionBatchSize defer func() { done() - cache.CacheEvictionBatchSize = old }() - cache.CacheEvictionBatchSize = 1 code := m.Run() os.Exit(code) } @@ -212,35 +209,6 @@ func TestCache(t *testing.T) { verifyCacheState(t, st.IndexStore(), c2, chunks[0].Address(), chunks[len(chunks)-1].Address(), uint64(len(chunks))) verifyCacheOrder(t, c2, st.IndexStore(), chunks...) }) - - chunks2 := chunktest.GenerateTestRandomChunks(10) - - t.Run("add over capacity", func(t *testing.T) { - for idx, ch := range chunks2 { - err := c.Putter(st).Put(context.TODO(), ch) - if err != nil { - t.Fatal(err) - } - if idx == len(chunks)-1 { - verifyCacheState(t, st.IndexStore(), c, chunks2[0].Address(), chunks2[idx].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), chunks2...) - } else { - verifyCacheState(t, st.IndexStore(), c, chunks[idx+1].Address(), chunks2[idx].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), append(chunks[idx+1:], chunks2[:idx+1]...)...) - } - } - verifyChunksDeleted(t, st.ChunkStore(), chunks...) - }) - - t.Run("new with lower capacity", func(t *testing.T) { - c2, err := cache.New(context.TODO(), st, 5) - if err != nil { - t.Fatal(err) - } - verifyCacheState(t, st.IndexStore(), c2, chunks2[5].Address(), chunks2[len(chunks)-1].Address(), 5) - verifyCacheOrder(t, c2, st.IndexStore(), chunks2[5:]...) - verifyChunksDeleted(t, st.ChunkStore(), chunks[:5]...) - }) }) t.Run("getter", func(t *testing.T) { @@ -379,7 +347,7 @@ func TestCache(t *testing.T) { }) } -func TestMoveFromReserve(t *testing.T) { +func TestShallowCopy(t *testing.T) { t.Parallel() st := newTestStorage(t) @@ -388,109 +356,103 @@ func TestMoveFromReserve(t *testing.T) { t.Fatal(err) } - t.Run("move from reserve", func(t *testing.T) { - chunks := chunktest.GenerateTestRandomChunks(10) - chunksToMove := make([]swarm.Address, 0, 10) + chunks := chunktest.GenerateTestRandomChunks(10) + chunksToMove := make([]swarm.Address, 0, 10) - // add the chunks to chunkstore. This simulates the reserve already populating - // the chunkstore with chunks. - for _, ch := range chunks { - err := st.ChunkStore().Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - chunksToMove = append(chunksToMove, ch.Address()) - } - - err = c.MoveFromReserve(context.Background(), st, chunksToMove...) + // add the chunks to chunkstore. This simulates the reserve already populating + // the chunkstore with chunks. + for _, ch := range chunks { + err := st.ChunkStore().Put(context.Background(), ch) if err != nil { t.Fatal(err) } + chunksToMove = append(chunksToMove, ch.Address()) + } - verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), chunks...) + err = c.ShallowCopy(context.Background(), st, chunksToMove...) + if err != nil { + t.Fatal(err) + } - // move again, should be no-op - err = c.MoveFromReserve(context.Background(), st, chunksToMove...) - if err != nil { - t.Fatal(err) - } + verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10) + verifyCacheOrder(t, c, st.IndexStore(), chunks...) - verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), chunks...) - }) + // move again, should be no-op + err = c.ShallowCopy(context.Background(), st, chunksToMove...) + if err != nil { + t.Fatal(err) + } - t.Run("move from reserve new chunks", func(t *testing.T) { - chunks := chunktest.GenerateTestRandomChunks(10) - chunksToMove := make([]swarm.Address, 0, 10) + verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10) + verifyCacheOrder(t, c, st.IndexStore(), chunks...) - // add the chunks to chunkstore. This simulates the reserve already populating - // the chunkstore with chunks. - for _, ch := range chunks { - err := st.ChunkStore().Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - chunksToMove = append(chunksToMove, ch.Address()) - } + chunks1 := chunktest.GenerateTestRandomChunks(10) + chunksToMove1 := make([]swarm.Address, 0, 10) - // move new chunks - err = c.MoveFromReserve(context.Background(), st, chunksToMove...) + // add the chunks to chunkstore. This simulates the reserve already populating + // the chunkstore with chunks. + for _, ch := range chunks1 { + err := st.ChunkStore().Put(context.Background(), ch) if err != nil { t.Fatal(err) } + chunksToMove1 = append(chunksToMove1, ch.Address()) + } - verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks[9].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), chunks...) - - chunks2 := chunktest.GenerateTestRandomChunks(5) - chunksToMove2 := make([]swarm.Address, 0, 5) + // move new chunks + err = c.ShallowCopy(context.Background(), st, chunksToMove1...) + if err != nil { + t.Fatal(err) + } - // add the chunks to chunkstore. This simulates the reserve already populating - // the chunkstore with chunks. - for _, ch := range chunks2 { - err := st.ChunkStore().Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - chunksToMove2 = append(chunksToMove2, ch.Address()) - } + verifyCacheState(t, st.IndexStore(), c, chunks[0].Address(), chunks1[9].Address(), 20) + verifyCacheOrder(t, c, st.IndexStore(), append(chunks, chunks1...)...) - // move new chunks - err = c.MoveFromReserve(context.Background(), st, chunksToMove2...) - if err != nil { - t.Fatal(err) - } + err = c.RemoveOldest(context.Background(), st, st.ChunkStore(), 10) + if err != nil { + t.Fatal(err) + } - cacheChunks := append(chunks[5:], chunks2...) + verifyChunksDeleted(t, st.ChunkStore(), chunks...) +} - verifyCacheState(t, st.IndexStore(), c, cacheChunks[0].Address(), cacheChunks[9].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), cacheChunks...) - }) +func TestShallowCopyOverCap(t *testing.T) { + t.Parallel() - t.Run("move from reserve over capacity", func(t *testing.T) { - chunks := chunktest.GenerateTestRandomChunks(15) - chunksToMove := make([]swarm.Address, 0, 15) + st := newTestStorage(t) + c, err := cache.New(context.Background(), st, 10) + if err != nil { + t.Fatal(err) + } - // add the chunks to chunkstore. This simulates the reserve already populating - // the chunkstore with chunks. - for _, ch := range chunks { - err := st.ChunkStore().Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - chunksToMove = append(chunksToMove, ch.Address()) - } + chunks := chunktest.GenerateTestRandomChunks(15) + chunksToMove := make([]swarm.Address, 0, 15) - // move new chunks - err = c.MoveFromReserve(context.Background(), st, chunksToMove...) + // add the chunks to chunkstore. This simulates the reserve already populating + // the chunkstore with chunks. + for _, ch := range chunks { + err := st.ChunkStore().Put(context.Background(), ch) if err != nil { t.Fatal(err) } + chunksToMove = append(chunksToMove, ch.Address()) + } - verifyCacheState(t, st.IndexStore(), c, chunks[5].Address(), chunks[14].Address(), 10) - verifyCacheOrder(t, c, st.IndexStore(), chunks[5:15]...) - }) + // move new chunks + err = c.ShallowCopy(context.Background(), st, chunksToMove...) + if err != nil { + t.Fatal(err) + } + + verifyCacheState(t, st.IndexStore(), c, chunks[5].Address(), chunks[14].Address(), 10) + verifyCacheOrder(t, c, st.IndexStore(), chunks[5:15]...) + + err = c.RemoveOldest(context.Background(), st, st.ChunkStore(), 5) + if err != nil { + t.Fatal(err) + } + + verifyChunksDeleted(t, st.ChunkStore(), chunks[5:10]...) } func verifyCacheState( diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 4e04df245ae..b48f5075e8c 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -336,7 +336,7 @@ func (r *Reserve) EvictBatchBin( return 0, err } - batchCnt := 1000 + batchCnt := 1_000 evictionCompleted := 0 for i := 0; i < len(evicted); i += batchCnt { @@ -364,14 +364,9 @@ func (r *Reserve) EvictBatchBin( return err } - go func(addrs []swarm.Address) { - _ = txExecutor.Execute(ctx, func(store internal.Storage) error { - if err := r.cacheCb(ctx, store, addrs...); err != nil { - r.logger.Error(err, "evict and move to cache") - } - return nil - }) - }(moveToCache) + if err := r.cacheCb(ctx, store, moveToCache...); err != nil { + r.logger.Error(err, "evict and move to cache") + } return nil }) @@ -411,20 +406,6 @@ func (r *Reserve) DeleteChunk( return nil } -// CleanupBinIndex removes the bin index entry for the chunk. This is called mainly -// to cleanup the bin index if other indexes are missing during reserve cleanup. -func (r *Reserve) CleanupBinIndex( - ctx context.Context, - batch storage.Batch, - chunkAddress swarm.Address, - binID uint64, -) { - _ = batch.Delete(&ChunkBinItem{ - Bin: swarm.Proximity(r.baseAddr.Bytes(), chunkAddress.Bytes()), - BinID: binID, - }) -} - func removeChunk( ctx context.Context, store internal.Storage, diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index d1abdbfd10b..01762ae8bd2 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -196,47 +196,6 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) { } } -func (db *DB) getExpiredBatches() ([][]byte, error) { - var batchesToEvict [][]byte - err := db.repo.IndexStore().Iterate(storage.Query{ - Factory: func() storage.Item { return new(expiredBatchItem) }, - ItemProperty: storage.QueryItemID, - }, func(result storage.Result) (bool, error) { - batchesToEvict = append(batchesToEvict, []byte(result.ID)) - return false, nil - }) - if err != nil { - return nil, err - } - return batchesToEvict, nil -} - -func (db *DB) evictExpiredBatches(ctx context.Context) error { - - batches, err := db.getExpiredBatches() - if err != nil { - return err - } - - for _, batchID := range batches { - evicted, err := db.evictBatch(ctx, batchID, swarm.MaxBins) - if err != nil { - return err - } - if evicted > 0 { - db.logger.Debug("evicted expired batch", "batch_id", hex.EncodeToString(batchID), "total_evicted", evicted) - } - err = db.Execute(ctx, func(tx internal.Storage) error { - return tx.IndexStore().Delete(&expiredBatchItem{BatchID: batchID}) - }) - if err != nil { - return err - } - } - - return nil -} - func (db *DB) evictionWorker(ctx context.Context) { defer db.inFlight.Done() @@ -272,6 +231,65 @@ func (db *DB) evictionWorker(ctx context.Context) { } } +func (db *DB) evictExpiredBatches(ctx context.Context) error { + + batches, err := db.getExpiredBatches() + if err != nil { + return err + } + + for _, batchID := range batches { + evicted, err := db.evictBatch(ctx, batchID, swarm.MaxBins) + if err != nil { + return err + } + if evicted > 0 { + db.logger.Debug("evicted expired batch", "batch_id", hex.EncodeToString(batchID), "total_evicted", evicted) + } + err = db.Execute(ctx, func(tx internal.Storage) error { + return tx.IndexStore().Delete(&expiredBatchItem{BatchID: batchID}) + }) + if err != nil { + return err + } + } + + return nil +} + +func (db *DB) getExpiredBatches() ([][]byte, error) { + var batchesToEvict [][]byte + err := db.repo.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(expiredBatchItem) }, + ItemProperty: storage.QueryItemID, + }, func(result storage.Result) (bool, error) { + batchesToEvict = append(batchesToEvict, []byte(result.ID)) + return false, nil + }) + if err != nil { + return nil, err + } + return batchesToEvict, nil +} + +// EvictBatch evicts all chunks belonging to a batch from the reserve. +func (db *DB) EvictBatch(ctx context.Context, batchID []byte) error { + if db.reserve == nil { + // if reserve is not configured, do nothing + return nil + } + + err := db.Execute(ctx, func(tx internal.Storage) error { + return tx.IndexStore().Put(&expiredBatchItem{BatchID: batchID}) + }) + if err != nil { + return fmt.Errorf("save expired batch: %w", err) + } + + db.events.Trigger(batchExpiry) + return nil +} + func (db *DB) ReserveGet(ctx context.Context, addr swarm.Address, batchID []byte) (chunk swarm.Chunk, err error) { dur := captureDuration(time.Now()) defer func() { @@ -338,24 +356,6 @@ func (db *DB) ReservePutter() storage.Putter { } } -// EvictBatch evicts all chunks belonging to a batch from the reserve. -func (db *DB) EvictBatch(ctx context.Context, batchID []byte) error { - if db.reserve == nil { - // if reserve is not configured, do nothing - return nil - } - - err := db.Execute(ctx, func(tx internal.Storage) error { - return tx.IndexStore().Put(&expiredBatchItem{BatchID: batchID}) - }) - if err != nil { - return fmt.Errorf("save expired batch: %w", err) - } - - db.events.Trigger(batchExpiry) - return nil -} - func (db *DB) evictBatch( ctx context.Context, batchID []byte, diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 771889a9689..b5d588162c7 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -617,14 +617,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { opts.ReserveCapacity, opts.RadiusSetter, logger, - func(ctx context.Context, store internal.Storage, addrs ...swarm.Address) error { - defer func() { db.metrics.CacheSize.Set(float64(db.cacheObj.Size())) }() - - db.lock.Lock(cacheAccessLockKey) - defer db.lock.Unlock(cacheAccessLockKey) - - return cacheObj.MoveFromReserve(ctx, store, addrs...) - }, + db.CacheShallowCopy, ) if err != nil { return nil, err @@ -646,6 +639,9 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { return nil, err } + db.inFlight.Add(1) + go db.cacheWorker(ctx) + return db, nil } @@ -664,7 +660,7 @@ func (db *DB) Close() error { bgReserveWorkersClosed := make(chan struct{}) go func() { defer close(bgReserveWorkersClosed) - if !syncutil.WaitWithTimeout(&db.inFlight, 2*time.Second) { + if !syncutil.WaitWithTimeout(&db.inFlight, 5*time.Second) { db.logger.Warning("db shutting down with running goroutines") } }() @@ -672,7 +668,7 @@ func (db *DB) Close() error { bgCacheWorkersClosed := make(chan struct{}) go func() { defer close(bgCacheWorkersClosed) - if !syncutil.WaitWithTimeout(&db.cacheLimiter.wg, 2*time.Second) { + if !syncutil.WaitWithTimeout(&db.cacheLimiter.wg, 5*time.Second) { db.logger.Warning("cache goroutines still running after the wait timeout; force closing") db.cacheLimiter.cancel() } diff --git a/pkg/storer/storer_test.go b/pkg/storer/storer_test.go index a126253a5ce..ccc491912b6 100644 --- a/pkg/storer/storer_test.go +++ b/pkg/storer/storer_test.go @@ -18,7 +18,6 @@ import ( "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" "github.com/ethersphere/bee/pkg/storage/migration" "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/storer/internal/cache" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" "github.com/ethersphere/bee/pkg/storer/internal/upload" localmigration "github.com/ethersphere/bee/pkg/storer/migration" @@ -98,12 +97,9 @@ func verifyPinCollection( // TestMain exists to adjust the time.Now function to a fixed value. func TestMain(m *testing.M) { storer.ReplaceSharkyShardLimit(4) - old := cache.CacheEvictionBatchSize defer func() { - cache.CacheEvictionBatchSize = old storer.ReplaceSharkyShardLimit(32) }() - cache.CacheEvictionBatchSize = 1 code := m.Run() os.Exit(code) }