Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: stamper fixes #4556

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions pkg/postage/stamper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
)

var (
// ErrBucketFull is the error when a collision bucket is full.
ErrBucketFull = errors.New("bucket full")
ErrOverwriteImmutableIndex = errors.New("immutable batch old index overwrite due to previous faulty save")
ErrBucketFull = errors.New("bucket full")
)

// Stamper can issue stamps from the given address of chunk.
Expand All @@ -30,31 +30,30 @@ type stamper struct {
store storage.Store
issuer *StampIssuer
signer crypto.Signer
mu *multex.Multex
}

// NewStamper constructs a Stamper.
func NewStamper(store storage.Store, issuer *StampIssuer, signer crypto.Signer) Stamper {
return &stamper{store, issuer, signer}
return &stamper{store, issuer, signer, multex.New()}
}

// Stamp takes chunk, see if the chunk can included in the batch and
// Stamp takes chunk, see if the chunk can be included in the batch and
// signs it with the owner of the batch of this Stamp issuer.
func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) {
st.mu.Lock(addr.ByteString())
defer st.mu.Unlock(addr.ByteString())

item := &StampItem{
BatchID: st.issuer.data.BatchID,
chunkAddress: addr,
}
switch err := st.store.Get(item); {
case err == nil:
// The index should be in the past. It could happen that we encountered
// some error after assigning this index and did not save the issuer data. In
// this case we should assign a new index and update it.
if st.issuer.assigned(item.BatchIndex) {
break
} else if st.issuer.ImmutableFlag() {
return nil, ErrOverwriteImmutableIndex
item.BatchTimestamp = unixTime()
if err = st.store.Put(item); err != nil {
return nil, err
}
fallthrough
case errors.Is(err, storage.ErrNotFound):
item.BatchIndex, item.BatchTimestamp, err = st.issuer.increment(addr)
if err != nil {
Expand Down
36 changes: 12 additions & 24 deletions pkg/postage/stamper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func TestStamperStamping(t *testing.T) {
}
})

t.Run("incorrect old index", func(t *testing.T) {
t.Run("reuse index but get new timestamp for mutable or immutable batch", func(t *testing.T) {
st := newTestStampIssuerMutability(t, 1000, false)
chunkAddr := swarm.RandAddress(t)
bIdx := postage.ToBucket(st.BucketDepth(), chunkAddr)
index := postage.IndexToBytes(bIdx, 100)
index := postage.IndexToBytes(bIdx, 4)
testItem := postage.NewStampItem().
WithBatchID(st.ID()).
WithChunkAddress(chunkAddr).
Expand All @@ -121,28 +121,16 @@ func TestStamperStamping(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := stamp.Valid(chunkAddr, owner, 12, 8, true); err != nil {
t.Fatalf("expected no error, got %v", err)
}
if bytes.Equal(stamp.Index(), testItem.BatchIndex) {
t.Fatalf("expected index to be different, got %x", stamp.Index())
}
})

t.Run("incorrect old index immutable", func(t *testing.T) {
st := newTestStampIssuerMutability(t, 1000, true)
chunkAddr := swarm.RandAddress(t)
bIdx := postage.ToBucket(st.BucketDepth(), chunkAddr)
index := postage.IndexToBytes(bIdx, 100)
testItem := postage.NewStampItem().
WithBatchID(st.ID()).
WithChunkAddress(chunkAddr).
WithBatchIndex(index)
testSt := &testStore{Store: inmemstore.New(), stampItem: testItem}
stamper := postage.NewStamper(testSt, st, signer)
_, err := stamper.Stamp(chunkAddr)
if !errors.Is(err, postage.ErrOverwriteImmutableIndex) {
t.Fatalf("got err %v, wanted %v", err, postage.ErrOverwriteImmutableIndex)
for _, mutability := range []bool{true, false} {
if err := stamp.Valid(chunkAddr, owner, 12, 8, mutability); err != nil {
t.Fatalf("expected no error, got %v", err)
}
if bytes.Equal(stamp.Timestamp(), testItem.BatchTimestamp) {
t.Fatalf("expected timestamp to be different, got %x", stamp.Index())
}
if !bytes.Equal(stamp.Index(), testItem.BatchIndex) {
t.Fatalf("expected index to be the same, got %x", stamp.Index())
}
}
})

Expand Down
8 changes: 0 additions & 8 deletions pkg/postage/stampissuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,6 @@ func (si *StampIssuer) increment(addr swarm.Address) (batchIndex []byte, batchTi
return indexToBytes(bIdx, bCnt), unixTime(), nil
}

// check if this stamp index has already been assigned
func (si *StampIssuer) assigned(stampIdx []byte) bool {
si.bucketMtx.Lock()
defer si.bucketMtx.Unlock()
b, idx := BucketIndexFromBytes(stampIdx)
return idx < si.data.Buckets[b]
}

// Label returns the label of the issuer.
func (si *StampIssuer) Label() string {
return si.data.Label
Expand Down
Loading