From ebaf7170fc3321741bf00ec06d5554d82e6d2e33 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 30 Jan 2024 21:46:07 +0300 Subject: [PATCH] fix: cleanup chunk locking --- .../internal/transaction/transaction.go | 125 ++++++------------ 1 file changed, 43 insertions(+), 82 deletions(-) diff --git a/pkg/storer/internal/transaction/transaction.go b/pkg/storer/internal/transaction/transaction.go index ab8f2b73411..073487ed258 100644 --- a/pkg/storer/internal/transaction/transaction.go +++ b/pkg/storer/internal/transaction/transaction.go @@ -16,12 +16,10 @@ import ( "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/pkg/swarm" "github.com/prometheus/client_golang/prometheus" - "github.com/segmentio/ksuid" "resenje.org/multex" ) // TODO(esad): remove contexts from sharky and any other storage call -// TODO(esad): continue metrics /* The rules of the transction is as follows: @@ -33,8 +31,6 @@ The rules of the transction is as follows: -> if batch_commit fails or is not called, release all sharky_write location from the disk, do nothing for sharky_release */ -const globalLockerKey = "global" - type Transaction interface { Store Commit() error @@ -58,14 +54,14 @@ type Storage interface { } type store struct { - sharky *sharky.Store - bstore storage.BatchedStore - metrics metrics - chunkStoreLocker *chunkStoreLocker + sharky *sharky.Store + bstore storage.BatchedStore + metrics metrics + chunkLocker *multex.Multex } func NewStorage(sharky *sharky.Store, bstore storage.BatchedStore) Storage { - return &store{sharky, bstore, newMetrics(), &chunkStoreLocker{multex.New(), make(map[string]map[string]struct{})}} + return &store{sharky, bstore, newMetrics(), multex.New()} } type transaction struct { @@ -91,18 +87,23 @@ func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) { t := &transaction{ batch: b, indexstore: indexTrx, - chunkStore: &chunkStoreTrx{indexTrx, sharyTrx, s.chunkStoreLocker, s.chunkStoreLocker.newID(), s.metrics, false}, + chunkStore: &chunkStoreTrx{indexTrx, sharyTrx, s.chunkLocker, make(map[string]struct{}), s.metrics, false}, sharkyTrx: sharyTrx, metrics: s.metrics, } return t, func() { - // for whatever reason, the commit call was not made - // release uncommitted written sharky locations + // for whatever reason, commit was not called + // release uncommitted but written sharky locations + // unlock the locked addresses for _, l := range t.sharkyTrx.writtenLocs { _ = t.sharkyTrx.sharky.Release(context.TODO(), l) } - t.chunkStore.chunkStoreLocker.unlockTrx(t.chunkStore.id) + for addr := range t.chunkStore.lockedAddrs { + s.chunkLocker.Unlock(addr) + } + t.sharkyTrx.writtenLocs = nil + t.chunkStore.lockedAddrs = nil } } @@ -115,7 +116,7 @@ func (s *store) ReadOnly() ReadOnlyStore { indexStore := &indexTrx{s.bstore, nil} sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil} - return &readOnly{indexStore, &chunkStoreTrx{indexStore, sharyTrx, s.chunkStoreLocker, "", s.metrics, true}} + return &readOnly{indexStore, &chunkStoreTrx{indexStore, sharyTrx, s.chunkLocker, nil, s.metrics, true}} } func (s *store) Run(ctx context.Context, f func(Store) error) error { @@ -179,7 +180,10 @@ func (t *transaction) Commit() (err error) { defer handleMetric("commit", t.metrics)(err) defer func() { - t.chunkStore.chunkStoreLocker.unlockTrx(t.chunkStore.id) + for addr := range t.chunkStore.lockedAddrs { + t.chunkStore.globalLocker.Unlock(addr) + } + t.chunkStore.lockedAddrs = nil t.sharkyTrx.writtenLocs = nil }() @@ -207,46 +211,55 @@ func (t *transaction) Commit() (err error) { } type chunkStoreTrx struct { - indexStore *indexTrx - sharkyTrx *sharkyTrx - chunkStoreLocker *chunkStoreLocker - id string - metrics metrics - readOnly bool + indexStore *indexTrx + sharkyTrx *sharkyTrx + globalLocker *multex.Multex + lockedAddrs map[string]struct{} + metrics metrics + readOnly bool } func (c *chunkStoreTrx) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) { defer handleMetric("chunkstore_get", c.metrics)(err) - defer c.lock(addr)() + unlock := c.lock(addr) + defer unlock() ch, err = chunkstore.Get(ctx, c.indexStore, c.sharkyTrx, addr) return ch, err } func (c *chunkStoreTrx) Has(ctx context.Context, addr swarm.Address) (_ bool, err error) { defer handleMetric("chunkstore_has", c.metrics)(err) - defer c.lock(addr)() + unlock := c.lock(addr) + defer unlock() return chunkstore.Has(ctx, c.indexStore, addr) } func (c *chunkStoreTrx) Put(ctx context.Context, ch swarm.Chunk) (err error) { defer handleMetric("chunkstore_put", c.metrics)(err) - defer c.lock(ch.Address())() + unlock := c.lock(ch.Address()) + defer unlock() return chunkstore.Put(ctx, c.indexStore, c.sharkyTrx, ch) } func (c *chunkStoreTrx) Delete(ctx context.Context, addr swarm.Address) (err error) { defer handleMetric("chunkstore_delete", c.metrics)(err) - defer c.lock(addr)() + unlock := c.lock(addr) + defer unlock() return chunkstore.Delete(ctx, c.indexStore, c.sharkyTrx, addr) } -func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) error { +func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) (err error) { + defer handleMetric("chunkstore_iterate", c.metrics)(err) return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn) } func (c *chunkStoreTrx) lock(addr swarm.Address) func() { + if c.readOnly { // directly lock + c.globalLocker.Lock(addr.ByteString()) + } else if _, ok := c.lockedAddrs[addr.ByteString()]; !ok { // lock chunk only once in the same transaction + c.globalLocker.Lock(addr.ByteString()) + c.lockedAddrs[addr.ByteString()] = struct{}{} + } + return func() { if c.readOnly { - c.chunkStoreLocker.lock(addr) - defer c.chunkStoreLocker.unlock(addr) - } else { - c.chunkStoreLocker.lockTrx(addr, c.id) + c.globalLocker.Unlock(addr.ByteString()) } } } @@ -279,58 +292,6 @@ func (s *sharkyTrx) Release(ctx context.Context, loc sharky.Location) error { return nil } -type chunkStoreLocker struct { - mtx *multex.Multex - chunkLocked map[string]map[string]struct{} // transactionID -> chunkAddress, one to many mapping -} - -// lock globally locks the chunk address. -func (c *chunkStoreLocker) lock(addr swarm.Address) { - c.mtx.Lock(addr.ByteString()) -} - -// lock globally unlocks the chunk address. -func (c *chunkStoreLocker) unlock(addr swarm.Address) { - c.mtx.Unlock(addr.ByteString()) -} - -// lockTrx globally locks the chunk address, also making sure that the same address is locked only once by the same transaction. -func (c *chunkStoreLocker) lockTrx(addr swarm.Address, id string) { - c.mtx.Lock(globalLockerKey) - defer c.mtx.Unlock(globalLockerKey) - - if c.chunkLocked[id] == nil { - c.chunkLocked[id] = make(map[string]struct{}) - } - - // chunk already globally locked as part of this transaction - if _, ok := c.chunkLocked[id][addr.ByteString()]; ok { - return - } - - c.chunkLocked[id][addr.ByteString()] = struct{}{} - - // acquire global chunk lock - c.lock(addr) -} - -// unlockTrx globally unlocks any chunk addresses that was part of the transaction. -func (c *chunkStoreLocker) unlockTrx(id string) { - - c.mtx.Lock(globalLockerKey) - defer c.mtx.Unlock(globalLockerKey) - - for addr := range c.chunkLocked[id] { - c.mtx.Unlock(addr) - } - - delete(c.chunkLocked, id) -} - -func (c *chunkStoreLocker) newID() string { - return ksuid.New().String() -} - func handleMetric(key string, m metrics) func(err error) { t := time.Now() return func(err error) {