diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index d18675ef33b..5b80b2663c8 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -55,13 +55,13 @@ func TestCompact(t *testing.T) { } } + c, unsub := st.Events().Subscribe("batchExpiryDone") + t.Cleanup(unsub) + err = st.EvictBatch(ctx, evictBatch.ID) if err != nil { t.Fatal(err) } - - c, unsub := st.Events().Subscribe("batchExpiryDone") - t.Cleanup(unsub) <-c time.Sleep(time.Second) diff --git a/pkg/storer/epoch_migration.go b/pkg/storer/epoch_migration.go new file mode 100644 index 00000000000..873160564e1 --- /dev/null +++ b/pkg/storer/epoch_migration.go @@ -0,0 +1,490 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/ethersphere/bee/pkg/log" + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/shed" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/swarm" + "github.com/ethersphere/bee/pkg/traversal" + "golang.org/x/sync/errgroup" +) + +// epochKey implements storage.Item and is used to store the epoch in the +// store. It is used to check if the epoch migration has already been +// performed. +type epochKey struct{} + +func (epochKey) Namespace() string { return "localstore" } + +func (epochKey) ID() string { return "epoch" } + +// this is a key-only item, so we don't need to marshal/unmarshal +func (epochKey) Marshal() ([]byte, error) { return nil, nil } + +func (epochKey) Unmarshal([]byte) error { return nil } + +func (epochKey) Clone() storage.Item { return epochKey{} } + +func (epochKey) String() string { return "localstore-epoch" } + +var ( + _ internal.Storage = (*putOpStorage)(nil) + _ chunkstore.Sharky = (*putOpStorage)(nil) +) + +// putOpStorage implements the internal.Storage interface which is used by +// the internal component stores to store chunks. It also implements the sharky interface +// which uses the recovery mechanism to recover chunks without moving them. +type putOpStorage struct { + chunkstore.Sharky + + store storage.BatchedStore + location sharky.Location + recovery sharkyRecover +} + +func (p *putOpStorage) IndexStore() storage.BatchedStore { return p.store } + +func (p *putOpStorage) ChunkStore() storage.ChunkStore { + return chunkstore.New(p.store, p) +} + +// Write implements the sharky.Store interface. It uses the sharky recovery mechanism +// to recover chunks without moving them. The location returned is the same as the +// one passed in. This is present in the old localstore indexes. +func (p *putOpStorage) Write(_ context.Context, _ []byte) (sharky.Location, error) { + return p.location, p.recovery.Add(p.location) +} + +type reservePutter interface { + Put(context.Context, internal.Storage, swarm.Chunk) error + Size() int +} + +type sharkyRecover interface { + Add(sharky.Location) error + Read(context.Context, sharky.Location, []byte) error +} + +// epochMigration performs the initial migration if it hasnt been done already. It +// reads the old indexes and writes them in the new format. It only migrates the +// reserve and pinned chunks. It also creates the new epoch key in the store to +// indicate that the migration has been performed. Due to a bug in the old localstore +// pinned chunks are not always present in the pinned index. So we do a best-effort +// migration of the pinning index. If the migration fails, the user can re-pin +// the chunks using the stewardship endpoint if the stamps used to upload them are +// still valid. +func epochMigration( + ctx context.Context, + path string, + stateStore storage.StateStorer, + store storage.BatchedStore, + reserve reservePutter, + recovery sharkyRecover, + logger log.Logger, +) error { + has, err := store.Has(epochKey{}) + if err != nil { + return fmt.Errorf("has epoch key: %w", err) + } + + if has { + return nil + } + + logger.Debug("started", "path", path, "start_time", time.Now()) + + dbshed, err := shed.NewDB(path, nil) + if err != nil { + return fmt.Errorf("shed.NewDB: %w", err) + } + + defer func() { + if dbshed != nil { + dbshed.Close() + } + }() + + pullIndex, retrievalDataIndex, err := initShedIndexes(dbshed, swarm.ZeroAddress) + if err != nil { + return fmt.Errorf("initShedIndexes: %w", err) + } + + chunkCount, err := retrievalDataIndex.Count() + if err != nil { + return fmt.Errorf("retrievalDataIndex count: %w", err) + } + + pullIdxCnt, _ := pullIndex.Count() + + logger.Debug("index counts", "retrieval index", chunkCount, "pull index", pullIdxCnt) + + e := &epochMigrator{ + stateStore: stateStore, + store: store, + recovery: recovery, + reserve: reserve, + pullIndex: pullIndex, + retrievalDataIndex: retrievalDataIndex, + logger: logger, + } + + if e.reserve != nil && chunkCount > 0 { + err = e.migrateReserve(ctx) + if err != nil { + return err + } + } + + if e.stateStore != nil && chunkCount > 0 { + err = e.migratePinning(ctx) + if err != nil { + return err + } + } + + dbshed.Close() + dbshed = nil + + matches, err := filepath.Glob(filepath.Join(path, "*")) + if err != nil { + return err + } + + for _, m := range matches { + if !strings.Contains(m, indexPath) && !strings.Contains(m, sharkyPath) { + err = os.Remove(m) + if err != nil { + return err + } + } + } + + return store.Put(epochKey{}) +} + +func initShedIndexes(dbshed *shed.DB, baseAddress swarm.Address) (pullIndex shed.Index, retrievalDataIndex shed.Index, err error) { + // pull index allows history and live syncing per po bin + pullIndex, err = dbshed.NewIndex("PO|BinID->Hash", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + key = make([]byte, 9) + key[0] = swarm.Proximity(baseAddress.Bytes(), fields.Address) + binary.BigEndian.PutUint64(key[1:9], fields.BinID) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.BinID = binary.BigEndian.Uint64(key[1:9]) + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + value = make([]byte, 64) // 32 bytes address, 32 bytes batch id + copy(value, fields.Address) + copy(value[32:], fields.BatchID) + return value, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.Address = value[:32] + e.BatchID = value[32:64] + return e, nil + }, + }) + if err != nil { + return shed.Index{}, shed.Index{}, err + } + + // Index storing actual chunk address, data and bin id. + headerSize := 16 + postage.StampSize + retrievalDataIndex, err = dbshed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, headerSize) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary() + if err != nil { + return nil, err + } + copy(b[16:], stamp) + value = append(b, fields.Location...) + return value, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + stamp := new(postage.Stamp) + if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil { + return e, err + } + e.BatchID = stamp.BatchID() + e.Index = stamp.Index() + e.Timestamp = stamp.Timestamp() + e.Sig = stamp.Sig() + e.Location = value[headerSize:] + return e, nil + }, + }) + if err != nil { + return shed.Index{}, shed.Index{}, err + } + + return pullIndex, retrievalDataIndex, nil +} + +// epochMigrator is a helper struct for migrating epoch data. It is used to house +// the main logic of the migration so that it can be tested. Also it houses the +// dependencies of the migration logic. +type epochMigrator struct { + stateStore storage.StateStorer + store storage.BatchedStore + recovery sharkyRecover + reserve reservePutter + pullIndex shed.Index + retrievalDataIndex shed.Index + logger log.Logger +} + +func (e *epochMigrator) migrateReserve(ctx context.Context) error { + type putOp struct { + pIdx shed.Item + chunk swarm.Chunk + loc sharky.Location + } + + e.logger.Debug("migrating reserve contents") + + opChan := make(chan putOp, 4) + eg, egCtx := errgroup.WithContext(ctx) + + for i := 0; i < 4; i++ { + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case op, more := <-opChan: + if !more { + return nil + } + pStorage := &putOpStorage{ + store: e.store, + location: op.loc, + recovery: e.recovery, + } + + err := e.reserve.Put(egCtx, pStorage, op.chunk) + if err != nil { + return err + } + } + } + }) + } + + err := func() error { + defer close(opChan) + + return e.pullIndex.Iterate(func(i shed.Item) (stop bool, err error) { + addr := swarm.NewAddress(i.Address) + + item, err := e.retrievalDataIndex.Get(i) + if err != nil { + e.logger.Debug("retrieval data index read failed", "chunk_address", addr, "error", err) + return false, nil //continue + } + + l, err := sharky.LocationFromBinary(item.Location) + if err != nil { + e.logger.Debug("location from binary failed", "chunk_address", addr, "error", err) + return false, err + } + + chData := make([]byte, l.Length) + err = e.recovery.Read(ctx, l, chData) + if err != nil { + e.logger.Debug("reading location failed", "chunk_address", addr, "error", err) + return false, nil // continue + } + + ch := swarm.NewChunk(addr, chData). + WithStamp(postage.NewStamp(item.BatchID, item.Index, item.Timestamp, item.Sig)) + + select { + case <-egCtx.Done(): + return true, egCtx.Err() + case opChan <- putOp{pIdx: i, chunk: ch, loc: l}: + } + return false, nil + }, nil) + }() + if err != nil { + return err + } + + if err := eg.Wait(); err != nil { + return err + } + + e.logger.Debug("migrating reserve contents done", "reserve_size", e.reserve.Size()) + + return nil +} + +const pinStorePrefix = "root-pin" + +func (e *epochMigrator) migratePinning(ctx context.Context) error { + pinChan := make(chan swarm.Address, 4) + eg, egCtx := errgroup.WithContext(ctx) + + pStorage := &putOpStorage{ + store: e.store, + recovery: e.recovery, + } + var mu sync.Mutex // used to protect pStorage.location + + traverser := traversal.New( + storage.GetterFunc(func(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) { + i := shed.Item{ + Address: addr.Bytes(), + } + item, err := e.retrievalDataIndex.Get(i) + if err != nil { + return nil, err + } + + l, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return nil, err + } + + chData := make([]byte, l.Length) + err = e.recovery.Read(ctx, l, chData) + if err != nil { + return nil, err + } + + return swarm.NewChunk(addr, chData), nil + }), + ) + + e.logger.Debug("migrating pinning collections, if all the chunks in the collection" + + " are not found locally, the collection will not be migrated. Users will have to" + + " re-pin the content using the stewardship API. The migration will print out the failed" + + " collections at the end.") + + for i := 0; i < 4; i++ { + eg.Go(func() error { + for { + select { + case <-egCtx.Done(): + return egCtx.Err() + case addr, more := <-pinChan: + if !more { + return nil + } + + pinningPutter, err := pinstore.NewCollection(pStorage) + if err != nil { + return err + } + + traverserFn := func(chAddr swarm.Address) error { + item, err := e.retrievalDataIndex.Get(shed.Item{Address: chAddr.Bytes()}) + if err != nil { + return err + } + + l, err := sharky.LocationFromBinary(item.Location) + if err != nil { + return err + } + ch := swarm.NewChunk(chAddr, nil) + + mu.Lock() + pStorage.location = l + err = pinningPutter.Put(egCtx, pStorage, pStorage.IndexStore(), ch) + if err != nil { + mu.Unlock() + return err + } + mu.Unlock() + + return nil + } + + err = func() error { + if err := traverser.Traverse(egCtx, addr, traverserFn); err != nil { + return err + } + + if err := pinningPutter.Close(pStorage, pStorage.IndexStore(), addr); err != nil { + return err + } + return nil + }() + + _ = e.stateStore.Delete(fmt.Sprintf("%s-%s", pinStorePrefix, addr)) + + // do not fail the entire migration if the collection is not migrated + if err != nil { + e.logger.Debug("pinning collection migration failed", "collection_root_address", addr, "error", err) + } else { + e.logger.Debug("pinning collection migration successful", "collection_root_address", addr) + } + } + } + }) + } + + err := func() error { + defer close(pinChan) + + return e.stateStore.Iterate(pinStorePrefix, func(key, value []byte) (stop bool, err error) { + var ref swarm.Address + if err := json.Unmarshal(value, &ref); err != nil { + return true, fmt.Errorf("pinning: unmarshal pin reference: %w", err) + } + select { + case <-egCtx.Done(): + return true, egCtx.Err() + case pinChan <- ref: + } + return false, nil + }) + }() + if err != nil { + return err + } + + if err := eg.Wait(); err != nil { + return err + } + + e.logger.Debug("migrating pinning collections done") + + return nil +} diff --git a/pkg/storer/epoch_migration_test.go b/pkg/storer/epoch_migration_test.go new file mode 100644 index 00000000000..6978cfe0536 --- /dev/null +++ b/pkg/storer/epoch_migration_test.go @@ -0,0 +1,333 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer_test + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "io" + "io/fs" + "os" + "path" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/ethersphere/bee/pkg/file/splitter" + "github.com/ethersphere/bee/pkg/log" + postagetesting "github.com/ethersphere/bee/pkg/postage/testing" + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/shed" + mockstatestore "github.com/ethersphere/bee/pkg/statestore/mock" + storage "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage/inmemstore" + chunktest "github.com/ethersphere/bee/pkg/storage/testing" + storer "github.com/ethersphere/bee/pkg/storer" + "github.com/ethersphere/bee/pkg/storer/internal" + pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/swarm" +) + +type dirFS struct { + basedir string +} + +func (d *dirFS) Open(path string) (fs.File, error) { + return os.OpenFile(filepath.Join(d.basedir, path), os.O_RDWR|os.O_CREATE, 0644) +} + +func createOldDataDir(t *testing.T, dataPath string, baseAddress swarm.Address, stateStore storage.StateStorer) { + t.Helper() + + binIDs := map[uint8]int{} + + assignBinID := func(addr swarm.Address) int { + po := swarm.Proximity(baseAddress.Bytes(), addr.Bytes()) + if _, ok := binIDs[po]; !ok { + binIDs[po] = 1 + return 1 + } + binIDs[po]++ + return binIDs[po] + } + + err := os.Mkdir(filepath.Join(dataPath, "sharky"), 0777) + if err != nil { + t.Fatal(err) + } + + sharkyStore, err := sharky.New(&dirFS{basedir: filepath.Join(dataPath, "sharky")}, 2, swarm.SocMaxChunkSize) + if err != nil { + t.Fatal(err) + } + defer sharkyStore.Close() + + shedDB, err := shed.NewDB(dataPath, nil) + if err != nil { + t.Fatal(err) + } + defer shedDB.Close() + + pIdx, rIdx, err := storer.InitShedIndexes(shedDB, baseAddress) + if err != nil { + t.Fatal(err) + } + + reserveChunks := chunktest.GenerateTestRandomChunks(10) + + for _, c := range reserveChunks { + loc, err := sharkyStore.Write(context.Background(), c.Data()) + if err != nil { + t.Fatal(err) + } + + locBuf, err := loc.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + binID := assignBinID(c.Address()) + + err = pIdx.Put(shed.Item{ + Address: c.Address().Bytes(), + BinID: uint64(binID), + BatchID: c.Stamp().BatchID(), + }) + if err != nil { + t.Fatal(err) + } + + err = rIdx.Put(shed.Item{ + Address: c.Address().Bytes(), + BinID: uint64(binID), + BatchID: c.Stamp().BatchID(), + Index: c.Stamp().Index(), + Timestamp: c.Stamp().Timestamp(), + Sig: c.Stamp().Sig(), + Location: locBuf, + }) + + if err != nil { + t.Fatal(err) + } + } + + // create a pinning collection + writer := splitter.NewSimpleSplitter( + storage.PutterFunc( + func(ctx context.Context, chunk swarm.Chunk) error { + c := chunk.WithStamp(postagetesting.MustNewStamp()) + + loc, err := sharkyStore.Write(context.Background(), c.Data()) + if err != nil { + return err + } + + locBuf, err := loc.MarshalBinary() + if err != nil { + return err + } + + return rIdx.Put(shed.Item{ + Address: c.Address().Bytes(), + BatchID: c.Stamp().BatchID(), + Index: c.Stamp().Index(), + Timestamp: c.Stamp().Timestamp(), + Sig: c.Stamp().Sig(), + Location: locBuf, + }) + }, + ), + ) + + randData := make([]byte, 4096*20) + _, err = rand.Read(randData) + if err != nil { + t.Fatal(err) + } + + root, err := writer.Split(context.Background(), io.NopCloser(bytes.NewBuffer(randData)), 4096*20, false) + if err != nil { + t.Fatal(err) + } + + err = stateStore.Put(fmt.Sprintf("root-pin-%s", root.String()), root) + if err != nil { + t.Fatal(err) + } +} + +type testSharkyRecovery struct { + *sharky.Recovery + mtx sync.Mutex + addCalls int +} + +func (t *testSharkyRecovery) Add(loc sharky.Location) error { + t.mtx.Lock() + t.addCalls++ + t.mtx.Unlock() + return t.Recovery.Add(loc) +} + +type testReservePutter struct { + mtx sync.Mutex + size int + calls int +} + +func (t *testReservePutter) Put(ctx context.Context, st internal.Storage, ch swarm.Chunk) error { + t.mtx.Lock() + t.calls++ + t.mtx.Unlock() + return st.ChunkStore().Put(ctx, ch) +} + +func (t *testReservePutter) AddSize(size int) { + t.mtx.Lock() + t.size += size + t.mtx.Unlock() +} + +func (t *testReservePutter) Size() int { + t.mtx.Lock() + defer t.mtx.Unlock() + return t.size +} + +// TestEpochMigration_FLAKY is flaky on windows. +func TestEpochMigration_FLAKY(t *testing.T) { + t.Parallel() + t.Skip("will be removed") + + var ( + dataPath = t.TempDir() + baseAddress = swarm.RandAddress(t) + stateStore = mockstatestore.NewStateStore() + reserve = &testReservePutter{} + logBytes = bytes.NewBuffer(nil) + logger = log.NewLogger("test", log.WithSink(logBytes)) + indexStore = inmemstore.New() + ) + + createOldDataDir(t, dataPath, baseAddress, stateStore) + + r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize) + if err != nil { + t.Fatal(err) + } + + sharkyRecovery := &testSharkyRecovery{Recovery: r} + + err = storer.EpochMigration( + context.Background(), + dataPath, + stateStore, + indexStore, + reserve, + sharkyRecovery, + logger, + ) + if err != nil { + t.Fatal(err) + } + + if !strings.Contains(logBytes.String(), "migrating pinning collections done") { + t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String()) + } + + if !strings.Contains(logBytes.String(), "migrating reserve contents done") { + t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String()) + } + + if sharkyRecovery.addCalls != 31 { + t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls) + } + + if reserve.calls != 10 { + t.Fatalf("expected 10 reserve calls, got %d", reserve.calls) + } + + if reserve.size != 10 { + t.Fatalf("expected 10 reserve size, got %d", reserve.size) + } + + pins, err := pinstore.Pins(indexStore) + if err != nil { + t.Fatal(err) + } + + if len(pins) != 1 { + t.Fatalf("expected 1 pin, got %d", len(pins)) + } + + if !strings.Contains(logBytes.String(), pins[0].String()) { + t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String()) + } +} + +func TestEpochMigrationLightNode(t *testing.T) { + t.Parallel() + + var ( + dataPath = t.TempDir() + baseAddress = swarm.RandAddress(t) + stateStore = mockstatestore.NewStateStore() + reserve storer.ReservePutter + logBytes = bytes.NewBuffer(nil) + logger = log.NewLogger("test", log.WithSink(logBytes)) + indexStore = inmemstore.New() + ) + + createOldDataDir(t, dataPath, baseAddress, stateStore) + + r, err := sharky.NewRecovery(path.Join(dataPath, "sharky"), 2, swarm.SocMaxChunkSize) + if err != nil { + t.Fatal(err) + } + + sharkyRecovery := &testSharkyRecovery{Recovery: r} + + err = storer.EpochMigration( + context.Background(), + dataPath, + stateStore, + indexStore, + reserve, + sharkyRecovery, + logger, + ) + if err != nil { + t.Fatal(err) + } + + if !strings.Contains(logBytes.String(), "migrating pinning collections done") { + t.Fatalf("expected log to contain 'migrating pinning collections done', got %s", logBytes.String()) + } + + if strings.Contains(logBytes.String(), "migrating reserve contents done") { + t.Fatalf("expected log to not contain 'migrating reserve contents done', got %s", logBytes.String()) + } + + if sharkyRecovery.addCalls != 21 { + t.Fatalf("expected 31 add calls, got %d", sharkyRecovery.addCalls) + } + + pins, err := pinstore.Pins(indexStore) + if err != nil { + t.Fatal(err) + } + + if len(pins) != 1 { + t.Fatalf("expected 1 pin, got %d", len(pins)) + } + + if !strings.ContainsAny(logBytes.String(), pins[0].String()) { + t.Fatalf("expected log to contain root pin reference, got %s", logBytes.String()) + } +} diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 83b6a4c236c..97ca16a70e0 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -139,13 +139,11 @@ func (i item) String() string { } // Load returns first found swarm.Stamp related to the given address. -// The storage.ErrNoStampsForChunk is returned if no record is found. func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, error) { return LoadWithBatchID(s, namespace, addr, nil) } // LoadWithBatchID returns swarm.Stamp related to the given address and batchID. -// The storage.ErrNoStampsForChunk is returned if no record is found. func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) { var stamp swarm.Stamp diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index c640052609e..c60262adea7 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -21,18 +21,11 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/topology" + "resenje.org/multex" ) -// loggerName is the tree path name of the logger for this package. -const loggerName = "reserve" const reserveNamespace = "reserve" -/* - pull by bin - binID - evict by bin - batchID - sample by bin -*/ - type Reserve struct { baseAddr swarm.Address radiusSetter topology.SetStorageRadiuser @@ -41,9 +34,9 @@ type Reserve struct { capacity int size atomic.Int64 radius atomic.Uint32 - cacheCb func(context.Context, internal.Storage, ...swarm.Address) error binMtx sync.Mutex + mutx *multex.Multex } func New( @@ -52,15 +45,14 @@ func New( capacity int, radiusSetter topology.SetStorageRadiuser, logger log.Logger, - cb func(context.Context, internal.Storage, ...swarm.Address) error, ) (*Reserve, error) { rs := &Reserve{ baseAddr: baseAddr, capacity: capacity, radiusSetter: radiusSetter, - logger: logger.WithName(loggerName).Register(), - cacheCb: cb, + logger: logger.WithName(reserveNamespace).Register(), + mutx: multex.New(), } rItem := &radiusItem{} @@ -93,10 +85,13 @@ func New( } // Put stores a new chunk in the reserve and returns if the reserve size should increase. -func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.Chunk) (bool, error) { +func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.Chunk) error { indexStore := store.IndexStore() chunkStore := store.ChunkStore() + unlock := r.lock(chunk.Address(), chunk.Stamp().BatchID()) + defer unlock() + po := swarm.Proximity(r.baseAddr.Bytes(), chunk.Address().Bytes()) has, err := indexStore.Has(&BatchRadiusItem{ @@ -105,40 +100,40 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C BatchID: chunk.Stamp().BatchID(), }) if err != nil { - return false, err + return err } if has { - return false, nil + return nil } storeBatch, err := indexStore.Batch(ctx) if err != nil { - return false, err + return err } newStampIndex := true item, loaded, err := stampindex.LoadOrStore(indexStore, storeBatch, reserveNamespace, chunk) if err != nil { - return false, fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) + return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } if loaded { prev := binary.BigEndian.Uint64(item.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { - return false, fmt.Errorf("overwrite prev %d cur %d :%w", prev, curr, storage.ErrOverwriteNewerChunk) + return fmt.Errorf("overwrite prev %d cur %d batch %s :%w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } // An older and different chunk with the same batchID and stamp index has been previously // saved to the reserve. We must do the below before saving the new chunk: - // 1. Delete the old chunk from the chunkstore - // 2. Delete the old chunk's stamp data - // 3. Delete ALL old chunk related items from the reserve - // 4. Update the stamp index + // 1. Delete the old chunk from the chunkstore. + // 2. Delete the old chunk's stamp data. + // 3. Delete ALL old chunk related items from the reserve. + // 4. Update the stamp index. newStampIndex = false - err = r.DeleteChunk(ctx, store, storeBatch, item.ChunkAddress, chunk.Stamp().BatchID()) + err := r.removeChunk(ctx, store, storeBatch, item.ChunkAddress, chunk.Stamp().BatchID()) if err != nil { - return false, fmt.Errorf("failed removing older chunk: %w", err) + return fmt.Errorf("failed removing older chunk: %w", err) } r.logger.Debug( @@ -150,18 +145,18 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C err = stampindex.Store(storeBatch, reserveNamespace, chunk) if err != nil { - return false, fmt.Errorf("failed updating stamp index: %w", err) + return fmt.Errorf("failed updating stamp index: %w", err) } } err = chunkstamp.Store(storeBatch, reserveNamespace, chunk) if err != nil { - return false, err + return err } binID, err := r.IncBinID(indexStore, po) if err != nil { - return false, err + return err } err = storeBatch.Put(&BatchRadiusItem{ @@ -171,7 +166,7 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C BatchID: chunk.Stamp().BatchID(), }) if err != nil { - return false, err + return err } err = storeBatch.Put(&ChunkBinItem{ @@ -182,15 +177,24 @@ func (r *Reserve) Put(ctx context.Context, store internal.Storage, chunk swarm.C ChunkType: ChunkType(chunk), }) if err != nil { - return false, err + return err } err = chunkStore.Put(ctx, chunk) if err != nil { - return false, err + return err + } + + err = storeBatch.Commit() + if err != nil { + return err } - return newStampIndex, storeBatch.Commit() + if newStampIndex { + r.size.Add(1) + } + + return nil } func (r *Reserve) Has(store storage.Store, addr swarm.Address, batchID []byte) (bool, error) { @@ -199,6 +203,10 @@ func (r *Reserve) Has(store storage.Store, addr swarm.Address, batchID []byte) ( } func (r *Reserve) Get(ctx context.Context, storage internal.Storage, addr swarm.Address, batchID []byte) (swarm.Chunk, error) { + + unlock := r.lock(addr, batchID) + defer unlock() + item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr} err := storage.IndexStore().Get(item) if err != nil { @@ -306,12 +314,20 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb func (r *Reserve) EvictBatchBin( ctx context.Context, txExecutor internal.TxExecutor, - bin uint8, batchID []byte, + count int, + bin uint8, ) (int, error) { + unlock := r.lock(swarm.ZeroAddress, batchID) + defer unlock() + var evicted []*BatchRadiusItem + if count <= 0 { + return 0, nil + } + err := txExecutor.Execute(ctx, func(store internal.Storage) error { return store.IndexStore().Iterate(storage.Query{ Factory: func() storage.Item { return &BatchRadiusItem{} }, @@ -322,6 +338,9 @@ func (r *Reserve) EvictBatchBin( return true, nil } evicted = append(evicted, batchRadius) + if len(evicted) == count { + return true, nil + } return false, nil }) }) @@ -331,6 +350,9 @@ func (r *Reserve) EvictBatchBin( batchCnt := 1_000 evictionCompleted := 0 + defer func() { + r.size.Add(-int64(evictionCompleted)) + }() for i := 0; i < len(evicted); i += batchCnt { end := i + batchCnt @@ -347,7 +369,7 @@ func (r *Reserve) EvictBatchBin( } for _, item := range evicted[i:end] { - err = removeChunk(ctx, store, batch, item) + err = removeChunkWithItem(ctx, store, batch, item) if err != nil { return err } @@ -356,11 +378,6 @@ func (r *Reserve) EvictBatchBin( if err := batch.Commit(); err != nil { return err } - - if err := r.cacheCb(ctx, store, moveToCache...); err != nil { - r.logger.Error(err, "evict and move to cache") - } - return nil }) if err != nil { @@ -372,7 +389,7 @@ func (r *Reserve) EvictBatchBin( return evictionCompleted, nil } -func (r *Reserve) DeleteChunk( +func (r *Reserve) removeChunk( ctx context.Context, store internal.Storage, batch storage.Writer, @@ -388,18 +405,10 @@ func (r *Reserve) DeleteChunk( if err != nil { return err } - err = removeChunk(ctx, store, batch, item) - if err != nil { - return err - } - if err := r.cacheCb(ctx, store, item.Address); err != nil { - r.logger.Error(err, "delete and move to cache") - return err - } - return nil + return removeChunkWithItem(ctx, store, batch, item) } -func removeChunk( +func removeChunkWithItem( ctx context.Context, store internal.Storage, batch storage.Writer, @@ -423,15 +432,31 @@ func removeChunk( } return errors.Join(errs, - batch.Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}), batch.Delete(item), + batch.Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}), + store.ChunkStore().Delete(ctx, item.Address), ) } +func (r *Reserve) lock(addr swarm.Address, batchID []byte) func() { + r.mutx.Lock(addr.ByteString()) + r.mutx.Lock(string(batchID)) + return func() { + r.mutx.Unlock(addr.ByteString()) + r.mutx.Unlock(string(batchID)) + } +} + func (r *Reserve) Radius() uint8 { return uint8(r.radius.Load()) } +func (r *Reserve) SetRadius(store storage.Store, rad uint8) error { + r.radius.Store(uint32(rad)) + r.radiusSetter.SetStorageRadius(rad) + return store.Put(&radiusItem{Radius: rad}) +} + func (r *Reserve) Size() int { return int(r.size.Load()) } @@ -440,10 +465,6 @@ func (r *Reserve) Capacity() int { return r.capacity } -func (r *Reserve) AddSize(diff int) { - r.size.Add(int64(diff)) -} - func (r *Reserve) IsWithinCapacity() bool { return int(r.size.Load()) <= r.capacity } @@ -455,12 +476,6 @@ func (r *Reserve) EvictionTarget() int { return int(r.size.Load()) - r.capacity } -func (r *Reserve) SetRadius(store storage.Store, rad uint8) error { - r.radius.Store(uint32(rad)) - r.radiusSetter.SetStorageRadius(rad) - return store.Put(&radiusItem{Radius: rad}) -} - func (r *Reserve) LastBinIDs(store storage.Store) ([]uint64, uint64, error) { r.binMtx.Lock() defer r.binMtx.Unlock() diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 56a1ba4ee11..1a1b9074f29 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "errors" + "math" "math/rand" "testing" "time" @@ -25,10 +26,6 @@ import ( kademlia "github.com/ethersphere/bee/v2/pkg/topology/mock" ) -func noopCacher(_ context.Context, _ internal.Storage, _ ...swarm.Address) error { - return nil -} - func TestReserve(t *testing.T) { t.Parallel() @@ -46,7 +43,6 @@ func TestReserve(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - noopCacher, ) if err != nil { t.Fatal(err) @@ -55,13 +51,10 @@ func TestReserve(t *testing.T) { for b := 0; b < 2; b++ { for i := 1; i < 51; i++ { ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) - c, err := r.Put(context.Background(), ts, ch) + err := r.Put(context.Background(), ts, ch) if err != nil { t.Fatal(err) } - if !c { - t.Fatal("entered unique chunk") - } checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: uint8(b), BatchID: ch.Stamp().BatchID(), Address: ch.Address()}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: uint8(b), BinID: uint64(i)}, false) checkChunk(t, ts, ch, false) @@ -103,7 +96,6 @@ func TestReserveChunkType(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - noopCacher, ) if err != nil { t.Fatal(err) @@ -119,7 +111,7 @@ func TestReserveChunkType(t *testing.T) { ch = chunk.GenerateTestRandomSoChunk(t, ch) storedChunksSO++ } - if _, err := r.Put(ctx, ts, ch); err != nil { + if err := r.Put(ctx, ts, ch); err != nil { t.Errorf("unexpected error: %v", err) } } @@ -166,15 +158,6 @@ func TestReplaceOldIndex(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - func(ctx context.Context, st internal.Storage, addrs ...swarm.Address) error { - for _, addr := range addrs { - err := st.ChunkStore().Delete(ctx, addr) - if err != nil { - return err - } - } - return nil - }, ) if err != nil { t.Fatal(err) @@ -184,12 +167,12 @@ func TestReplaceOldIndex(t *testing.T) { ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) ch2 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) - _, err = r.Put(context.Background(), ts, ch1) + err = r.Put(context.Background(), ts, ch1) if err != nil { t.Fatal(err) } - _, err = r.Put(context.Background(), ts, ch2) + err = r.Put(context.Background(), ts, ch2) if err != nil { t.Fatal(err) } @@ -235,15 +218,6 @@ func TestEvict(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - func(ctx context.Context, st internal.Storage, addrs ...swarm.Address) error { - for _, addr := range addrs { - err := st.ChunkStore().Delete(ctx, addr) - if err != nil { - return err - } - } - return nil - }, ) if err != nil { t.Fatal(err) @@ -253,19 +227,16 @@ func TestEvict(t *testing.T) { for b := 0; b < 3; b++ { ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b).WithStamp(postagetesting.MustNewBatchStamp(batches[b].ID)) chunks = append(chunks, ch) - c, err := r.Put(context.Background(), ts, ch) + err := r.Put(context.Background(), ts, ch) if err != nil { t.Fatal(err) } - if !c { - t.Fatal("entered unique chunk") - } } } totalEvicted := 0 for i := 0; i < 3; i++ { - evicted, err := r.EvictBatchBin(context.Background(), ts, uint8(i), evictBatch.ID) + evicted, err := r.EvictBatchBin(context.Background(), ts, evictBatch.ID, math.MaxInt, uint8(i)) if err != nil { t.Fatal(err) } @@ -300,6 +271,46 @@ func TestEvict(t *testing.T) { } } +func TestEvictMaxCount(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + + ts, closer := internal.NewInmemStorage() + t.Cleanup(func() { + if err := closer(); err != nil { + t.Errorf("failed closing the storage: %v", err) + } + }) + + r, err := reserve.New(baseAddr, ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + + for i := 0; i < 50; i++ { + ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewBatchStamp(batch.ID)) + err := r.Put(context.Background(), ts, ch) + if err != nil { + t.Fatal(err) + } + } + + evicted, err := r.EvictBatchBin(context.Background(), ts, batch.ID, 10, 1) + if err != nil { + t.Fatal(err) + } + if evicted != 10 { + t.Fatalf("wanted evicted count 10, got %d", evicted) + } + + if r.Size() != 40 { + t.Fatalf("wanted size 40, got %d", r.Size()) + } +} + func TestIterate(t *testing.T) { t.Parallel() @@ -320,7 +331,6 @@ func TestIterate(t *testing.T) { ts.IndexStore(), 0, kademlia.NewTopologyDriver(), log.Noop, - noopCacher, ) if err != nil { t.Fatal(err) @@ -329,13 +339,10 @@ func TestIterate(t *testing.T) { for b := 0; b < 3; b++ { for i := 0; i < 10; i++ { ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) - c, err := r.Put(context.Background(), ts, ch) + err := r.Put(context.Background(), ts, ch) if err != nil { t.Fatal(err) } - if !c { - t.Fatal("entered unique chunk") - } } } diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 589a0f853a1..921b37f633c 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "errors" "fmt" + "math" "slices" "sync" "sync/atomic" @@ -30,10 +31,6 @@ const ( batchExpiryDone = "batchExpiryDone" ) -func reserveUpdateBatchLockKey(batchID []byte) string { - return fmt.Sprintf("%s%s", reserveUpdateLockKey, string(batchID)) -} - var errMaxRadius = errors.New("max radius reached") var reserveSizeWithinRadius atomic.Uint64 @@ -228,7 +225,7 @@ func (db *DB) evictExpiredBatches(ctx context.Context) error { } for _, batchID := range batches { - evicted, err := db.evictBatch(ctx, batchID, swarm.MaxBins) + evicted, err := db.evictBatch(ctx, batchID, math.MaxInt, swarm.MaxBins) if err != nil { return err } @@ -312,25 +309,11 @@ func (db *DB) ReservePutter() storage.Putter { return putterWithMetrics{ storage.PutterFunc( func(ctx context.Context, chunk swarm.Chunk) (err error) { - - var ( - newIndex bool - ) - lockKey := reserveUpdateBatchLockKey(chunk.Stamp().BatchID()) - db.lock.Lock(lockKey) err = db.Execute(ctx, func(tx internal.Storage) error { - newIndex, err = db.reserve.Put(ctx, tx, chunk) - if err != nil { - return fmt.Errorf("reserve: putter.Put: %w", err) - } - return nil + return db.reserve.Put(ctx, tx, chunk) }) - db.lock.Unlock(lockKey) if err != nil { - return err - } - if newIndex { - db.reserve.AddSize(1) + return fmt.Errorf("reserve: putter.Put: %w", err) } db.reserveBinEvents.Trigger(string(db.po(chunk.Address()))) if !db.reserve.IsWithinCapacity() { @@ -348,11 +331,11 @@ func (db *DB) ReservePutter() storage.Putter { func (db *DB) evictBatch( ctx context.Context, batchID []byte, + evictCount int, upToBin uint8, ) (evicted int, err error) { dur := captureDuration(time.Now()) defer func() { - db.reserve.AddSize(-evicted) db.metrics.ReserveSize.Set(float64(db.reserve.Size())) db.metrics.MethodCallsDuration.WithLabelValues("reserve", "EvictBatch").Observe(dur()) if err == nil { @@ -374,11 +357,7 @@ func (db *DB) evictBatch( ) }() - lockKey := reserveUpdateBatchLockKey(batchID) - db.lock.Lock(lockKey) - defer db.lock.Unlock(lockKey) - - return db.reserve.EvictBatchBin(ctx, db, upToBin, batchID) + return db.reserve.EvictBatchBin(ctx, db, batchID, evictCount, upToBin) } func (db *DB) unreserve(ctx context.Context) (err error) { @@ -425,7 +404,7 @@ func (db *DB) unreserve(ctx context.Context) (err error) { default: } - binEvicted, err := db.evictBatch(ctx, b, radius) + binEvicted, err := db.evictBatch(ctx, b, target-totalEvicted, radius) // eviction happens in batches, so we need to keep track of the total // number of chunks evicted even if there was an error totalEvicted += binEvicted diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 9537f230c49..497f49773a8 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -193,19 +193,14 @@ func TestEvictBatch(t *testing.T) { } } + c, unsub := st.Events().Subscribe("batchExpiryDone") + t.Cleanup(unsub) + err = st.EvictBatch(ctx, evictBatch.ID) if err != nil { t.Fatal(err) } - - c, unsub := st.Events().Subscribe("batchExpiryDone") - t.Cleanup(unsub) - gotUnreserveSignal := make(chan struct{}) - go func() { - defer close(gotUnreserveSignal) - <-c - }() - <-gotUnreserveSignal + <-c reserve := st.Reserve() @@ -219,7 +214,7 @@ func TestEvictBatch(t *testing.T) { if has { t.Fatal("store should NOT have chunk") } - checkSaved(t, st, ch, false, true) + checkSaved(t, st, ch, false, false) } else if !has { t.Fatal("store should have chunk") checkSaved(t, st, ch, true, true) @@ -310,7 +305,7 @@ func TestUnreserveCap(t *testing.T) { if has { t.Fatal("store should NOT have chunk at PO", po) } - checkSaved(t, storer, ch, false, true) + checkSaved(t, storer, ch, false, false) } else if !has { t.Fatal("store should have chunk at PO", po) } else { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index c6987a54554..c016062a6cf 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -572,7 +572,6 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { opts.ReserveCapacity, opts.RadiusSetter, logger, - db.CacheShallowCopy, ) if err != nil { return nil, err