Skip to content

Commit

Permalink
fix: replace chunk locker with mutext
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol committed Nov 8, 2023
1 parent 48a603c commit a54bdc0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 27 deletions.
24 changes: 10 additions & 14 deletions pkg/storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package storage
import (
"context"
"errors"
"sync"
"time"

m "github.com/ethersphere/bee/pkg/metrics"
Expand All @@ -29,7 +30,6 @@ type repository struct {

txIndexStore TxStore
txChunkStore TxChunkStore
locker ChunkLocker
}

// IndexStore returns Store.
Expand All @@ -50,7 +50,7 @@ func (r *repository) NewTx(ctx context.Context) (Repository, func() error, func(
txStart: time.Now(),
txIndexStore: txIndexStoreWithMetrics{r.txIndexStore.NewTx(NewTxState(ctx)), r.metrics},
txChunkStore: txChunkStoreWithMetrics{
wrapSync(r.txChunkStore.NewTx(NewTxState(ctx)), r.locker),
r.txChunkStore.NewTx(NewTxState(ctx)),
r.metrics,
},
}
Expand Down Expand Up @@ -92,40 +92,36 @@ func (r *repository) Metrics() []prometheus.Collector {
return m.PrometheusCollectorsFromFields(r.metrics)
}

type ChunkLocker func(chunk swarm.Address) func()

// NewRepository returns a new Repository instance.
func NewRepository(
txIndexStore TxStore,
txChunkStore TxChunkStore,
locker ChunkLocker,
) Repository {
metrics := newMetrics()
return &repository{
metrics: metrics,
txIndexStore: txIndexStoreWithMetrics{txIndexStore, metrics},
txChunkStore: txChunkStoreWithMetrics{wrapSync(txChunkStore, locker), metrics},
locker: locker,
txChunkStore: txChunkStoreWithMetrics{txChunkStore, metrics},
}
}

type syncChunkStore struct {

Check failure on line 108 in pkg/storage/repository.go

View workflow job for this annotation

GitHub Actions / Lint

type `syncChunkStore` is unused (unused)
TxChunkStore
locker ChunkLocker
mu *sync.Mutex
}

func wrapSync(store TxChunkStore, locker ChunkLocker) TxChunkStore {
return &syncChunkStore{store, locker}
func wrapSync(store TxChunkStore) TxChunkStore {

Check failure on line 113 in pkg/storage/repository.go

View workflow job for this annotation

GitHub Actions / Lint

func `wrapSync` is unused (unused)
return &syncChunkStore{store, new(sync.Mutex)}
}

func (s *syncChunkStore) Put(ctx context.Context, chunk swarm.Chunk) error {

Check failure on line 117 in pkg/storage/repository.go

View workflow job for this annotation

GitHub Actions / Lint

func `(*syncChunkStore).Put` is unused (unused)
unlock := s.locker(chunk.Address())
defer unlock()
s.mu.Lock()
defer s.mu.Unlock()
return s.TxChunkStore.Put(ctx, chunk)
}

func (s *syncChunkStore) Delete(ctx context.Context, addr swarm.Address) error {

Check failure on line 123 in pkg/storage/repository.go

View workflow job for this annotation

GitHub Actions / Lint

func `(*syncChunkStore).Delete` is unused (unused)
unlock := s.locker(addr)
defer unlock()
s.mu.Lock()
defer s.mu.Unlock()
return s.TxChunkStore.Delete(ctx, addr)
}
18 changes: 5 additions & 13 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func closer(closers ...io.Closer) io.Closer {
})
}

func initInmemRepository(locker storage.ChunkLocker) (storage.Repository, io.Closer, error) {
func initInmemRepository() (storage.Repository, io.Closer, error) {
store, err := leveldbstore.New("", nil)
if err != nil {
return nil, nil, fmt.Errorf("failed creating inmem levelDB index store: %w", err)
Expand All @@ -229,7 +229,7 @@ func initInmemRepository(locker storage.ChunkLocker) (storage.Repository, io.Clo
txStore := leveldbstore.NewTxStore(store)
txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky), nil
return storage.NewRepository(txStore, txChunkStore), closer(store, sharky), nil
}

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -275,7 +275,6 @@ func initStore(basePath string, opts *Options) (*leveldbstore.Store, error) {
func initDiskRepository(
ctx context.Context,
basePath string,
locker storage.ChunkLocker,
opts *Options,
) (storage.Repository, io.Closer, error) {
store, err := initStore(basePath, opts)
Expand Down Expand Up @@ -365,7 +364,7 @@ func initDiskRepository(
return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
}

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil
return storage.NewRepository(txStore, txChunkStore), closer(store, sharky, recoveryCloser), nil
}

func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) {
Expand Down Expand Up @@ -531,15 +530,8 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
metrics := newMetrics()
opts.LdbStats.CompareAndSwap(nil, &metrics.LevelDBStats)

locker := func(addr swarm.Address) func() {
lock.Lock(addr.ByteString())
return func() {
lock.Unlock(addr.ByteString())
}
}

if dirPath == "" {
repo, dbCloser, err = initInmemRepository(locker)
repo, dbCloser, err = initInmemRepository()
if err != nil {
return nil, err
}
Expand All @@ -552,7 +544,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
}
}

repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
repo, dbCloser, err = initDiskRepository(ctx, dirPath, opts)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a54bdc0

Please sign in to comment.