From ff3eb1d09a2de68fcda448f70bc67b68fc5a6873 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 23 Jan 2024 17:31:26 +0300 Subject: [PATCH] fix(reserve): evict just enough chunks to reach the capacity (#4549) --- pkg/storer/compact_test.go | 6 +- pkg/storer/epoch_migration.go | 9 +- pkg/storer/epoch_migration_test.go | 5 +- pkg/storer/internal/chunkstamp/chunkstamp.go | 2 - pkg/storer/internal/reserve/reserve.go | 135 ++++++++++--------- pkg/storer/internal/reserve/reserve_test.go | 89 ++++++------ pkg/storer/reserve.go | 35 +---- pkg/storer/reserve_test.go | 17 +-- pkg/storer/storer.go | 4 - 9 files changed, 145 insertions(+), 157 deletions(-) diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index 3606e7c8c9d..4e4f883f8c1 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -55,13 +55,13 @@ func TestCompact(t *testing.T) { } } + c, unsub := st.Events().Subscribe("batchExpiryDone") + t.Cleanup(unsub) + err = st.EvictBatch(ctx, evictBatch.ID) if err != nil { t.Fatal(err) } - - c, unsub := st.Events().Subscribe("batchExpiryDone") - t.Cleanup(unsub) <-c time.Sleep(time.Second) diff --git a/pkg/storer/epoch_migration.go b/pkg/storer/epoch_migration.go index f93ecafa486..873160564e1 100644 --- a/pkg/storer/epoch_migration.go +++ b/pkg/storer/epoch_migration.go @@ -76,8 +76,7 @@ func (p *putOpStorage) Write(_ context.Context, _ []byte) (sharky.Location, erro } type reservePutter interface { - Put(context.Context, internal.Storage, swarm.Chunk) (bool, error) - AddSize(int) + Put(context.Context, internal.Storage, swarm.Chunk) error Size() int } @@ -297,11 +296,9 @@ func (e *epochMigrator) migrateReserve(ctx context.Context) error { recovery: e.recovery, } - switch newIdx, err := e.reserve.Put(egCtx, pStorage, op.chunk); { - case err != nil: + err := e.reserve.Put(egCtx, pStorage, op.chunk) + if err != nil { return err - case newIdx: - e.reserve.AddSize(1) } } } diff --git a/pkg/storer/epoch_migration_test.go b/pkg/storer/epoch_migration_test.go index b51451466f2..6978cfe0536 100644 --- a/pkg/storer/epoch_migration_test.go +++ b/pkg/storer/epoch_migration_test.go @@ -181,11 +181,11 @@ type testReservePutter struct { calls int } -func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) (bool, error) { +func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) error { t.mtx.Lock() t.calls++ t.mtx.Unlock() - return true, st.ChunkStore().Put(ctx, ch) + return st.ChunkStore().Put(ctx, ch) } func (t *testReservePutter) AddSize(size int) { @@ -203,6 +203,7 @@ func (t *testReservePutter) Size() int { // TestEpochMigration_FLAKY is flaky on windows. func TestEpochMigration_FLAKY(t *testing.T) { t.Parallel() + t.Skip("will be removed") var ( dataPath = t.TempDir() diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 4e8e8c91f22..394ab386b73 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -139,13 +139,11 @@ func (i item) String() string { } // Load returns first found swarm.Stamp related to the given address. -// The storage.ErrNoStampsForChunk is returned if no record is found. func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, error) { return LoadWithBatchID(s, namespace, addr, nil) } // LoadWithBatchID returns swarm.Stamp related to the given address and batchID. -// The storage.ErrNoStampsForChunk is returned if no record is found. func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) { var stamp swarm.Stamp diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 47c843e48b1..69b1abeccbb 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -21,18 +21,11 @@ import ( "github.com/ethersphere/bee/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/topology" + "resenje.org/multex" ) -// loggerName is the tree path name of the logger for this package. -const loggerName = "reserve" const reserveNamespace = "reserve" -/* - pull by bin - binID - evict by bin - batchID - sample by bin -*/ - type Reserve struct { baseAddr swarm.Address radiusSetter topology.SetStorageRadiuser @@ -41,9 +34,9 @@ type Reserve struct { capacity int size atomic.Int64 radius atomic.Uint32 - cacheCb func(context.Context, internal.Storage, ...swarm.Address) error binMtx sync.Mutex + mutx *multex.Multex } func New( @@ -52,15 +45,14 @@ func New( capacity int, radiusSetter topology.SetStorageRadiuser, logger log.Logger, - cb func(context.Context, internal.Storage, ...swarm.Address) error, ) (*Reserve, error) { rs := &Reserve{ baseAddr: baseAddr, capacity: capacity, radiusSetter: radiusSetter, - logger: logger.WithName(loggerName).Register(), - cacheCb: cb, + logger: logger.WithName(reserveNamespace).Register(), + mutx: multex.New(), } rItem := &radiusItem{} @@ -93,10 +85,13 @@ func New( } // Put stores a new chunk in the reserve and returns if the reserve size should increase. -func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.Chunk) (bool, error) { +func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.Chunk) error { indexStore := store.IndexStore() chunkStore := store.ChunkStore() + unlock := r.lock(chunk.Address(), chunk.Stamp().BatchID()) + defer unlock() + po := swarm.Proximity(r.baseAddr.Bytes(), chunk.Address().Bytes()) has, err := indexStore.Has(&BatchRadiusItem{ @@ -105,40 +100,40 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C BatchID: chunk.Stamp().BatchID(), }) if err != nil { - return false, err + return err } if has { - return false, nil + return nil } storeBatch, err := indexStore.Batch(ctx) if err != nil { - return false, err + return err } newStampIndex := true item, loaded, err := stampindex.LoadOrStore(indexStore, storeBatch, reserveNamespace, chunk) if err != nil { - return false, fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) + return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } if loaded { prev := binary.BigEndian.Uint64(item.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { - return false, fmt.Errorf("overwrite prev %d cur %d :%w", prev, curr, storage.ErrOverwriteNewerChunk) + return fmt.Errorf("overwrite prev %d cur %d batch %s :%w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } // An older and different chunk with the same batchID and stamp index has been previously // saved to the reserve. We must do the below before saving the new chunk: - // 1. Delete the old chunk from the chunkstore - // 2. Delete the old chunk's stamp data - // 3. Delete ALL old chunk related items from the reserve - // 4. Update the stamp index + // 1. Delete the old chunk from the chunkstore. + // 2. Delete the old chunk's stamp data. + // 3. Delete ALL old chunk related items from the reserve. + // 4. Update the stamp index. newStampIndex = false - err = r.DeleteChunk(ctx, store, storeBatch, item.ChunkAddress, chunk.Stamp().BatchID()) + err := r.removeChunk(ctx, store, storeBatch, item.ChunkAddress, chunk.Stamp().BatchID()) if err != nil { - return false, fmt.Errorf("failed removing older chunk: %w", err) + return fmt.Errorf("failed removing older chunk: %w", err) } r.logger.Debug( @@ -150,18 +145,18 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C err = stampindex.Store(storeBatch, reserveNamespace, chunk) if err != nil { - return false, fmt.Errorf("failed updating stamp index: %w", err) + return fmt.Errorf("failed updating stamp index: %w", err) } } err = chunkstamp.Store(storeBatch, reserveNamespace, chunk) if err != nil { - return false, err + return err } binID, err := r.IncBinID(indexStore, po) if err != nil { - return false, err + return err } err = storeBatch.Put(&BatchRadiusItem{ @@ -171,7 +166,7 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C BatchID: chunk.Stamp().BatchID(), }) if err != nil { - return false, err + return err } err = storeBatch.Put(&ChunkBinItem{ @@ -182,15 +177,24 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C ChunkType: ChunkType(chunk), }) if err != nil { - return false, err + return err } err = chunkStore.Put(ctx, chunk) if err != nil { - return false, err + return err + } + + err = storeBatch.Commit() + if err != nil { + return err } - return newStampIndex, storeBatch.Commit() + if newStampIndex { + r.size.Add(1) + } + + return nil } func (r *Reserve) Has(store storage.Store, addr swarm.Address, batchID []byte) (bool, error) { @@ -199,6 +203,10 @@ func (r *Reserve) Has(store storage.Store, addr swarm.Address, batchID []byte) ( } func (r *Reserve) Get(ctx context.Context, storage internal.Storage, addr swarm.Address, batchID []byte) (swarm.Chunk, error) { + + unlock := r.lock(addr, batchID) + defer unlock() + item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr} err := storage.IndexStore().Get(item) if err != nil { @@ -306,12 +314,20 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb func (r *Reserve) EvictBatchBin( ctx context.Context, txExecutor internal.TxExecutor, - bin uint8, batchID []byte, + count int, + bin uint8, ) (int, error) { + unlock := r.lock(swarm.ZeroAddress, batchID) + defer unlock() + var evicted []*BatchRadiusItem + if count <= 0 { + return 0, nil + } + err := txExecutor.Execute(ctx, func(store internal.Storage) error { return store.IndexStore().Iterate(storage.Query{ Factory: func() storage.Item { return &BatchRadiusItem{} }, @@ -322,6 +338,9 @@ func (r *Reserve) EvictBatchBin( return true, nil } evicted = append(evicted, batchRadius) + if len(evicted) == count { + return true, nil + } return false, nil }) }) @@ -331,6 +350,9 @@ func (r *Reserve) EvictBatchBin( batchCnt := 1_000 evictionCompleted := 0 + defer func() { + r.size.Add(-int64(evictionCompleted)) + }() for i := 0; i < len(evicted); i += batchCnt { end := i + batchCnt @@ -347,7 +369,7 @@ func (r *Reserve) EvictBatchBin( } for _, item := range evicted[i:end] { - err = removeChunk(ctx, store, batch, item) + err = removeChunkWithItem(ctx, store, batch, item) if err != nil { return err } @@ -356,11 +378,6 @@ func (r *Reserve) EvictBatchBin( if err := batch.Commit(); err != nil { return err } - - if err := r.cacheCb(ctx, store, moveToCache...); err != nil { - r.logger.Error(err, "evict and move to cache") - } - return nil }) if err != nil { @@ -372,7 +389,7 @@ func (r *Reserve) EvictBatchBin( return evictionCompleted, nil } -func (r *Reserve) DeleteChunk( +func (r *Reserve) removeChunk( ctx context.Context, store internal.Storage, batch storage.Writer, @@ -388,18 +405,10 @@ func (r *Reserve) DeleteChunk( if err != nil { return err } - err = removeChunk(ctx, store, batch, item) - if err != nil { - return err - } - if err := r.cacheCb(ctx, store, item.Address); err != nil { - r.logger.Error(err, "delete and move to cache") - return err - } - return nil + return removeChunkWithItem(ctx, store, batch, item) } -func removeChunk( +func removeChunkWithItem( ctx context.Context, store internal.Storage, batch storage.Writer, @@ -423,15 +432,31 @@ func removeChunk( } return errors.Join(errs, - batch.Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}), batch.Delete(item), + batch.Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}), + store.ChunkStore().Delete(ctx, item.Address), ) } +func (r *Reserve) lock(addr swarm.Address, batchID []byte) func() { + r.mutx.Lock(addr.ByteString()) + r.mutx.Lock(string(batchID)) + return func() { + r.mutx.Unlock(addr.ByteString()) + r.mutx.Unlock(string(batchID)) + } +} + func (r *Reserve) Radius() uint8 { return uint8(r.radius.Load()) } +func (r *Reserve) SetRadius(store storage.Store, rad uint8) error { + r.radius.Store(uint32(rad)) + r.radiusSetter.SetStorageRadius(rad) + return store.Put(&radiusItem{Radius: rad}) +} + func (r *Reserve) Size() int { return int(r.size.Load()) } @@ -440,10 +465,6 @@ func (r *Reserve) Capacity() int { return r.capacity } -func (r *Reserve) AddSize(diff int) { - r.size.Add(int64(diff)) -} - func (r *Reserve) IsWithinCapacity() bool { return int(r.size.Load()) <= r.capacity } @@ -455,12 +476,6 @@ func (r *Reserve) EvictionTarget() int { return int(r.size.Load()) - r.capacity } -func (r *Reserve) SetRadius(store storage.Store, rad uint8) error { - r.radius.Store(uint32(rad)) - r.radiusSetter.SetStorageRadius(rad) - return store.Put(&radiusItem{Radius: rad}) -} - func (r *Reserve) LastBinIDs(store storage.Store) ([]uint64, uint64, error) { r.binMtx.Lock() defer r.binMtx.Unlock() diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 596552ad4dc..e2589407cfc 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "errors" + "math" "math/rand" "testing" "time" @@ -25,10 +26,6 @@ import ( kademlia "github.com/ethersphere/bee/pkg/topology/mock" ) -func noopCacher(_ context.Context, _ internal.Storage, _ ...swarm.Address) error { - return nil -} - func TestReserve(t *testing.T) { t.Parallel() @@ -46,7 +43,6 @@ func TestReserve(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - noopCacher, ) if err != nil { t.Fatal(err) @@ -55,13 +51,10 @@ func TestReserve(t *testing.T) { for b := 0; b < 2; b++ { for i := 1; i < 51; i++ { ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) - c, err := r.Put(context.Background(), ts, ch) + err := r.Put(context.Background(), ts, ch) if err != nil { t.Fatal(err) } - if !c { - t.Fatal("entered unique chunk") - } checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: uint8(b), BatchID: ch.Stamp().BatchID(), Address: ch.Address()}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: uint8(b), BinID: uint64(i)}, false) checkChunk(t, ts, ch, false) @@ -103,7 +96,6 @@ func TestReserveChunkType(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - noopCacher, ) if err != nil { t.Fatal(err) @@ -119,7 +111,7 @@ func TestReserveChunkType(t *testing.T) { ch = chunk.GenerateTestRandomSoChunk(t, ch) storedChunksSO++ } - if _, err := r.Put(ctx, ts, ch); err != nil { + if err := r.Put(ctx, ts, ch); err != nil { t.Errorf("unexpected error: %v", err) } } @@ -166,15 +158,6 @@ func TestReplaceOldIndex(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - func(ctx context.Context, st internal.Storage, addrs ...swarm.Address) error { - for _, addr := range addrs { - err := st.ChunkStore().Delete(ctx, addr) - if err != nil { - return err - } - } - return nil - }, ) if err != nil { t.Fatal(err) @@ -184,12 +167,12 @@ func TestReplaceOldIndex(t *testing.T) { ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) ch2 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) - _, err = r.Put(context.Background(), ts, ch1) + err = r.Put(context.Background(), ts, ch1) if err != nil { t.Fatal(err) } - _, err = r.Put(context.Background(), ts, ch2) + err = r.Put(context.Background(), ts, ch2) if err != nil { t.Fatal(err) } @@ -235,15 +218,6 @@ func TestEvict(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - func(ctx context.Context, st internal.Storage, addrs ...swarm.Address) error { - for _, addr := range addrs { - err := st.ChunkStore().Delete(ctx, addr) - if err != nil { - return err - } - } - return nil - }, ) if err != nil { t.Fatal(err) @@ -253,19 +227,16 @@ func TestEvict(t *testing.T) { for b := 0; b < 3; b++ { ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b).WithStamp(postagetesting.MustNewBatchStamp(batches[b].ID)) chunks = append(chunks, ch) - c, err := r.Put(context.Background(), ts, ch) + err := r.Put(context.Background(), ts, ch) if err != nil { t.Fatal(err) } - if !c { - t.Fatal("entered unique chunk") - } } } totalEvicted := 0 for i := 0; i < 3; i++ { - evicted, err := r.EvictBatchBin(context.Background(), ts, uint8(i), evictBatch.ID) + evicted, err := r.EvictBatchBin(context.Background(), ts, evictBatch.ID, math.MaxInt, uint8(i)) if err != nil { t.Fatal(err) } @@ -300,6 +271,46 @@ func TestEvict(t *testing.T) { } } +func TestEvictMaxCount(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + + ts, closer := internal.NewInmemStorage() + t.Cleanup(func() { + if err := closer(); err != nil { + t.Errorf("failed closing the storage: %v", err) + } + }) + + r, err := reserve.New(baseAddr, ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + + for i := 0; i < 50; i++ { + ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + err := r.Put(context.Background(), ts, ch) + if err != nil { + t.Fatal(err) + } + } + + evicted, err := r.EvictBatchBin(context.Background(), ts, batch.ID, 10, 1) + if err != nil { + t.Fatal(err) + } + if evicted != 10 { + t.Fatalf("wanted evicted count 10, got %d", evicted) + } + + if r.Size() != 40 { + t.Fatalf("wanted size 40, got %d", r.Size()) + } +} + func TestIterate(t *testing.T) { t.Parallel() @@ -320,7 +331,6 @@ func TestIterate(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - noopCacher, ) if err != nil { t.Fatal(err) @@ -329,13 +339,10 @@ func TestIterate(t *testing.T) { for b := 0; b < 3; b++ { for i := 0; i < 10; i++ { ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) - c, err := r.Put(context.Background(), ts, ch) + err := r.Put(context.Background(), ts, ch) if err != nil { t.Fatal(err) } - if !c { - t.Fatal("entered unique chunk") - } } } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 66a8fd0c464..38060e842f8 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "errors" "fmt" + "math" "slices" "sync" "sync/atomic" @@ -30,10 +31,6 @@ const ( batchExpiryDone = "batchExpiryDone" ) -func reserveUpdateBatchLockKey(batchID []byte) string { - return fmt.Sprintf("%s%s", reserveUpdateLockKey, string(batchID)) -} - var errMaxRadius = errors.New("max radius reached") type Syncer interface { @@ -226,7 +223,7 @@ func (db *DB) evictExpiredBatches(ctx context.Context) error { } for _, batchID := range batches { - evicted, err := db.evictBatch(ctx, batchID, swarm.MaxBins) + evicted, err := db.evictBatch(ctx, batchID, math.MaxInt, swarm.MaxBins) if err != nil { return err } @@ -310,25 +307,11 @@ func (db *DB) ReservePutter() storage.Putter { return putterWithMetrics{ storage.PutterFunc( func(ctx context.Context, chunk swarm.Chunk) (err error) { - - var ( - newIndex bool - ) - lockKey := reserveUpdateBatchLockKey(chunk.Stamp().BatchID()) - db.lock.Lock(lockKey) err = db.Execute(ctx, func(tx internal.Storage) error { - newIndex, err = db.reserve.Put(ctx, tx, chunk) - if err != nil { - return fmt.Errorf("reserve: putter.Put: %w", err) - } - return nil + return db.reserve.Put(ctx, tx, chunk) }) - db.lock.Unlock(lockKey) if err != nil { - return err - } - if newIndex { - db.reserve.AddSize(1) + return fmt.Errorf("reserve: putter.Put: %w", err) } db.reserveBinEvents.Trigger(string(db.po(chunk.Address()))) if !db.reserve.IsWithinCapacity() { @@ -346,11 +329,11 @@ func (db *DB) ReservePutter() storage.Putter { func (db *DB) evictBatch( ctx context.Context, batchID []byte, + evictCount int, upToBin uint8, ) (evicted int, err error) { dur := captureDuration(time.Now()) defer func() { - db.reserve.AddSize(-evicted) db.metrics.ReserveSize.Set(float64(db.reserve.Size())) db.metrics.MethodCallsDuration.WithLabelValues("reserve", "EvictBatch").Observe(dur()) if err == nil { @@ -372,11 +355,7 @@ func (db *DB) evictBatch( ) }() - lockKey := reserveUpdateBatchLockKey(batchID) - db.lock.Lock(lockKey) - defer db.lock.Unlock(lockKey) - - return db.reserve.EvictBatchBin(ctx, db, upToBin, batchID) + return db.reserve.EvictBatchBin(ctx, db, batchID, evictCount, upToBin) } func (db *DB) unreserve(ctx context.Context) (err error) { @@ -423,7 +402,7 @@ func (db *DB) unreserve(ctx context.Context) (err error) { default: } - binEvicted, err := db.evictBatch(ctx, b, radius) + binEvicted, err := db.evictBatch(ctx, b, target-totalEvicted, radius) // eviction happens in batches, so we need to keep track of the total // number of chunks evicted even if there was an error totalEvicted += binEvicted diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index aa5f79a6737..26e3f3cabcf 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -193,19 +193,14 @@ func TestEvictBatch(t *testing.T) { } } + c, unsub := st.Events().Subscribe("batchExpiryDone") + t.Cleanup(unsub) + err = st.EvictBatch(ctx, evictBatch.ID) if err != nil { t.Fatal(err) } - - c, unsub := st.Events().Subscribe("batchExpiryDone") - t.Cleanup(unsub) - gotUnreserveSignal := make(chan struct{}) - go func() { - defer close(gotUnreserveSignal) - <-c - }() - <-gotUnreserveSignal + <-c reserve := st.Reserve() @@ -219,7 +214,7 @@ func TestEvictBatch(t *testing.T) { if has { t.Fatal("store should NOT have chunk") } - checkSaved(t, st, ch, false, true) + checkSaved(t, st, ch, false, false) } else if !has { t.Fatal("store should have chunk") checkSaved(t, st, ch, true, true) @@ -310,7 +305,7 @@ func TestUnreserveCap(t *testing.T) { if has { t.Fatal("store should NOT have chunk at PO", po) } - checkSaved(t, storer, ch, false, true) + checkSaved(t, storer, ch, false, false) } else if !has { t.Fatal("store should have chunk at PO", po) } else { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index e7d973ea0b7..12057f95c43 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -415,9 +415,6 @@ func performEpochMigration(ctx context.Context, basePath string, opts *Options) opts.ReserveCapacity, noopRadiusSetter{}, logger, - func(_ context.Context, _ internal.Storage, _ ...swarm.Address) error { - return nil - }, ) if err != nil { return err @@ -622,7 +619,6 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { opts.ReserveCapacity, opts.RadiusSetter, logger, - db.CacheShallowCopy, ) if err != nil { return nil, err