From bf3c78e5ccdd7dfb092f5a650908fa7d99937e77 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Mon, 22 Jan 2024 17:09:36 +0100 Subject: [PATCH 1/2] feat: stamper index assignment rule update --- pkg/postage/stamper.go | 24 +++++++++++++----------- pkg/postage/stamper_test.go | 30 +++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/pkg/postage/stamper.go b/pkg/postage/stamper.go index 5f32fa8ecd3..a13261fe2d8 100644 --- a/pkg/postage/stamper.go +++ b/pkg/postage/stamper.go @@ -7,6 +7,7 @@ package postage import ( "errors" "fmt" + "resenje.org/multex" "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/storage" @@ -15,8 +16,7 @@ import ( var ( // ErrBucketFull is the error when a collision bucket is full. - ErrBucketFull = errors.New("bucket full") - ErrOverwriteImmutableIndex = errors.New("immutable batch old index overwrite due to previous faulty save") + ErrBucketFull = errors.New("bucket full") ) // Stamper can issue stamps from the given address of chunk. @@ -30,31 +30,33 @@ type stamper struct { store storage.Store issuer *StampIssuer signer crypto.Signer + mu *multex.Multex } // NewStamper constructs a Stamper. func NewStamper(store storage.Store, issuer *StampIssuer, signer crypto.Signer) Stamper { - return &stamper{store, issuer, signer} + return &stamper{store, issuer, signer, multex.New()} } -// Stamp takes chunk, see if the chunk can included in the batch and +// Stamp takes chunk, see if the chunk can be included in the batch and // signs it with the owner of the batch of this Stamp issuer. func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) { + st.mu.Lock(addr.ByteString()) + defer st.mu.Unlock(addr.ByteString()) + item := &StampItem{ BatchID: st.issuer.data.BatchID, chunkAddress: addr, } switch err := st.store.Get(item); { case err == nil: - // The index should be in the past. It could happen that we encountered - // some error after assigning this index and did not save the issuer data. In - // this case we should assign a new index and update it. - if st.issuer.assigned(item.BatchIndex) { + if st.issuer.assigned(item.BatchIndex) || st.issuer.ImmutableFlag() { break - } else if st.issuer.ImmutableFlag() { - return nil, ErrOverwriteImmutableIndex } - fallthrough + item.BatchTimestamp = unixTime() + if err = st.store.Put(item); err != nil { + return nil, err + } case errors.Is(err, storage.ErrNotFound): item.BatchIndex, item.BatchTimestamp, err = st.issuer.increment(addr) if err != nil { diff --git a/pkg/postage/stamper_test.go b/pkg/postage/stamper_test.go index 1fc3b3d5c34..7fbc845e4de 100644 --- a/pkg/postage/stamper_test.go +++ b/pkg/postage/stamper_test.go @@ -106,11 +106,11 @@ func TestStamperStamping(t *testing.T) { } }) - t.Run("incorrect old index", func(t *testing.T) { + t.Run("reuse index but get new timestamp for mutable batch", func(t *testing.T) { st := newTestStampIssuerMutability(t, 1000, false) chunkAddr := swarm.RandAddress(t) bIdx := postage.ToBucket(st.BucketDepth(), chunkAddr) - index := postage.IndexToBytes(bIdx, 100) + index := postage.IndexToBytes(bIdx, 4) testItem := postage.NewStampItem(). WithBatchID(st.ID()). WithChunkAddress(chunkAddr). @@ -124,25 +124,37 @@ func TestStamperStamping(t *testing.T) { if err := stamp.Valid(chunkAddr, owner, 12, 8, true); err != nil { t.Fatalf("expected no error, got %v", err) } - if bytes.Equal(stamp.Index(), testItem.BatchIndex) { - t.Fatalf("expected index to be different, got %x", stamp.Index()) + if bytes.Equal(stamp.Timestamp(), testItem.BatchTimestamp) { + t.Fatalf("expected timestamp to be different, got %x", stamp.Index()) + } + if !bytes.Equal(stamp.Index(), testItem.BatchIndex) { + t.Fatalf("expected index to be the same, got %x", stamp.Index()) } }) - t.Run("incorrect old index immutable", func(t *testing.T) { + t.Run("reuse same index and timestamp for immutable batch", func(t *testing.T) { st := newTestStampIssuerMutability(t, 1000, true) chunkAddr := swarm.RandAddress(t) bIdx := postage.ToBucket(st.BucketDepth(), chunkAddr) - index := postage.IndexToBytes(bIdx, 100) + index := postage.IndexToBytes(bIdx, 4) testItem := postage.NewStampItem(). WithBatchID(st.ID()). WithChunkAddress(chunkAddr). WithBatchIndex(index) testSt := &testStore{Store: inmemstore.New(), stampItem: testItem} stamper := postage.NewStamper(testSt, st, signer) - _, err := stamper.Stamp(chunkAddr) - if !errors.Is(err, postage.ErrOverwriteImmutableIndex) { - t.Fatalf("got err %v, wanted %v", err, postage.ErrOverwriteImmutableIndex) + stamp, err := stamper.Stamp(chunkAddr) + if err != nil { + t.Fatal(err) + } + if err := stamp.Valid(chunkAddr, owner, 12, 8, true); err != nil { + t.Fatalf("expected no error, got %v", err) + } + if !bytes.Equal(stamp.Timestamp(), testItem.BatchTimestamp) { + t.Fatalf("expected timestamp to be the same, got %x", stamp.Index()) + } + if !bytes.Equal(stamp.Index(), testItem.BatchIndex) { + t.Fatalf("expected index to be the same, got %x", stamp.Index()) } }) From 234705ddc5ecca397668ba140a17b10c23aa660e Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Tue, 23 Jan 2024 13:33:57 +0100 Subject: [PATCH 2/2] chore: refactor --- pkg/postage/stamper.go | 4 ++-- pkg/postage/stampissuer.go | 8 -------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/postage/stamper.go b/pkg/postage/stamper.go index a13261fe2d8..e7b8d51b4c5 100644 --- a/pkg/postage/stamper.go +++ b/pkg/postage/stamper.go @@ -7,11 +7,11 @@ package postage import ( "errors" "fmt" - "resenje.org/multex" "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" + "resenje.org/multex" ) var ( @@ -50,7 +50,7 @@ func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) { } switch err := st.store.Get(item); { case err == nil: - if st.issuer.assigned(item.BatchIndex) || st.issuer.ImmutableFlag() { + if st.issuer.ImmutableFlag() { break } item.BatchTimestamp = unixTime() diff --git a/pkg/postage/stampissuer.go b/pkg/postage/stampissuer.go index 2606feb36eb..84b5292528a 100644 --- a/pkg/postage/stampissuer.go +++ b/pkg/postage/stampissuer.go @@ -201,14 +201,6 @@ func (si *StampIssuer) increment(addr swarm.Address) (batchIndex []byte, batchTi return indexToBytes(bIdx, bCnt), unixTime(), nil } -// check if this stamp index has already been assigned -func (si *StampIssuer) assigned(stampIdx []byte) bool { - si.bucketMtx.Lock() - defer si.bucketMtx.Unlock() - b, idx := BucketIndexFromBytes(stampIdx) - return idx < si.data.Buckets[b] -} - // Label returns the label of the issuer. func (si *StampIssuer) Label() string { return si.data.Label