diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index 24d7c4e0999..2471d94edde 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -25,7 +25,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/file/splitter" filetest "github.com/ethersphere/bee/v2/pkg/file/testing" "github.com/ethersphere/bee/v2/pkg/log" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" @@ -1411,6 +1411,14 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error { return nil } +func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error { + c.mu.Lock() + defer c.mu.Unlock() + c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) + return nil + +} + func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/soc/testing/soc.go b/pkg/soc/testing/soc.go index e5325f464b1..787c8737b4b 100644 --- a/pkg/soc/testing/soc.go +++ b/pkg/soc/testing/soc.go @@ -32,6 +32,39 @@ func (ms MockSOC) Chunk() swarm.Chunk { return swarm.NewChunk(ms.Address(), append(ms.ID, append(ms.Signature, ms.WrappedChunk.Data()...)...)) } +// GenerateMockSocWithSigner generates a valid mocked SOC from given data and signer. +func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) *MockSOC { + t.Helper() + + owner, err := signer.EthereumAddress() + if err != nil { + t.Fatal(err) + } + ch, err := cac.New(data) + if err != nil { + t.Fatal(err) + } + + id := make([]byte, swarm.HashSize) + hasher := swarm.NewHasher() + _, err = hasher.Write(append(id, ch.Address().Bytes()...)) + if err != nil { + t.Fatal(err) + } + + signature, err := signer.Sign(hasher.Sum(nil)) + if err != nil { + t.Fatal(err) + } + + return &MockSOC{ + ID: id, + Owner: owner.Bytes(), + Signature: signature, + WrappedChunk: ch, + } +} + // GenerateMockSOC generates a valid mocked SOC from given data. func GenerateMockSOC(t *testing.T, data []byte) *MockSOC { t.Helper() diff --git a/pkg/storage/chunkstore.go b/pkg/storage/chunkstore.go index c601dab317f..72f8b9ba784 100644 --- a/pkg/storage/chunkstore.go +++ b/pkg/storage/chunkstore.go @@ -39,6 +39,12 @@ type Hasser interface { Has(context.Context, swarm.Address) (bool, error) } +// Replacer is the interface that wraps the basic Replace method. +type Replacer interface { + // Replace a chunk in the store. + Replace(context.Context, swarm.Chunk) error +} + // PutterFunc type is an adapter to allow the use of // ChunkStore as Putter interface. If f is a function // with the appropriate signature, PutterFunc(f) is a @@ -70,6 +76,7 @@ type ChunkStore interface { Putter Deleter Hasser + Replacer // Iterate over chunks in no particular order. Iterate(context.Context, IterateChunkFn) error diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index f1fd629db41..7d49e63c279 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -8,7 +8,7 @@ import ( "context" "sync" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -77,6 +77,16 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { return nil } +func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error { + c.mu.Lock() + defer c.mu.Unlock() + + chunkCount := c.chunks[ch.Address().ByteString()] + chunkCount.chunk = ch + c.chunks[ch.Address().ByteString()] = chunkCount + return nil +} + func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index fc7b0809533..1f84b8c7a3c 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -11,7 +11,7 @@ import ( "fmt" "github.com/ethersphere/bee/v2/pkg/postage" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/storageutil" "github.com/ethersphere/bee/v2/pkg/swarm" ) diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index 89941192b47..67fee1e6d77 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -94,6 +94,27 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. return s.Put(rIdx) } +func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { + rIdx := &RetrievalIndexItem{Address: ch.Address()} + err := s.Get(rIdx) + if err != nil { + return fmt.Errorf("chunk store: failed to read retrievalIndex for address %s: %w", ch.Address(), err) + } + + err = sh.Release(ctx, rIdx.Location) + if err != nil { + return fmt.Errorf("chunkstore: failed to release sharky location: %w", err) + } + + loc, err := sh.Write(ctx, ch.Data()) + if err != nil { + return fmt.Errorf("chunk store: write to sharky failed: %w", err) + } + rIdx.Location = loc + rIdx.Timestamp = uint64(time.Now().Unix()) + return s.Put(rIdx) +} + func Delete(ctx context.Context, s storage.IndexStore, sh storage.Sharky, addr swarm.Address) error { rIdx := &RetrievalIndexItem{Address: addr} err := s.Get(rIdx) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 2910e60cccd..aa0aa1463be 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -122,6 +122,106 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { if err != nil { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } + + sameAddressOldChunkStamp, err := chunkstamp.Load(s.IndexStore(), reserveNamespace, chunk.Address()) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return err + } + + // same address + if sameAddressOldChunkStamp != nil { + sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldChunkStamp) + if err != nil { + return err + } + prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) + curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) + if prev >= curr { + return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + } + if loadedStamp { + prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) + if prev >= curr { + return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + } + + if !chunk.Address().Equal(oldItem.ChunkAddress) { + r.logger.Warning( + "replacing chunk stamp index", + "old_chunk", oldItem.ChunkAddress, + "new_chunk", chunk.Address(), + "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), + ) + err = r.removeChunk(ctx, s, oldItem.ChunkAddress, oldItem.BatchID, oldItem.StampHash) + if err != nil { + return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err) + } + } + } + + oldBatchRadiusItem := &BatchRadiusItem{ + Bin: bin, + Address: chunk.Address(), + BatchID: sameAddressOldStampIndex.BatchID, + StampHash: sameAddressOldStampIndex.StampHash, + } + // load item to get the binID + err = s.IndexStore().Get(oldBatchRadiusItem) + if err != nil { + return err + } + + err = r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldChunkStamp) + if err != nil { + return err + } + + err = stampindex.Store(s.IndexStore(), reserveNamespace, chunk) + if err != nil { + return err + } + + err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk) + if err != nil { + return err + } + + binID, err := r.IncBinID(s.IndexStore(), bin) + if err != nil { + return err + } + + err = s.IndexStore().Put(&BatchRadiusItem{ + Bin: bin, + BinID: binID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + StampHash: stampHash, + }) + if err != nil { + return err + } + + err = s.IndexStore().Put(&ChunkBinItem{ + Bin: bin, + BinID: binID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + ChunkType: storage.ChunkType(chunk), + StampHash: stampHash, + }) + if err != nil { + return err + } + + if storage.ChunkType(chunk) != swarm.ChunkTypeSingleOwner { + return nil + } + r.logger.Warning("replacing soc in chunkstore", "address", chunk.Address()) + return s.ChunkStore().Replace(ctx, chunk) + } + + // different address if loadedStamp { prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) @@ -199,6 +299,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { }) } +func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error { + return errors.Join( + s.IndexStore().Delete(oldBatchRadiusItem), + s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), + stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(oldBatchRadiusItem.Address, nil).WithStamp(sameAddressOldChunkStamp)), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldChunkStamp), + ) +} + func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) { item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr, StampHash: stampHash} return r.st.IndexStore().Has(item) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index f249bca8498..e25f06d618a 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -13,9 +13,11 @@ import ( "testing" "time" + "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" + soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing" "github.com/ethersphere/bee/v2/pkg/storage" chunk "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/storer/internal" @@ -135,6 +137,142 @@ func TestReserveChunkType(t *testing.T) { } } +func TestSameChunkAddress(t *testing.T) { + t.Parallel() + + ctx := context.Background() + baseAddr := swarm.RandAddress(t) + + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + + t.Run("same stamp index and older timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + err = r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + err = r.Put(ctx, ch2) + if !errors.Is(err, storage.ErrOverwriteNewerChunk) { + t.Fatal("expected error") + } + }) + + t.Run("different stamp index and older timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 2)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0)) + err = r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + err = r.Put(ctx, ch2) + if !errors.Is(err, storage.ErrOverwriteNewerChunk) { + t.Fatal("expected error") + } + }) + + replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, ch1BinID, ch2BinID uint64) { + t.Helper() + + err := r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } + + ch1StampHash, err := ch1.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + ch2StampHash, err := ch2.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: ch1BinID, StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: ch2BinID, StampHash: ch2StampHash}, false) + ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(ch.Data(), ch2.Data()) { + t.Fatalf("expected chunk data to be updated") + } + } + + t.Run("same stamp index and newer timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 3)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 4)) + replace(t, ch1, ch2, 3, 4) + }) + + t.Run("different stamp index and newer timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 5)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 6)) + replace(t, ch1, ch2, 5, 6) + }) + + t.Run("not a soc and newer timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 7)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 8)) + err := r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } + + ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(ch.Data(), ch1.Data()) { + t.Fatalf("expected chunk data to not be updated") + } + }) +} + func TestReplaceOldIndex(t *testing.T) { t.Parallel() diff --git a/pkg/storer/internal/stampindex/stampindex.go b/pkg/storer/internal/stampindex/stampindex.go index bbb81763cf6..3ead7c8d861 100644 --- a/pkg/storer/internal/stampindex/stampindex.go +++ b/pkg/storer/internal/stampindex/stampindex.go @@ -185,6 +185,12 @@ func Load(s storage.Reader, namespace string, chunk swarm.Chunk) (*Item, error) return item, nil } +// LoadWithStamp returns stamp index record related to the given namespace and stamp. +func LoadWithStamp(s storage.Reader, namespace string, stamp swarm.Stamp) (*Item, error) { + ch := swarm.NewChunk(swarm.EmptyAddress, nil).WithStamp(stamp) + return Load(s, namespace, ch) +} + // Store creates new or updated an existing stamp index // record related to the given namespace and chunk. func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 8a506a0d528..57ad04c24e5 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -242,6 +242,13 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } +func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) { + defer handleMetric("chunkstore_replace", c.metrics)(&err) + unlock := c.lock(ch.Address()) + defer unlock() + return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch) +} + func (c *chunkStoreTrx) lock(addr swarm.Address) func() { // directly lock if c.readOnly {