From ab76c0b94fc51c049c6b8fd75ad26be989175d6d Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 11 Sep 2024 15:00:15 +0300 Subject: [PATCH] fix: reset reserve only when neighborhood hopping (#4807) --- pkg/node/node.go | 23 ++-- pkg/storer/internal/chunkstamp/chunkstamp.go | 126 +++++++++--------- pkg/storer/internal/chunkstamp/export_test.go | 6 +- pkg/storer/internal/reserve/reserve.go | 113 +++++++++++++--- pkg/storer/internal/reserve/reserve_test.go | 113 +++++++++++++++- pkg/storer/internal/stampindex/export_test.go | 6 +- .../internal/stampindex/oldstampindex.go | 2 +- pkg/storer/internal/stampindex/stampindex.go | 86 +++++------- .../internal/stampindex/stampindex_test.go | 103 +------------- pkg/storer/internal/upload/uploadstore.go | 10 +- pkg/storer/migration/step_06.go | 4 +- pkg/storer/reserve_test.go | 4 +- pkg/storer/storer.go | 5 + 13 files changed, 339 insertions(+), 262 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index cb9c1708feb..a84df04eca9 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -10,6 +10,7 @@ package node import ( "context" "crypto/ecdsa" + "encoding/hex" "errors" "fmt" "io" @@ -277,7 +278,7 @@ func NewBee( } } - var changedOverlay bool + var changedOverlay, resetReserve bool if targetNeighborhood != "" { neighborhood, err := swarm.ParseBitStrAddress(targetNeighborhood) if err != nil { @@ -293,21 +294,19 @@ func NewBee( } if nonceExists { - logger.Info("Override nonce %x to %x and clean state for neighborhood %s", nonce, newNonce, targetNeighborhood) + logger.Info("Override nonce and clean state for neighborhood", "old_none", hex.EncodeToString(nonce), "new_nonce", hex.EncodeToString(newNonce)) logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...") time.Sleep(10 * time.Second) - dirsToNuke := []string{ioutil.DataPathLocalstore, ioutil.DataPathKademlia} - for _, dir := range dirsToNuke { - err := ioutil.RemoveContent(filepath.Join(o.DataDir, dir)) - if err != nil { - return nil, fmt.Errorf("delete %s: %w", dir, err) - } + err := ioutil.RemoveContent(filepath.Join(o.DataDir, ioutil.DataPathKademlia)) + if err != nil { + return nil, fmt.Errorf("delete %s: %w", ioutil.DataPathKademlia, err) } if err := stateStore.ClearForHopping(); err != nil { return nil, fmt.Errorf("clearing stateStore %w", err) } + resetReserve = true } swarmAddress = newSwarmAddress @@ -736,6 +735,14 @@ func NewBee( b.localstoreCloser = localStore evictFn = func(id []byte) error { return localStore.EvictBatch(context.Background(), id) } + if resetReserve { + logger.Warning("resetting the reserve") + err := localStore.ResetReserve(ctx) + if err != nil { + return nil, fmt.Errorf("reset reserve: %w", err) + } + } + actLogic := accesscontrol.NewLogic(session) accesscontrol := accesscontrol.NewController(actLogic) b.accesscontrolCloser = accesscontrol diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 1f84b8c7a3c..2f3857d2ed3 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -17,8 +17,8 @@ import ( ) var ( - // errMarshalInvalidChunkStampItemNamespace is returned during marshaling if the namespace is not set. - errMarshalInvalidChunkStampItemNamespace = errors.New("marshal chunkstamp.Item: invalid namespace") + // errMarshalInvalidChunkStampItemScope is returned during marshaling if the scope is not set. + errMarshalInvalidChunkStampItemScope = errors.New("marshal chunkstamp.Item: invalid scope") // errMarshalInvalidChunkStampAddress is returned during marshaling if the address is zero. errMarshalInvalidChunkStampItemAddress = errors.New("marshal chunkstamp.item: invalid address") // errUnmarshalInvalidChunkStampAddress is returned during unmarshaling if the address is not set. @@ -29,55 +29,59 @@ var ( errUnmarshalInvalidChunkStampItemSize = errors.New("unmarshal chunkstamp.item: invalid size") ) -var _ storage.Item = (*item)(nil) +var _ storage.Item = (*Item)(nil) -// item is the index used to represent a stamp for a chunk. +// Item is the index used to represent a stamp for a chunk. // -// Going ahead we will support multiple stamps on chunks. This item will allow +// Going ahead we will support multiple stamps on chunks. This Item will allow // mapping multiple stamps to a single address. For this reason, the address is // part of the Namespace and can be used to iterate on all the stamps for this // address. -type item struct { - namespace []byte // The namespace of other related item. - address swarm.Address - stamp swarm.Stamp +type Item struct { + scope []byte // The scope of other related item. + address swarm.Address + stamp swarm.Stamp } // ID implements the storage.Item interface. -func (i *item) ID() string { +func (i *Item) ID() string { return storageutil.JoinFields(string(i.stamp.BatchID()), string(i.stamp.Index())) } // Namespace implements the storage.Item interface. -func (i *item) Namespace() string { - return storageutil.JoinFields("chunkStamp", string(i.namespace), i.address.ByteString()) +func (i *Item) Namespace() string { + return storageutil.JoinFields("chunkStamp", string(i.scope), i.address.ByteString()) +} + +func (i *Item) SetScope(ns []byte) { + i.scope = ns } // Marshal implements the storage.Item interface. // address is not part of the payload which is stored, as address is part of the // prefix, hence already known before querying this object. This will be reused // during unmarshaling. -func (i *item) Marshal() ([]byte, error) { +func (i *Item) Marshal() ([]byte, error) { // The address is not part of the payload, but it is used to create the - // namespace so it is better if we check that the address is correctly + // scope so it is better if we check that the address is correctly // set here before it is stored in the underlying storage. switch { - case len(i.namespace) == 0: - return nil, errMarshalInvalidChunkStampItemNamespace + case len(i.scope) == 0: + return nil, errMarshalInvalidChunkStampItemScope case i.address.IsZero(): return nil, errMarshalInvalidChunkStampItemAddress case i.stamp == nil: return nil, errMarshalInvalidChunkStampItemStamp } - buf := make([]byte, 8+len(i.namespace)+postage.StampSize) + buf := make([]byte, 8+len(i.scope)+postage.StampSize) l := 0 - binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace))) + binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.scope))) l += 8 - copy(buf[l:l+len(i.namespace)], i.namespace) - l += len(i.namespace) + copy(buf[l:l+len(i.scope)], i.scope) + l += len(i.scope) data, err := i.stamp.MarshalBinary() if err != nil { return nil, fmt.Errorf("unable to marshal chunkstamp.item: %w", err) @@ -88,7 +92,7 @@ func (i *item) Marshal() ([]byte, error) { } // Unmarshal implements the storage.Item interface. -func (i *item) Unmarshal(bytes []byte) error { +func (i *Item) Unmarshal(bytes []byte) error { if len(bytes) < 8 { return errUnmarshalInvalidChunkStampItemSize } @@ -102,9 +106,9 @@ func (i *item) Unmarshal(bytes []byte) error { return errUnmarshalInvalidChunkStampItemAddress } - ni := &item{address: i.address.Clone()} + ni := &Item{address: i.address.Clone()} l := 8 - ni.namespace = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) + ni.scope = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) l += nsLen stamp := new(postage.Stamp) if err := stamp.UnmarshalBinary(bytes[l : l+postage.StampSize]); err != nil { @@ -119,13 +123,13 @@ func (i *item) Unmarshal(bytes []byte) error { } // Clone implements the storage.Item interface. -func (i *item) Clone() storage.Item { +func (i *Item) Clone() storage.Item { if i == nil { return nil } - clone := &item{ - namespace: append([]byte(nil), i.namespace...), - address: i.address.Clone(), + clone := &Item{ + scope: append([]byte(nil), i.scope...), + address: i.address.Clone(), } if i.stamp != nil { clone.stamp = i.stamp.Clone() @@ -134,31 +138,31 @@ func (i *item) Clone() storage.Item { } // String implements the storage.Item interface. -func (i item) String() string { +func (i Item) String() string { return storageutil.JoinFields(i.Namespace(), i.ID()) } // Load returns first found swarm.Stamp related to the given address. -func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, error) { - return LoadWithBatchID(s, namespace, addr, nil) +func Load(s storage.Reader, scope string, addr swarm.Address) (swarm.Stamp, error) { + return LoadWithBatchID(s, scope, addr, nil) } // LoadWithBatchID returns swarm.Stamp related to the given address and batchID. -func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) { +func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) { var stamp swarm.Stamp found := false err := s.Iterate( storage.Query{ Factory: func() storage.Item { - return &item{ - namespace: []byte(namespace), - address: addr, + return &Item{ + scope: []byte(scope), + address: addr, } }, }, func(res storage.Result) (bool, error) { - item := res.Entry.(*item) + item := res.Entry.(*Item) if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) { stamp = item.stamp found = true @@ -178,12 +182,12 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat } // Store creates new or updated an existing stamp index -// record related to the given namespace and chunk. -func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { - item := &item{ - namespace: []byte(namespace), - address: chunk.Address(), - stamp: chunk.Stamp(), +// record related to the given scope and chunk. +func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error { + item := &Item{ + scope: []byte(scope), + address: chunk.Address(), + stamp: chunk.Stamp(), } if err := s.Put(item); err != nil { return fmt.Errorf("unable to put chunkstamp.item %s: %w", item, err) @@ -192,19 +196,19 @@ func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { } // DeleteAll removes all swarm.Stamp related to the given address. -func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error { +func DeleteAll(s storage.IndexStore, scope string, addr swarm.Address) error { var stamps []swarm.Stamp err := s.Iterate( storage.Query{ Factory: func() storage.Item { - return &item{ - namespace: []byte(namespace), - address: addr, + return &Item{ + scope: []byte(scope), + address: addr, } }, }, func(res storage.Result) (bool, error) { - stamps = append(stamps, res.Entry.(*item).stamp) + stamps = append(stamps, res.Entry.(*Item).stamp) return false, nil }, ) @@ -216,10 +220,10 @@ func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error for _, stamp := range stamps { errs = errors.Join( errs, - s.Delete(&item{ - namespace: []byte(namespace), - address: addr, - stamp: stamp, + s.Delete(&Item{ + scope: []byte(scope), + address: addr, + stamp: stamp, }), ) } @@ -227,30 +231,30 @@ func DeleteAll(s storage.IndexStore, namespace string, addr swarm.Address) error } // Delete removes a stamp associated with an chunk and batchID. -func Delete(s storage.IndexStore, namespace string, addr swarm.Address, batchId []byte) error { - stamp, err := LoadWithBatchID(s, namespace, addr, batchId) +func Delete(s storage.IndexStore, scope string, addr swarm.Address, batchId []byte) error { + stamp, err := LoadWithBatchID(s, scope, addr, batchId) if err != nil { if errors.Is(err, storage.ErrNotFound) { return nil } return err } - return s.Delete(&item{ - namespace: []byte(namespace), - address: addr, - stamp: stamp, + return s.Delete(&Item{ + scope: []byte(scope), + address: addr, + stamp: stamp, }) } func DeleteWithStamp( writer storage.Writer, - namespace string, + scope string, addr swarm.Address, stamp swarm.Stamp, ) error { - return writer.Delete(&item{ - namespace: []byte(namespace), - address: addr, - stamp: stamp, + return writer.Delete(&Item{ + scope: []byte(scope), + address: addr, + stamp: stamp, }) } diff --git a/pkg/storer/internal/chunkstamp/export_test.go b/pkg/storer/internal/chunkstamp/export_test.go index 77e54330e66..008cbf20e29 100644 --- a/pkg/storer/internal/chunkstamp/export_test.go +++ b/pkg/storer/internal/chunkstamp/export_test.go @@ -6,10 +6,8 @@ package chunkstamp import "github.com/ethersphere/bee/v2/pkg/swarm" -type Item = item - func (i *Item) WithNamespace(ns string) *Item { - i.namespace = []byte(ns) + i.scope = []byte(ns) return i } @@ -24,7 +22,7 @@ func (i *Item) WithStamp(stamp swarm.Stamp) *Item { } var ( - ErrMarshalInvalidChunkStampItemNamespace = errMarshalInvalidChunkStampItemNamespace + ErrMarshalInvalidChunkStampItemNamespace = errMarshalInvalidChunkStampItemScope ErrMarshalInvalidChunkStampItemAddress = errMarshalInvalidChunkStampItemAddress ErrUnmarshalInvalidChunkStampItemAddress = errUnmarshalInvalidChunkStampItemAddress ErrMarshalInvalidChunkStampItemStamp = errMarshalInvalidChunkStampItemStamp diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 8a26492114f..4e7bfed506a 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -16,6 +16,7 @@ import ( "time" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" @@ -26,7 +27,7 @@ import ( "resenje.org/multex" ) -const reserveNamespace = "reserve" +const reserveScope = "reserve" type Reserve struct { baseAddr swarm.Address @@ -54,7 +55,7 @@ func New( st: st, capacity: capacity, radiusSetter: radiusSetter, - logger: logger.WithName(reserveNamespace).Register(), + logger: logger.WithName(reserveScope).Register(), multx: multex.New(), } @@ -128,19 +129,19 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { err = r.st.Run(ctx, func(s transaction.Store) error { - oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) + oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) if err != nil { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } - sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID()) + sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID()) if err != nil && !errors.Is(err, storage.ErrNotFound) { return err } // same chunk address, same batch if sameAddressOldStamp != nil { - sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldStamp) + sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) if err != nil { return err } @@ -188,8 +189,8 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { err = errors.Join( s.IndexStore().Delete(oldBatchRadiusItem), s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), - stampindex.DeleteWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldStamp), - chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldStamp), + stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp), ) if err != nil { return err @@ -201,8 +202,8 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } err = errors.Join( - stampindex.Store(s.IndexStore(), reserveNamespace, chunk), - chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk), + stampindex.Store(s.IndexStore(), reserveScope, chunk), + chunkstamp.Store(s.IndexStore(), reserveScope, chunk), s.IndexStore().Put(&BatchRadiusItem{ Bin: bin, BinID: binID, @@ -227,7 +228,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return nil } - r.logger.Warning("replacing soc in chunkstore", "address", chunk.Address()) + r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) return s.ChunkStore().Replace(ctx, chunk) } @@ -245,7 +246,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { // 3. Delete ALL old chunk related items from the reserve. // 4. Update the stamp index. - err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, chunk.Stamp().BatchID(), oldStampIndex.StampHash) + err = r.removeChunk(ctx, s, oldStampIndex.ChunkAddress, oldStampIndex.BatchID, oldStampIndex.StampHash) if err != nil { return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) } @@ -258,7 +259,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { ) // replace old stamp index. - err = stampindex.Store(s.IndexStore(), reserveNamespace, chunk) + err = stampindex.Store(s.IndexStore(), reserveScope, chunk) if err != nil { return fmt.Errorf("failed updating stamp index: %w", err) } @@ -270,7 +271,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } err = errors.Join( - chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk), + chunkstamp.Store(s.IndexStore(), reserveScope, chunk), s.IndexStore().Put(&BatchRadiusItem{ Bin: bin, BinID: binID, @@ -325,7 +326,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s return nil, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveNamespace, addr, item.BatchID) + stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, addr, item.BatchID) if err != nil { return nil, err } @@ -429,15 +430,11 @@ func RemoveChunkWithItem( var errs error - stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveNamespace, item.Address, item.BatchID) + stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveScope, item.Address, item.BatchID) if stamp != nil { errs = errors.Join( - stampindex.Delete( - trx.IndexStore(), - reserveNamespace, - swarm.NewChunk(item.Address, nil).WithStamp(stamp), - ), - chunkstamp.DeleteWithStamp(trx.IndexStore(), reserveNamespace, item.Address, stamp), + stampindex.Delete(trx.IndexStore(), reserveScope, stamp), + chunkstamp.DeleteWithStamp(trx.IndexStore(), reserveScope, item.Address, stamp), ) } @@ -483,7 +480,7 @@ func (r *Reserve) IterateChunks(startBin uint8, cb func(swarm.Chunk) (bool, erro return false, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveNamespace, item.Address, item.BatchID) + stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, item.Address, item.BatchID) if err != nil { return false, err } @@ -516,6 +513,78 @@ func (r *Reserve) IterateChunksItems(startBin uint8, cb func(*ChunkBinItem) (boo return err } +// Reset removes all the entires in the reserve. Must be done before any calls to the reserve. +func (r *Reserve) Reset(ctx context.Context) error { + + size := r.Size() + + bRitems := make([]*BatchRadiusItem, 0, size) + + err := r.st.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return &BatchRadiusItem{} }, + }, func(res storage.Result) (bool, error) { + bRitems = append(bRitems, res.Entry.(*BatchRadiusItem)) + return false, nil + }) + if err != nil { + return err + } + + var eg errgroup.Group + eg.SetLimit(runtime.NumCPU()) + + for _, item := range bRitems { + item := item + eg.Go(func() error { + return r.st.Run(ctx, func(s transaction.Store) error { + return errors.Join( + s.ChunkStore().Delete(ctx, item.Address), + s.IndexStore().Delete(item), + s.IndexStore().Delete(&ChunkBinItem{Bin: item.Bin, BinID: item.BinID}), + ) + }) + }) + } + + err = eg.Wait() + if err != nil { + return err + } + bRitems = nil + + sitems := make([]*stampindex.Item, 0, size) + err = r.st.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return &stampindex.Item{} }, + }, func(res storage.Result) (bool, error) { + sitems = append(sitems, res.Entry.(*stampindex.Item)) + return false, nil + }) + if err != nil { + return err + } + for _, item := range sitems { + item := item + eg.Go(func() error { + return r.st.Run(ctx, func(s transaction.Store) error { + return errors.Join( + s.IndexStore().Delete(item), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, item.ChunkAddress, postage.NewStamp(item.BatchID, item.StampIndex, item.StampTimestamp, nil)), + ) + }) + }) + } + + err = eg.Wait() + if err != nil { + return err + } + sitems = nil + + r.size.Store(0) + + return nil +} + func (r *Reserve) Radius() uint8 { return uint8(r.radius.Load()) } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 3f7807f257d..ad7dd13d019 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -27,6 +27,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" kademlia "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/stretchr/testify/assert" ) func TestReserve(t *testing.T) { @@ -499,7 +500,7 @@ func TestReplaceOldIndex(t *testing.T) { checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 2, StampHash: ch2StampHash}, false) checkChunk(t, ts, ch2, false) - item, err := stampindex.Load(ts.IndexStore(), "reserve", ch2) + item, err := stampindex.Load(ts.IndexStore(), "reserve", ch2.Stamp()) if err != nil { t.Fatal(err) } @@ -752,6 +753,116 @@ func TestIterate(t *testing.T) { }) } +func TestReset(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + var chs []swarm.Chunk + + var ( + bins = 5 + chunksPerBin = 100 + total = bins * chunksPerBin + ) + + for b := 0; b < bins; b++ { + for i := 1; i <= chunksPerBin; i++ { + ch := chunk.GenerateTestRandomChunkAt(t, baseAddr, b) + err := r.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: uint8(b), BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: uint8(b), BinID: uint64(i), StampHash: stampHash}, false) + checkChunk(t, ts, ch, false) + _, err = r.Get(context.Background(), ch.Address(), ch.Stamp().BatchID(), stampHash) + if err != nil { + t.Fatal(err) + } + chs = append(chs, ch) + } + } + + c, err := ts.IndexStore().Count(&reserve.BatchRadiusItem{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, total) + c, err = ts.IndexStore().Count(&reserve.ChunkBinItem{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, total) + c, err = ts.IndexStore().Count(&stampindex.Item{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, total) + + cItem := &chunkstamp.Item{} + cItem.SetScope([]byte("reserve")) + c, err = ts.IndexStore().Count(cItem) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, total) + + err = r.Reset(context.Background()) + if err != nil { + t.Fatal(err) + } + + c, err = ts.IndexStore().Count(&reserve.BatchRadiusItem{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, 0) + c, err = ts.IndexStore().Count(&reserve.ChunkBinItem{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, 0) + c, err = ts.IndexStore().Count(&stampindex.Item{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, 0) + + c, err = ts.IndexStore().Count(cItem) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c, 0) + + for _, c := range chs { + h, err := c.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + _, err = r.Get(context.Background(), c.Address(), c.Stamp().BatchID(), h) + if !errors.Is(err, storage.ErrNotFound) { + t.Fatalf("expected error %v, got %v", storage.ErrNotFound, err) + } + } +} + func checkStore(t *testing.T, s storage.Reader, k storage.Key, gone bool) { t.Helper() h, err := s.Has(k) diff --git a/pkg/storer/internal/stampindex/export_test.go b/pkg/storer/internal/stampindex/export_test.go index aa139a56282..fce1182712d 100644 --- a/pkg/storer/internal/stampindex/export_test.go +++ b/pkg/storer/internal/stampindex/export_test.go @@ -7,7 +7,7 @@ package stampindex import "github.com/ethersphere/bee/v2/pkg/swarm" var ( - ErrStampItemMarshalNamespaceInvalid = errStampItemMarshalNamespaceInvalid + ErrStampItemMarshalNamespaceInvalid = errStampItemMarshalScopeInvalid ErrStampItemMarshalBatchIndexInvalid = errStampItemMarshalBatchIndexInvalid ErrStampItemMarshalBatchIDInvalid = errStampItemMarshalBatchIDInvalid ErrStampItemUnmarshalInvalidSize = errStampItemUnmarshalInvalidSize @@ -16,7 +16,7 @@ var ( // NewItemWithValues creates a new Item with given values and fixed keys. func NewItemWithValues(batchTimestamp []byte, chunkAddress swarm.Address) *Item { return &Item{ - namespace: []byte("test_namespace"), + scope: []byte("test_namespace"), BatchID: []byte{swarm.HashSize - 1: 9}, StampIndex: []byte{swarm.StampIndexSize - 1: 9}, StampHash: swarm.EmptyAddress.Bytes(), @@ -29,7 +29,7 @@ func NewItemWithValues(batchTimestamp []byte, chunkAddress swarm.Address) *Item // NewItemWithKeys creates a new Item with given keys and zero values. func NewItemWithKeys(namespace string, batchID, batchIndex, stampHash []byte) *Item { return &Item{ - namespace: append([]byte(nil), namespace...), + scope: append([]byte(nil), namespace...), BatchID: batchID, StampIndex: batchIndex, StampHash: stampHash, diff --git a/pkg/storer/internal/stampindex/oldstampindex.go b/pkg/storer/internal/stampindex/oldstampindex.go index f557d848a02..de24a33c21b 100644 --- a/pkg/storer/internal/stampindex/oldstampindex.go +++ b/pkg/storer/internal/stampindex/oldstampindex.go @@ -41,7 +41,7 @@ func (i ItemV1) Namespace() string { func (i ItemV1) Marshal() ([]byte, error) { switch { case len(i.namespace) == 0: - return nil, errStampItemMarshalNamespaceInvalid + return nil, errStampItemMarshalScopeInvalid case len(i.BatchID) != swarm.HashSize: return nil, errStampItemMarshalBatchIDInvalid case len(i.StampIndex) != swarm.StampIndexSize: diff --git a/pkg/storer/internal/stampindex/stampindex.go b/pkg/storer/internal/stampindex/stampindex.go index 815cfacd0fb..c8fff8698f9 100644 --- a/pkg/storer/internal/stampindex/stampindex.go +++ b/pkg/storer/internal/stampindex/stampindex.go @@ -16,9 +16,9 @@ import ( ) var ( - // errStampItemMarshalNamespaceInvalid is returned when trying to - // marshal a Item with invalid namespace. - errStampItemMarshalNamespaceInvalid = errors.New("marshal stampindex.Item: namespace is invalid") + // errStampItemMarshalScopeInvalid is returned when trying to + // marshal a Item with invalid scope. + errStampItemMarshalScopeInvalid = errors.New("marshal stampindex.Item: scope is invalid") // errStampItemMarshalBatchIDInvalid is returned when trying to // marshal a Item with invalid batchID. errStampItemMarshalBatchIDInvalid = errors.New("marshal stampindex.Item: batchID is invalid") @@ -36,7 +36,7 @@ var _ storage.Item = (*Item)(nil) // Item is an store.Item that represents data relevant to stamp. type Item struct { // Keys. - namespace []byte // The namespace of other related item. + scope []byte // The scope of other related item. BatchID []byte StampIndex []byte StampHash []byte @@ -48,7 +48,7 @@ type Item struct { // ID implements the storage.Item interface. func (i Item) ID() string { - return fmt.Sprintf("%s/%s/%s", string(i.namespace), string(i.BatchID), string(i.StampIndex)) + return fmt.Sprintf("%s/%s/%s", string(i.scope), string(i.BatchID), string(i.StampIndex)) } // Namespace implements the storage.Item interface. @@ -56,32 +56,32 @@ func (i Item) Namespace() string { return "stampIndex" } -func (i Item) GetNamespace() []byte { - return i.namespace +func (i Item) GetScope() []byte { + return i.scope } -func (i *Item) SetNamespace(ns []byte) { - i.namespace = ns +func (i *Item) SetScope(ns []byte) { + i.scope = ns } // Marshal implements the storage.Item interface. func (i Item) Marshal() ([]byte, error) { switch { - case len(i.namespace) == 0: - return nil, errStampItemMarshalNamespaceInvalid + case len(i.scope) == 0: + return nil, errStampItemMarshalScopeInvalid case len(i.BatchID) != swarm.HashSize: return nil, errStampItemMarshalBatchIDInvalid case len(i.StampIndex) != swarm.StampIndexSize: return nil, errStampItemMarshalBatchIndexInvalid } - buf := make([]byte, 8+len(i.namespace)+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize+swarm.HashSize) + buf := make([]byte, 8+len(i.scope)+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize+swarm.HashSize) l := 0 - binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace))) + binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.scope))) l += 8 - copy(buf[l:l+len(i.namespace)], i.namespace) - l += len(i.namespace) + copy(buf[l:l+len(i.scope)], i.scope) + l += len(i.scope) copy(buf[l:l+swarm.HashSize], i.BatchID) l += swarm.HashSize copy(buf[l:l+swarm.StampIndexSize], i.StampIndex) @@ -106,7 +106,7 @@ func (i *Item) Unmarshal(bytes []byte) error { ni := new(Item) l := 8 - ni.namespace = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) + ni.scope = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) l += nsLen ni.BatchID = append(make([]byte, 0, swarm.HashSize), bytes[l:l+swarm.HashSize]...) l += swarm.HashSize @@ -127,7 +127,7 @@ func (i *Item) Clone() storage.Item { return nil } return &Item{ - namespace: append([]byte(nil), i.namespace...), + scope: append([]byte(nil), i.scope...), BatchID: append([]byte(nil), i.BatchID...), StampIndex: append([]byte(nil), i.StampIndex...), StampHash: append([]byte(nil), i.StampHash...), @@ -146,10 +146,10 @@ func (i Item) String() string { // return it. func LoadOrStore( s storage.IndexStore, - namespace string, + scope string, chunk swarm.Chunk, ) (item *Item, loaded bool, err error) { - item, err = Load(s, namespace, chunk) + item, err = Load(s, scope, chunk.Stamp()) if err != nil { if errors.Is(err, storage.ErrNotFound) { stampHash, err := chunk.Stamp().Hash() @@ -157,26 +157,25 @@ func LoadOrStore( return nil, false, err } return &Item{ - namespace: []byte(namespace), + scope: []byte(scope), BatchID: chunk.Stamp().BatchID(), StampIndex: chunk.Stamp().Index(), - StampHash: stampHash, StampTimestamp: chunk.Stamp().Timestamp(), ChunkAddress: chunk.Address(), - }, false, Store(s, namespace, chunk) + StampHash: stampHash, + }, false, Store(s, scope, chunk) } return nil, false, err } return item, true, nil } -// Load returns stamp index record related to the given namespace and chunk. -// The storage.ErrNotFound is returned if no record is found. -func Load(s storage.Reader, namespace string, chunk swarm.Chunk) (*Item, error) { +// Load returns stamp index record related to the given scope and stamp. +func Load(s storage.Reader, scope string, stamp swarm.Stamp) (*Item, error) { item := &Item{ - namespace: []byte(namespace), - BatchID: chunk.Stamp().BatchID(), - StampIndex: chunk.Stamp().Index(), + scope: []byte(scope), + BatchID: stamp.BatchID(), + StampIndex: stamp.Index(), } err := s.Get(item) if err != nil { @@ -185,26 +184,20 @@ func Load(s storage.Reader, namespace string, chunk swarm.Chunk) (*Item, error) return item, nil } -// LoadWithStamp returns stamp index record related to the given namespace and stamp. -func LoadWithStamp(s storage.Reader, namespace string, stamp swarm.Stamp) (*Item, error) { - ch := swarm.NewChunk(swarm.EmptyAddress, nil).WithStamp(stamp) - return Load(s, namespace, ch) -} - // Store creates new or updated an existing stamp index -// record related to the given namespace and chunk. -func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { +// record related to the given scope and chunk. +func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error { stampHash, err := chunk.Stamp().Hash() if err != nil { return err } item := &Item{ - namespace: []byte(namespace), + scope: []byte(scope), BatchID: chunk.Stamp().BatchID(), StampIndex: chunk.Stamp().Index(), - StampHash: stampHash, StampTimestamp: chunk.Stamp().Timestamp(), ChunkAddress: chunk.Address(), + StampHash: stampHash, } if err := s.Put(item); err != nil { return fmt.Errorf("failed to put stampindex.Item %s: %w", item, err) @@ -213,22 +206,9 @@ func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { } // Delete removes the related stamp index record from the storage. -func Delete(s storage.Writer, namespace string, chunk swarm.Chunk) error { - item := &Item{ - namespace: []byte(namespace), - BatchID: chunk.Stamp().BatchID(), - StampIndex: chunk.Stamp().Index(), - } - if err := s.Delete(item); err != nil { - return fmt.Errorf("failed to delete stampindex.Item %s: %w", item, err) - } - return nil -} - -// DeleteWithStamp removes the related stamp index record from the storage. -func DeleteWithStamp(s storage.Writer, namespace string, stamp swarm.Stamp) error { +func Delete(s storage.Writer, scope string, stamp swarm.Stamp) error { item := &Item{ - namespace: []byte(namespace), + scope: []byte(scope), BatchID: stamp.BatchID(), StampIndex: stamp.Index(), } diff --git a/pkg/storer/internal/stampindex/stampindex_test.go b/pkg/storer/internal/stampindex/stampindex_test.go index 94eb0493533..a04ff2fc50c 100644 --- a/pkg/storer/internal/stampindex/stampindex_test.go +++ b/pkg/storer/internal/stampindex/stampindex_test.go @@ -106,103 +106,6 @@ func TestStampIndexItem(t *testing.T) { } } -func TestStoreLoadDelete(t *testing.T) { - t.Parallel() - - ts := newTestStorage(t) - chunks := chunktest.GenerateTestRandomChunks(10) - - for i, chunk := range chunks { - ns := fmt.Sprintf("namespace_%d", i) - t.Run(ns, func(t *testing.T) { - t.Run("store new stamp index", func(t *testing.T) { - - err := ts.Run(context.Background(), func(s transaction.Store) error { - return stampindex.Store(s.IndexStore(), ns, chunk) - - }) - if err != nil { - t.Fatalf("Store(...): unexpected error: %v", err) - } - - stampHash, err := chunk.Stamp().Hash() - if err != nil { - t.Fatal(err) - } - want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) - want.StampTimestamp = chunk.Stamp().Timestamp() - want.ChunkAddress = chunk.Address() - - have := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) - err = ts.IndexStore().Get(have) - if err != nil { - t.Fatalf("Get(...): unexpected error: %v", err) - } - - if diff := cmp.Diff(want, have, cmp.AllowUnexported(stampindex.Item{})); diff != "" { - t.Fatalf("Get(...): mismatch (-want +have):\n%s", diff) - } - }) - - t.Run("load stored stamp index", func(t *testing.T) { - stampHash, err := chunk.Stamp().Hash() - if err != nil { - t.Fatal(err) - } - want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) - want.StampTimestamp = chunk.Stamp().Timestamp() - want.ChunkAddress = chunk.Address() - - have, err := stampindex.Load(ts.IndexStore(), ns, chunk) - if err != nil { - t.Fatalf("Load(...): unexpected error: %v", err) - } - - if diff := cmp.Diff(want, have, cmp.AllowUnexported(stampindex.Item{})); diff != "" { - t.Fatalf("Load(...): mismatch (-want +have):\n%s", diff) - } - }) - - t.Run("delete stored stamp index", func(t *testing.T) { - - err := ts.Run(context.Background(), func(s transaction.Store) error { - return stampindex.Delete(s.IndexStore(), ns, chunk) - }) - if err != nil { - t.Fatalf("Delete(...): unexpected error: %v", err) - } - - have, err := stampindex.Load(ts.IndexStore(), ns, chunk) - if have != nil { - t.Fatalf("Load(...): unexpected item %v", have) - } - if !errors.Is(err, storage.ErrNotFound) { - t.Fatalf("Load(...): unexpected error: %v", err) - } - - cnt := 0 - err = ts.IndexStore().Iterate( - storage.Query{ - Factory: func() storage.Item { - return new(stampindex.Item) - }, - }, - func(result storage.Result) (bool, error) { - cnt++ - return false, nil - }, - ) - if err != nil { - t.Fatalf("Store().Iterate(...): unexpected error: %v", err) - } - if want, have := 0, cnt; want != have { - t.Fatalf("Store().Iterate(...): chunk count mismatch:\nwant: %d\nhave: %d", want, have) - } - }) - }) - } -} - func TestStoreLoadDeleteWithStamp(t *testing.T) { t.Parallel() @@ -250,7 +153,7 @@ func TestStoreLoadDeleteWithStamp(t *testing.T) { want.StampTimestamp = chunk.Stamp().Timestamp() want.ChunkAddress = chunk.Address() - have, err := stampindex.Load(ts.IndexStore(), ns, chunk) + have, err := stampindex.Load(ts.IndexStore(), ns, chunk.Stamp()) if err != nil { t.Fatalf("Load(...): unexpected error: %v", err) } @@ -263,13 +166,13 @@ func TestStoreLoadDeleteWithStamp(t *testing.T) { t.Run("delete stored stamp index", func(t *testing.T) { err := ts.Run(context.Background(), func(s transaction.Store) error { - return stampindex.DeleteWithStamp(s.IndexStore(), ns, chunk.Stamp()) + return stampindex.Delete(s.IndexStore(), ns, chunk.Stamp()) }) if err != nil { t.Fatalf("Delete(...): unexpected error: %v", err) } - have, err := stampindex.Load(ts.IndexStore(), ns, chunk) + have, err := stampindex.Load(ts.IndexStore(), ns, chunk.Stamp()) if have != nil { t.Fatalf("Load(...): unexpected item %v", have) } diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 0a51352d71f..5876ad4b3a8 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -41,7 +41,7 @@ var ( // pushItemSize is the size of a marshaled pushItem. const pushItemSize = 8 + 2*swarm.HashSize + 8 -const chunkStampNamespace = "upload" +const uploadScope = "upload" var _ storage.Item = (*pushItem)(nil) @@ -440,7 +440,7 @@ func (u *uploadPutter) Put(ctx context.Context, st transaction.Store, chunk swar st.IndexStore().Put(ui), st.IndexStore().Put(pi), st.ChunkStore().Put(ctx, chunk), - chunkstamp.Store(st.IndexStore(), chunkStampNamespace, chunk), + chunkstamp.Store(st.IndexStore(), uploadScope, chunk), ) } @@ -516,7 +516,7 @@ func (u *uploadPutter) Cleanup(st transaction.Storage) error { return errors.Join( s.IndexStore().Delete(ui), s.ChunkStore().Delete(context.Background(), item.Address), - chunkstamp.Delete(s.IndexStore(), chunkStampNamespace, item.Address, item.BatchID), + chunkstamp.Delete(s.IndexStore(), uploadScope, item.Address, item.BatchID), s.IndexStore().Delete(item), ) }) @@ -614,7 +614,7 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state return errors.Join( indexStore.Delete(pi), - chunkstamp.Delete(indexStore, chunkStampNamespace, pi.Address, pi.BatchID), + chunkstamp.Delete(indexStore, uploadScope, pi.Address, pi.BatchID), st.ChunkStore().Delete(ctx, chunk.Address()), indexStore.Delete(ui), ) @@ -727,7 +727,7 @@ func IteratePending(ctx context.Context, s transaction.ReadOnlyStore, consumerFn return true, err } - stamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), chunkStampNamespace, chunk.Address(), pi.BatchID) + stamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), uploadScope, chunk.Address(), pi.BatchID) if err != nil { return true, err } diff --git a/pkg/storer/migration/step_06.go b/pkg/storer/migration/step_06.go index 05237a3dd40..f5a48599578 100644 --- a/pkg/storer/migration/step_06.go +++ b/pkg/storer/migration/step_06.go @@ -164,7 +164,7 @@ func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, erro StampTimestamp: stamp.Timestamp(), ChunkAddress: chunkBinItemV1.Address, } - stampIndexItem.SetNamespace([]byte("reserve")) + stampIndexItem.SetScope([]byte("reserve")) err = idxStore.Put(stampIndexItem) if err != nil { return err @@ -214,7 +214,7 @@ func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, erro return false, fmt.Errorf("stamp item get: %w", err) } - stampIndex, err := stampindex.LoadWithStamp(st.IndexStore(), "reserve", stamp) + stampIndex, err := stampindex.Load(st.IndexStore(), "reserve", stamp) if err != nil { return false, fmt.Errorf("stamp index get: %w", err) } diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 80c707892fc..ce5d253be96 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -131,7 +131,7 @@ func TestReplaceOldIndex(t *testing.T) { } // Chunk 1 must be missing - item, err := stampindex.Load(storer.Storage().IndexStore(), "reserve", ch_1) + item, err := stampindex.Load(storer.Storage().IndexStore(), "reserve", ch_1.Stamp()) if err != nil { t.Fatal(err) } @@ -683,7 +683,7 @@ func checkSaved(t *testing.T, st *storer.DB, ch swarm.Chunk, stampSaved, chunkSt if !stampSaved { stampWantedErr = storage.ErrNotFound } - _, err := stampindex.Load(st.Storage().IndexStore(), "reserve", ch) + _, err := stampindex.Load(st.Storage().IndexStore(), "reserve", ch.Stamp()) if !errors.Is(err, stampWantedErr) { t.Fatalf("wanted err %s, got err %s", stampWantedErr, err) } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index a79dfd609f3..bf7e0f0dbc1 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -576,6 +576,11 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { return db, nil } +// Reset removes all entries +func (db *DB) ResetReserve(ctx context.Context) error { + return db.reserve.Reset(ctx) +} + // Metrics returns set of prometheus collectors. func (db *DB) Metrics() []prometheus.Collector { collectors := m.PrometheusCollectorsFromFields(db.metrics)