Skip to content

Commit

Permalink
fix: evict enough chunks only to fall below capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Jan 17, 2024
1 parent 2cdfadf commit 56d9dbb
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 134 deletions.
8 changes: 3 additions & 5 deletions pkg/storer/epoch_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *putOpStorage) Write(_ context.Context, _ []byte) (sharky.Location, erro
}

type reservePutter interface {
Put(context.Context, internal.Storage, swarm.Chunk) (bool, error)
Put(context.Context, internal.Storage, swarm.Chunk) error
AddSize(int)
Size() int
}
Expand Down Expand Up @@ -297,11 +297,9 @@ func (e *epochMigrator) migrateReserve(ctx context.Context) error {
recovery: e.recovery,
}

switch newIdx, err := e.reserve.Put(egCtx, pStorage, op.chunk); {
case err != nil:
err := e.reserve.Put(egCtx, pStorage, op.chunk)
if err != nil {
return err
case newIdx:
e.reserve.AddSize(1)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storer/epoch_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ type testReservePutter struct {
calls int
}

func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) (bool, error) {
func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) error {
t.mtx.Lock()
t.calls++
t.mtx.Unlock()
return true, st.ChunkStore().Put(ctx, ch)
return st.ChunkStore().Put(ctx, ch)
}

func (t *testReservePutter) AddSize(size int) {
Expand All @@ -203,6 +203,7 @@ func (t *testReservePutter) Size() int {
// TestEpochMigration_FLAKY is flaky on windows.
func TestEpochMigration_FLAKY(t *testing.T) {
t.Parallel()
t.Skip("will be removed")

var (
dataPath = t.TempDir()
Expand Down
2 changes: 0 additions & 2 deletions pkg/storer/internal/chunkstamp/chunkstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,11 @@ func (i item) String() string {
}

// Load returns first found swarm.Stamp related to the given address.
// The storage.ErrNoStampsForChunk is returned if no record is found.
func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, error) {
return LoadWithBatchID(s, namespace, addr, nil)
}

// LoadWithBatchID returns swarm.Stamp related to the given address and batchID.
// The storage.ErrNoStampsForChunk is returned if no record is found.
func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) {
var stamp swarm.Stamp

Expand Down
114 changes: 72 additions & 42 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/ethersphere/bee/pkg/storer/internal/stampindex"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/topology"
"resenje.org/multex"
)

// loggerName is the tree path name of the logger for this package.
Expand All @@ -41,9 +43,9 @@ type Reserve struct {
capacity int
size atomic.Int64
radius atomic.Uint32
cacheCb func(context.Context, internal.Storage, ...swarm.Address) error

binMtx sync.Mutex
mutx *multex.Multex
}

func New(
Expand All @@ -52,15 +54,14 @@ func New(
capacity int,
radiusSetter topology.SetStorageRadiuser,
logger log.Logger,
cb func(context.Context, internal.Storage, ...swarm.Address) error,
) (*Reserve, error) {

rs := &Reserve{
baseAddr: baseAddr,
capacity: capacity,
radiusSetter: radiusSetter,
logger: logger.WithName(loggerName).Register(),
cacheCb: cb,
mutx: multex.New(),
}

rItem := &radiusItem{}
Expand Down Expand Up @@ -93,10 +94,13 @@ func New(
}

// Put stores a new chunk in the reserve and returns if the reserve size should increase.
func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.Chunk) (bool, error) {
func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.Chunk) error {
indexStore := store.IndexStore()
chunkStore := store.ChunkStore()

unlock := r.lock(chunk.Address(), chunk.Stamp().BatchID())
defer unlock()

po := swarm.Proximity(r.baseAddr.Bytes(), chunk.Address().Bytes())

has, err := indexStore.Has(&BatchRadiusItem{
Expand All @@ -105,15 +109,15 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C
BatchID: chunk.Stamp().BatchID(),
})
if err != nil {
return false, err
return err
}
if has {
return false, nil
return nil
}

storeBatch, err := indexStore.Batch(ctx)
if err != nil {
return false, err
return err
}

newStampIndex := true
Expand All @@ -125,26 +129,26 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C
chunk,
); {
case err != nil:
return false, fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
case loaded && item.ChunkIsImmutable:
return false, fmt.Errorf("batch %s index %s: %w", hex.EncodeToString(chunk.Stamp().BatchID()), hex.EncodeToString(chunk.Stamp().Index()), storage.ErrOverwriteOfImmutableBatch)
return fmt.Errorf("batch %s index %s: %w", hex.EncodeToString(chunk.Stamp().BatchID()), hex.EncodeToString(chunk.Stamp().Index()), storage.ErrOverwriteOfImmutableBatch)
case loaded && !item.ChunkIsImmutable:
prev := binary.BigEndian.Uint64(item.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return false, fmt.Errorf("overwrite prev %d cur %d :%w", prev, curr, storage.ErrOverwriteNewerChunk)
return fmt.Errorf("overwrite prev %d cur %d batch %s :%w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
// An older and different chunk with the same batchID and stamp index has been previously
// saved to the reserve. We must do the below before saving the new chunk:
// 1. Delete the old chunk from the chunkstore
// 2. Delete the old chunk's stamp data
// 3. Delete ALL old chunk related items from the reserve
// 4. Update the stamp index
// 1. Delete the old chunk from the chunkstore.
// 2. Delete the old chunk's stamp data.
// 3. Delete ALL old chunk related items from the reserve.
// 4. Update the stamp index.
newStampIndex = false

err = r.DeleteChunk(ctx, store, storeBatch, item.ChunkAddress, chunk.Stamp().BatchID())
err := r.removeChunk(ctx, store, storeBatch, item.ChunkAddress, chunk.Stamp().BatchID())
if err != nil {
return false, fmt.Errorf("failed removing older chunk: %w", err)
return fmt.Errorf("failed removing older chunk: %w", err)
}

r.logger.Debug(
Expand All @@ -156,18 +160,18 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C

err = stampindex.Store(storeBatch, reserveNamespace, chunk)
if err != nil {
return false, fmt.Errorf("failed updating stamp index: %w", err)
return fmt.Errorf("failed updating stamp index: %w", err)
}
}

err = chunkstamp.Store(storeBatch, reserveNamespace, chunk)
if err != nil {
return false, err
return err
}

binID, err := r.IncBinID(indexStore, po)
if err != nil {
return false, err
return err
}

err = storeBatch.Put(&BatchRadiusItem{
Expand All @@ -177,7 +181,7 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C
BatchID: chunk.Stamp().BatchID(),
})
if err != nil {
return false, err
return err
}

err = storeBatch.Put(&ChunkBinItem{
Expand All @@ -188,15 +192,24 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C
ChunkType: ChunkType(chunk),
})
if err != nil {
return false, err
return err
}

err = chunkStore.Put(ctx, chunk)
if err != nil {
return false, err
return err
}

err = storeBatch.Commit()
if err != nil {
return err
}

if newStampIndex {
r.size.Add(1)
}

return newStampIndex, storeBatch.Commit()
return nil
}

func (r *Reserve) Has(store storage.Store, addr swarm.Address, batchID []byte) (bool, error) {
Expand All @@ -205,6 +218,10 @@ func (r *Reserve) Has(store storage.Store, addr swarm.Address, batchID []byte) (
}

func (r *Reserve) Get(ctx context.Context, storage internal.Storage, addr swarm.Address, batchID []byte) (swarm.Chunk, error) {

unlock := r.lock(addr, batchID)
defer unlock()

item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr}
err := storage.IndexStore().Get(item)
if err != nil {
Expand Down Expand Up @@ -312,12 +329,20 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb
func (r *Reserve) EvictBatchBin(
ctx context.Context,
txExecutor internal.TxExecutor,
bin uint8,
batchID []byte,
count int,
bin uint8,
) (int, error) {

unlock := r.lock(swarm.ZeroAddress, batchID)
defer unlock()

var evicted []*BatchRadiusItem

if count <= 0 {
return 0, nil
}

err := txExecutor.Execute(ctx, func(store internal.Storage) error {
return store.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return &BatchRadiusItem{} },
Expand All @@ -335,6 +360,14 @@ func (r *Reserve) EvictBatchBin(
return 0, err
}

// evict oldest chunks first
sort.Slice(evicted, func(i, j int) bool {
return evicted[i].BinID < evicted[j].BinID
})

// evict only count many items
evicted = evicted[:min(len(evicted), count)]

batchCnt := 1_000
evictionCompleted := 0

Expand All @@ -353,7 +386,7 @@ func (r *Reserve) EvictBatchBin(
}

for _, item := range evicted[i:end] {
err = removeChunk(ctx, store, batch, item)
err = removeChunkWithItem(ctx, store, batch, item)
if err != nil {
return err
}
Expand All @@ -362,11 +395,6 @@ func (r *Reserve) EvictBatchBin(
if err := batch.Commit(); err != nil {
return err
}

if err := r.cacheCb(ctx, store, moveToCache...); err != nil {
r.logger.Error(err, "evict and move to cache")
}

return nil
})
if err != nil {
Expand All @@ -378,7 +406,7 @@ func (r *Reserve) EvictBatchBin(
return evictionCompleted, nil
}

func (r *Reserve) DeleteChunk(
func (r *Reserve) removeChunk(
ctx context.Context,
store internal.Storage,
batch storage.Writer,
Expand All @@ -394,18 +422,10 @@ func (r *Reserve) DeleteChunk(
if err != nil {
return err
}
err = removeChunk(ctx, store, batch, item)
if err != nil {
return err
}
if err := r.cacheCb(ctx, store, item.Address); err != nil {
r.logger.Error(err, "delete and move to cache")
return err
}
return nil
return removeChunkWithItem(ctx, store, batch, item)
}

func removeChunk(
func removeChunkWithItem(
ctx context.Context,
store internal.Storage,
batch storage.Writer,
Expand All @@ -429,11 +449,21 @@ func removeChunk(
}

return errors.Join(errs,
batch.Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}),
batch.Delete(item),
batch.Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}),
store.ChunkStore().Delete(ctx, item.Address),
)
}

func (r *Reserve) lock(addr swarm.Address, batchID []byte) func() {
r.mutx.Lock(addr.ByteString())
r.mutx.Lock(string(batchID))
return func() {
r.mutx.Unlock(addr.ByteString())
r.mutx.Unlock(string(batchID))
}
}

func (r *Reserve) Radius() uint8 {
return uint8(r.radius.Load())
}
Expand Down
Loading

0 comments on commit 56d9dbb

Please sign in to comment.