diff --git a/pkg/storage/repository.go b/pkg/storage/repository.go index f681ee76d88..330f388b8fe 100644 --- a/pkg/storage/repository.go +++ b/pkg/storage/repository.go @@ -7,6 +7,7 @@ package storage import ( "context" "errors" + "sync" "time" m "github.com/ethersphere/bee/pkg/metrics" @@ -29,7 +30,6 @@ type repository struct { txIndexStore TxStore txChunkStore TxChunkStore - locker ChunkLocker } // IndexStore returns Store. @@ -50,7 +50,7 @@ func (r *repository) NewTx(ctx context.Context) (Repository, func() error, func( txStart: time.Now(), txIndexStore: txIndexStoreWithMetrics{r.txIndexStore.NewTx(NewTxState(ctx)), r.metrics}, txChunkStore: txChunkStoreWithMetrics{ - wrapSync(r.txChunkStore.NewTx(NewTxState(ctx)), r.locker), + wrapSync(r.txChunkStore.NewTx(NewTxState(ctx))), r.metrics, }, } @@ -92,40 +92,36 @@ func (r *repository) Metrics() []prometheus.Collector { return m.PrometheusCollectorsFromFields(r.metrics) } -type ChunkLocker func(chunk swarm.Address) func() - // NewRepository returns a new Repository instance. func NewRepository( txIndexStore TxStore, txChunkStore TxChunkStore, - locker ChunkLocker, ) Repository { metrics := newMetrics() return &repository{ metrics: metrics, txIndexStore: txIndexStoreWithMetrics{txIndexStore, metrics}, - txChunkStore: txChunkStoreWithMetrics{wrapSync(txChunkStore, locker), metrics}, - locker: locker, + txChunkStore: txChunkStoreWithMetrics{txChunkStore, metrics}, } } type syncChunkStore struct { TxChunkStore - locker ChunkLocker + mu *sync.Mutex } -func wrapSync(store TxChunkStore, locker ChunkLocker) TxChunkStore { - return &syncChunkStore{store, locker} +func wrapSync(store TxChunkStore) TxChunkStore { + return &syncChunkStore{store, new(sync.Mutex)} } func (s *syncChunkStore) Put(ctx context.Context, chunk swarm.Chunk) error { - unlock := s.locker(chunk.Address()) - defer unlock() + s.mu.Lock() + defer s.mu.Unlock() return s.TxChunkStore.Put(ctx, chunk) } func (s *syncChunkStore) Delete(ctx context.Context, addr swarm.Address) error { - unlock := s.locker(addr) - defer unlock() + s.mu.Lock() + defer s.mu.Unlock() return s.TxChunkStore.Delete(ctx, addr) } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index b5d588162c7..817fa429893 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -211,7 +211,7 @@ func closer(closers ...io.Closer) io.Closer { }) } -func initInmemRepository(locker storage.ChunkLocker) (storage.Repository, io.Closer, error) { +func initInmemRepository() (storage.Repository, io.Closer, error) { store, err := leveldbstore.New("", nil) if err != nil { return nil, nil, fmt.Errorf("failed creating inmem levelDB index store: %w", err) @@ -229,7 +229,7 @@ func initInmemRepository(locker storage.ChunkLocker) (storage.Repository, io.Clo txStore := leveldbstore.NewTxStore(store) txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky) - return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky), nil + return storage.NewRepository(txStore, txChunkStore), closer(store, sharky), nil } // loggerName is the tree path name of the logger for this package. @@ -275,7 +275,6 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) { func initDiskRepository( ctx context.Context, basePath string, - locker storage.ChunkLocker, opts *Options, ) (storage.Repository, io.Closer, error) { store, err := initStore(basePath, opts) @@ -365,7 +364,7 @@ func initDiskRepository( return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err) } - return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil + return storage.NewRepository(txStore, txChunkStore), closer(store, sharky, recoveryCloser), nil } func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) { @@ -531,15 +530,8 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { metrics := newMetrics() opts.LdbStats.CompareAndSwap(nil, &metrics.LevelDBStats) - locker := func(addr swarm.Address) func() { - lock.Lock(addr.ByteString()) - return func() { - lock.Unlock(addr.ByteString()) - } - } - if dirPath == "" { - repo, dbCloser, err = initInmemRepository(locker) + repo, dbCloser, err = initInmemRepository() if err != nil { return nil, err } @@ -552,7 +544,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { } } - repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts) + repo, dbCloser, err = initDiskRepository(ctx, dirPath, opts) if err != nil { return nil, err }