Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: override same chunk #4749

Merged
merged 14 commits into from
Aug 26, 2024
10 changes: 9 additions & 1 deletion pkg/file/joiner/joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/splitter"
filetest "github.com/ethersphere/bee/v2/pkg/file/testing"
"github.com/ethersphere/bee/v2/pkg/log"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
testingc "github.com/ethersphere/bee/v2/pkg/storage/testing"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
Expand Down Expand Up @@ -1411,6 +1411,14 @@ func (c *chunkStore) Put(_ context.Context, ch swarm.Chunk) error {
return nil
}

func (c *chunkStore) Replace(_ context.Context, ch swarm.Chunk) error {
c.mu.Lock()
defer c.mu.Unlock()
c.chunks[ch.Address().ByteString()] = swarm.NewChunk(ch.Address(), ch.Data()).WithStamp(ch.Stamp())
return nil

}

func (c *chunkStore) Has(_ context.Context, addr swarm.Address) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type Hasser interface {
Has(context.Context, swarm.Address) (bool, error)
}

// Replacer is the interface that wraps the basic Replace method.
type Replacer interface {
// Replace a chunk in the store.
Replace(context.Context, swarm.Chunk) error
}

// PutterFunc type is an adapter to allow the use of
// ChunkStore as Putter interface. If f is a function
// with the appropriate signature, PutterFunc(f) is a
Expand Down Expand Up @@ -70,6 +76,7 @@ type ChunkStore interface {
Putter
Deleter
Hasser
Replacer

// Iterate over chunks in no particular order.
Iterate(context.Context, IterateChunkFn) error
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/inmemchunkstore/inmemchunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"context"
"sync"

storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

Expand Down Expand Up @@ -77,6 +77,10 @@ func (c *ChunkStore) Delete(_ context.Context, addr swarm.Address) error {
return nil
}

func (c *ChunkStore) Replace(_ context.Context, _ swarm.Chunk) error {
return nil
}

func (c *ChunkStore) Iterate(_ context.Context, fn storage.IterateChunkFn) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
21 changes: 21 additions & 0 deletions pkg/storer/internal/chunkstore/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.
return s.Put(rIdx)
}

func Replace(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error {
rIdx := &RetrievalIndexItem{Address: ch.Address()}
err := s.Get(rIdx)
if err != nil {
return fmt.Errorf("chunk store: failed to read retrievalIndex for address %s: %w", ch.Address(), err)
}

err = sh.Release(ctx, rIdx.Location)
if err != nil {
return fmt.Errorf("chunkstore: failed to release sharky location: %w", err)
}

loc, err := sh.Write(ctx, ch.Data())
if err != nil {
return fmt.Errorf("chunk store: write to sharky failed: %w", err)
}
rIdx.Location = loc
rIdx.Timestamp = uint64(time.Now().Unix())
return s.Put(rIdx)
}

func Delete(ctx context.Context, s storage.IndexStore, sh storage.Sharky, addr swarm.Address) error {
rIdx := &RetrievalIndexItem{Address: addr}
err := s.Get(rIdx)
Expand Down
35 changes: 33 additions & 2 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
defer r.multx.Unlock(strconv.Itoa(int(bin)))

return r.st.Run(ctx, func(s transaction.Store) error {

// index collision (same or different chunk)
oldItem, loadedStamp, err := stampindex.LoadOrStore(s.IndexStore(), reserveNamespace, chunk)
if err != nil {
return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err)
Expand Down Expand Up @@ -152,6 +152,28 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
if err != nil {
return fmt.Errorf("failed updating stamp index: %w", err)
}
} else {
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
// same chunk with different index
oldChunkStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveNamespace, chunk.Address(), chunk.Stamp().BatchID())
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return err
}
if oldChunkStamp != nil {
prev := binary.BigEndian.Uint64(oldChunkStamp.Timestamp())
curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp())
if prev >= curr {
return fmt.Errorf("overwrite prev %d cur %d batch %s: %w", prev, curr, hex.EncodeToString(chunk.Stamp().BatchID()), storage.ErrOverwriteNewerChunk)
}
oldChunkStampHash, err := oldChunkStamp.Hash()
if err != nil {
return err
}
err = r.removeChunk(ctx, s, chunk.Address(), oldChunkStamp.BatchID(), oldChunkStampHash)
if err != nil {
return fmt.Errorf("failed removing older chunk %s: %w", oldItem.ChunkAddress, err)
}
// stampindex is already saved in LoadOrStore
}
}

err = chunkstamp.Store(s.IndexStore(), reserveNamespace, chunk)
Expand Down Expand Up @@ -186,7 +208,16 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error {
return err
}

err = s.ChunkStore().Put(ctx, chunk)
has, err := s.ChunkStore().Has(ctx, chunk.Address())
acha-bill marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if has {
r.logger.Warning("replacing chunk in chunkstore", "chunk", chunk.Address())
err = s.ChunkStore().Replace(ctx, chunk)
} else {
err = s.ChunkStore().Put(ctx, chunk)
}
if err != nil {
return err
}
Expand Down
98 changes: 98 additions & 0 deletions pkg/storer/internal/reserve/reserve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,104 @@ func TestReserveChunkType(t *testing.T) {
}
}

func TestSameChunkAddress(t *testing.T) {
t.Parallel()

ctx := context.Background()
baseAddr := swarm.RandAddress(t)

ts := internal.NewInmemStorage()

r, err := reserve.New(
baseAddr,
ts,
0, kademlia.NewTopologyDriver(),
log.Noop,
)
if err != nil {
t.Fatal(err)
}

t.Run("same stamp index and older timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
})

t.Run("different stamp index and older timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 0))

err = r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}
err = r.Put(ctx, ch2)
if !errors.Is(err, storage.ErrOverwriteNewerChunk) {
t.Fatal("expected error")
}
})

replace := func(t *testing.T, ch1, ch2 swarm.Chunk) {
t.Helper()

err := r.Put(ctx, ch1)
if err != nil {
t.Fatal(err)
}

err = r.Put(ctx, ch2)
if err != nil {
t.Fatal(err)
}

ch1StampHash, err := ch1.Stamp().Hash()
if err != nil {
t.Fatal(err)
}

ch2StampHash, err := ch2.Stamp().Hash()
if err != nil {
t.Fatal(err)
}
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch1.Stamp().BatchID(), Address: ch1.Address(), StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.BatchRadiusItem{Bin: 0, BatchID: ch2.Stamp().BatchID(), Address: ch2.Address(), StampHash: ch2StampHash}, false)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 3, StampHash: ch1StampHash}, true)
checkStore(t, ts.IndexStore(), &reserve.ChunkBinItem{Bin: 0, BinID: 4, StampHash: ch2StampHash}, false)

ch, err := ts.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ch.Data(), ch2.Data()) {
t.Fatalf("expected chunk data to be updated")
}
}

t.Run("same stamp index and newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 1))
replace(t, ch1, ch2)
})

t.Run("different stamp index and newer timestamp", func(t *testing.T) {
batch := postagetesting.MustNewBatch()
ch1 := chunk.GenerateTestRandomChunkAt(t, baseAddr, 0).WithStamp(postagetesting.MustNewFields(batch.ID, 0, 0))
ch2 := swarm.NewChunk(ch1.Address(), []byte("update")).WithStamp(postagetesting.MustNewFields(batch.ID, 1, 1))
replace(t, ch1, ch2)
})
}

func TestReplaceOldIndex(t *testing.T) {
t.Parallel()

Expand Down
7 changes: 7 additions & 0 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn)
return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn)
}

func (c *chunkStoreTrx) Replace(ctx context.Context, ch swarm.Chunk) (err error) {
defer handleMetric("chunkstore_replace", c.metrics)(&err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if I can remember to add this metric to one of my Grafana dashboards!

unlock := c.lock(ch.Address())
defer unlock()
return chunkstore.Replace(ctx, c.indexStore, c.sharkyTrx, ch)
}

func (c *chunkStoreTrx) lock(addr swarm.Address) func() {
// directly lock
if c.readOnly {
Expand Down
Loading