From f2056a42d6a4dc362d4e53e7400fa3212ed6b27a Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 11 Nov 2024 21:41:00 +0300 Subject: [PATCH] fix: chunkstamp load with hash, replace to increase refCnt --- pkg/file/joiner/joiner_test.go | 2 +- pkg/storage/chunkstore.go | 2 +- .../inmemchunkstore/inmemchunkstore.go | 6 +- pkg/storer/internal/chunkstamp/chunkstamp.go | 38 ++++++++ .../internal/chunkstamp/chunkstamp_test.go | 16 ++- pkg/storer/internal/chunkstore/chunkstore.go | 5 +- pkg/storer/internal/reserve/reserve.go | 97 +++++++++---------- pkg/storer/internal/reserve/reserve_test.go | 78 +++++++++++++-- .../internal/transaction/transaction.go | 4 +- 9 files changed, 182 insertions(+), 66 deletions(-) diff --git a/pkg/file/joiner/joiner_test.go b/pkg/file/joiner/joiner_test.go index ce00ac8d782..15eddc23e3b 100644 --- a/pkg/file/joiner/joiner_test.go +++ b/pkg/file/joiner/joiner_test.go @@ -1405,7 +1405,7 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error { return nil } -func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error { +func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error { c.mu.Lock() defer c.mu.Unlock() c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp()) diff --git a/pkg/storage/chunkstore.go b/pkg/storage/chunkstore.go index 72f8b9ba784..68a9d10652a 100644 --- a/pkg/storage/chunkstore.go +++ b/pkg/storage/chunkstore.go @@ -42,7 +42,7 @@ type Hasser interface { // Replacer is the interface that wraps the basic Replace method. type Replacer interface { // Replace a chunk in the store. - Replace(context.Context, swarm.Chunk) error + Replace(context.Context, swarm.Chunk, bool) error } // PutterFunc type is an adapter to allow the use of diff --git a/pkg/storage/inmemchunkstore/inmemchunkstore.go b/pkg/storage/inmemchunkstore/inmemchunkstore.go index 7d49e63c279..3ec2b8e8a6d 100644 --- a/pkg/storage/inmemchunkstore/inmemchunkstore.go +++ b/pkg/storage/inmemchunkstore/inmemchunkstore.go @@ -77,13 +77,17 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error { return nil } -func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error { +func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk, emplace bool) error { c.mu.Lock() defer c.mu.Unlock() chunkCount := c.chunks[ch.Address().ByteString()] chunkCount.chunk = ch + if emplace { + chunkCount.count++ + } c.chunks[ch.Address().ByteString()] = chunkCount + return nil } diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 2f3857d2ed3..5334983c13a 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -181,6 +181,44 @@ func LoadWithBatchID(s storage.Reader, scope string, addr swarm.Address, batchID return stamp, nil } +// LoadWithBatchID returns swarm.Stamp related to the given address and batchID. +func LoadWithStampHash(s storage.Reader, scope string, addr swarm.Address, hash []byte) (swarm.Stamp, error) { + var stamp swarm.Stamp + + found := false + err := s.Iterate( + storage.Query{ + Factory: func() storage.Item { + return &Item{ + scope: []byte(scope), + address: addr, + } + }, + }, + func(res storage.Result) (bool, error) { + item := res.Entry.(*Item) + h, err := item.stamp.Hash() + if err != nil { + return false, err + } + if bytes.Equal(hash, h) { + stamp = item.stamp + found = true + return true, nil + } + return false, nil + }, + ) + if err != nil { + return nil, err + } + if !found { + return nil, fmt.Errorf("stamp not found for hash %x: %w", hash, storage.ErrNotFound) + } + + return stamp, nil +} + // Store creates new or updated an existing stamp index // record related to the given scope and chunk. func Store(s storage.IndexStore, scope string, chunk swarm.Chunk) error { diff --git a/pkg/storer/internal/chunkstamp/chunkstamp_test.go b/pkg/storer/internal/chunkstamp/chunkstamp_test.go index 1167a56f10a..a000cfb8694 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp_test.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp_test.go @@ -185,7 +185,7 @@ func TestStoreLoadDelete(t *testing.T) { } }) - t.Run("load stored chunk stamp with batch id", func(t *testing.T) { + t.Run("load stored chunk stamp with batch id and hash", func(t *testing.T) { want := chunk.Stamp() have, err := chunkstamp.LoadWithBatchID(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()) @@ -196,6 +196,20 @@ func TestStoreLoadDelete(t *testing.T) { if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" { t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff) } + + h, err := want.Hash() + if err != nil { + t.Fatal(err) + } + + have, err = chunkstamp.LoadWithStampHash(ts.IndexStore(), ns, chunk.Address(), h) + if err != nil { + t.Fatalf("LoadWithBatchID(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(want, have, cmp.AllowUnexported(postage.Stamp{})); diff != "" { + t.Fatalf("LoadWithBatchID(...): mismatch (-want +have):\n%s", diff) + } }) t.Run("delete stored stamp", func(t *testing.T) { diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index 67fee1e6d77..cc2743b97c4 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -94,7 +94,7 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. return s.Put(rIdx) } -func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { +func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk, emplace bool) error { rIdx := &RetrievalIndexItem{Address: ch.Address()} err := s.Get(rIdx) if err != nil { @@ -112,6 +112,9 @@ func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch sw } rIdx.Location = loc rIdx.Timestamp = uint64(time.Now().Unix()) + if emplace { + rIdx.RefCnt++ + } return s.Put(rIdx) } diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index f45df03ea41..47f64dfffd5 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -5,7 +5,6 @@ package reserve import ( - "bytes" "context" "encoding/binary" "encoding/hex" @@ -130,37 +129,40 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { err = r.st.Run(ctx, func(s transaction.Store) error { - sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID()) - if err != nil && !errors.Is(err, storage.ErrNotFound) { - return err + oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) + if err != nil { + return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) } - var sameAddressSoc = false + // index collision + if loadedStampIndex { - // same chunk address, same batch - if sameAddressOldStamp != nil { - - if chunkType == swarm.ChunkTypeSingleOwner { - sameAddressSoc = true + prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) + curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) + if prev >= curr { + return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) } - // index collision - if bytes.Equal(chunk.Stamp().Index(), sameAddressOldStamp.Index()) { - sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) + r.logger.Debug( + "replacing chunk stamp index", + "old_chunk", oldStampIndex.ChunkAddress, + "new_chunk", chunk.Address(), + "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), + ) + + // same chunk address + if oldStampIndex.ChunkAddress.Equal(chunk.Address()) { + + oldStamp, err := chunkstamp.LoadWithStampHash(s.IndexStore(), reserveScope, oldStampIndex.ChunkAddress, oldStampIndex.StampHash) if err != nil { return err } - prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev >= curr { - return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) - } oldBatchRadiusItem := &BatchRadiusItem{ Bin: bin, - Address: chunk.Address(), - BatchID: sameAddressOldStampIndex.BatchID, - StampHash: sameAddressOldStampIndex.StampHash, + Address: oldStampIndex.ChunkAddress, + BatchID: oldStampIndex.BatchID, + StampHash: oldStampIndex.StampHash, } // load item to get the binID err = s.IndexStore().Get(oldBatchRadiusItem) @@ -172,8 +174,8 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { err = errors.Join( s.IndexStore().Delete(oldBatchRadiusItem), s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}), - stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp), - chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp), + stampindex.Delete(s.IndexStore(), reserveScope, oldStamp), + chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, oldStamp), ) if err != nil { return err @@ -207,28 +209,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - if chunkType != swarm.ChunkTypeSingleOwner { - return nil + if chunkType == swarm.ChunkTypeSingleOwner { + r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) + return s.ChunkStore().Replace(ctx, chunk, false) } - r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address()) - return s.ChunkStore().Replace(ctx, chunk) + return nil } - } - oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) - if err != nil { - return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) - } - - // different address, same batch, index collision - if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) { - prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) - curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) - if prev >= curr { - return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk) - } - // An older (same or different) chunk with the same batchID and stamp index has been previously + // An older and different chunk with the same batchID and stamp index has been previously // saved to the reserve. We must do the below before saving the new chunk: // 1. Delete the old chunk from the chunkstore. // 2. Delete the old chunk's stamp data. @@ -240,13 +229,6 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return fmt.Errorf("failed removing older chunk %s: %w", oldStampIndex.ChunkAddress, err) } - r.logger.Warning( - "replacing chunk stamp index", - "old_chunk", oldStampIndex.ChunkAddress, - "new_chunk", chunk.Address(), - "batch_id", hex.EncodeToString(chunk.Stamp().BatchID()), - ) - // replace old stamp index. err = stampindex.Store(s.IndexStore(), reserveScope, chunk) if err != nil { @@ -281,8 +263,17 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { return err } - if sameAddressSoc { - err = s.ChunkStore().Replace(ctx, chunk) + var has bool + if chunkType == swarm.ChunkTypeSingleOwner { + has, err = s.ChunkStore().Has(ctx, chunk.Address()) + if err != nil { + return err + } + if has { + err = s.ChunkStore().Replace(ctx, chunk, true) + } else { + err = s.ChunkStore().Put(ctx, chunk) + } } else { err = s.ChunkStore().Put(ctx, chunk) } @@ -321,7 +312,7 @@ func (r *Reserve) Get(ctx context.Context, addr swarm.Address, batchID []byte, s return nil, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, addr, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, addr, stampHash) if err != nil { return nil, err } @@ -423,7 +414,7 @@ func RemoveChunkWithItem( ) error { var errs error - stamp, _ := chunkstamp.LoadWithBatchID(trx.IndexStore(), reserveScope, item.Address, item.BatchID) + stamp, _ := chunkstamp.LoadWithStampHash(trx.IndexStore(), reserveScope, item.Address, item.StampHash) if stamp != nil { errs = errors.Join( stampindex.Delete(trx.IndexStore(), reserveScope, stamp), @@ -473,7 +464,7 @@ func (r *Reserve) IterateChunks(startBin uint8, cb func(swarm.Chunk) (bool, erro return false, err } - stamp, err := chunkstamp.LoadWithBatchID(r.st.IndexStore(), reserveScope, item.Address, item.BatchID) + stamp, err := chunkstamp.LoadWithStampHash(r.st.IndexStore(), reserveScope, item.Address, item.StampHash) if err != nil { return false, err } diff --git a/pkg/storer/internal/reserve/reserve_test.go b/pkg/storer/internal/reserve/reserve_test.go index 15993fd7de8..b06c9658206 100644 --- a/pkg/storer/internal/reserve/reserve_test.go +++ b/pkg/storer/internal/reserve/reserve_test.go @@ -325,7 +325,7 @@ func TestSameChunkAddress(t *testing.T) { }) t.Run("chunk with different batchID remains untouched", func(t *testing.T) { - noReplace := func(ch1, ch2 swarm.Chunk) { + checkReplace := func(ch1, ch2 swarm.Chunk, replace bool) { t.Helper() err = r.Put(ctx, ch1) if err != nil { @@ -355,13 +355,12 @@ func TestSameChunkAddress(t *testing.T) { checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binBinIDs[bin] - 1}, false) checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binBinIDs[bin]}, false) - // expect new chunk to NOT replace old one ch, err := ts.ChunkStore().Get(ctx, ch2.Address()) if err != nil { t.Fatal(err) } - if !bytes.Equal(ch.Data(), ch1.Data()) { - t.Fatalf("expected chunk data to not be updated") + if replace && bytes.Equal(ch.Data(), ch1.Data()) { + t.Fatalf("expected chunk data to be updated") } } @@ -379,7 +378,7 @@ func TestSameChunkAddress(t *testing.T) { if !bytes.Equal(ch1.Address().Bytes(), ch2.Address().Bytes()) { t.Fatalf("expected chunk addresses to be the same") } - noReplace(ch1, ch2) + checkReplace(ch1, ch2, true) // cac batch = postagetesting.MustNewBatch() @@ -389,7 +388,7 @@ func TestSameChunkAddress(t *testing.T) { if !bytes.Equal(ch1.Address().Bytes(), ch2.Address().Bytes()) { t.Fatalf("expected chunk addresses to be the same") } - noReplace(ch1, ch2) + checkReplace(ch1, ch2, false) size2 := r.Size() if size2-size1 != 4 { t.Fatalf("expected reserve size to increase by 4, got %d", size2-size1) @@ -603,6 +602,73 @@ func TestEvict(t *testing.T) { } } +func TestEvictSOC(t *testing.T) { + t.Parallel() + + baseAddr := swarm.RandAddress(t) + ts := internal.NewInmemStorage() + + r, err := reserve.New( + baseAddr, + ts, + 0, kademlia.NewTopologyDriver(), + log.Noop, + ) + if err != nil { + t.Fatal(err) + } + + batch := postagetesting.MustNewBatch() + signer := getSigner(t) + + var chunks []swarm.Chunk + + for i := 0; i < 10; i++ { + ch := soctesting.GenerateMockSocWithSigner(t, []byte{byte(i)}, signer).Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, uint64(i), uint64(i))) + chunks = append(chunks, ch) + err := r.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + bin := swarm.Proximity(baseAddr.Bytes(), chunks[0].Address().Bytes()) + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, false) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, false) + checkChunk(t, ts, ch, false) + } + + _, err = r.EvictBatchBin(context.Background(), batch.ID, 1, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + checkChunk(t, ts, chunks[9], false) // chunk should still persist, eg refCnt > 0 + + evicted, err := r.EvictBatchBin(context.Background(), batch.ID, 10, swarm.MaxBins) + if err != nil { + t.Fatal(err) + } + if evicted != 9 { + t.Fatalf("wanted evicted count 10, got %d", evicted) + } + + for i, ch := range chunks { + stampHash, err := ch.Stamp().Hash() + if err != nil { + t.Fatal(err) + } + checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch.Stamp().BatchID(), Address: ch.Address(), StampHash: stampHash}, true) + checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: uint64(i + 1), StampHash: stampHash}, true) + checkChunk(t, ts, ch, true) + } +} + func TestEvictMaxCount(t *testing.T) { t.Parallel() diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index 2dc0c7c4bcd..7fb247da391 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -242,11 +242,11 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } -func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) { +func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk, emplace bool) (err error) { defer handleMetric("chunkstore_replace", c.metrics)(&err) unlock := c.lock(ch.Address()) defer unlock() - return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch) + return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch, emplace) } func (c *chunkStoreTrx) lock(addr swarm.Address) func() {