Skip to content

Commit

Permalink
fix: asd
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 11, 2024
1 parent 9e2d66f commit 17aa920
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 49 deletions.
73 changes: 30 additions & 43 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package reserve

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
Expand Down Expand Up @@ -130,37 +129,33 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {

err = r.st.Run(ctx, func(s transaction.Store) error {

sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

var sameAddressSoc = false

// same chunk address, same batch
if sameAddressOldStamp != nil {
// index collision
if loadedStampIndex {

if chunkType == swarm.ChunkTypeSingleOwner {
sameAddressSoc = true
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

// index collision
if bytes.Equal(chunk.Stamp().Index(), sameAddressOldStamp.Index()) {
sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp)
if err != nil {
// same chunk address
if oldStampIndex.ChunkAddress.Equal(chunk.Address()) {

oldStamp, err := chunkstamp.LoadWithStampHash(s.IndexStore(), reserveScope, oldStampIndex.ChunkAddress, oldStampIndex.StampHash)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite same chunk. prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}

oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
Address: oldStampIndex.ChunkAddress,
BatchID: oldStampIndex.BatchID,
StampHash: oldStampIndex.StampHash,
}
// load item to get the binID
err = s.IndexStore().Get(oldBatchRadiusItem)
Expand All @@ -172,8 +167,8 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
err = errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: oldBatchRadiusItem.Bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveScope, sameAddressOldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, sameAddressOldStamp),
stampindex.Delete(s.IndexStore(), reserveScope, oldStamp),
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveScope, oldBatchRadiusItem.Address, oldStamp),
)
if err != nil {
return err
Expand Down Expand Up @@ -207,28 +202,15 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

if chunkType != swarm.ChunkTypeSingleOwner {
return nil
if chunkType == swarm.ChunkTypeSingleOwner {
r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk, false)
}

r.logger.Debug("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk, false)
return nil
}
}

oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

// different address, same batch, index collision
if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) {
prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
// An older (same or different) chunk with the same batchID and stamp index has been previously
// An older and different chunk with the same batchID and stamp index has been previously
// saved to the reserve. We must do the below before saving the new chunk:
// 1. Delete the old chunk from the chunkstore.
// 2. Delete the old chunk's stamp data.
Expand Down Expand Up @@ -281,7 +263,12 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

if sameAddressSoc {
has, err := s.ChunkStore().Has(ctx, chunk.Address())
if err != nil {
return err
}

if has && chunkType == swarm.ChunkTypeSingleOwner {
err = s.ChunkStore().Replace(ctx, chunk, true)
} else {
err = s.ChunkStore().Put(ctx, chunk)
Expand Down
11 changes: 5 additions & 6 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestSameChunkAddress(t *testing.T) {
})

t.Run("chunk with different batchID remains untouched", func(t *testing.T) {
noReplace := func(ch1, ch2 swarm.Chunk) {
checkReplace := func(ch1, ch2 swarm.Chunk, replace bool) {
t.Helper()
err = r.Put(ctx, ch1)
if err != nil {
Expand Down Expand Up @@ -355,13 +355,12 @@ func TestSameChunkAddress(t *testing.T) {
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binBinIDs[bin] - 1}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: bin, BinID: binBinIDs[bin]}, false)

// expect new chunk to NOT replace old one
ch, err := ts.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ch.Data(), ch1.Data()) {
t.Fatalf("expected chunk data to not be updated")
if replace && bytes.Equal(ch.Data(), ch1.Data()) {
t.Fatalf("expected chunk data to be updated")
}
}

Expand All @@ -379,7 +378,7 @@ func TestSameChunkAddress(t *testing.T) {
if !bytes.Equal(ch1.Address().Bytes(), ch2.Address().Bytes()) {
t.Fatalf("expected chunk addresses to be the same")
}
noReplace(ch1, ch2)
checkReplace(ch1, ch2, true)

// cac
batch = postagetesting.MustNewBatch()
Expand All @@ -389,7 +388,7 @@ func TestSameChunkAddress(t *testing.T) {
if !bytes.Equal(ch1.Address().Bytes(), ch2.Address().Bytes()) {
t.Fatalf("expected chunk addresses to be the same")
}
noReplace(ch1, ch2)
checkReplace(ch1, ch2, false)
size2 := r.Size()
if size2-size1 != 4 {
t.Fatalf("expected reserve size to increase by 4, got %d", size2-size1)
Expand Down

0 comments on commit 17aa920

Please sign in to comment.