From f1ed79ee745e971b5faa9a9c6bbebd97bbe0bd27 Mon Sep 17 00:00:00 2001 From: Acha Bill <57879913+acha-bill@users.noreply.github.com> Date: Mon, 2 Sep 2024 16:15:35 +0100 Subject: [PATCH] fix: reduce reserve cnt on same chunk and index collision (#4798) Co-authored-by: Ivan Vandot --- .github/workflows/beekeeper.yml | 1 + pkg/storer/internal/reserve/reserve.go | 17 +++- pkg/storer/internal/reserve/reserve_test.go | 97 +++++++++++++++++++++ 3 files changed, 113 insertions(+), 2 deletions(-) diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index a10329748ce..d8c88c47bdf 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -82,6 +82,7 @@ jobs: - uses: actions/upload-artifact@v4 with: name: temp-artifacts + include-hidden-files: true path: | Dockerfile.goreleaser Makefile diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 9cc387a1906..7781498cd1f 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -124,7 +124,9 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { r.multx.Lock(strconv.Itoa(int(bin))) defer r.multx.Unlock(strconv.Itoa(int(bin))) - return r.st.Run(ctx, func(s transaction.Store) error { + var shouldIncReserveSize, shouldDecrReserveSize bool + + err = r.st.Run(ctx, func(s transaction.Store) error { oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk) if err != nil { @@ -165,6 +167,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { if err != nil { return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) } + shouldDecrReserveSize = true } } @@ -279,11 +282,21 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } if !loadedStampIndex { - r.size.Add(1) + shouldIncReserveSize = true } return nil }) + if err != nil { + return err + } + if shouldIncReserveSize { + r.size.Add(1) + } + if shouldDecrReserveSize { + r.size.Add(-1) + } + return nil } func (r *Reserve) deleteWithStamp(s transaction.Store, oldBatchRadiusItem *BatchRadiusItem, sameAddressOldChunkStamp swarm.Stamp) error { diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 2495fa35f20..3f7807f257d 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -158,6 +158,7 @@ func TestSameChunkAddress(t *testing.T) { binBinIDs := make(map[uint8]uint64) t.Run("same stamp index and older timestamp", func(t *testing.T) { + size1 := r.Size() signer := getSigner(t) batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) @@ -174,9 +175,14 @@ func TestSameChunkAddress(t *testing.T) { if !errors.Is(err, storage.ErrOverwriteNewerChunk) { t.Fatal("expected error") } + size2 := r.Size() + if size2-size1 != 1 { + t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + } }) t.Run("different stamp index and older timestamp", func(t *testing.T) { + size1 := r.Size() signer := getSigner(t) batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) @@ -193,6 +199,10 @@ func TestSameChunkAddress(t *testing.T) { if !errors.Is(err, storage.ErrOverwriteNewerChunk) { t.Fatal("expected error") } + size2 := r.Size() + if size2-size1 != 1 { + t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + } }) replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, ch1BinID, ch2BinID uint64) { @@ -233,6 +243,7 @@ func TestSameChunkAddress(t *testing.T) { } t.Run("same stamp index and newer timestamp", func(t *testing.T) { + size1 := r.Size() signer := getSigner(t) batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) @@ -242,9 +253,14 @@ func TestSameChunkAddress(t *testing.T) { bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) binBinIDs[bin] += 2 replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin]) + size2 := r.Size() + if size2-size1 != 1 { + t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + } }) t.Run("different stamp index and newer timestamp", func(t *testing.T) { + size1 := r.Size() signer := getSigner(t) batch := postagetesting.MustNewBatch() s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) @@ -254,9 +270,14 @@ func TestSameChunkAddress(t *testing.T) { bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) binBinIDs[bin] += 2 replace(t, ch1, ch2, binBinIDs[bin]-1, binBinIDs[bin]) + size2 := r.Size() + if size2-size1 != 1 { + t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + } }) t.Run("not a soc and newer timestamp", func(t *testing.T) { + size1 := r.Size() batch := postagetesting.MustNewBatch() ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 7)) ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 8)) @@ -284,6 +305,11 @@ func TestSameChunkAddress(t *testing.T) { if !bytes.Equal(ch.Data(), ch1.Data()) { t.Fatalf("expected chunk data to not be updated") } + + size2 := r.Size() + if size2-size1 != 1 { + t.Fatalf("expected reserve size to increase by 2, got %d", size2-size1) + } }) t.Run("chunk with different batchID remains untouched", func(t *testing.T) { @@ -327,6 +353,8 @@ func TestSameChunkAddress(t *testing.T) { } } + size1 := r.Size() + // soc signer := getSigner(t) batch := postagetesting.MustNewBatch() @@ -350,7 +378,76 @@ func TestSameChunkAddress(t *testing.T) { t.Fatalf("expected chunk addresses to be the same") } noReplace(ch1, ch2) + size2 := r.Size() + if size2-size1 != 4 { + t.Fatalf("expected reserve size to increase by 4, got %d", size2-size1) + } }) + + t.Run("same address but index collision with different chunk", func(t *testing.T) { + size1 := r.Size() + batch := postagetesting.MustNewBatch() + ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0)) + err = r.Put(ctx, ch1) + if err != nil { + t.Fatal(err) + } + bin1 := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes()) + binBinIDs[bin1] += 1 + ch1BinID := binBinIDs[bin1] + ch1StampHash, err := ch1.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + signer := getSigner(t) + s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer) + ch2 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1)) + err = r.Put(ctx, ch2) + if err != nil { + t.Fatal(err) + } + bin2 := swarm.Proximity(baseAddr.Bytes(), ch2.Address().Bytes()) + binBinIDs[bin2] += 1 + ch2BinID := binBinIDs[bin2] + ch2StampHash, err := ch2.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: binBinIDs[bin1], StampHash: ch1StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: binBinIDs[bin2], StampHash: ch2StampHash}, false) + + s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer) + ch3 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 2)) + err = r.Put(ctx, ch3) + if err != nil { + t.Fatal(err) + } + binBinIDs[bin2] += 1 + ch3StampHash, err := ch3.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + ch3BinID := binBinIDs[bin2] + + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin1, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin2, BatchID: ch3.Stamp().BatchID(), Address: ch3.Address(), StampHash: ch3StampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin1, BinID: ch1BinID, StampHash: ch1StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch2BinID, StampHash: ch2StampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin2, BinID: ch3BinID, StampHash: ch3StampHash}, false) + + size2 := r.Size() + + // (ch1 + ch2) == 2 and then ch3 reduces reserve size by 1 + if size2-size1 != 1 { + t.Fatalf("expected reserve size to increase by 1, got %d", size2-size1) + } + }) + } func TestReplaceOldIndex(t *testing.T) {