diff --git a/pkg/postage/service.go b/pkg/postage/service.go index 7621e00d281..803daa3a66b 100644 --- a/pkg/postage/service.go +++ b/pkg/postage/service.go @@ -174,8 +174,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e // save persists the specified stamp issuer to the stamperstore. func (ps *service) save(st *StampIssuer) error { - st.bucketMtx.Lock() - defer st.bucketMtx.Unlock() + st.mtx.Lock() + defer st.mtx.Unlock() if err := ps.store.Put(&StampIssuerItem{ Issuer: st, diff --git a/pkg/postage/stamper.go b/pkg/postage/stamper.go index 1ee16a8a133..a5f4107e375 100644 --- a/pkg/postage/stamper.go +++ b/pkg/postage/stamper.go @@ -11,7 +11,6 @@ import ( "github.com/ethersphere/bee/pkg/crypto" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" - "resenje.org/multex" ) var ( @@ -30,19 +29,18 @@ type stamper struct { store storage.Store issuer *StampIssuer signer crypto.Signer - mu *multex.Multex } // NewStamper constructs a Stamper. func NewStamper(store storage.Store, issuer *StampIssuer, signer crypto.Signer) Stamper { - return &stamper{store, issuer, signer, multex.New()} + return &stamper{store, issuer, signer} } // Stamp takes chunk, see if the chunk can be included in the batch and // signs it with the owner of the batch of this Stamp issuer. func (st *stamper) Stamp(addr swarm.Address) (*Stamp, error) { - st.mu.Lock(addr.ByteString()) - defer st.mu.Unlock(addr.ByteString()) + st.issuer.mtx.Lock() + defer st.issuer.mtx.Unlock() item := &StampItem{ BatchID: st.issuer.data.BatchID, diff --git a/pkg/postage/stampissuer.go b/pkg/postage/stampissuer.go index 84b5292528a..bc3064580c0 100644 --- a/pkg/postage/stampissuer.go +++ b/pkg/postage/stampissuer.go @@ -151,8 +151,8 @@ func (s stampIssuerData) Clone() stampIssuerData { // A StampIssuer instance extends a batch with bucket collision tracking // embedded in multiple Stampers, can be used concurrently. type StampIssuer struct { - data stampIssuerData - bucketMtx sync.Mutex + data stampIssuerData + mtx sync.Mutex } // NewStampIssuer constructs a StampIssuer as an extension of a batch for local @@ -177,10 +177,8 @@ func NewStampIssuer(label, keyID string, batchID []byte, batchAmount *big.Int, b // increment increments the count in the correct collision // bucket for a newly stamped chunk with given addr address. +// Must be mutex locked before usage. func (si *StampIssuer) increment(addr swarm.Address) (batchIndex []byte, batchTimestamp []byte, err error) { - si.bucketMtx.Lock() - defer si.bucketMtx.Unlock() - bIdx := toBucket(si.BucketDepth(), addr) bCnt := si.data.Buckets[bIdx] @@ -263,8 +261,8 @@ func (si *StampIssuer) ImmutableFlag() bool { } func (si *StampIssuer) Buckets() []uint32 { - si.bucketMtx.Lock() - defer si.bucketMtx.Unlock() + si.mtx.Lock() + defer si.mtx.Unlock() b := make([]uint32, len(si.data.Buckets)) copy(b, si.data.Buckets) return b