Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace chunk locker with mutext #4449

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions pkg/storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package storage
import (
"context"
"errors"
"sync"
"time"

m "github.com/ethersphere/bee/pkg/metrics"
Expand All @@ -29,7 +30,6 @@ type repository struct {

txIndexStore TxStore
txChunkStore TxChunkStore
locker ChunkLocker
}

// IndexStore returns Store.
Expand All @@ -50,7 +50,7 @@ func (r *repository) NewTx(ctx context.Context) (Repository, func() error, func(
txStart: time.Now(),
txIndexStore: txIndexStoreWithMetrics{r.txIndexStore.NewTx(NewTxState(ctx)), r.metrics},
txChunkStore: txChunkStoreWithMetrics{
wrapSync(r.txChunkStore.NewTx(NewTxState(ctx)), r.locker),
wrapSync(r.txChunkStore.NewTx(NewTxState(ctx))),
r.metrics,
},
}
Expand Down Expand Up @@ -92,40 +92,36 @@ func (r *repository) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(r.metrics)
}

type ChunkLocker func(chunk swarm.Address) func()

// NewRepository returns a new Repository instance.
func NewRepository(
txIndexStore TxStore,
txChunkStore TxChunkStore,
locker ChunkLocker,
) Repository {
metrics := newMetrics()
return &repository{
metrics: metrics,
txIndexStore: txIndexStoreWithMetrics{txIndexStore, metrics},
txChunkStore: txChunkStoreWithMetrics{wrapSync(txChunkStore, locker), metrics},
locker: locker,
txChunkStore: txChunkStoreWithMetrics{txChunkStore, metrics},
}
}

type syncChunkStore struct {
TxChunkStore
locker ChunkLocker
mu *sync.Mutex
}

func wrapSync(store TxChunkStore, locker ChunkLocker) TxChunkStore {
return &syncChunkStore{store, locker}
func wrapSync(store TxChunkStore) TxChunkStore {
return &syncChunkStore{store, new(sync.Mutex)}
}

func (s *syncChunkStore) Put(ctx context.Context, chunk swarm.Chunk) error {
unlock := s.locker(chunk.Address())
defer unlock()
s.mu.Lock()
defer s.mu.Unlock()
return s.TxChunkStore.Put(ctx, chunk)
}

func (s *syncChunkStore) Delete(ctx context.Context, addr swarm.Address) error {
unlock := s.locker(addr)
defer unlock()
s.mu.Lock()
defer s.mu.Unlock()
return s.TxChunkStore.Delete(ctx, addr)
}
18 changes: 5 additions & 13 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func closer(closers ...io.Closer) io.Closer {
})
}

func initInmemRepository(locker storage.ChunkLocker) (storage.Repository, io.Closer, error) {
func initInmemRepository() (storage.Repository, io.Closer, error) {
store, err := leveldbstore.New("", nil)
if err != nil {
return nil, nil, fmt.Errorf("failed creating inmem levelDB index store: %w", err)
Expand All @@ -229,7 +229,7 @@ func initInmemRepository(locker storage.ChunkLocker) (storage.Repository, io.Clo
txStore := leveldbstore.NewTxStore(store)
txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky), nil
return storage.NewRepository(txStore, txChunkStore), closer(store, sharky), nil
}

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -275,7 +275,6 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) {
func initDiskRepository(
ctx context.Context,
basePath string,
locker storage.ChunkLocker,
opts *Options,
) (storage.Repository, io.Closer, error) {
store, err := initStore(basePath, opts)
Expand Down Expand Up @@ -365,7 +364,7 @@ func initDiskRepository(
return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
}

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil
return storage.NewRepository(txStore, txChunkStore), closer(store, sharky, recoveryCloser), nil
}

func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) {
Expand Down Expand Up @@ -531,15 +530,8 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
metrics := newMetrics()
opts.LdbStats.CompareAndSwap(nil, &metrics.LevelDBStats)

locker := func(addr swarm.Address) func() {
lock.Lock(addr.ByteString())
return func() {
lock.Unlock(addr.ByteString())
}
}

if dirPath == "" {
repo, dbCloser, err = initInmemRepository(locker)
repo, dbCloser, err = initInmemRepository()
if err != nil {
return nil, err
}
Expand All @@ -552,7 +544,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
}
}

repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
repo, dbCloser, err = initDiskRepository(ctx, dirPath, opts)
if err != nil {
return nil, err
}
Expand Down
Loading