Skip to content

Commit

Permalink
fix(stamper): global lock stamper across multiple upload sessions (#4578
Browse files Browse the repository at this point in the history
)
  • Loading branch information
istae authored Feb 13, 2024
1 parent d07d458 commit ad0b209
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/postage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e

// save persists the specified stamp issuer to the stamperstore.
func (ps *service) save(st *StampIssuer) error {
st.bucketMtx.Lock()
defer st.bucketMtx.Unlock()
st.mtx.Lock()
defer st.mtx.Unlock()

if err := ps.store.Put(&StampIssuerItem{
Issuer: st,
Expand Down
8 changes: 3 additions & 5 deletions pkg/postage/stamper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
)

var (
Expand All @@ -30,19 +29,18 @@ type stamper struct {
store storage.Store
issuer *StampIssuer
signer crypto.Signer
mu *multex.Multex
}

// NewStamper constructs a Stamper.
func NewStamper(store storage.Store, issuer *StampIssuer, signer crypto.Signer) Stamper {
return &stamper{store, issuer, signer, multex.New()}
return &stamper{store, issuer, signer}
}

// Stamp takes chunk, see if the chunk can be included in the batch and
// signs it with the owner of the batch of this Stamp issuer.
func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) {
st.mu.Lock(addr.ByteString())
defer st.mu.Unlock(addr.ByteString())
st.issuer.mtx.Lock()
defer st.issuer.mtx.Unlock()

item := &StampItem{
BatchID: st.issuer.data.BatchID,
Expand Down
12 changes: 5 additions & 7 deletions pkg/postage/stampissuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (s stampIssuerData) Clone() stampIssuerData {
// A StampIssuer instance extends a batch with bucket collision tracking
// embedded in multiple Stampers, can be used concurrently.
type StampIssuer struct {
data stampIssuerData
bucketMtx sync.Mutex
data stampIssuerData
mtx sync.Mutex
}

// NewStampIssuer constructs a StampIssuer as an extension of a batch for local
Expand All @@ -177,10 +177,8 @@ func NewStampIssuer(label, keyID string, batchID []byte, batchAmount *big.Int, b

// increment increments the count in the correct collision
// bucket for a newly stamped chunk with given addr address.
// Must be mutex locked before usage.
func (si *StampIssuer) increment(addr swarm.Address) (batchIndex []byte, batchTimestamp []byte, err error) {
si.bucketMtx.Lock()
defer si.bucketMtx.Unlock()

bIdx := toBucket(si.BucketDepth(), addr)
bCnt := si.data.Buckets[bIdx]

Expand Down Expand Up @@ -263,8 +261,8 @@ func (si *StampIssuer) ImmutableFlag() bool {
}

func (si *StampIssuer) Buckets() []uint32 {
si.bucketMtx.Lock()
defer si.bucketMtx.Unlock()
si.mtx.Lock()
defer si.mtx.Unlock()
b := make([]uint32, len(si.data.Buckets))
copy(b, si.data.Buckets)
return b
Expand Down

0 comments on commit ad0b209

Please sign in to comment.