Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: override same chunk #4749

Merged
merged 14 commits into from
Aug 26, 2024
10 changes: 9 additions & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/splitter"
filetest "github.com/ethersphere/bee/v2/pkg/file/testing"
"github.com/ethersphere/bee/v2/pkg/log"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
Expand Down Expand Up @@ -1411,6 +1411,14 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
return nil
}

func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
return nil

}

func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
33 changes: 33 additions & 0 deletions pkg/soc/testing/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ func (ms MockSOC) Chunk() swarm.Chunk {
return swarm.NewChunk(ms.Address(), append(ms.ID, append(ms.Signature, ms.WrappedChunk.Data()...)...))
}

// GenerateMockSocWithSigner generates a valid mocked SOC from given data and signer.
func GenerateMockSocWithSigner(t *testing.T, data []byte, signer crypto.Signer) *MockSOC {
t.Helper()

owner, err := signer.EthereumAddress()
if err != nil {
t.Fatal(err)
}
ch, err := cac.New(data)
if err != nil {
t.Fatal(err)
}

id := make([]byte, swarm.HashSize)
hasher := swarm.NewHasher()
_, err = hasher.Write(append(id, ch.Address().Bytes()...))
if err != nil {
t.Fatal(err)
}

signature, err := signer.Sign(hasher.Sum(nil))
if err != nil {
t.Fatal(err)
}

return &MockSOC{
ID: id,
Owner: owner.Bytes(),
Signature: signature,
WrappedChunk: ch,
}
}

// GenerateMockSOC generates a valid mocked SOC from given data.
func GenerateMockSOC(t *testing.T, data []byte) *MockSOC {
t.Helper()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type Hasser interface {
Has(context.Context, swarm.Address) (bool, error)
}

// Replacer is the interface that wraps the basic Replace method.
type Replacer interface {
// Replace a chunk in the store.
Replace(context.Context, swarm.Chunk) error
}

// PutterFunc type is an adapter to allow the use of
// ChunkStore as Putter interface. If f is a function
// with the appropriate signature, PutterFunc(f) is a
Expand Down Expand Up @@ -70,6 +76,7 @@ type ChunkStore interface {
Putter
Deleter
Hasser
Replacer

// Iterate over chunks in no particular order.
Iterate(context.Context, IterateChunkFn) error
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"sync"

storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -77,6 +77,16 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
return nil
}

func (c *ChunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()

chunkCount := c.chunks[ch.Address().ByteString()]
chunkCount.chunk = ch
c.chunks[ch.Address().ByteString()] = chunkCount
return nil
}

func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
21 changes: 21 additions & 0 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
return s.Put(rIdx)
}

func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
rIdx := &RetrievalIndexItem{Address: ch.Address()}
err := s.Get(rIdx)
if err != nil {
return fmt.Errorf("chunk store: failed to read retrievalIndex for address %s: %w", ch.Address(), err)
}

err = sh.Release(ctx, rIdx.Location)
if err != nil {
return fmt.Errorf("chunkstore: failed to release sharky location: %w", err)
}

loc, err := sh.Write(ctx, ch.Data())
if err != nil {
return fmt.Errorf("chunk store: write to sharky failed: %w", err)
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
return s.Put(rIdx)
}

func Delete(ctx context.Context, s storage.IndexStore, sh storage.Sharky, addr swarm.Address) error {
rIdx := &RetrievalIndexItem{Address: addr}
err := s.Get(rIdx)
Expand Down
87 changes: 87 additions & 0 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,93 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
}

sameAddressOldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}

// same address
if sameAddressOldChunkStamp != nil {
sameAddressOldStampIndex, err := stampindex.LoadWithStamp(s.IndexStore(), reserveNamespace, sameAddressOldChunkStamp)
nugaon marked this conversation as resolved.
Show resolved Hide resolved
if err != nil && !errors.Is(err, storage.ErrNotFound) {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
return err
}
prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
}
if loadedStamp {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
}

// load item to get the binID
oldBatchRadiusItem := &BatchRadiusItem{
Bin: bin,
Address: chunk.Address(),
BatchID: sameAddressOldStampIndex.BatchID,
StampHash: sameAddressOldStampIndex.StampHash,
}
err = s.IndexStore().Get(oldBatchRadiusItem)
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

err = errors.Join(
s.IndexStore().Delete(oldBatchRadiusItem),
s.IndexStore().Delete(&ChunkBinItem{Bin: bin, BinID: oldBatchRadiusItem.BinID}),
stampindex.Delete(s.IndexStore(), reserveNamespace, swarm.NewChunk(chunk.Address(), nil).WithStamp(sameAddressOldChunkStamp)),
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
chunkstamp.DeleteWithStamp(s.IndexStore(), reserveNamespace, chunk.Address(), sameAddressOldChunkStamp),
)
if err != nil {
return err
}

err = stampindex.Store(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return err
}

err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return err
}

err = s.IndexStore().Put(&BatchRadiusItem{
Bin: bin,
BinID: oldBatchRadiusItem.BinID,
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
StampHash: stampHash,
})
if err != nil {
return err
}

err = s.IndexStore().Put(&ChunkBinItem{
Bin: bin,
BinID: oldBatchRadiusItem.BinID,
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
ChunkType: storage.ChunkType(chunk),
StampHash: stampHash,
})
if err != nil {
return err
}

if storage.ChunkType(chunk) != swarm.ChunkTypeSingleOwner {
return nil
}
r.logger.Warning("replacing soc in chunkstore", "address", chunk.Address())
return s.ChunkStore().Replace(ctx, chunk)
}

// different address
if loadedStamp {
prev := binary.BigEndian.Uint64(oldItem.StampTimestamp)
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
Expand Down
145 changes: 145 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing"
soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing"
"github.com/ethersphere/bee/v2/pkg/storage"
chunk "github.com/ethersphere/bee/v2/pkg/storage/testing"
"github.com/ethersphere/bee/v2/pkg/storer/internal"
Expand Down Expand Up @@ -135,6 +137,149 @@ func TestReserveChunkType(t *testing.T) {
}
}

func TestSameChunkAddress(t *testing.T) {
t.Parallel()

ctx := context.Background()
baseAddr := swarm.RandAddress(t)

ts := internal.NewInmemStorage()

r, err := reserve.New(
baseAddr,
ts,
0, kademlia.NewTopologyDriver(),
log.Noop,
)
if err != nil {
t.Fatal(err)
}

privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
signer := crypto.NewDefaultSigner(privKey)

t.Run("same stamp index and older timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1))
s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer)
ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
})

t.Run("different stamp index and older timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1))
s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer)
ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0))
err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
})

replace := func(t *testing.T, ch1 swarm.Chunk, ch2 swarm.Chunk, binID uint64) {
t.Helper()

err := r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}

err = r.Put(ctx, ch2)
if err != nil {
t.Fatal(err)
}

ch1StampHash, err := ch1.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

ch2StampHash, err := ch2.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

bin := swarm.Proximity(baseAddr.Bytes(), ch1.Address().Bytes())
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: bin, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false)
chunkBinItem := &reserve.ChunkBinItem{Bin: bin, BinID: binID}
err = ts.IndexStore().Get(chunkBinItem)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(chunkBinItem.StampHash, ch2StampHash) {
t.Fatalf("expected chunk bin item to be updated")
}

ch, err := ts.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ch.Data(), ch2.Data()) {
t.Fatalf("expected chunk data to be updated")
}
}

t.Run("same stamp index and newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer)
ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1))
replace(t, ch1, ch2, 3)
})

t.Run("different stamp index and newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
s1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer)
ch1 := s1.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
s2 := soctesting.GenerateMockSocWithSigner(t, []byte("update"), signer)
ch2 := s2.Chunk().WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1))
replace(t, ch1, ch2, 4)
})

t.Run("not a soc and newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1))
err := r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}

err = r.Put(ctx, ch2)
if err != nil {
t.Fatal(err)
}

ch, err := ts.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(ch.Data(), ch1.Data()) {
t.Fatalf("expected chunk data to not be updated")
}
})
}

func TestReplaceOldIndex(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading