From 4cf5ab6f83a54aeff1f6b923dce1e0dcb1458f0e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 11 Nov 2024 18:24:15 +0300 Subject: [PATCH] fix(gsoc): improvements (#4899) --- pkg/api/api.go | 3 +- pkg/api/gsoc.go | 7 ++- pkg/gsoc/gsoc.go | 34 +++++------ pkg/gsoc/gsoc_test.go | 8 +-- pkg/postage/stamp_test.go | 4 +- pkg/pusher/pusher.go | 23 +++---- pkg/pusher/pusher_test.go | 2 +- pkg/pushsync/pushsync.go | 2 +- pkg/soc/utils.go | 34 ----------- pkg/storage/storage.go | 33 ++++++++++ .../utils_test.go => storage/storage_test.go} | 8 +-- pkg/storage/testing/chunk.go | 5 +- pkg/storer/internal/chunkstore/chunkstore.go | 27 +------- .../internal/chunkstore/chunkstore_test.go | 61 ------------------- pkg/storer/internal/reserve/reserve.go | 36 ++++++++--- pkg/storer/reserve_test.go | 2 +- pkg/storer/sample_test.go | 4 +- 17 files changed, 112 insertions(+), 181 deletions(-) delete mode 100644 pkg/soc/utils.go rename pkg/{soc/utils_test.go => storage/storage_test.go} (92%) diff --git a/pkg/api/api.go b/pkg/api/api.go index b67c916e388..b226a5cff24 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -48,7 +48,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/settlement/swap" "github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook" "github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20" - "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/status" "github.com/ethersphere/bee/v2/pkg/steward" storage "github.com/ethersphere/bee/v2/pkg/storage" @@ -687,7 +686,7 @@ type putterSessionWrapper struct { } func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error { - idAddress, err := soc.IdentityAddress(chunk) + idAddress, err := storage.IdentityAddress(chunk) if err != nil { return err } diff --git a/pkg/api/gsoc.go b/pkg/api/gsoc.go index ea9aad5271e..60d048ffdc0 100644 --- a/pkg/api/gsoc.go +++ b/pkg/api/gsoc.go @@ -18,8 +18,9 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("gsoc_subscribe").Build() paths := struct { - Address []byte `map:"address" validate:"required"` + Address swarm.Address `map:"address,resolve" validate:"required"` }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { response("invalid path params", logger, w) return @@ -43,7 +44,7 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { go s.gsocListeningWs(conn, paths.Address) } -func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) { +func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) { defer s.wsWg.Done() var ( @@ -56,7 +57,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) { ticker.Stop() _ = conn.Close() }() - cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) { + cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) { select { case dataC <- m: case <-gone: diff --git a/pkg/gsoc/gsoc.go b/pkg/gsoc/gsoc.go index 91d5dcf925d..6497c9b0c63 100644 --- a/pkg/gsoc/gsoc.go +++ b/pkg/gsoc/gsoc.go @@ -12,14 +12,18 @@ import ( "github.com/ethersphere/bee/v2/pkg/swarm" ) +// Handler defines code to be executed upon reception of a GSOC sub message. +// it is used as a parameter definition. +type Handler func([]byte) + type Listener interface { - Subscribe(address [32]byte, handler Handler) (cleanup func()) + Subscribe(address swarm.Address, handler Handler) (cleanup func()) Handle(c *soc.SOC) Close() error } type listener struct { - handlers map[[32]byte][]*Handler + handlers map[string][]*Handler handlersMu sync.Mutex quit chan struct{} logger log.Logger @@ -29,26 +33,26 @@ type listener struct { func New(logger log.Logger) Listener { return &listener{ logger: logger, - handlers: make(map[[32]byte][]*Handler), + handlers: make(map[string][]*Handler), quit: make(chan struct{}), } } // Subscribe allows the definition of a Handler func on a specific GSOC address. -func (l *listener) Subscribe(address [32]byte, handler Handler) (cleanup func()) { +func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) { l.handlersMu.Lock() defer l.handlersMu.Unlock() - l.handlers[address] = append(l.handlers[address], &handler) + l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler) return func() { l.handlersMu.Lock() defer l.handlersMu.Unlock() - h := l.handlers[address] + h := l.handlers[address.ByteString()] for i := 0; i < len(h); i++ { if h[i] == &handler { - l.handlers[address] = append(h[:i], h[i+1:]...) + l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...) return } } @@ -61,13 +65,11 @@ func (l *listener) Handle(c *soc.SOC) { if err != nil { return // no handler } - h := l.getHandlers([32]byte(addr.Bytes())) + h := l.getHandlers(addr) if h == nil { return // no handler } - l.logger.Info("new incoming GSOC message", - "GSOC Address", addr, - "wrapped chunk address", c.WrappedChunk().Address()) + l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address()) for _, hh := range h { go func(hh Handler) { @@ -76,11 +78,11 @@ func (l *listener) Handle(c *soc.SOC) { } } -func (p *listener) getHandlers(address [32]byte) []*Handler { +func (p *listener) getHandlers(address swarm.Address) []*Handler { p.handlersMu.Lock() defer p.handlersMu.Unlock() - return p.handlers[address] + return p.handlers[address.ByteString()] } func (l *listener) Close() error { @@ -88,11 +90,7 @@ func (l *listener) Close() error { l.handlersMu.Lock() defer l.handlersMu.Unlock() - l.handlers = make(map[[32]byte][]*Handler) //unset handlers on shutdown + l.handlers = make(map[string][]*Handler) //unset handlers on shutdown return nil } - -// Handler defines code to be executed upon reception of a GSOC sub message. -// it is used as a parameter definition. -type Handler func([]byte) diff --git a/pkg/gsoc/gsoc_test.go b/pkg/gsoc/gsoc_test.go index 989225df068..dfbe8e03a5c 100644 --- a/pkg/gsoc/gsoc_test.go +++ b/pkg/gsoc/gsoc_test.go @@ -21,7 +21,7 @@ func TestRegister(t *testing.T) { t.Parallel() var ( - g = gsoc.New(log.NewLogger("test")) + g = gsoc.New(log.Noop) h1Calls = 0 h2Calls = 0 h3Calls = 0 @@ -52,8 +52,8 @@ func TestRegister(t *testing.T) { msgChan <- struct{}{} } ) - _ = g.Subscribe([32]byte(address1.Bytes()), h1) - _ = g.Subscribe([32]byte(address2.Bytes()), h2) + _ = g.Subscribe(address1, h1) + _ = g.Subscribe(address2, h2) ch1, _ := cac.New(payload1) socCh1 := soc.New(socId1, ch1) @@ -74,7 +74,7 @@ func TestRegister(t *testing.T) { ensureCalls(t, &h2Calls, 0) // register another handler on the first address - cleanup := g.Subscribe([32]byte(address1.Bytes()), h3) + cleanup := g.Subscribe(address1, h3) g.Handle(socCh1) diff --git a/pkg/postage/stamp_test.go b/pkg/postage/stamp_test.go index b1547b6581c..948e5311686 100644 --- a/pkg/postage/stamp_test.go +++ b/pkg/postage/stamp_test.go @@ -14,7 +14,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing" - "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing" ) @@ -104,7 +104,7 @@ func TestValidStamp(t *testing.T) { // stamp on execution ch := chunktesting.GenerateTestRandomChunk() - idAddress, err := soc.IdentityAddress(ch) + idAddress, err := storage.IdentityAddress(ch) if err != nil { t.Fatal(err) } diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 0945bdee05e..71ffed77ac3 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -18,7 +18,6 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" "github.com/ethersphere/bee/v2/pkg/pushsync" - "github.com/ethersphere/bee/v2/pkg/soc" storage "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/topology" @@ -36,6 +35,8 @@ type Op struct { Err chan error Direct bool Span opentracing.Span + + identityAddress swarm.Address } type OpChan <-chan *Op @@ -215,10 +216,12 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { for { select { case op := <-cc: - idAddress, err := soc.IdentityAddress(op.Chunk) + idAddress, err := storage.IdentityAddress(op.Chunk) if err != nil { op.Err <- err + continue } + op.identityAddress = idAddress if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) { if op.Direct { select { @@ -245,12 +248,8 @@ func (s *Service) chunksWorker(warmupTime time.Duration) { func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) { loggerV1 := logger.V(1).Build() - idAddress, err := soc.IdentityAddress(op.Chunk) - if err != nil { - return true, err - } - defer s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID()) + defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) if _, err := s.validStamp(op.Chunk); err != nil { loggerV1.Warning( @@ -278,7 +277,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( return true, err } case errors.Is(err, pushsync.ErrShallowReceipt): - if retry := s.shallowReceipt(idAddress); retry { + if retry := s.shallowReceipt(op.identityAddress); retry { return true, err } if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil { @@ -300,13 +299,11 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error { loggerV1 := logger.V(1).Build() - idAddress, err := soc.IdentityAddress(op.Chunk) - if err != nil { - return err - } + + var err error defer func() { - s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID()) + s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) select { case op.Err <- err: default: diff --git a/pkg/pusher/pusher_test.go b/pkg/pusher/pusher_test.go index 3925e80ec31..fbdfb3a9acd 100644 --- a/pkg/pusher/pusher_test.go +++ b/pkg/pusher/pusher_test.go @@ -333,7 +333,7 @@ func TestPusherRetryShallow(t *testing.T) { // generate a chunk at PO 1 with closestPeer, meaning that we get a // receipt which is shallower than the pivot peer's depth, resulting // in retries - chunk := testingc.GenerateTestRandomChunkAt(t, closestPeer, 1) + chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, 1) storer.chunks <- chunk diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 4c7983660cf..94f3c8868af 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -361,7 +361,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo sentErrorsLeft = maxPushErrors } - idAddress, err := soc.IdentityAddress(ch) + idAddress, err := storage.IdentityAddress(ch) if err != nil { return nil, err } diff --git a/pkg/soc/utils.go b/pkg/soc/utils.go deleted file mode 100644 index 6ca3364c967..00000000000 --- a/pkg/soc/utils.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2024 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package soc - -import "github.com/ethersphere/bee/v2/pkg/swarm" - -// IdentityAddress returns the internally used address for the chunk -// since the single owner chunk address is not a unique identifier for the chunk, -// but hashing the soc address and the wrapped chunk address is. -// it is used in the reserve sampling and other places where a key is needed to represent a chunk. -func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) { - // check the chunk is single owner chunk or cac - if sch, err := FromChunk(chunk); err == nil { - socAddress, err := sch.Address() - if err != nil { - return swarm.ZeroAddress, err - } - h := swarm.NewHasher() - _, err = h.Write(socAddress.Bytes()) - if err != nil { - return swarm.ZeroAddress, err - } - _, err = h.Write(sch.WrappedChunk().Address().Bytes()) - if err != nil { - return swarm.ZeroAddress, err - } - - return swarm.NewAddress(h.Sum(nil)), nil - } - - return chunk.Address(), nil -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 1373e23e4e9..312364e2676 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -18,6 +18,7 @@ import ( var ( ErrOverwriteNewerChunk = errors.New("overwriting chunk with newer timestamp") + ErrUnknownChunkType = errors.New("unknown chunk type") ) // Result represents the item returned by the read operation, which returns @@ -293,3 +294,35 @@ func ChunkType(ch swarm.Chunk) swarm.ChunkType { } return swarm.ChunkTypeUnspecified } + +// IdentityAddress returns the internally used address for the chunk +// since the single owner chunk address is not a unique identifier for the chunk, +// but hashing the soc address and the wrapped chunk address is. +// it is used in the reserve sampling and other places where a key is needed to represent a chunk. +func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) { + + if cac.Valid(chunk) { + return chunk.Address(), nil + } + + // check the chunk is single owner chunk or cac + if sch, err := soc.FromChunk(chunk); err == nil { + socAddress, err := sch.Address() + if err != nil { + return swarm.ZeroAddress, err + } + h := swarm.NewHasher() + _, err = h.Write(socAddress.Bytes()) + if err != nil { + return swarm.ZeroAddress, err + } + _, err = h.Write(sch.WrappedChunk().Address().Bytes()) + if err != nil { + return swarm.ZeroAddress, err + } + + return swarm.NewAddress(h.Sum(nil)), nil + } + + return swarm.ZeroAddress, fmt.Errorf("identity address failed on chunk %s: %w", chunk, ErrUnknownChunkType) +} diff --git a/pkg/soc/utils_test.go b/pkg/storage/storage_test.go similarity index 92% rename from pkg/soc/utils_test.go rename to pkg/storage/storage_test.go index a86e79083cb..6ca04a8342f 100644 --- a/pkg/soc/utils_test.go +++ b/pkg/storage/storage_test.go @@ -1,7 +1,7 @@ // Copyright 2024 The Swarm Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package soc_test +package storage_test import ( "encoding/hex" @@ -10,10 +10,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/swarm" ) -// TestIdentityAddress tests the IdentityAddress function. func TestIdentityAddress(t *testing.T) { t.Run("single owner chunk", func(t *testing.T) { // Create a single owner chunk (SOC) @@ -43,7 +43,7 @@ func TestIdentityAddress(t *testing.T) { t.Fatal(err) } - idAddr, err := soc.IdentityAddress(schChunk) + idAddr, err := storage.IdentityAddress(schChunk) if err != nil { t.Fatalf("IdentityAddress returned error: %v", err) } @@ -66,7 +66,7 @@ func TestIdentityAddress(t *testing.T) { } // Call IdentityAddress with the CAC - addr, err := soc.IdentityAddress(cacChunk) + addr, err := storage.IdentityAddress(cacChunk) if err != nil { t.Fatalf("IdentityAddress returned error: %v", err) } diff --git a/pkg/storage/testing/chunk.go b/pkg/storage/testing/chunk.go index f51c1079974..c5e83f845aa 100644 --- a/pkg/storage/testing/chunk.go +++ b/pkg/storage/testing/chunk.go @@ -91,10 +91,13 @@ func GenerateTestRandomChunkAt(tb testing.TB, target swarm.Address, po int) swar addr := swarm.RandAddressAt(tb, target, po) stamp := postagetesting.MustNewStamp() return swarm.NewChunk(addr, data).WithStamp(stamp) + } // GenerateTestRandomChunkAt generates an invalid (!) chunk with address of proximity order po wrt target. -func GenerateValidRandomChunkAt(target swarm.Address, po int) swarm.Chunk { +func GenerateValidRandomChunkAt(tb testing.TB, target swarm.Address, po int) swarm.Chunk { + tb.Helper() + data := make([]byte, swarm.ChunkSize) var ch swarm.Chunk diff --git a/pkg/storer/internal/chunkstore/chunkstore.go b/pkg/storer/internal/chunkstore/chunkstore.go index f17eed79af3..67fee1e6d77 100644 --- a/pkg/storer/internal/chunkstore/chunkstore.go +++ b/pkg/storer/internal/chunkstore/chunkstore.go @@ -71,9 +71,8 @@ func Has(_ context.Context, r storage.Reader, addr swarm.Address) (bool, error) func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm.Chunk) error { var ( - rIdx = &RetrievalIndexItem{Address: ch.Address()} - loc sharky.Location - inserted bool + rIdx = &RetrievalIndexItem{Address: ch.Address()} + loc sharky.Location ) err := s.Get(rIdx) switch { @@ -86,31 +85,11 @@ func Put(ctx context.Context, s storage.IndexStore, sh storage.Sharky, ch swarm. } rIdx.Location = loc rIdx.Timestamp = uint64(time.Now().Unix()) - inserted = true case err != nil: return fmt.Errorf("chunk store: failed to read: %w", err) } - // SOC will be replaced in the chunk store if it is already stored with the newer payload. - // Pull sync should sync the new SOC payload with the new stamp. - // TODO: remove this condition when postage stamping is refactored for GSOC. - chunkType := storage.ChunkType(ch) - if !inserted && chunkType == swarm.ChunkTypeSingleOwner { - // replace old payload - err = sh.Release(ctx, rIdx.Location) - if err != nil { - return fmt.Errorf("chunkstore: failed to release sharky location: %w", err) - } - - loc, err := sh.Write(ctx, ch.Data()) - if err != nil { - return fmt.Errorf("chunk store: write to sharky failed: %w", err) - } - rIdx.Location = loc - rIdx.Timestamp = uint64(time.Now().Unix()) - } else { - rIdx.RefCnt++ - } + rIdx.RefCnt++ return s.Put(rIdx) } diff --git a/pkg/storer/internal/chunkstore/chunkstore_test.go b/pkg/storer/internal/chunkstore/chunkstore_test.go index c86fbbb6b91..9e30c1af876 100644 --- a/pkg/storer/internal/chunkstore/chunkstore_test.go +++ b/pkg/storer/internal/chunkstore/chunkstore_test.go @@ -5,7 +5,6 @@ package chunkstore_test import ( - "bytes" "context" "errors" "fmt" @@ -14,9 +13,7 @@ import ( "os" "testing" - "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/sharky" - "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/storage" @@ -339,64 +336,6 @@ func TestChunkStore(t *testing.T) { } }) - // TODO: remove this when postage stamping is refactored for GSOC. - t.Run("put two SOCs with different payloads", func(t *testing.T) { - key, _ := crypto.GenerateSecp256k1Key() - signer := crypto.NewDefaultSigner(key) - - // chunk data to upload - chunk1 := chunktest.FixtureChunk("7000") - chunk2 := chunktest.FixtureChunk("0033") - id := make([]byte, swarm.HashSize) - s1 := soc.New(id, chunk1) - s2 := soc.New(id, chunk2) - sch1, err := s1.Sign(signer) - if err != nil { - t.Fatal(err) - } - sch1 = sch1.WithStamp(chunk1.Stamp()) - sch2, err := s2.Sign(signer) - if err != nil { - t.Fatal(err) - } - sch2 = sch2.WithStamp(chunk2.Stamp()) - - // Put the first SOC into the chunk store - err = st.Run(context.Background(), func(s transaction.Store) error { - return s.ChunkStore().Put(context.TODO(), sch1) - }) - if err != nil { - t.Fatalf("failed putting first single owner chunk: %v", err) - } - - // Put the second SOC into the chunk store - err = st.Run(context.Background(), func(s transaction.Store) error { - return s.ChunkStore().Put(context.TODO(), sch2) - }) - if err != nil { - t.Fatalf("failed putting second single owner chunk: %v", err) - } - - // Retrieve the chunk from the chunk store - var retrievedChunk swarm.Chunk - err = st.Run(context.Background(), func(s transaction.Store) error { - retrievedChunk, err = s.ChunkStore().Get(context.TODO(), sch1.Address()) - return err - }) - if err != nil { - t.Fatalf("failed retrieving chunk: %v", err) - } - schRetrieved, err := soc.FromChunk(retrievedChunk) - if err != nil { - t.Fatalf("failed converting chunk to SOC: %v", err) - } - - // Verify that the retrieved chunk contains the latest payload - if !bytes.Equal(chunk2.Data(), schRetrieved.WrappedChunk().Data()) { - t.Fatalf("expected payload %s, got %s", chunk2.Data(), schRetrieved.WrappedChunk().Data()) - } - }) - t.Run("close store", func(t *testing.T) { err := st.Close() if err != nil { diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 853454034d8..f45df03ea41 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -129,25 +129,27 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { var shouldIncReserveSize bool err = r.st.Run(ctx, func(s transaction.Store) error { - oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) - if err != nil { - return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) - } sameAddressOldStamp, err := chunkstamp.LoadWithBatchID(s.IndexStore(), reserveScope, chunk.Address(), chunk.Stamp().BatchID()) if err != nil && !errors.Is(err, storage.ErrNotFound) { return err } + var sameAddressSoc = false + // same chunk address, same batch if sameAddressOldStamp != nil { - sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) - if err != nil { - return err + + if chunkType == swarm.ChunkTypeSingleOwner { + sameAddressSoc = true } - // same index - if bytes.Equal(sameAddressOldStamp.Index(), chunk.Stamp().Index()) { + // index collision + if bytes.Equal(chunk.Stamp().Index(), sameAddressOldStamp.Index()) { + sameAddressOldStampIndex, err := stampindex.Load(s.IndexStore(), reserveScope, sameAddressOldStamp) + if err != nil { + return err + } prev := binary.BigEndian.Uint64(sameAddressOldStampIndex.StampTimestamp) curr := binary.BigEndian.Uint64(chunk.Stamp().Timestamp()) if prev >= curr { @@ -214,6 +216,11 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { } } + oldStampIndex, loadedStampIndex, err := stampindex.LoadOrStore(s.IndexStore(), reserveScope, chunk) + if err != nil { + return fmt.Errorf("load or store stamp index for chunk %v has fail: %w", chunk, err) + } + // different address, same batch, index collision if loadedStampIndex && !chunk.Address().Equal(oldStampIndex.ChunkAddress) { prev := binary.BigEndian.Uint64(oldStampIndex.StampTimestamp) @@ -269,12 +276,21 @@ func (r *Reserve) Put(ctx context.Context, chunk swarm.Chunk) error { ChunkType: chunkType, StampHash: stampHash, }), - s.ChunkStore().Put(ctx, chunk), ) if err != nil { return err } + if sameAddressSoc { + err = s.ChunkStore().Replace(ctx, chunk) + } else { + err = s.ChunkStore().Put(ctx, chunk) + } + + if err != nil { + return err + } + if !loadedStampIndex { shouldIncReserveSize = true } diff --git a/pkg/storer/reserve_test.go b/pkg/storer/reserve_test.go index 79b38ac2ecd..c354f92cdbe 100644 --- a/pkg/storer/reserve_test.go +++ b/pkg/storer/reserve_test.go @@ -695,7 +695,7 @@ func TestNeighborhoodStats(t *testing.T) { putChunks := func(addr swarm.Address, startingRadius int, st *storer.DB) { putter := st.ReservePutter() for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(addr, startingRadius) + ch := chunk.GenerateValidRandomChunkAt(t, addr, startingRadius) err := putter.Put(context.Background(), ch) if err != nil { t.Fatal(err) diff --git a/pkg/storer/sample_test.go b/pkg/storer/sample_test.go index 2f97aaab13f..07f4e36cd6d 100644 --- a/pkg/storer/sample_test.go +++ b/pkg/storer/sample_test.go @@ -27,7 +27,7 @@ func TestReserveSampler(t *testing.T) { var chs []swarm.Chunk for po := 0; po < maxPO; po++ { for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(3, 2, false) + ch := chunk.GenerateValidRandomChunkAt(t, baseAddr, po).WithBatch(3, 2, false) if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC ch = chunk.GenerateTestRandomSoChunk(t, ch) } @@ -156,7 +156,7 @@ func TestReserveSamplerSisterNeighborhood(t *testing.T) { var chs []swarm.Chunk for po := startingRadius; po < maxPO; po++ { for i := 0; i < chunkCountPerPO; i++ { - ch := chunk.GenerateValidRandomChunkAt(baseAddr, po).WithBatch(3, 2, false) + ch := chunk.GenerateValidRandomChunkAt(t, baseAddr, po).WithBatch(3, 2, false) if rand.Intn(2) == 0 { // 50% chance to wrap CAC into SOC ch = chunk.GenerateTestRandomSoChunk(t, ch) }