Skip to content

Commit

Permalink
fix: cleanup chunk locking
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Jan 30, 2024
1 parent 53db61f commit ebaf717
Showing 1 changed file with 43 additions and 82 deletions.
125 changes: 43 additions & 82 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ import (
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/ksuid"
"resenje.org/multex"
)

// TODO(esad): remove contexts from sharky and any other storage call
// TODO(esad): continue metrics

/*
The rules of the transction is as follows:
Expand All @@ -33,8 +31,6 @@ The rules of the transction is as follows:
-> if batch_commit fails or is not called, release all sharky_write location from the disk, do nothing for sharky_release
*/

const globalLockerKey = "global"

type Transaction interface {
Store
Commit() error
Expand All @@ -58,14 +54,14 @@ type Storage interface {
}

type store struct {
sharky *sharky.Store
bstore storage.BatchedStore
metrics metrics
chunkStoreLocker *chunkStoreLocker
sharky *sharky.Store
bstore storage.BatchedStore
metrics metrics
chunkLocker *multex.Multex
}

func NewStorage(sharky *sharky.Store, bstore storage.BatchedStore) Storage {
return &store{sharky, bstore, newMetrics(), &chunkStoreLocker{multex.New(), make(map[string]map[string]struct{})}}
return &store{sharky, bstore, newMetrics(), multex.New()}
}

type transaction struct {
Expand All @@ -91,18 +87,23 @@ func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) {
t := &transaction{
batch: b,
indexstore: indexTrx,
chunkStore: &chunkStoreTrx{indexTrx, sharyTrx, s.chunkStoreLocker, s.chunkStoreLocker.newID(), s.metrics, false},
chunkStore: &chunkStoreTrx{indexTrx, sharyTrx, s.chunkLocker, make(map[string]struct{}), s.metrics, false},
sharkyTrx: sharyTrx,
metrics: s.metrics,
}

return t, func() {
// for whatever reason, the commit call was not made
// release uncommitted written sharky locations
// for whatever reason, commit was not called
// release uncommitted but written sharky locations
// unlock the locked addresses
for _, l := range t.sharkyTrx.writtenLocs {
_ = t.sharkyTrx.sharky.Release(context.TODO(), l)
}
t.chunkStore.chunkStoreLocker.unlockTrx(t.chunkStore.id)
for addr := range t.chunkStore.lockedAddrs {
s.chunkLocker.Unlock(addr)
}
t.sharkyTrx.writtenLocs = nil
t.chunkStore.lockedAddrs = nil
}
}

Expand All @@ -115,7 +116,7 @@ func (s *store) ReadOnly() ReadOnlyStore {
indexStore := &indexTrx{s.bstore, nil}
sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil}

return &readOnly{indexStore, &chunkStoreTrx{indexStore, sharyTrx, s.chunkStoreLocker, "", s.metrics, true}}
return &readOnly{indexStore, &chunkStoreTrx{indexStore, sharyTrx, s.chunkLocker, nil, s.metrics, true}}
}

func (s *store) Run(ctx context.Context, f func(Store) error) error {
Expand Down Expand Up @@ -179,7 +180,10 @@ func (t *transaction) Commit() (err error) {

defer handleMetric("commit", t.metrics)(err)
defer func() {
t.chunkStore.chunkStoreLocker.unlockTrx(t.chunkStore.id)
for addr := range t.chunkStore.lockedAddrs {
t.chunkStore.globalLocker.Unlock(addr)
}
t.chunkStore.lockedAddrs = nil
t.sharkyTrx.writtenLocs = nil
}()

Expand Down Expand Up @@ -207,46 +211,55 @@ func (t *transaction) Commit() (err error) {
}

type chunkStoreTrx struct {
indexStore *indexTrx
sharkyTrx *sharkyTrx
chunkStoreLocker *chunkStoreLocker
id string
metrics metrics
readOnly bool
indexStore *indexTrx
sharkyTrx *sharkyTrx
globalLocker *multex.Multex
lockedAddrs map[string]struct{}
metrics metrics
readOnly bool
}

func (c *chunkStoreTrx) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) {
defer handleMetric("chunkstore_get", c.metrics)(err)
defer c.lock(addr)()
unlock := c.lock(addr)
defer unlock()
ch, err = chunkstore.Get(ctx, c.indexStore, c.sharkyTrx, addr)
return ch, err
}
func (c *chunkStoreTrx) Has(ctx context.Context, addr swarm.Address) (_ bool, err error) {
defer handleMetric("chunkstore_has", c.metrics)(err)
defer c.lock(addr)()
unlock := c.lock(addr)
defer unlock()
return chunkstore.Has(ctx, c.indexStore, addr)
}
func (c *chunkStoreTrx) Put(ctx context.Context, ch swarm.Chunk) (err error) {
defer handleMetric("chunkstore_put", c.metrics)(err)
defer c.lock(ch.Address())()
unlock := c.lock(ch.Address())
defer unlock()
return chunkstore.Put(ctx, c.indexStore, c.sharkyTrx, ch)
}
func (c *chunkStoreTrx) Delete(ctx context.Context, addr swarm.Address) (err error) {
defer handleMetric("chunkstore_delete", c.metrics)(err)
defer c.lock(addr)()
unlock := c.lock(addr)
defer unlock()
return chunkstore.Delete(ctx, c.indexStore, c.sharkyTrx, addr)
}
func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) error {
func (c *chunkStoreTrx) Iterate(ctx context.Context, fn storage.IterateChunkFn) (err error) {
defer handleMetric("chunkstore_iterate", c.metrics)(err)
return chunkstore.Iterate(ctx, c.indexStore, c.sharkyTrx, fn)
}

func (c *chunkStoreTrx) lock(addr swarm.Address) func() {
if c.readOnly { // directly lock
c.globalLocker.Lock(addr.ByteString())
} else if _, ok := c.lockedAddrs[addr.ByteString()]; !ok { // lock chunk only once in the same transaction
c.globalLocker.Lock(addr.ByteString())
c.lockedAddrs[addr.ByteString()] = struct{}{}
}

return func() {
if c.readOnly {
c.chunkStoreLocker.lock(addr)
defer c.chunkStoreLocker.unlock(addr)
} else {
c.chunkStoreLocker.lockTrx(addr, c.id)
c.globalLocker.Unlock(addr.ByteString())
}
}
}
Expand Down Expand Up @@ -279,58 +292,6 @@ func (s *sharkyTrx) Release(ctx context.Context, loc sharky.Location) error {
return nil
}

type chunkStoreLocker struct {
mtx *multex.Multex
chunkLocked map[string]map[string]struct{} // transactionID -> chunkAddress, one to many mapping
}

// lock globally locks the chunk address.
func (c *chunkStoreLocker) lock(addr swarm.Address) {
c.mtx.Lock(addr.ByteString())
}

// lock globally unlocks the chunk address.
func (c *chunkStoreLocker) unlock(addr swarm.Address) {
c.mtx.Unlock(addr.ByteString())
}

// lockTrx globally locks the chunk address, also making sure that the same address is locked only once by the same transaction.
func (c *chunkStoreLocker) lockTrx(addr swarm.Address, id string) {
c.mtx.Lock(globalLockerKey)
defer c.mtx.Unlock(globalLockerKey)

if c.chunkLocked[id] == nil {
c.chunkLocked[id] = make(map[string]struct{})
}

// chunk already globally locked as part of this transaction
if _, ok := c.chunkLocked[id][addr.ByteString()]; ok {
return
}

c.chunkLocked[id][addr.ByteString()] = struct{}{}

// acquire global chunk lock
c.lock(addr)
}

// unlockTrx globally unlocks any chunk addresses that was part of the transaction.
func (c *chunkStoreLocker) unlockTrx(id string) {

c.mtx.Lock(globalLockerKey)
defer c.mtx.Unlock(globalLockerKey)

for addr := range c.chunkLocked[id] {
c.mtx.Unlock(addr)
}

delete(c.chunkLocked, id)
}

func (c *chunkStoreLocker) newID() string {
return ksuid.New().String()
}

func handleMetric(key string, m metrics) func(err error) {
t := time.Now()
return func(err error) {
Expand Down

0 comments on commit ebaf717

Please sign in to comment.