diff --git a/pkg/storer/internal/reserve/olditems.go b/pkg/storer/internal/reserve/olditems.go new file mode 100644 index 00000000000..37bcdd9cc75 --- /dev/null +++ b/pkg/storer/internal/reserve/olditems.go @@ -0,0 +1,180 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package reserve + +import ( + "encoding/binary" + "path" + + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// BatchRadiusItemV1 allows iteration of the chunks with respect to bin and batchID. +// Used for batch evictions of certain bins. +type BatchRadiusItemV1 struct { + Bin uint8 + BatchID []byte + Address swarm.Address + BinID uint64 +} + +func (b *BatchRadiusItemV1) Namespace() string { + return "batchRadius" +} + +func (b *BatchRadiusItemV1) ID() string { + return string(b.BatchID) + string(b.Bin) + b.Address.ByteString() +} + +func (b *BatchRadiusItemV1) String() string { + return path.Join(b.Namespace(), b.ID()) +} + +func (b *BatchRadiusItemV1) Clone() storage.Item { + if b == nil { + return nil + } + return &BatchRadiusItemV1{ + Bin: b.Bin, + BatchID: copyBytes(b.BatchID), + Address: b.Address.Clone(), + BinID: b.BinID, + } +} + +const batchRadiusItemSizeV1 = 1 + swarm.HashSize + swarm.HashSize + 8 + +func (b *BatchRadiusItemV1) Marshal() ([]byte, error) { + + if b.Address.IsZero() { + return nil, errMarshalInvalidAddress + } + + buf := make([]byte, batchRadiusItemSizeV1) + + i := 0 + + buf[i] = b.Bin + i += 1 + + copy(buf[i:i+swarm.HashSize], b.BatchID) + i += swarm.HashSize + + copy(buf[i:i+swarm.HashSize], b.Address.Bytes()) + i += swarm.HashSize + + binary.BigEndian.PutUint64(buf[i:i+8], b.BinID) + + return buf, nil +} + +func (b *BatchRadiusItemV1) Unmarshal(buf []byte) error { + + if len(buf) != batchRadiusItemSizeV1 { + return errUnmarshalInvalidSize + } + + i := 0 + b.Bin = buf[i] + i += 1 + + b.BatchID = copyBytes(buf[i : i+swarm.HashSize]) + i += swarm.HashSize + + b.Address = swarm.NewAddress(buf[i : i+swarm.HashSize]).Clone() + i += swarm.HashSize + + b.BinID = binary.BigEndian.Uint64(buf[i : i+8]) + + return nil +} + +// ChunkBinItemV1 allows for iterating on ranges of bin and binIDs for chunks. +// BinIDs come in handy when syncing the reserve contents with other peers. +type ChunkBinItemV1 struct { + Bin uint8 + BinID uint64 + Address swarm.Address + BatchID []byte + ChunkType swarm.ChunkType +} + +func (c *ChunkBinItemV1) Namespace() string { + return "chunkBin" +} + +func (c *ChunkBinItemV1) ID() string { + return binIDToString(c.Bin, c.BinID) +} + +func (c *ChunkBinItemV1) String() string { + return path.Join(c.Namespace(), c.ID()) +} + +func (c *ChunkBinItemV1) Clone() storage.Item { + if c == nil { + return nil + } + return &ChunkBinItemV1{ + Bin: c.Bin, + BinID: c.BinID, + Address: c.Address.Clone(), + BatchID: copyBytes(c.BatchID), + ChunkType: c.ChunkType, + } +} + +const chunkBinItemSizeV1 = 1 + 8 + swarm.HashSize + swarm.HashSize + 1 + +func (c *ChunkBinItemV1) Marshal() ([]byte, error) { + + if c.Address.IsZero() { + return nil, errMarshalInvalidAddress + } + + buf := make([]byte, chunkBinItemSizeV1) + i := 0 + + buf[i] = c.Bin + i += 1 + + binary.BigEndian.PutUint64(buf[i:i+8], c.BinID) + i += 8 + + copy(buf[i:i+swarm.HashSize], c.Address.Bytes()) + i += swarm.HashSize + + copy(buf[i:i+swarm.HashSize], c.BatchID) + i += swarm.HashSize + + buf[i] = uint8(c.ChunkType) + + return buf, nil +} + +func (c *ChunkBinItemV1) Unmarshal(buf []byte) error { + + if len(buf) != chunkBinItemSizeV1 { + return errUnmarshalInvalidSize + } + + i := 0 + c.Bin = buf[i] + i += 1 + + c.BinID = binary.BigEndian.Uint64(buf[i : i+8]) + i += 8 + + c.Address = swarm.NewAddress(buf[i : i+swarm.HashSize]).Clone() + i += swarm.HashSize + + c.BatchID = copyBytes(buf[i : i+swarm.HashSize]) + i += swarm.HashSize + + c.ChunkType = swarm.ChunkType(buf[i]) + + return nil +} diff --git a/pkg/storer/internal/stampindex/export_test.go b/pkg/storer/internal/stampindex/export_test.go index df0852da459..aa139a56282 100644 --- a/pkg/storer/internal/stampindex/export_test.go +++ b/pkg/storer/internal/stampindex/export_test.go @@ -14,16 +14,15 @@ var ( ) // NewItemWithValues creates a new Item with given values and fixed keys. -func NewItemWithValues(batchTimestamp []byte, chunkAddress swarm.Address, chunkIsImmutable bool) *Item { +func NewItemWithValues(batchTimestamp []byte, chunkAddress swarm.Address) *Item { return &Item{ namespace: []byte("test_namespace"), BatchID: []byte{swarm.HashSize - 1: 9}, StampIndex: []byte{swarm.StampIndexSize - 1: 9}, StampHash: swarm.EmptyAddress.Bytes(), - StampTimestamp: batchTimestamp, - ChunkAddress: chunkAddress, - ChunkIsImmutable: chunkIsImmutable, + StampTimestamp: batchTimestamp, + ChunkAddress: chunkAddress, } } diff --git a/pkg/storer/internal/stampindex/oldstampindex.go b/pkg/storer/internal/stampindex/oldstampindex.go new file mode 100644 index 00000000000..f557d848a02 --- /dev/null +++ b/pkg/storer/internal/stampindex/oldstampindex.go @@ -0,0 +1,115 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package stampindex + +import ( + "encoding/binary" + "fmt" + + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/storageutil" + "github.com/ethersphere/bee/v2/pkg/storer/internal" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// ItemV1 is an store.Item that represents data relevant to stamp. +type ItemV1 struct { + // Keys. + namespace []byte // The namespace of other related item. + BatchID []byte + StampIndex []byte + + // Values. + StampTimestamp []byte + ChunkAddress swarm.Address + ChunkIsImmutable bool +} + +// ID implements the storage.Item interface. +func (i ItemV1) ID() string { + return fmt.Sprintf("%s/%s/%s", string(i.namespace), string(i.BatchID), string(i.StampIndex)) +} + +// Namespace implements the storage.Item interface. +func (i ItemV1) Namespace() string { + return "stampIndex" +} + +// Marshal implements the storage.Item interface. +func (i ItemV1) Marshal() ([]byte, error) { + switch { + case len(i.namespace) == 0: + return nil, errStampItemMarshalNamespaceInvalid + case len(i.BatchID) != swarm.HashSize: + return nil, errStampItemMarshalBatchIDInvalid + case len(i.StampIndex) != swarm.StampIndexSize: + return nil, errStampItemMarshalBatchIndexInvalid + } + + buf := make([]byte, 8+len(i.namespace)+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize) + + l := 0 + binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace))) + l += 8 + copy(buf[l:l+len(i.namespace)], i.namespace) + l += len(i.namespace) + copy(buf[l:l+swarm.HashSize], i.BatchID) + l += swarm.HashSize + copy(buf[l:l+swarm.StampIndexSize], i.StampIndex) + l += swarm.StampIndexSize + copy(buf[l:l+swarm.StampTimestampSize], i.StampTimestamp) + l += swarm.StampTimestampSize + copy(buf[l:l+swarm.HashSize], internal.AddressBytesOrZero(i.ChunkAddress)) + return buf, nil +} + +// Unmarshal implements the storage.Item interface. +func (i *ItemV1) Unmarshal(bytes []byte) error { + if len(bytes) < 8 { + return errStampItemUnmarshalInvalidSize + } + nsLen := int(binary.LittleEndian.Uint64(bytes)) + if len(bytes) != 8+nsLen+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize { + return errStampItemUnmarshalInvalidSize + } + + ni := new(ItemV1) + l := 8 + ni.namespace = append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) + l += nsLen + ni.BatchID = append(make([]byte, 0, swarm.HashSize), bytes[l:l+swarm.HashSize]...) + l += swarm.HashSize + ni.StampIndex = append(make([]byte, 0, swarm.StampIndexSize), bytes[l:l+swarm.StampIndexSize]...) + l += swarm.StampIndexSize + ni.StampTimestamp = append(make([]byte, 0, swarm.StampTimestampSize), bytes[l:l+swarm.StampTimestampSize]...) + l += swarm.StampTimestampSize + ni.ChunkAddress = internal.AddressOrZero(bytes[l : l+swarm.HashSize]) + *i = *ni + return nil +} + +// Clone implements the storage.Item interface. +func (i *ItemV1) Clone() storage.Item { + if i == nil { + return nil + } + return &ItemV1{ + namespace: append([]byte(nil), i.namespace...), + BatchID: append([]byte(nil), i.BatchID...), + StampIndex: append([]byte(nil), i.StampIndex...), + StampTimestamp: append([]byte(nil), i.StampTimestamp...), + ChunkAddress: i.ChunkAddress.Clone(), + ChunkIsImmutable: i.ChunkIsImmutable, + } +} + +// String implements the fmt.Stringer interface. +func (i ItemV1) String() string { + return storageutil.JoinFields(i.Namespace(), i.ID()) +} + +func (i *ItemV1) SetNamespace(ns []byte) { + i.namespace = ns +} diff --git a/pkg/storer/internal/stampindex/stampindex.go b/pkg/storer/internal/stampindex/stampindex.go index af41f2732ac..bbb81763cf6 100644 --- a/pkg/storer/internal/stampindex/stampindex.go +++ b/pkg/storer/internal/stampindex/stampindex.go @@ -29,9 +29,6 @@ var ( // to unmarshal buffer with smaller size then is the size // of the Item fields. errStampItemUnmarshalInvalidSize = errors.New("unmarshal stampindex.Item: invalid size") - // errStampItemUnmarshalChunkImmutableInvalid is returned when trying - // to unmarshal buffer with invalid ChunkIsImmutable value. - errStampItemUnmarshalChunkImmutableInvalid = errors.New("unmarshal stampindex.Item: chunk immutable is invalid") ) var _ storage.Item = (*Item)(nil) @@ -45,9 +42,8 @@ type Item struct { StampHash []byte // Values. - StampTimestamp []byte - ChunkAddress swarm.Address - ChunkIsImmutable bool + StampTimestamp []byte + ChunkAddress swarm.Address } // ID implements the storage.Item interface. @@ -79,7 +75,7 @@ func (i Item) Marshal() ([]byte, error) { return nil, errStampItemMarshalBatchIndexInvalid } - buf := make([]byte, 8+len(i.namespace)+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize+1+swarm.HashSize) + buf := make([]byte, 8+len(i.namespace)+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize+swarm.HashSize) l := 0 binary.LittleEndian.PutUint64(buf[l:l+8], uint64(len(i.namespace))) @@ -94,11 +90,6 @@ func (i Item) Marshal() ([]byte, error) { l += swarm.StampTimestampSize copy(buf[l:l+swarm.HashSize], internal.AddressBytesOrZero(i.ChunkAddress)) l += swarm.HashSize - buf[l] = '0' - if i.ChunkIsImmutable { - buf[l] = '1' - } - l += 1 copy(buf[l:l+swarm.HashSize], i.StampHash) return buf, nil } @@ -109,7 +100,7 @@ func (i *Item) Unmarshal(bytes []byte) error { return errStampItemUnmarshalInvalidSize } nsLen := int(binary.LittleEndian.Uint64(bytes)) - if len(bytes) != 8+nsLen+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize+swarm.HashSize+1 { + if len(bytes) != 8+nsLen+swarm.HashSize+swarm.StampIndexSize+swarm.StampTimestampSize+swarm.HashSize+swarm.HashSize { return errStampItemUnmarshalInvalidSize } @@ -125,15 +116,6 @@ func (i *Item) Unmarshal(bytes []byte) error { l += swarm.StampTimestampSize ni.ChunkAddress = internal.AddressOrZero(bytes[l : l+swarm.HashSize]) l += swarm.HashSize - switch bytes[l] { - case '0': - ni.ChunkIsImmutable = false - case '1': - ni.ChunkIsImmutable = true - default: - return errStampItemUnmarshalChunkImmutableInvalid - } - l += 1 ni.StampHash = append(make([]byte, 0, swarm.HashSize), bytes[l:l+swarm.HashSize]...) *i = *ni return nil @@ -145,13 +127,12 @@ func (i *Item) Clone() storage.Item { return nil } return &Item{ - namespace: append([]byte(nil), i.namespace...), - BatchID: append([]byte(nil), i.BatchID...), - StampIndex: append([]byte(nil), i.StampIndex...), - StampHash: append([]byte(nil), i.StampHash...), - StampTimestamp: append([]byte(nil), i.StampTimestamp...), - ChunkAddress: i.ChunkAddress.Clone(), - ChunkIsImmutable: i.ChunkIsImmutable, + namespace: append([]byte(nil), i.namespace...), + BatchID: append([]byte(nil), i.BatchID...), + StampIndex: append([]byte(nil), i.StampIndex...), + StampHash: append([]byte(nil), i.StampHash...), + StampTimestamp: append([]byte(nil), i.StampTimestamp...), + ChunkAddress: i.ChunkAddress.Clone(), } } @@ -176,13 +157,12 @@ func LoadOrStore( return nil, false, err } return &Item{ - namespace: []byte(namespace), - BatchID: chunk.Stamp().BatchID(), - StampIndex: chunk.Stamp().Index(), - StampHash: stampHash, - StampTimestamp: chunk.Stamp().Timestamp(), - ChunkAddress: chunk.Address(), - ChunkIsImmutable: chunk.Immutable(), + namespace: []byte(namespace), + BatchID: chunk.Stamp().BatchID(), + StampIndex: chunk.Stamp().Index(), + StampHash: stampHash, + StampTimestamp: chunk.Stamp().Timestamp(), + ChunkAddress: chunk.Address(), }, false, Store(s, namespace, chunk) } return nil, false, err @@ -213,13 +193,12 @@ func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { return err } item := &Item{ - namespace: []byte(namespace), - BatchID: chunk.Stamp().BatchID(), - StampIndex: chunk.Stamp().Index(), - StampHash: stampHash, - StampTimestamp: chunk.Stamp().Timestamp(), - ChunkAddress: chunk.Address(), - ChunkIsImmutable: chunk.Immutable(), + namespace: []byte(namespace), + BatchID: chunk.Stamp().BatchID(), + StampIndex: chunk.Stamp().Index(), + StampHash: stampHash, + StampTimestamp: chunk.Stamp().Timestamp(), + ChunkAddress: chunk.Address(), } if err := s.Put(item); err != nil { return fmt.Errorf("failed to put stampindex.Item %s: %w", item, err) diff --git a/pkg/storer/internal/stampindex/stampindex_test.go b/pkg/storer/internal/stampindex/stampindex_test.go index 3b6c66fc280..7b951924bf4 100644 --- a/pkg/storer/internal/stampindex/stampindex_test.go +++ b/pkg/storer/internal/stampindex/stampindex_test.go @@ -62,22 +62,14 @@ func TestStampIndexItem(t *testing.T) { }, { name: "valid values", test: &storagetest.ItemMarshalAndUnmarshalTest{ - Item: stampindex.NewItemWithValues( - []byte{swarm.StampTimestampSize - 1: 9}, - swarm.RandAddress(t), - false, - ), + Item: stampindex.NewItemWithValues([]byte{swarm.StampTimestampSize - 1: 9}, swarm.RandAddress(t)), Factory: func() storage.Item { return new(stampindex.Item) }, CmpOpts: []cmp.Option{cmp.AllowUnexported(stampindex.Item{})}, }, }, { name: "max values", test: &storagetest.ItemMarshalAndUnmarshalTest{ - Item: stampindex.NewItemWithValues( - storagetest.MaxBatchTimestampBytes[:], - swarm.NewAddress(storagetest.MaxAddressBytes[:]), - true, - ), + Item: stampindex.NewItemWithValues(storagetest.MaxBatchTimestampBytes[:], swarm.NewAddress(storagetest.MaxAddressBytes[:])), Factory: func() storage.Item { return new(stampindex.Item) }, CmpOpts: []cmp.Option{cmp.AllowUnexported(stampindex.Item{})}, }, @@ -140,7 +132,6 @@ func TestStoreLoadDelete(t *testing.T) { want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) want.StampTimestamp = chunk.Stamp().Timestamp() want.ChunkAddress = chunk.Address() - want.ChunkIsImmutable = chunk.Immutable() have := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) err = ts.IndexStore().Get(have) @@ -161,7 +152,6 @@ func TestStoreLoadDelete(t *testing.T) { want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) want.StampTimestamp = chunk.Stamp().Timestamp() want.ChunkAddress = chunk.Address() - want.ChunkIsImmutable = chunk.Immutable() have, err := stampindex.Load(ts.IndexStore(), ns, chunk) if err != nil { @@ -229,7 +219,6 @@ func TestLoadOrStore(t *testing.T) { want := stampindex.NewItemWithKeys(ns, chunk.Stamp().BatchID(), chunk.Stamp().Index(), stampHash) want.StampTimestamp = chunk.Stamp().Timestamp() want.ChunkAddress = chunk.Address() - want.ChunkIsImmutable = chunk.Immutable() trx, done := ts.NewTransaction(context.Background()) diff --git a/pkg/storer/migration/reserveRepair.go b/pkg/storer/migration/reserveRepair.go index 8bee84e57d6..546915e1898 100644 --- a/pkg/storer/migration/reserveRepair.go +++ b/pkg/storer/migration/reserveRepair.go @@ -5,6 +5,7 @@ package migration import ( + "bytes" "context" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -196,6 +198,18 @@ func ReserveRepairer( } item.BinID = newID(int(item.Bin)) + if bytes.Equal(item.StampHash, swarm.EmptyAddress.Bytes()) { + stamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), "reserve", item.Address, item.BatchID) + if err != nil { + return err + } + stampHash, err := stamp.Hash() + if err != nil { + return err + } + item.StampHash = stampHash + } + err = s.IndexStore().Put(item) if err != nil { return err diff --git a/pkg/storer/migration/reserveRepair_test.go b/pkg/storer/migration/reserveRepair_test.go index 85e20f226ab..884664c8787 100644 --- a/pkg/storer/migration/reserveRepair_test.go +++ b/pkg/storer/migration/reserveRepair_test.go @@ -13,6 +13,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/storage" chunktest "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/storer/internal" + "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" localmigration "github.com/ethersphere/bee/v2/pkg/storer/migration" @@ -82,6 +83,10 @@ func TestReserveRepair(t *testing.T) { } err = store.Run(context.Background(), func(s transaction.Store) error { + err := chunkstamp.Store(s.IndexStore(), "reserve", ch) + if err != nil { + return err + } return s.ChunkStore().Put(context.Background(), ch) }) if err != nil { diff --git a/pkg/storer/migration/step_06.go b/pkg/storer/migration/step_06.go index 18a35efdfbf..d68d6a80b47 100644 --- a/pkg/storer/migration/step_06.go +++ b/pkg/storer/migration/step_06.go @@ -6,19 +6,19 @@ package migration import ( "context" - "encoding/binary" - "errors" "fmt" "os" + "runtime" + "sync/atomic" + "time" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/storage" - "github.com/ethersphere/bee/v2/pkg/storer/internal" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstamp" "github.com/ethersphere/bee/v2/pkg/storer/internal/reserve" "github.com/ethersphere/bee/v2/pkg/storer/internal/stampindex" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" - "github.com/ethersphere/bee/v2/pkg/swarm" + "golang.org/x/sync/errgroup" ) // step_06 is a migration step that adds a stampHash to all BatchRadiusItems, ChunkBinItems and StampIndexItems. @@ -27,30 +27,64 @@ func step_06(st transaction.Storage) func() error { logger := log.NewLogger("migration-step-06", log.WithSink(os.Stdout)) logger.Info("start adding stampHash to BatchRadiusItems, ChunkBinItems and StampIndexItems") - err := addStampHashToBatchRadiusAndChunkBinItems(st) + seenCount, doneCount, err := addStampHash(logger, st) if err != nil { - return fmt.Errorf("batch radius and chunk bin item migration: %w", err) + return fmt.Errorf("add stamp hash migration: %w", err) } - logger.Info("done migrating batch radius and chunk bin items") - err = addStampHashToStampIndexItems(st) - if err != nil { - return fmt.Errorf("stamp index migration: %w", err) - } - logger.Info("done migrating stamp index items") - logger.Info("finished migrating items") + logger.Info("finished migrating items", "seen", seenCount, "migrated", doneCount) return nil } } -func addStampHashToBatchRadiusAndChunkBinItems(st transaction.Storage) error { - itemC := make(chan *OldBatchRadiusItem) +func addStampHash(logger log.Logger, st transaction.Storage) (int64, int64, error) { + itemC := make(chan *reserve.BatchRadiusItemV1) errC := make(chan error) + doneC := make(chan any) + defer close(doneC) + defer close(errC) + + var eg errgroup.Group + p := runtime.NumCPU() + eg.SetLimit(p) + + var doneCount atomic.Int64 + var seenCount int64 + + go func() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + logger.Info("still migrating items...") + case <-doneC: + return + } + } + }() go func() { - for oldBatchRadiusItem := range itemC { + _ = st.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(reserve.BatchRadiusItemV1) }, + }, func(result storage.Result) (bool, error) { + seenCount++ + item := result.Entry.(*reserve.BatchRadiusItemV1) + select { + case itemC <- item: + case err := <-errC: + return true, err + } + return false, nil + }) + close(itemC) + }() + + for item := range itemC { + batchRadiusItemV1 := item + eg.Go(func() error { err := st.Run(context.Background(), func(s transaction.Store) error { idxStore := s.IndexStore() - stamp, err := chunkstamp.LoadWithBatchID(idxStore, "reserve", oldBatchRadiusItem.Address, oldBatchRadiusItem.BatchID) + stamp, err := chunkstamp.LoadWithBatchID(idxStore, "reserve", batchRadiusItemV1.Address, batchRadiusItemV1.BatchID) if err != nil { return err } @@ -60,212 +94,71 @@ func addStampHashToBatchRadiusAndChunkBinItems(st transaction.Storage) error { } // Since the ID format has changed, we should delete the old item and put a new one with the new ID format. - err = idxStore.Delete(oldBatchRadiusItem) + err = idxStore.Delete(batchRadiusItemV1) if err != nil { return err } err = idxStore.Put(&reserve.BatchRadiusItem{ - Bin: oldBatchRadiusItem.Bin, - BatchID: oldBatchRadiusItem.BatchID, + Bin: batchRadiusItemV1.Bin, + BatchID: batchRadiusItemV1.BatchID, StampHash: stampHash, - Address: oldBatchRadiusItem.Address, - BinID: oldBatchRadiusItem.BinID, + Address: batchRadiusItemV1.Address, + BinID: batchRadiusItemV1.BinID, }) if err != nil { return err } - oldChunkBinItem := &OldChunkBinItem{&reserve.ChunkBinItem{ - Bin: oldBatchRadiusItem.Bin, - BinID: oldBatchRadiusItem.BinID, - }} - err = idxStore.Get(oldChunkBinItem) + chunkBinItemV1 := &reserve.ChunkBinItemV1{ + Bin: batchRadiusItemV1.Bin, + BinID: batchRadiusItemV1.BinID, + } + err = idxStore.Get(chunkBinItemV1) if err != nil { return err } // same id. Will replace. - return idxStore.Put(&reserve.ChunkBinItem{ - Bin: oldChunkBinItem.Bin, - BinID: oldChunkBinItem.BinID, - Address: oldChunkBinItem.Address, - BatchID: oldChunkBinItem.BatchID, + err = idxStore.Put(&reserve.ChunkBinItem{ + Bin: chunkBinItemV1.Bin, + BinID: chunkBinItemV1.BinID, + Address: chunkBinItemV1.Address, + BatchID: chunkBinItemV1.BatchID, StampHash: stampHash, - ChunkType: oldChunkBinItem.ChunkType, + ChunkType: chunkBinItemV1.ChunkType, }) - }) - if err != nil { - errC <- err - return - } - } - close(errC) - }() - - err := st.IndexStore().Iterate(storage.Query{ - Factory: func() storage.Item { return &OldBatchRadiusItem{&reserve.BatchRadiusItem{}} }, - }, func(result storage.Result) (bool, error) { - item := result.Entry.(*OldBatchRadiusItem) - select { - case itemC <- item: - case err := <-errC: - return true, err - } - return false, nil - }) - close(itemC) - if err != nil { - return err - } - - return <-errC -} - -func addStampHashToStampIndexItems(st transaction.Storage) error { - itemC := make(chan *OldStampIndexItem) - errC := make(chan error) - - go func() { - for oldStampIndexItem := range itemC { - err := st.Run(context.Background(), func(s transaction.Store) error { - idxStore := s.IndexStore() - stamp, err := chunkstamp.LoadWithBatchID(idxStore, string(oldStampIndexItem.GetNamespace()), oldStampIndexItem.ChunkAddress, oldStampIndexItem.BatchID) - if err != nil { - return err - } - stampHash, err := stamp.Hash() if err != nil { return err } // same id. Will replace. - item := &stampindex.Item{ - BatchID: oldStampIndexItem.BatchID, - StampIndex: oldStampIndexItem.StampIndex, - StampHash: stampHash, - StampTimestamp: oldStampIndexItem.StampIndex, - ChunkAddress: oldStampIndexItem.ChunkAddress, - ChunkIsImmutable: oldStampIndexItem.ChunkIsImmutable, + stampIndexItem := &stampindex.Item{ + BatchID: chunkBinItemV1.BatchID, + StampIndex: stamp.Index(), + StampHash: stampHash, + StampTimestamp: stamp.Timestamp(), + ChunkAddress: chunkBinItemV1.Address, } - item.SetNamespace(oldStampIndexItem.GetNamespace()) - return idxStore.Put(item) + stampIndexItem.SetNamespace([]byte("reserve")) + err = idxStore.Put(stampIndexItem) + if err != nil { + return err + } + doneCount.Add(1) + return nil }) if err != nil { errC <- err - return + return err } - } - close(errC) - }() - - err := st.IndexStore().Iterate(storage.Query{ - Factory: func() storage.Item { return &OldStampIndexItem{&stampindex.Item{}} }, - }, func(result storage.Result) (bool, error) { - item := result.Entry.(*OldStampIndexItem) - select { - case itemC <- item: - case err := <-errC: - return true, err - } - return false, nil - }) - close(itemC) - if err != nil { - return err + return nil + }) } - return <-errC -} - -type OldChunkBinItem struct { - *reserve.ChunkBinItem -} - -// Unmarshal unmarshals the old chunk bin item that does not include a stamp hash. -func (c *OldChunkBinItem) Unmarshal(buf []byte) error { - i := 0 - c.Bin = buf[i] - i += 1 - - c.BinID = binary.BigEndian.Uint64(buf[i : i+8]) - i += 8 - - c.Address = swarm.NewAddress(buf[i : i+swarm.HashSize]).Clone() - i += swarm.HashSize - - c.BatchID = copyBytes(buf[i : i+swarm.HashSize]) - i += swarm.HashSize - - c.ChunkType = swarm.ChunkType(buf[i]) - c.StampHash = swarm.EmptyAddress.Bytes() - - return nil -} - -type OldBatchRadiusItem struct { - *reserve.BatchRadiusItem -} - -// ID returns the old ID format for BatchRadiusItem ID. (batchId/bin/ChunkAddr). -func (b *OldBatchRadiusItem) ID() string { - return string(b.BatchID) + string(b.Bin) + b.Address.ByteString() -} - -// Unmarshal unmarshals the old batch radius item that does not include a stamp hash. -func (b *OldBatchRadiusItem) Unmarshal(buf []byte) error { - i := 0 - b.Bin = buf[i] - i += 1 - - b.BatchID = copyBytes(buf[i : i+swarm.HashSize]) - i += swarm.HashSize - - b.Address = swarm.NewAddress(buf[i : i+swarm.HashSize]).Clone() - i += swarm.HashSize - - b.BinID = binary.BigEndian.Uint64(buf[i : i+8]) - b.StampHash = swarm.EmptyAddress.Bytes() - return nil -} - -type OldStampIndexItem struct { - *stampindex.Item -} - -// Unmarshal unmarhsals the old stamp index item that does not include a stamp hash. -func (i *OldStampIndexItem) Unmarshal(bytes []byte) error { - nsLen := int(binary.LittleEndian.Uint64(bytes)) - ni := &OldStampIndexItem{&stampindex.Item{}} - l := 8 - namespace := append(make([]byte, 0, nsLen), bytes[l:l+nsLen]...) - l += nsLen - ni.BatchID = append(make([]byte, 0, swarm.HashSize), bytes[l:l+swarm.HashSize]...) - l += swarm.HashSize - ni.StampIndex = append(make([]byte, 0, swarm.StampIndexSize), bytes[l:l+swarm.StampIndexSize]...) - l += swarm.StampIndexSize - ni.StampTimestamp = append(make([]byte, 0, swarm.StampTimestampSize), bytes[l:l+swarm.StampTimestampSize]...) - l += swarm.StampTimestampSize - ni.ChunkAddress = internal.AddressOrZero(bytes[l : l+swarm.HashSize]) - l += swarm.HashSize - switch bytes[l] { - case '0': - ni.ChunkIsImmutable = false - case '1': - ni.ChunkIsImmutable = true - default: - return errors.New("immutable invalid") + err := eg.Wait() + if err != nil { + return 0, 0, err } - ni.StampHash = swarm.EmptyAddress.Bytes() - *i = *ni - i.SetNamespace(namespace) - return nil -} -func copyBytes(src []byte) []byte { - if src == nil { - return nil - } - dst := make([]byte, len(src)) - copy(dst, src) - return dst + return seenCount, doneCount.Load(), nil } diff --git a/pkg/storer/migration/step_06_test.go b/pkg/storer/migration/step_06_test.go index 1c9a02b0678..ec9211f9d48 100644 --- a/pkg/storer/migration/step_06_test.go +++ b/pkg/storer/migration/step_06_test.go @@ -5,7 +5,6 @@ package migration_test import ( - "bytes" "context" "testing" @@ -23,6 +22,11 @@ import ( "github.com/stretchr/testify/require" ) +type oldAndNewItem[K storage.Item, V storage.Item] struct { + old K + new V +} + func Test_Step_06(t *testing.T) { t.Parallel() @@ -42,38 +46,43 @@ func Test_Step_06(t *testing.T) { chunks := chunktest.GenerateTestRandomChunks(10) ctx := context.Background() - err = store.Run(ctx, func(s transaction.Store) error { - for i, ch := range chunks { - err := s.IndexStore().Put(&localmigration.OldBatchRadiusItem{ - BatchRadiusItem: &reserve.BatchRadiusItem{ - Bin: uint8(i), - BatchID: ch.Stamp().BatchID(), - Address: ch.Address(), - }, - }) + batchRadiusItems := make(map[string]oldAndNewItem[*reserve.BatchRadiusItemV1, *reserve.BatchRadiusItem]) + chunkBinItems := make(map[string]oldAndNewItem[*reserve.ChunkBinItemV1, *reserve.ChunkBinItem]) + stampIndexItems := make(map[string]oldAndNewItem[*stampindex.ItemV1, *stampindex.Item]) + + for i, ch := range chunks { + err = store.Run(ctx, func(s transaction.Store) error { + b := &reserve.BatchRadiusItemV1{ + Bin: uint8(i), + BatchID: ch.Stamp().BatchID(), + Address: ch.Address(), + BinID: uint64(i), + } + err := s.IndexStore().Put(b) if err != nil { return err } + batchRadiusItems[string(b.BatchID)+string(b.Bin)+b.Address.ByteString()] = oldAndNewItem[*reserve.BatchRadiusItemV1, *reserve.BatchRadiusItem]{old: b, new: nil} - err = s.IndexStore().Put(&localmigration.OldChunkBinItem{ - ChunkBinItem: &reserve.ChunkBinItem{ - Bin: uint8(i), - Address: ch.Address(), - BatchID: ch.Stamp().BatchID(), - }, - }) + c := &reserve.ChunkBinItemV1{ + Bin: uint8(i), + BinID: uint64(i), + Address: ch.Address(), + BatchID: ch.Stamp().BatchID(), + ChunkType: swarm.ChunkTypeSingleOwner, + } + err = s.IndexStore().Put(c) if err != nil { return err } + chunkBinItems[c.ID()] = oldAndNewItem[*reserve.ChunkBinItemV1, *reserve.ChunkBinItem]{old: c, new: nil} - sIdxItem := &localmigration.OldStampIndexItem{ - Item: &stampindex.Item{ - BatchID: ch.Stamp().BatchID(), - StampIndex: ch.Stamp().Index(), - StampTimestamp: ch.Stamp().Timestamp(), - ChunkAddress: ch.Address(), - ChunkIsImmutable: false, - }, + sIdxItem := &stampindex.ItemV1{ + BatchID: ch.Stamp().BatchID(), + StampIndex: ch.Stamp().Index(), + StampTimestamp: ch.Stamp().Timestamp(), + ChunkAddress: ch.Address(), + ChunkIsImmutable: true, } sIdxItem.SetNamespace([]byte("reserve")) err = s.IndexStore().Put(sIdxItem) @@ -81,55 +90,98 @@ func Test_Step_06(t *testing.T) { return err } - err = chunkstamp.Store(s.IndexStore(), "reserve", ch) - if err != nil { - return err - } - } - return nil - }) - require.NoError(t, err) - checkItems(t, store.IndexStore(), false, 10, &localmigration.OldBatchRadiusItem{BatchRadiusItem: &reserve.BatchRadiusItem{}}) - checkItems(t, store.IndexStore(), false, 10, &localmigration.OldChunkBinItem{ChunkBinItem: &reserve.ChunkBinItem{}}) - checkItems(t, store.IndexStore(), false, 10, &localmigration.OldStampIndexItem{Item: &stampindex.Item{}}) + stampIndexItems[sIdxItem.ID()] = oldAndNewItem[*stampindex.ItemV1, *stampindex.Item]{old: sIdxItem, new: nil} + return chunkstamp.Store(s.IndexStore(), "reserve", ch) + }) + require.NoError(t, err) + } + + require.NoError(t, err) err = localmigration.Step_06(store)() require.NoError(t, err) - checkItems(t, store.IndexStore(), true, 10, &reserve.BatchRadiusItem{}) - checkItems(t, store.IndexStore(), true, 10, &reserve.ChunkBinItem{}) - checkItems(t, store.IndexStore(), true, 10, &stampindex.Item{}) + checkBatchRadiusItems(t, store.IndexStore(), len(chunks), batchRadiusItems) + checkChunkBinItems(t, store.IndexStore(), len(chunks), chunkBinItems) + checkStampIndex(t, store.IndexStore(), len(chunks), stampIndexItems) +} + +func checkBatchRadiusItems(t *testing.T, s storage.Reader, wantCount int, m map[string]oldAndNewItem[*reserve.BatchRadiusItemV1, *reserve.BatchRadiusItem]) { + t.Helper() + count := 0 + + err := s.Iterate(storage.Query{ + Factory: func() storage.Item { return new(reserve.BatchRadiusItem) }, + }, func(result storage.Result) (bool, error) { + count++ + b := result.Entry.(*reserve.BatchRadiusItem) + id := string(b.BatchID) + string(b.Bin) + b.Address.ByteString() + found, ok := m[id] + require.True(t, ok) + found.new = b + m[id] = found + return false, nil + }) + require.NoError(t, err) + assert.Equal(t, wantCount, count) + + for _, v := range m { + assert.Equal(t, v.old.Bin, v.new.Bin) + assert.Equal(t, v.old.BatchID, v.new.BatchID) + assert.Equal(t, v.old.Address, v.new.Address) + assert.Equal(t, v.old.BinID, v.new.BinID) + assert.NotEqual(t, swarm.EmptyAddress.Bytes(), v.new.StampHash) + } +} + +func checkChunkBinItems(t *testing.T, s storage.Reader, wantCount int, m map[string]oldAndNewItem[*reserve.ChunkBinItemV1, *reserve.ChunkBinItem]) { + t.Helper() + count := 0 + err := s.Iterate(storage.Query{ + Factory: func() storage.Item { return new(reserve.ChunkBinItem) }, + }, func(result storage.Result) (bool, error) { + count++ + b := result.Entry.(*reserve.ChunkBinItem) + found, ok := m[b.ID()] + require.True(t, ok) + found.new = b + m[b.ID()] = found + return false, nil + }) + require.NoError(t, err) + assert.Equal(t, wantCount, count) + for _, v := range m { + assert.Equal(t, v.old.Bin, v.new.Bin) + assert.Equal(t, v.old.BatchID, v.new.BatchID) + assert.Equal(t, v.old.Address, v.new.Address) + assert.Equal(t, v.old.BinID, v.new.BinID) + assert.Equal(t, v.old.ChunkType, v.new.ChunkType) + assert.NotEqual(t, swarm.EmptyAddress.Bytes(), v.new.StampHash) + } } -func checkItems(t *testing.T, s storage.Reader, wantStampHash bool, wantCount int, fact storage.Item) { +func checkStampIndex(t *testing.T, s storage.Reader, wantCount int, m map[string]oldAndNewItem[*stampindex.ItemV1, *stampindex.Item]) { t.Helper() count := 0 err := s.Iterate(storage.Query{ - Factory: func() storage.Item { return fact }, + Factory: func() storage.Item { return new(stampindex.Item) }, }, func(result storage.Result) (bool, error) { - var stampHash []byte - switch result.Entry.(type) { - case *localmigration.OldBatchRadiusItem: - stampHash = result.Entry.(*localmigration.OldBatchRadiusItem).StampHash - case *localmigration.OldChunkBinItem: - stampHash = result.Entry.(*localmigration.OldChunkBinItem).StampHash - case *localmigration.OldStampIndexItem: - stampHash = result.Entry.(*localmigration.OldStampIndexItem).StampHash - case *reserve.ChunkBinItem: - stampHash = result.Entry.(*reserve.ChunkBinItem).StampHash - case *reserve.BatchRadiusItem: - stampHash = result.Entry.(*reserve.BatchRadiusItem).StampHash - case *stampindex.Item: - stampHash = result.Entry.(*stampindex.Item).StampHash - } - eq := bytes.Equal(stampHash, swarm.EmptyAddress.Bytes()) - assert.Equal(t, wantStampHash, !eq) - if wantStampHash == eq { - return false, nil - } count++ + b := result.Entry.(*stampindex.Item) + found, ok := m[b.ID()] + require.True(t, ok) + found.new = b + m[b.ID()] = found return false, nil }) require.NoError(t, err) assert.Equal(t, wantCount, count) + for _, v := range m { + assert.Equal(t, v.old.Namespace(), v.new.Namespace()) + assert.Equal(t, v.old.BatchID, v.new.BatchID) + assert.Equal(t, v.old.StampIndex, v.new.StampIndex) + assert.Equal(t, v.old.StampTimestamp, v.new.StampTimestamp) + assert.Equal(t, v.old.ChunkAddress, v.new.ChunkAddress) + assert.NotEqual(t, swarm.EmptyAddress.Bytes(), v.new.StampHash) + } }