From 6ee501996924276ab2b9dd36434d2ac773dad2ca Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 6 Aug 2024 00:51:35 +0100 Subject: [PATCH 01/14] fix: replace chunk --- pkg/storer/internal/reserve/reserve.go | 27 ++++++++++++- pkg/storer/internal/reserve/reserve_test.go | 43 +++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 2910e60cccd..083d484f757 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -116,7 +116,23 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { r.multx.Lock(strconv.Itoa(int(bin))) defer r.multx.Unlock(strconv.Itoa(int(bin))) - return r.st.Run(ctx, func(s transaction.Store) error { + // If this chunk already exists in the chunkstore, we delete it in a separate transaction. + // The oldChunk will be restored if the next tx fails. + oldChunk, err := r.st.ChunkStore().Get(ctx, chunk.Address()) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return err + } + if oldChunk != nil { + err = r.st.Run(ctx, func(s transaction.Store) error { + r.logger.Warning("deleting chunk from chunkstore", "address", chunk.Address().String()) + return s.ChunkStore().Delete(ctx, chunk.Address()) + }) + if err != nil { + return err + } + } + + err = r.st.Run(ctx, func(s transaction.Store) error { oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) if err != nil { @@ -197,6 +213,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return nil }) + if err != nil { + if oldChunk != nil { + err = errors.Join(err, r.st.Run(ctx, func(s transaction.Store) error { + return s.ChunkStore().Put(ctx, oldChunk) + })) + } + return err + } + return nil } func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) { diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index f249bca8498..895078d34ba 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -135,6 +135,49 @@ func TestReserveChunkType(t *testing.T) { } } +func TestReplaceSameAddress(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + ts := internal.NewInmemStorage() + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("new payload")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + + err = r.Put(context.Background(), ch1) + if err != nil { + t.Fatal(err) + } + + err = r.Put(context.Background(), ch2) + if err != nil { + t.Fatal(err) + } + + stampHash, err := ch2.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + ch3, err := r.Get(context.Background(), ch2.Address(), ch2.Stamp().BatchID(), stampHash) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(ch3.Data(), ch2.Data()) { + t.Fatalf("ch3 and ch2 payloads not equal") + } +} + func TestReplaceOldIndex(t *testing.T) { t.Parallel() From 094755c3c3332d1082e9819538bfd627ccc206fb Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 6 Aug 2024 12:55:17 +0100 Subject: [PATCH 02/14] fix: refactor --- pkg/storage/chunkstore.go | 7 ++++ .../inmemchunkstore/inmemchunkstore.go | 6 +++- pkg/storer/internal/chunkstore/chunkstore.go | 21 ++++++++++++ pkg/storer/internal/reserve/reserve.go | 33 +++++++------------ .../internal/transaction/transaction.go | 7 ++++ 5 files changed, 51 insertions(+), 23 deletions(-) diff --git a/pkg/storage/chunkstore.go b/pkg/storage/chunkstore.go index c601dab317f..72f8b9ba784 100644 --- a/pkg/storage/chunkstore.go +++ b/pkg/storage/chunkstore.go @@ -39,6 +39,12 @@ type Hasser interface { Has(context.Context, swarm.Address) (bool, error) } +// Replacer is the interface that wraps the basic Replace method. +type Replacer interface { + // Replace a chunk in the store. + Replace(context.Context, swarm.Chunk) error +} + // PutterFunc type is an adapter to allow the use of // ChunkStore as Putter interface. If f is a function // with the appropriate signature, PutterFunc(f) is a @@ -70,6 +76,7 @@ type ChunkStore interface { Putter Deleter Hasser + Replacer // Iterate over chunks in no particular order. Iterate(context.Context, IterateChunkFn) error diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index f1fd629db41..3abff50fd20 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -8,7 +8,7 @@ import ( "context" "sync" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -77,6 +77,10 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { return nil } +func (c *ChunkStore) Replace(_ context.Context, _ swarm.Chunk) error { + return nil +} + func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index 89941192b47..67fee1e6d77 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -94,6 +94,27 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. return s.Put(rIdx) } +func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { + rIdx := &RetrievalIndexItem{Address: ch.Address()} + err := s.Get(rIdx) + if err != nil { + return fmt.Errorf("chunk store: failed to read retrievalIndex for address %s: %w", ch.Address(), err) + } + + err = sh.Release(ctx, rIdx.Location) + if err != nil { + return fmt.Errorf("chunkstore: failed to release sharky location: %w", err) + } + + loc, err := sh.Write(ctx, ch.Data()) + if err != nil { + return fmt.Errorf("chunk store: write to sharky failed: %w", err) + } + rIdx.Location = loc + rIdx.Timestamp = uint64(time.Now().Unix()) + return s.Put(rIdx) +} + func Delete(ctx context.Context, s storage.IndexStore, sh storage.Sharky, addr swarm.Address) error { rIdx := &RetrievalIndexItem{Address: addr} err := s.Get(rIdx) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 083d484f757..e6620d5833f 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -116,22 +116,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { r.multx.Lock(strconv.Itoa(int(bin))) defer r.multx.Unlock(strconv.Itoa(int(bin))) - // If this chunk already exists in the chunkstore, we delete it in a separate transaction. - // The oldChunk will be restored if the next tx fails. - oldChunk, err := r.st.ChunkStore().Get(ctx, chunk.Address()) - if err != nil && !errors.Is(err, storage.ErrNotFound) { - return err - } - if oldChunk != nil { - err = r.st.Run(ctx, func(s transaction.Store) error { - r.logger.Warning("deleting chunk from chunkstore", "address", chunk.Address().String()) - return s.ChunkStore().Delete(ctx, chunk.Address()) - }) - if err != nil { - return err - } - } - err = r.st.Run(ctx, func(s transaction.Store) error { oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) @@ -202,7 +186,17 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - err = s.ChunkStore().Put(ctx, chunk) + chunkType := storage.ChunkType(chunk) + has, err := s.ChunkStore().Has(ctx, chunk.Address()) + if err != nil { + return err + } + if !has { + err = s.ChunkStore().Put(ctx, chunk) + } else if chunkType == swarm.ChunkTypeSingleOwner { + r.logger.Warning("replacing SOC in chunkstore", "chunk", chunk.Address()) + err = s.ChunkStore().Replace(ctx, chunk) + } if err != nil { return err } @@ -214,11 +208,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return nil }) if err != nil { - if oldChunk != nil { - err = errors.Join(err, r.st.Run(ctx, func(s transaction.Store) error { - return s.ChunkStore().Put(ctx, oldChunk) - })) - } return err } return nil diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 8a506a0d528..57ad04c24e5 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -242,6 +242,13 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } +func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) { + defer handleMetric("chunkstore_replace", c.metrics)(&err) + unlock := c.lock(ch.Address()) + defer unlock() + return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch) +} + func (c *chunkStoreTrx) lock(addr swarm.Address) func() { // directly lock if c.readOnly { From 10e5c467e7963da026c1458aba532961eddf82d4 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 6 Aug 2024 13:04:59 +0100 Subject: [PATCH 03/14] fix: lint --- pkg/file/joiner/joiner_test.go | 10 ++++- pkg/storer/internal/reserve/reserve.go | 6 +-- pkg/storer/internal/reserve/reserve_test.go | 43 --------------------- 3 files changed, 10 insertions(+), 49 deletions(-) diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index 24d7c4e0999..2471d94edde 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -25,7 +25,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/file/splitter" filetest "github.com/ethersphere/bee/v2/pkg/file/testing" "github.com/ethersphere/bee/v2/pkg/log" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" @@ -1411,6 +1411,14 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error { return nil } +func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error { + c.mu.Lock() + defer c.mu.Unlock() + c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) + return nil + +} + func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index e6620d5833f..7ba798ec51e 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -116,7 +116,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { r.multx.Lock(strconv.Itoa(int(bin))) defer r.multx.Unlock(strconv.Itoa(int(bin))) - err = r.st.Run(ctx, func(s transaction.Store) error { + return r.st.Run(ctx, func(s transaction.Store) error { oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) if err != nil { @@ -207,10 +207,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return nil }) - if err != nil { - return err - } - return nil } func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) { diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 895078d34ba..f249bca8498 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -135,49 +135,6 @@ func TestReserveChunkType(t *testing.T) { } } -func TestReplaceSameAddress(t *testing.T) { - t.Parallel() - - baseAddr := swarm.RandAddress(t) - ts := internal.NewInmemStorage() - r, err := reserve.New( - baseAddr, - ts, - 0, kademlia.NewTopologyDriver(), - log.Noop, - ) - if err != nil { - t.Fatal(err) - } - - batch := postagetesting.MustNewBatch() - ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) - ch2 := swarm.NewChunk(ch1.Address(), []byte("new payload")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) - - err = r.Put(context.Background(), ch1) - if err != nil { - t.Fatal(err) - } - - err = r.Put(context.Background(), ch2) - if err != nil { - t.Fatal(err) - } - - stampHash, err := ch2.Stamp().Hash() - if err != nil { - t.Fatal(err) - } - ch3, err := r.Get(context.Background(), ch2.Address(), ch2.Stamp().BatchID(), stampHash) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(ch3.Data(), ch2.Data()) { - t.Fatalf("ch3 and ch2 payloads not equal") - } -} - func TestReplaceOldIndex(t *testing.T) { t.Parallel() From 8be8f9b2876d5cf05e7e666df00d6232ec601ab7 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 6 Aug 2024 13:39:50 +0100 Subject: [PATCH 04/14] fix: replace --- pkg/storer/internal/reserve/reserve.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 7ba798ec51e..87405c981b0 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -186,16 +186,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - chunkType := storage.ChunkType(chunk) has, err := s.ChunkStore().Has(ctx, chunk.Address()) if err != nil { return err } - if !has { - err = s.ChunkStore().Put(ctx, chunk) - } else if chunkType == swarm.ChunkTypeSingleOwner { - r.logger.Warning("replacing SOC in chunkstore", "chunk", chunk.Address()) + if has { + r.logger.Warning("replacing chunk in chunkstore", "chunk", chunk.Address()) err = s.ChunkStore().Replace(ctx, chunk) + } else { + err = s.ChunkStore().Put(ctx, chunk) } if err != nil { return err From 84a1a15a3a8f5f20315b652e7ee3e3a993269b98 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Thu, 8 Aug 2024 04:12:17 +0100 Subject: [PATCH 05/14] chore: refactor and added tests --- pkg/storer/internal/reserve/reserve.go | 26 +++++- pkg/storer/internal/reserve/reserve_test.go | 98 +++++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 87405c981b0..706c164b0ec 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -117,17 +117,19 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { defer r.multx.Unlock(strconv.Itoa(int(bin))) return r.st.Run(ctx, func(s transaction.Store) error { - + // index collision (same or different chunk) oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) if err != nil { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } + if loadedStamp { prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } + // An older (same or different) chunk with the same batchID and stamp index has been previously // saved to the reserve. We must do the below before saving the new chunk: // 1. Delete the old chunk from the chunkstore. @@ -152,6 +154,28 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { if err != nil { return fmt.Errorf("failed updating stamp index: %w", err) } + } else { + // same chunk with different index + oldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID()) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return err + } + if oldChunkStamp != nil { + prev := binary.BigEndian.Uint64(oldChunkStamp.Timestamp()) + curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) + if prev >= curr { + return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + } + oldChunkStampHash, err := oldChunkStamp.Hash() + if err != nil { + return err + } + err = r.removeChunk(ctx, s, chunk.Address(), oldChunkStamp.BatchID(), oldChunkStampHash) + if err != nil { + return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err) + } + // stampindex is already saved in LoadOrStore + } } err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index f249bca8498..9d6ee9fb1a0 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -135,6 +135,104 @@ func TestReserveChunkType(t *testing.T) { } } +func TestSameChunkSameIndex(t *testing.T) { + t.Parallel() + + ctx := context.Background() + baseAddr := swarm.RandAddress(t) + + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + t.Run("same stamp index older timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + err = r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + err = r.Put(ctx, ch2) + if !errors.Is(err, storage.ErrOverwriteNewerChunk) { + t.Fatal("expected error") + } + }) + + t.Run("different stamp index older timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0)) + + err = r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + err = r.Put(ctx, ch2) + if !errors.Is(err, storage.ErrOverwriteNewerChunk) { + t.Fatal("expected error") + } + }) + + replace := func(t *testing.T, ch1, ch2 swarm.Chunk) { + t.Helper() + + err := r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } + + ch1StampHash, err := ch1.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + ch2StampHash, err := ch2.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 3, StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 4, StampHash: ch2StampHash}, false) + + ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(ch.Data(), ch2.Data()) { + t.Fatalf("expected chunk data to be updated") + } + } + + t.Run("same stamp index newer timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + replace(t, ch1, ch2) + }) + + t.Run("different stamp index newer timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) + replace(t, ch1, ch2) + }) +} + func TestReplaceOldIndex(t *testing.T) { t.Parallel() From f872409efdcc6912494d2c6b59e6a2cab40386f2 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Thu, 8 Aug 2024 04:12:54 +0100 Subject: [PATCH 06/14] chore: fix --- pkg/storer/internal/reserve/reserve.go | 2 -- pkg/storer/internal/reserve/reserve_test.go | 10 +++++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 706c164b0ec..e05d13964fc 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -122,14 +122,12 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { if err != nil { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } - if loadedStamp { prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } - // An older (same or different) chunk with the same batchID and stamp index has been previously // saved to the reserve. We must do the below before saving the new chunk: // 1. Delete the old chunk from the chunkstore. diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 9d6ee9fb1a0..12f3c967315 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -135,7 +135,7 @@ func TestReserveChunkType(t *testing.T) { } } -func TestSameChunkSameIndex(t *testing.T) { +func TestSameChunkAddress(t *testing.T) { t.Parallel() ctx := context.Background() @@ -153,7 +153,7 @@ func TestSameChunkSameIndex(t *testing.T) { t.Fatal(err) } - t.Run("same stamp index older timestamp", func(t *testing.T) { + t.Run("same stamp index and older timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) @@ -167,7 +167,7 @@ func TestSameChunkSameIndex(t *testing.T) { } }) - t.Run("different stamp index older timestamp", func(t *testing.T) { + t.Run("different stamp index and older timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0)) @@ -218,14 +218,14 @@ func TestSameChunkSameIndex(t *testing.T) { } } - t.Run("same stamp index newer timestamp", func(t *testing.T) { + t.Run("same stamp index and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) replace(t, ch1, ch2) }) - t.Run("different stamp index newer timestamp", func(t *testing.T) { + t.Run("different stamp index and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) From 65c2e0e360a0d799291d3333c965f0391cbf4154 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Sun, 11 Aug 2024 23:43:35 +0100 Subject: [PATCH 07/14] fix: refactoring --- pkg/soc/testing/soc.go | 33 +++++ .../inmemchunkstore/inmemchunkstore.go | 8 +- pkg/storer/internal/reserve/reserve.go | 122 +++++++++++++----- pkg/storer/internal/reserve/reserve_test.go | 77 ++++++++--- pkg/storer/internal/stampindex/stampindex.go | 6 + 5 files changed, 197 insertions(+), 49 deletions(-) diff --git a/pkg/soc/testing/soc.go b/pkg/soc/testing/soc.go index e5325f464b1..787c8737b4b 100644 --- a/pkg/soc/testing/soc.go +++ b/pkg/soc/testing/soc.go @@ -32,6 +32,39 @@ func (ms MockSOC) Chunk() swarm.Chunk { return swarm.NewChunk(ms.Address(), append(ms.ID, append(ms.Signature, ms.WrappedChunk.Data()...)...)) } +// GenerateMockSocWithSigner generates a valid mocked SOC from given data and signer. +func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) *MockSOC { + t.Helper() + + owner, err := signer.EthereumAddress() + if err != nil { + t.Fatal(err) + } + ch, err := cac.New(data) + if err != nil { + t.Fatal(err) + } + + id := make([]byte, swarm.HashSize) + hasher := swarm.NewHasher() + _, err = hasher.Write(append(id, ch.Address().Bytes()...)) + if err != nil { + t.Fatal(err) + } + + signature, err := signer.Sign(hasher.Sum(nil)) + if err != nil { + t.Fatal(err) + } + + return &MockSOC{ + ID: id, + Owner: owner.Bytes(), + Signature: signature, + WrappedChunk: ch, + } +} + // GenerateMockSOC generates a valid mocked SOC from given data. func GenerateMockSOC(t *testing.T, data []byte) *MockSOC { t.Helper() diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index 3abff50fd20..7d49e63c279 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -77,7 +77,13 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { return nil } -func (c *ChunkStore) Replace(_ context.Context, _ swarm.Chunk) error { +func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error { + c.mu.Lock() + defer c.mu.Unlock() + + chunkCount := c.chunks[ch.Address().ByteString()] + chunkCount.chunk = ch + c.chunks[ch.Address().ByteString()] = chunkCount return nil } diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index e05d13964fc..8d6c17b0040 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -117,11 +117,98 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { defer r.multx.Unlock(strconv.Itoa(int(bin))) return r.st.Run(ctx, func(s transaction.Store) error { - // index collision (same or different chunk) + oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) if err != nil { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } + + sameAddressOldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID()) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return err + } + + // same address + if sameAddressOldChunkStamp != nil { + sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldChunkStamp) + if err != nil && !errors.Is(err, storage.ErrNotFound) { + return err + } + prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) + curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) + if prev >= curr { + return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + } + if loadedStamp { + prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) + if prev >= curr { + return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + } + } + + // load item to get the binID + oldBatchRadiusItem := &BatchRadiusItem{ + Bin: bin, + Address: chunk.Address(), + BatchID: sameAddressOldStampIndex.BatchID, + StampHash: sameAddressOldStampIndex.StampHash, + } + err = s.IndexStore().Get(oldBatchRadiusItem) + if err != nil { + return err + } + + err = errors.Join( + s.IndexStore().Delete(oldBatchRadiusItem), + s.IndexStore().Delete(&ChunkBinItem{Bin: bin, BinID: oldBatchRadiusItem.BinID}), + stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(chunk.Address(), nil).WithStamp(sameAddressOldChunkStamp)), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, chunk.Address(), sameAddressOldChunkStamp), + ) + if err != nil { + return err + } + + err = stampindex.Store(s.IndexStore(), reserveNamespace, chunk) + if err != nil { + return err + } + + err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk) + if err != nil { + return err + } + + err = s.IndexStore().Put(&BatchRadiusItem{ + Bin: bin, + BinID: oldBatchRadiusItem.BinID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + StampHash: stampHash, + }) + if err != nil { + return err + } + + err = s.IndexStore().Put(&ChunkBinItem{ + Bin: bin, + BinID: oldBatchRadiusItem.BinID, + Address: chunk.Address(), + BatchID: chunk.Stamp().BatchID(), + ChunkType: storage.ChunkType(chunk), + StampHash: stampHash, + }) + if err != nil { + return err + } + + if storage.ChunkType(chunk) != swarm.ChunkTypeSingleOwner { + return nil + } + r.logger.Warning("replacing soc in chunkstore", "address", chunk.Address()) + return s.ChunkStore().Replace(ctx, chunk) + } + + // different address if loadedStamp { prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) @@ -152,28 +239,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { if err != nil { return fmt.Errorf("failed updating stamp index: %w", err) } - } else { - // same chunk with different index - oldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID()) - if err != nil && !errors.Is(err, storage.ErrNotFound) { - return err - } - if oldChunkStamp != nil { - prev := binary.BigEndian.Uint64(oldChunkStamp.Timestamp()) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev >= curr { - return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) - } - oldChunkStampHash, err := oldChunkStamp.Hash() - if err != nil { - return err - } - err = r.removeChunk(ctx, s, chunk.Address(), oldChunkStamp.BatchID(), oldChunkStampHash) - if err != nil { - return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err) - } - // stampindex is already saved in LoadOrStore - } } err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk) @@ -208,16 +273,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - has, err := s.ChunkStore().Has(ctx, chunk.Address()) - if err != nil { - return err - } - if has { - r.logger.Warning("replacing chunk in chunkstore", "chunk", chunk.Address()) - err = s.ChunkStore().Replace(ctx, chunk) - } else { - err = s.ChunkStore().Put(ctx, chunk) - } + err = s.ChunkStore().Put(ctx, chunk) if err != nil { return err } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 12f3c967315..f03d8170da6 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -13,9 +13,11 @@ import ( "testing" "time" + "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" + soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing" "github.com/ethersphere/bee/v2/pkg/storage" chunk "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/storer/internal" @@ -153,10 +155,18 @@ func TestSameChunkAddress(t *testing.T) { t.Fatal(err) } + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + t.Run("same stamp index and older timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() - ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) - ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) err = r.Put(ctx, ch1) if err != nil { t.Fatal(err) @@ -169,9 +179,10 @@ func TestSameChunkAddress(t *testing.T) { t.Run("different stamp index and older timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() - ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) - ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0)) - + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0)) err = r.Put(ctx, ch1) if err != nil { t.Fatal(err) @@ -182,7 +193,7 @@ func TestSameChunkAddress(t *testing.T) { } }) - replace := func(t *testing.T, ch1, ch2 swarm.Chunk) { + replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, binID uint64) { t.Helper() err := r.Put(ctx, ch1) @@ -204,10 +215,18 @@ func TestSameChunkAddress(t *testing.T) { if err != nil { t.Fatal(err) } - checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) - checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 3, StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 4, StampHash: ch2StampHash}, false) + + bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) + chunkBinItem := &reserve.ChunkBinItem{Bin: bin, BinID: binID} + err = ts.IndexStore().Get(chunkBinItem) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(chunkBinItem.StampHash, ch2StampHash) { + t.Fatalf("expected chunk bin item to be updated") + } ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) if err != nil { @@ -220,16 +239,44 @@ func TestSameChunkAddress(t *testing.T) { t.Run("same stamp index and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() - ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) - ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) - replace(t, ch1, ch2) + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + replace(t, ch1, ch2, 3) }) t.Run("different stamp index and newer timestamp", func(t *testing.T) { + batch := postagetesting.MustNewBatch() + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) + replace(t, ch1, ch2, 4) + }) + + t.Run("not a soc and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) - ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) - replace(t, ch1, ch2) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + err := r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } + + ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(ch.Data(), ch1.Data()) { + t.Fatalf("expected chunk data to not be updated") + } }) } diff --git a/pkg/storer/internal/stampindex/stampindex.go b/pkg/storer/internal/stampindex/stampindex.go index bbb81763cf6..3ead7c8d861 100644 --- a/pkg/storer/internal/stampindex/stampindex.go +++ b/pkg/storer/internal/stampindex/stampindex.go @@ -185,6 +185,12 @@ func Load(s storage.Reader, namespace string, chunk swarm.Chunk) (*Item, error) return item, nil } +// LoadWithStamp returns stamp index record related to the given namespace and stamp. +func LoadWithStamp(s storage.Reader, namespace string, stamp swarm.Stamp) (*Item, error) { + ch := swarm.NewChunk(swarm.EmptyAddress, nil).WithStamp(stamp) + return Load(s, namespace, ch) +} + // Store creates new or updated an existing stamp index // record related to the given namespace and chunk. func Store(s storage.IndexStore, namespace string, chunk swarm.Chunk) error { From 212f474745f314d783226babbb66e79ede02ea3c Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 19 Aug 2024 13:35:31 +0100 Subject: [PATCH 08/14] fix: review comments --- pkg/storer/internal/reserve/reserve.go | 41 +++++++++++++++------ pkg/storer/internal/reserve/reserve_test.go | 4 +- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 8d6c17b0040..df9eaf70a4b 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -131,18 +131,28 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { // same address if sameAddressOldChunkStamp != nil { sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldChunkStamp) - if err != nil && !errors.Is(err, storage.ErrNotFound) { + if err != nil { return err } prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { - return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } if loadedStamp { prev := binary.BigEndian.Uint64(oldItem.StampTimestamp) if prev >= curr { - return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) + } + + r.logger.Warning( + "replacing chunk stamp index", + "old_chunk", oldItem.ChunkAddress, + "new_chunk", chunk.Address(), + "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), + ) + if !chunk.Address().Equal(oldItem.ChunkAddress) { + err = r.removeChunk(ctx, s, oldItem.ChunkAddress, oldItem.BatchID, oldItem.StampHash) } } @@ -158,12 +168,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - err = errors.Join( - s.IndexStore().Delete(oldBatchRadiusItem), - s.IndexStore().Delete(&ChunkBinItem{Bin: bin, BinID: oldBatchRadiusItem.BinID}), - stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(chunk.Address(), nil).WithStamp(sameAddressOldChunkStamp)), - chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, chunk.Address(), sameAddressOldChunkStamp), - ) + err = r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldChunkStamp) if err != nil { return err } @@ -178,9 +183,14 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } + binID, err := r.IncBinID(s.IndexStore(), bin) + if err != nil { + return err + } + err = s.IndexStore().Put(&BatchRadiusItem{ Bin: bin, - BinID: oldBatchRadiusItem.BinID, + BinID: binID, Address: chunk.Address(), BatchID: chunk.Stamp().BatchID(), StampHash: stampHash, @@ -191,7 +201,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { err = s.IndexStore().Put(&ChunkBinItem{ Bin: bin, - BinID: oldBatchRadiusItem.BinID, + BinID: binID, Address: chunk.Address(), BatchID: chunk.Stamp().BatchID(), ChunkType: storage.ChunkType(chunk), @@ -286,6 +296,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { }) } +func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error { + return errors.Join( + s.IndexStore().Delete(oldBatchRadiusItem), + s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), + stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(oldBatchRadiusItem.Address, nil).WithStamp(sameAddressOldChunkStamp)), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, oldBatchRadiusItem.Address, sameAddressOldChunkStamp), + ) +} + func (r *Reserve) Has(addr swarm.Address, batchID []byte, stampHash []byte) (bool, error) { item := &BatchRadiusItem{Bin: swarm.Proximity(r.baseAddr.Bytes(), addr.Bytes()), BatchID: batchID, Address: addr, StampHash: stampHash} return r.st.IndexStore().Has(item) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index f03d8170da6..4222b4bdaaf 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -243,7 +243,7 @@ func TestSameChunkAddress(t *testing.T) { ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) - replace(t, ch1, ch2, 3) + replace(t, ch1, ch2, 4) }) t.Run("different stamp index and newer timestamp", func(t *testing.T) { @@ -252,7 +252,7 @@ func TestSameChunkAddress(t *testing.T) { ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) - replace(t, ch1, ch2, 4) + replace(t, ch1, ch2, 6) }) t.Run("not a soc and newer timestamp", func(t *testing.T) { From b9a21a313060fb5fc92c8961ac3fc0541403a181 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 19 Aug 2024 13:37:12 +0100 Subject: [PATCH 09/14] fix: log placement --- pkg/storer/internal/reserve/reserve.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index df9eaf70a4b..e3049471e28 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -145,13 +145,13 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } - r.logger.Warning( - "replacing chunk stamp index", - "old_chunk", oldItem.ChunkAddress, - "new_chunk", chunk.Address(), - "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), - ) if !chunk.Address().Equal(oldItem.ChunkAddress) { + r.logger.Warning( + "replacing chunk stamp index", + "old_chunk", oldItem.ChunkAddress, + "new_chunk", chunk.Address(), + "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), + ) err = r.removeChunk(ctx, s, oldItem.ChunkAddress, oldItem.BatchID, oldItem.StampHash) } } From 906178eed50a7d0903bc25cf79f1ef9a4f897a77 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 19 Aug 2024 13:39:31 +0100 Subject: [PATCH 10/14] fix: lint --- pkg/storer/internal/reserve/reserve.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index e3049471e28..b502d60f0cd 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -153,6 +153,9 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), ) err = r.removeChunk(ctx, s, oldItem.ChunkAddress, oldItem.BatchID, oldItem.StampHash) + if err != nil { + return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err) + } } } From 6e960211042d0e01b1aa3e502fb6fc2a8e1ec164 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 26 Aug 2024 10:43:51 +0100 Subject: [PATCH 11/14] fix: unecessary load --- pkg/storer/internal/reserve/reserve.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index b502d60f0cd..e9d4388eb07 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -159,18 +159,12 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } } - // load item to get the binID oldBatchRadiusItem := &BatchRadiusItem{ Bin: bin, Address: chunk.Address(), BatchID: sameAddressOldStampIndex.BatchID, StampHash: sameAddressOldStampIndex.StampHash, } - err = s.IndexStore().Get(oldBatchRadiusItem) - if err != nil { - return err - } - err = r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldChunkStamp) if err != nil { return err From 76813eb46d12c57cd1d2a28e2612f4837ddd665c Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 26 Aug 2024 13:23:42 +0100 Subject: [PATCH 12/14] fix: remove batchID from load --- pkg/storer/internal/chunkstamp/chunkstamp.go | 2 +- pkg/storer/internal/reserve/reserve.go | 2 +- pkg/storer/internal/reserve/reserve_test.go | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index fc7b0809533..1f84b8c7a3c 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -11,7 +11,7 @@ import ( "fmt" "github.com/ethersphere/bee/v2/pkg/postage" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/storageutil" "github.com/ethersphere/bee/v2/pkg/swarm" ) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index e9d4388eb07..7dfc9afab41 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -123,7 +123,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } - sameAddressOldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID()) + sameAddressOldChunkStamp, err := chunkstamp.Load(s.IndexStore(), reserveNamespace, chunk.Address()) if err != nil && !errors.Is(err, storage.ErrNotFound) { return err } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 4222b4bdaaf..c598c2eb80c 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -180,7 +180,7 @@ func TestSameChunkAddress(t *testing.T) { t.Run("different stamp index and older timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) - ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 2)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0)) err = r.Put(ctx, ch1) @@ -240,25 +240,25 @@ func TestSameChunkAddress(t *testing.T) { t.Run("same stamp index and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) - ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 3)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) - ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 4)) replace(t, ch1, ch2, 4) }) t.Run("different stamp index and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) - ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 5)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) - ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) + ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 6)) replace(t, ch1, ch2, 6) }) t.Run("not a soc and newer timestamp", func(t *testing.T) { batch := postagetesting.MustNewBatch() - ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) - ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1)) + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 7)) + ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 8)) err := r.Put(ctx, ch1) if err != nil { t.Fatal(err) From e1278d48e02c9cec7085b7ee189af9f604fd3572 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 26 Aug 2024 14:40:24 +0100 Subject: [PATCH 13/14] fix: load bin id --- pkg/storer/internal/reserve/reserve.go | 6 ++++++ pkg/storer/internal/reserve/reserve_test.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 7dfc9afab41..aa0aa1463be 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -165,6 +165,12 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { BatchID: sameAddressOldStampIndex.BatchID, StampHash: sameAddressOldStampIndex.StampHash, } + // load item to get the binID + err = s.IndexStore().Get(oldBatchRadiusItem) + if err != nil { + return err + } + err = r.deleteWithStamp(s, oldBatchRadiusItem, sameAddressOldChunkStamp) if err != nil { return err diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index c598c2eb80c..b5ff345aeaf 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -219,6 +219,9 @@ func TestSameChunkAddress(t *testing.T) { bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binID - 1, StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binID, StampHash: ch2StampHash}, false) + chunkBinItem := &reserve.ChunkBinItem{Bin: bin, BinID: binID} err = ts.IndexStore().Get(chunkBinItem) if err != nil { From d1e029b02caa5d92f6662e3e04ec9cd8295b7636 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 26 Aug 2024 14:51:29 +0100 Subject: [PATCH 14/14] fix: refactor --- pkg/storer/internal/reserve/reserve_test.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index b5ff345aeaf..e25f06d618a 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -193,7 +193,7 @@ func TestSameChunkAddress(t *testing.T) { } }) - replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, binID uint64) { + replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, ch1BinID, ch2BinID uint64) { t.Helper() err := r.Put(ctx, ch1) @@ -219,18 +219,8 @@ func TestSameChunkAddress(t *testing.T) { bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) - checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binID - 1, StampHash: ch1StampHash}, true) - checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binID, StampHash: ch2StampHash}, false) - - chunkBinItem := &reserve.ChunkBinItem{Bin: bin, BinID: binID} - err = ts.IndexStore().Get(chunkBinItem) - if err != nil { - t.Fatal(err) - } - if !bytes.Equal(chunkBinItem.StampHash, ch2StampHash) { - t.Fatalf("expected chunk bin item to be updated") - } - + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: ch1BinID, StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: ch2BinID, StampHash: ch2StampHash}, false) ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) if err != nil { t.Fatal(err) @@ -246,7 +236,7 @@ func TestSameChunkAddress(t *testing.T) { ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 3)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 4)) - replace(t, ch1, ch2, 4) + replace(t, ch1, ch2, 3, 4) }) t.Run("different stamp index and newer timestamp", func(t *testing.T) { @@ -255,7 +245,7 @@ func TestSameChunkAddress(t *testing.T) { ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 5)) s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 6)) - replace(t, ch1, ch2, 6) + replace(t, ch1, ch2, 5, 6) }) t.Run("not a soc and newer timestamp", func(t *testing.T) {