From 8847084d873ada55a0c04752a0a6209b94c9c4c5 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 23 Jan 2024 17:58:38 +0300 Subject: [PATCH] fix: rebase --- pkg/storer/epoch_migration.go | 490 -------------------- pkg/storer/epoch_migration_test.go | 333 ------------- pkg/storer/internal/reserve/reserve.go | 5 +- pkg/storer/internal/reserve/reserve_test.go | 40 -- 4 files changed, 1 insertion(+), 867 deletions(-) delete mode 100644 pkg/storer/epoch_migration.go delete mode 100644 pkg/storer/epoch_migration_test.go diff --git a/pkg/storer/epoch_migration.go b/pkg/storer/epoch_migration.go deleted file mode 100644 index 873160564e1..00000000000 --- a/pkg/storer/epoch_migration.go +++ /dev/null @@ -1,490 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storer - -import ( - "context" - "encoding/binary" - "encoding/json" - "fmt" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/ethersphere/bee/pkg/log" - "github.com/ethersphere/bee/pkg/postage" - "github.com/ethersphere/bee/pkg/sharky" - "github.com/ethersphere/bee/pkg/shed" - "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal" - "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" - pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" - "github.com/ethersphere/bee/pkg/swarm" - "github.com/ethersphere/bee/pkg/traversal" - "golang.org/x/sync/errgroup" -) - -// epochKey implements storage.Item and is used to store the epoch in the -// store. It is used to check if the epoch migration has already been -// performed. -type epochKey struct{} - -func (epochKey) Namespace() string { return "localstore" } - -func (epochKey) ID() string { return "epoch" } - -// this is a key-only item, so we don't need to marshal/unmarshal -func (epochKey) Marshal() ([]byte, error) { return nil, nil } - -func (epochKey) Unmarshal([]byte) error { return nil } - -func (epochKey) Clone() storage.Item { return epochKey{} } - -func (epochKey) String() string { return "localstore-epoch" } - -var ( - _ internal.Storage = (*putOpStorage)(nil) - _ chunkstore.Sharky = (*putOpStorage)(nil) -) - -// putOpStorage implements the internal.Storage interface which is used by -// the internal component stores to store chunks. It also implements the sharky interface -// which uses the recovery mechanism to recover chunks without moving them. -type putOpStorage struct { - chunkstore.Sharky - - store storage.BatchedStore - location sharky.Location - recovery sharkyRecover -} - -func (p *putOpStorage) IndexStore() storage.BatchedStore { return p.store } - -func (p *putOpStorage) ChunkStore() storage.ChunkStore { - return chunkstore.New(p.store, p) -} - -// Write implements the sharky.Store interface. It uses the sharky recovery mechanism -// to recover chunks without moving them. The location returned is the same as the -// one passed in. This is present in the old localstore indexes. -func (p *putOpStorage) Write(_ context.Context, _ []byte) (sharky.Location, error) { - return p.location, p.recovery.Add(p.location) -} - -type reservePutter interface { - Put(context.Context, internal.Storage, swarm.Chunk) error - Size() int -} - -type sharkyRecover interface { - Add(sharky.Location) error - Read(context.Context, sharky.Location, []byte) error -} - -// epochMigration performs the initial migration if it hasnt been done already. It -// reads the old indexes and writes them in the new format. It only migrates the -// reserve and pinned chunks. It also creates the new epoch key in the store to -// indicate that the migration has been performed. Due to a bug in the old localstore -// pinned chunks are not always present in the pinned index. So we do a best-effort -// migration of the pinning index. If the migration fails, the user can re-pin -// the chunks using the stewardship endpoint if the stamps used to upload them are -// still valid. -func epochMigration( - ctx context.Context, - path string, - stateStore storage.StateStorer, - store storage.BatchedStore, - reserve reservePutter, - recovery sharkyRecover, - logger log.Logger, -) error { - has, err := store.Has(epochKey{}) - if err != nil { - return fmt.Errorf("has epoch key: %w", err) - } - - if has { - return nil - } - - logger.Debug("started", "path", path, "start_time", time.Now()) - - dbshed, err := shed.NewDB(path, nil) - if err != nil { - return fmt.Errorf("shed.NewDB: %w", err) - } - - defer func() { - if dbshed != nil { - dbshed.Close() - } - }() - - pullIndex, retrievalDataIndex, err := initShedIndexes(dbshed, swarm.ZeroAddress) - if err != nil { - return fmt.Errorf("initShedIndexes: %w", err) - } - - chunkCount, err := retrievalDataIndex.Count() - if err != nil { - return fmt.Errorf("retrievalDataIndex count: %w", err) - } - - pullIdxCnt, _ := pullIndex.Count() - - logger.Debug("index counts", "retrieval index", chunkCount, "pull index", pullIdxCnt) - - e := &epochMigrator{ - stateStore: stateStore, - store: store, - recovery: recovery, - reserve: reserve, - pullIndex: pullIndex, - retrievalDataIndex: retrievalDataIndex, - logger: logger, - } - - if e.reserve != nil && chunkCount > 0 { - err = e.migrateReserve(ctx) - if err != nil { - return err - } - } - - if e.stateStore != nil && chunkCount > 0 { - err = e.migratePinning(ctx) - if err != nil { - return err - } - } - - dbshed.Close() - dbshed = nil - - matches, err := filepath.Glob(filepath.Join(path, "*")) - if err != nil { - return err - } - - for _, m := range matches { - if !strings.Contains(m, indexPath) && !strings.Contains(m, sharkyPath) { - err = os.Remove(m) - if err != nil { - return err - } - } - } - - return store.Put(epochKey{}) -} - -func initShedIndexes(dbshed *shed.DB, baseAddress swarm.Address) (pullIndex shed.Index, retrievalDataIndex shed.Index, err error) { - // pull index allows history and live syncing per po bin - pullIndex, err = dbshed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{ - EncodeKey: func(fields shed.Item) (key []byte, err error) { - key = make([]byte, 9) - key[0] = swarm.Proximity(baseAddress.Bytes(), fields.Address) - binary.BigEndian.PutUint64(key[1:9], fields.BinID) - return key, nil - }, - DecodeKey: func(key []byte) (e shed.Item, err error) { - e.BinID = binary.BigEndian.Uint64(key[1:9]) - return e, nil - }, - EncodeValue: func(fields shed.Item) (value []byte, err error) { - value = make([]byte, 64) // 32 bytes address, 32 bytes batch id - copy(value, fields.Address) - copy(value[32:], fields.BatchID) - return value, nil - }, - DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.Address = value[:32] - e.BatchID = value[32:64] - return e, nil - }, - }) - if err != nil { - return shed.Index{}, shed.Index{}, err - } - - // Index storing actual chunk address, data and bin id. - headerSize := 16 + postage.StampSize - retrievalDataIndex, err = dbshed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{ - EncodeKey: func(fields shed.Item) (key []byte, err error) { - return fields.Address, nil - }, - DecodeKey: func(key []byte) (e shed.Item, err error) { - e.Address = key - return e, nil - }, - EncodeValue: func(fields shed.Item) (value []byte, err error) { - b := make([]byte, headerSize) - binary.BigEndian.PutUint64(b[:8], fields.BinID) - binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) - stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary() - if err != nil { - return nil, err - } - copy(b[16:], stamp) - value = append(b, fields.Location...) - return value, nil - }, - DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) - e.BinID = binary.BigEndian.Uint64(value[:8]) - stamp := new(postage.Stamp) - if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil { - return e, err - } - e.BatchID = stamp.BatchID() - e.Index = stamp.Index() - e.Timestamp = stamp.Timestamp() - e.Sig = stamp.Sig() - e.Location = value[headerSize:] - return e, nil - }, - }) - if err != nil { - return shed.Index{}, shed.Index{}, err - } - - return pullIndex, retrievalDataIndex, nil -} - -// epochMigrator is a helper struct for migrating epoch data. It is used to house -// the main logic of the migration so that it can be tested. Also it houses the -// dependencies of the migration logic. -type epochMigrator struct { - stateStore storage.StateStorer - store storage.BatchedStore - recovery sharkyRecover - reserve reservePutter - pullIndex shed.Index - retrievalDataIndex shed.Index - logger log.Logger -} - -func (e *epochMigrator) migrateReserve(ctx context.Context) error { - type putOp struct { - pIdx shed.Item - chunk swarm.Chunk - loc sharky.Location - } - - e.logger.Debug("migrating reserve contents") - - opChan := make(chan putOp, 4) - eg, egCtx := errgroup.WithContext(ctx) - - for i := 0; i < 4; i++ { - eg.Go(func() error { - for { - select { - case <-egCtx.Done(): - return egCtx.Err() - case op, more := <-opChan: - if !more { - return nil - } - pStorage := &putOpStorage{ - store: e.store, - location: op.loc, - recovery: e.recovery, - } - - err := e.reserve.Put(egCtx, pStorage, op.chunk) - if err != nil { - return err - } - } - } - }) - } - - err := func() error { - defer close(opChan) - - return e.pullIndex.Iterate(func(i shed.Item) (stop bool, err error) { - addr := swarm.NewAddress(i.Address) - - item, err := e.retrievalDataIndex.Get(i) - if err != nil { - e.logger.Debug("retrieval data index read failed", "chunk_address", addr, "error", err) - return false, nil //continue - } - - l, err := sharky.LocationFromBinary(item.Location) - if err != nil { - e.logger.Debug("location from binary failed", "chunk_address", addr, "error", err) - return false, err - } - - chData := make([]byte, l.Length) - err = e.recovery.Read(ctx, l, chData) - if err != nil { - e.logger.Debug("reading location failed", "chunk_address", addr, "error", err) - return false, nil // continue - } - - ch := swarm.NewChunk(addr, chData). - WithStamp(postage.NewStamp(item.BatchID, item.Index, item.Timestamp, item.Sig)) - - select { - case <-egCtx.Done(): - return true, egCtx.Err() - case opChan <- putOp{pIdx: i, chunk: ch, loc: l}: - } - return false, nil - }, nil) - }() - if err != nil { - return err - } - - if err := eg.Wait(); err != nil { - return err - } - - e.logger.Debug("migrating reserve contents done", "reserve_size", e.reserve.Size()) - - return nil -} - -const pinStorePrefix = "root-pin" - -func (e *epochMigrator) migratePinning(ctx context.Context) error { - pinChan := make(chan swarm.Address, 4) - eg, egCtx := errgroup.WithContext(ctx) - - pStorage := &putOpStorage{ - store: e.store, - recovery: e.recovery, - } - var mu sync.Mutex // used to protect pStorage.location - - traverser := traversal.New( - storage.GetterFunc(func(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) { - i := shed.Item{ - Address: addr.Bytes(), - } - item, err := e.retrievalDataIndex.Get(i) - if err != nil { - return nil, err - } - - l, err := sharky.LocationFromBinary(item.Location) - if err != nil { - return nil, err - } - - chData := make([]byte, l.Length) - err = e.recovery.Read(ctx, l, chData) - if err != nil { - return nil, err - } - - return swarm.NewChunk(addr, chData), nil - }), - ) - - e.logger.Debug("migrating pinning collections, if all the chunks in the collection" + - " are not found locally, the collection will not be migrated. Users will have to" + - " re-pin the content using the stewardship API. The migration will print out the failed" + - " collections at the end.") - - for i := 0; i < 4; i++ { - eg.Go(func() error { - for { - select { - case <-egCtx.Done(): - return egCtx.Err() - case addr, more := <-pinChan: - if !more { - return nil - } - - pinningPutter, err := pinstore.NewCollection(pStorage) - if err != nil { - return err - } - - traverserFn := func(chAddr swarm.Address) error { - item, err := e.retrievalDataIndex.Get(shed.Item{Address: chAddr.Bytes()}) - if err != nil { - return err - } - - l, err := sharky.LocationFromBinary(item.Location) - if err != nil { - return err - } - ch := swarm.NewChunk(chAddr, nil) - - mu.Lock() - pStorage.location = l - err = pinningPutter.Put(egCtx, pStorage, pStorage.IndexStore(), ch) - if err != nil { - mu.Unlock() - return err - } - mu.Unlock() - - return nil - } - - err = func() error { - if err := traverser.Traverse(egCtx, addr, traverserFn); err != nil { - return err - } - - if err := pinningPutter.Close(pStorage, pStorage.IndexStore(), addr); err != nil { - return err - } - return nil - }() - - _ = e.stateStore.Delete(fmt.Sprintf("%s-%s", pinStorePrefix, addr)) - - // do not fail the entire migration if the collection is not migrated - if err != nil { - e.logger.Debug("pinning collection migration failed", "collection_root_address", addr, "error", err) - } else { - e.logger.Debug("pinning collection migration successful", "collection_root_address", addr) - } - } - } - }) - } - - err := func() error { - defer close(pinChan) - - return e.stateStore.Iterate(pinStorePrefix, func(key, value []byte) (stop bool, err error) { - var ref swarm.Address - if err := json.Unmarshal(value, &ref); err != nil { - return true, fmt.Errorf("pinning: unmarshal pin reference: %w", err) - } - select { - case <-egCtx.Done(): - return true, egCtx.Err() - case pinChan <- ref: - } - return false, nil - }) - }() - if err != nil { - return err - } - - if err := eg.Wait(); err != nil { - return err - } - - e.logger.Debug("migrating pinning collections done") - - return nil -} diff --git a/pkg/storer/epoch_migration_test.go b/pkg/storer/epoch_migration_test.go deleted file mode 100644 index 6978cfe0536..00000000000 --- a/pkg/storer/epoch_migration_test.go +++ /dev/null @@ -1,333 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storer_test - -import ( - "bytes" - "context" - "crypto/rand" - "fmt" - "io" - "io/fs" - "os" - "path" - "path/filepath" - "strings" - "sync" - "testing" - - "github.com/ethersphere/bee/pkg/file/splitter" - "github.com/ethersphere/bee/pkg/log" - postagetesting "github.com/ethersphere/bee/pkg/postage/testing" - "github.com/ethersphere/bee/pkg/sharky" - "github.com/ethersphere/bee/pkg/shed" - mockstatestore "github.com/ethersphere/bee/pkg/statestore/mock" - storage "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storage/inmemstore" - chunktest "github.com/ethersphere/bee/pkg/storage/testing" - storer "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/storer/internal" - pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" - "github.com/ethersphere/bee/pkg/swarm" -) - -type dirFS struct { - basedir string -} - -func (d *dirFS) Open(path string) (fs.File, error) { - return os.OpenFile(filepath.Join(d.basedir, path), os.O_RDWR|os.O_CREATE, 0644) -} - -func createOldDataDir(t *testing.T, dataPath string, baseAddress swarm.Address, stateStore storage.StateStorer) { - t.Helper() - - binIDs := map[uint8]int{} - - assignBinID := func(addr swarm.Address) int { - po := swarm.Proximity(baseAddress.Bytes(), addr.Bytes()) - if _, ok := binIDs[po]; !ok { - binIDs[po] = 1 - return 1 - } - binIDs[po]++ - return binIDs[po] - } - - err := os.Mkdir(filepath.Join(dataPath, "sharky"), 0777) - if err != nil { - t.Fatal(err) - } - - sharkyStore, err := sharky.New(&dirFS{basedir: filepath.Join(dataPath, "sharky")}, 2, swarm.SocMaxChunkSize) - if err != nil { - t.Fatal(err) - } - defer sharkyStore.Close() - - shedDB, err := shed.NewDB(dataPath, nil) - if err != nil { - t.Fatal(err) - } - defer shedDB.Close() - - pIdx, rIdx, err := storer.InitShedIndexes(shedDB, baseAddress) - if err != nil { - t.Fatal(err) - } - - reserveChunks := chunktest.GenerateTestRandomChunks(10) - - for _, c := range reserveChunks { - loc, err := sharkyStore.Write(context.Background(), c.Data()) - if err != nil { - t.Fatal(err) - } - - locBuf, err := loc.MarshalBinary() - if err != nil { - t.Fatal(err) - } - - binID := assignBinID(c.Address()) - - err = pIdx.Put(shed.Item{ - Address: c.Address().Bytes(), - BinID: uint64(binID), - BatchID: c.Stamp().BatchID(), - }) - if err != nil { - t.Fatal(err) - } - - err = rIdx.Put(shed.Item{ - Address: c.Address().Bytes(), - BinID: uint64(binID), - BatchID: c.Stamp().BatchID(), - Index: c.Stamp().Index(), - Timestamp: c.Stamp().Timestamp(), - Sig: c.Stamp().Sig(), - Location: locBuf, - }) - - if err != nil { - t.Fatal(err) - } - } - - // create a pinning collection - writer := splitter.NewSimpleSplitter( - storage.PutterFunc( - func(ctx context.Context, chunk swarm.Chunk) error { - c := chunk.WithStamp(postagetesting.MustNewStamp()) - - loc, err := sharkyStore.Write(context.Background(), c.Data()) - if err != nil { - return err - } - - locBuf, err := loc.MarshalBinary() - if err != nil { - return err - } - - return rIdx.Put(shed.Item{ - Address: c.Address().Bytes(), - BatchID: c.Stamp().BatchID(), - Index: c.Stamp().Index(), - Timestamp: c.Stamp().Timestamp(), - Sig: c.Stamp().Sig(), - Location: locBuf, - }) - }, - ), - ) - - randData := make([]byte, 4096*20) - _, err = rand.Read(randData) - if err != nil { - t.Fatal(err) - } - - root, err := writer.Split(context.Background(), io.NopCloser(bytes.NewBuffer(randData)), 4096*20, false) - if err != nil { - t.Fatal(err) - } - - err = stateStore.Put(fmt.Sprintf("root-pin-%s", root.String()), root) - if err != nil { - t.Fatal(err) - } -} - -type testSharkyRecovery struct { - *sharky.Recovery - mtx sync.Mutex - addCalls int -} - -func (t *testSharkyRecovery) Add(loc sharky.Location) error { - t.mtx.Lock() - t.addCalls++ - t.mtx.Unlock() - return t.Recovery.Add(loc) -} - -type testReservePutter struct { - mtx sync.Mutex - size int - calls int -} - -func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) error { - t.mtx.Lock() - t.calls++ - t.mtx.Unlock() - return st.ChunkStore().Put(ctx, ch) -} - -func (t *testReservePutter) AddSize(size int) { - t.mtx.Lock() - t.size += size - t.mtx.Unlock() -} - -func (t *testReservePutter) Size() int { - t.mtx.Lock() - defer t.mtx.Unlock() - return t.size -} - -// TestEpochMigration_FLAKY is flaky on windows. -func TestEpochMigration_FLAKY(t *testing.T) { - t.Parallel() - t.Skip("will be removed") - - var ( - dataPath = t.TempDir() - baseAddress = swarm.RandAddress(t) - stateStore = mockstatestore.NewStateStore() - reserve = &testReservePutter{} - logBytes = bytes.NewBuffer(nil) - logger = log.NewLogger("test", log.WithSink(logBytes)) - indexStore = inmemstore.New() - ) - - createOldDataDir(t, dataPath, baseAddress, stateStore) - - r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize) - if err != nil { - t.Fatal(err) - } - - sharkyRecovery := &testSharkyRecovery{Recovery: r} - - err = storer.EpochMigration( - context.Background(), - dataPath, - stateStore, - indexStore, - reserve, - sharkyRecovery, - logger, - ) - if err != nil { - t.Fatal(err) - } - - if !strings.Contains(logBytes.String(), "migrating pinning collections done") { - t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String()) - } - - if !strings.Contains(logBytes.String(), "migrating reserve contents done") { - t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String()) - } - - if sharkyRecovery.addCalls != 31 { - t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls) - } - - if reserve.calls != 10 { - t.Fatalf("expected 10 reserve calls, got %d", reserve.calls) - } - - if reserve.size != 10 { - t.Fatalf("expected 10 reserve size, got %d", reserve.size) - } - - pins, err := pinstore.Pins(indexStore) - if err != nil { - t.Fatal(err) - } - - if len(pins) != 1 { - t.Fatalf("expected 1 pin, got %d", len(pins)) - } - - if !strings.Contains(logBytes.String(), pins[0].String()) { - t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String()) - } -} - -func TestEpochMigrationLightNode(t *testing.T) { - t.Parallel() - - var ( - dataPath = t.TempDir() - baseAddress = swarm.RandAddress(t) - stateStore = mockstatestore.NewStateStore() - reserve storer.ReservePutter - logBytes = bytes.NewBuffer(nil) - logger = log.NewLogger("test", log.WithSink(logBytes)) - indexStore = inmemstore.New() - ) - - createOldDataDir(t, dataPath, baseAddress, stateStore) - - r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize) - if err != nil { - t.Fatal(err) - } - - sharkyRecovery := &testSharkyRecovery{Recovery: r} - - err = storer.EpochMigration( - context.Background(), - dataPath, - stateStore, - indexStore, - reserve, - sharkyRecovery, - logger, - ) - if err != nil { - t.Fatal(err) - } - - if !strings.Contains(logBytes.String(), "migrating pinning collections done") { - t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String()) - } - - if strings.Contains(logBytes.String(), "migrating reserve contents done") { - t.Fatalf("expected log to not contain 'migrating reserve contents done', got %s", logBytes.String()) - } - - if sharkyRecovery.addCalls != 21 { - t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls) - } - - pins, err := pinstore.Pins(indexStore) - if err != nil { - t.Fatal(err) - } - - if len(pins) != 1 { - t.Fatalf("expected 1 pin, got %d", len(pins)) - } - - if !strings.ContainsAny(logBytes.String(), pins[0].String()) { - t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String()) - } -} diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 3f3a4cbf173..1f54fe80a52 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -53,7 +53,7 @@ func New( st: st, capacity: capacity, radiusSetter: radiusSetter, - logger: logger.WithName(loggerName).Register(), + logger: logger.WithName(reserveNamespace).Register(), multx: multex.New(), } @@ -92,9 +92,6 @@ func New( // Put stores a new chunk in the reserve and returns if the reserve size should increase. func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { - unlock := r.lock(chunk.Address(), chunk.Stamp().BatchID()) - defer unlock() - po := swarm.Proximity(r.baseAddr.Bytes(), chunk.Address().Bytes()) // batchID lock, Put vs Eviction diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 2efc461456d..e9d580c155c 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -302,46 +302,6 @@ func TestEvictMaxCount(t *testing.T) { } } -func TestEvictMaxCount(t *testing.T) { - t.Parallel() - - baseAddr := swarm.RandAddress(t) - - ts, closer := internal.NewInmemStorage() - t.Cleanup(func() { - if err := closer(); err != nil { - t.Errorf("failed closing the storage: %v", err) - } - }) - - r, err := reserve.New(baseAddr, ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop) - if err != nil { - t.Fatal(err) - } - - batch := postagetesting.MustNewBatch() - - for i := 0; i < 50; i++ { - ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) - err := r.Put(context.Background(), ts, ch) - if err != nil { - t.Fatal(err) - } - } - - evicted, err := r.EvictBatchBin(context.Background(), ts, batch.ID, 10, 1) - if err != nil { - t.Fatal(err) - } - if evicted != 10 { - t.Fatalf("wanted evicted count 10, got %d", evicted) - } - - if r.Size() != 40 { - t.Fatalf("wanted size 40, got %d", r.Size()) - } -} - func TestIterate(t *testing.T) { t.Parallel()