Skip to content

Commit

Permalink
fix concurrent cache access in test with atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Oct 10, 2023
1 parent eed7180 commit 27024ee
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
6 changes: 3 additions & 3 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"

bstore "github.com/ipfs/boxo/blockstore"
dshelp "github.com/ipfs/boxo/datastore/dshelp"
"github.com/ipfs/boxo/datastore/dshelp"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -154,13 +154,13 @@ func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (*

// check if either cache contains an accessor
shardKey := keys[0]
accessor, err := bs.store.cache.Get(shardKey)
accessor, err := bs.store.cache.Load().Get(shardKey)
if err == nil {
return blockstoreCloser(accessor)
}

// load accessor to the blockstore cache and use it as blockstoreCloser
accessor, err = bs.store.cache.Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor)
accessor, err = bs.store.cache.Load().Second().GetOrLoad(ctx, shardKey, bs.store.getAccessor)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
Expand Down
2 changes: 1 addition & 1 deletion share/eds/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *Store) WithMetrics() error {
return err
}

if err = s.cache.EnableMetrics(); err != nil {
if err = s.cache.Load().EnableMetrics(); err != nil {
return err
}

Expand Down
12 changes: 6 additions & 6 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Store struct {
mounts *mount.Registry

bs *blockstore
cache *cache.DoubleCache
cache atomic.Pointer[cache.DoubleCache]

carIdx index.FullIndexRepo
invertedIdx *simpleInvertedIndex
Expand Down Expand Up @@ -129,9 +129,9 @@ func NewStore(params *Parameters, basePath string, ds datastore.Batching) (*Stor
gcInterval: params.GCInterval,
mounts: r,
shardFailures: failureChan,
cache: cache.NewDoubleCache(recentBlocksCache, blockstoreCache),
}
store.bs = newBlockstore(store, ds)
store.cache.Store(cache.NewDoubleCache(recentBlocksCache, blockstoreCache))
return store, nil
}

Expand Down Expand Up @@ -286,7 +286,7 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
ac, err := s.cache.First().GetOrLoad(ctx, result.Key, s.getAccessor)
ac, err := s.cache.Load().First().GetOrLoad(ctx, result.Key, s.getAccessor)
if err != nil {
log.Warnw("unable to put accessor to recent blocks accessors cache", "err", err)
return
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.ReadCloser,

func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.ReadCloser, error) {
key := shard.KeyFromString(root.String())
accessor, err := s.cache.Get(key)
accessor, err := s.cache.Load().Get(key)
if err == nil {
return newReadCloser(accessor), nil
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func (s *Store) carBlockstore(
root share.DataHash,
) (*BlockstoreCloser, error) {
key := shard.KeyFromString(root.String())
accessor, err := s.cache.Get(key)
accessor, err := s.cache.Load().Get(key)
if err == nil {
return blockstoreCloser(accessor)
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) error {
func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) {
key := shard.KeyFromString(root.String())
// remove open links to accessor from cache
if err := s.cache.Remove(key); err != nil {
if err := s.cache.Load().Remove(key); err != nil {
log.Warnw("remove accessor from cache", "err", err)
}
ch := make(chan dagstore.ShardResult, 1)
Expand Down
22 changes: 11 additions & 11 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestEDSStore(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// remove non-failed accessor from cache
err = edsStore.cache.Remove(shard.KeyFromString(dah.String()))
err = edsStore.cache.Load().Remove(shard.KeyFromString(dah.String()))
assert.NoError(t, err)

_, err = edsStore.GetCAR(ctx, dah.Hash())
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestEDSStore(t *testing.T) {

// check, that the key is in the cache after put
shardKey := shard.KeyFromString(dah.String())
_, err = edsStore.cache.Get(shardKey)
_, err = edsStore.cache.Load().Get(shardKey)
assert.NoError(t, err)
})

Expand Down Expand Up @@ -276,7 +276,7 @@ func TestEDSStore_GC(t *testing.T) {
// remove links to the shard from cache
time.Sleep(time.Millisecond * 100)
key := shard.KeyFromString(share.DataHash(dah.Hash()).String())
err = edsStore.cache.Remove(key)
err = edsStore.cache.Load().Remove(key)
require.NoError(t, err)

// doesn't exist yet
Expand Down Expand Up @@ -305,8 +305,8 @@ func Test_BlockstoreCache(t *testing.T) {
require.NoError(t, err)

// store eds to the store with noopCache to allow clean cache after put
swap := edsStore.cache
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})
swap := edsStore.cache.Load()
edsStore.cache.Store(cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}))
eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
require.NoError(t, err)
Expand All @@ -327,19 +327,19 @@ func Test_BlockstoreCache(t *testing.T) {
}

// swap back original cache
edsStore.cache = swap
edsStore.cache.Store(swap)

// key shouldn't be in cache yet, check for returned errCacheMiss
shardKey := shard.KeyFromString(dah.String())
_, err = edsStore.cache.Get(shardKey)
_, err = edsStore.cache.Load().Get(shardKey)
require.Error(t, err)

// now get it from blockstore, to trigger storing to cache
_, err = edsStore.Blockstore().Get(ctx, key)
require.NoError(t, err)

// should be no errCacheMiss anymore
_, err = edsStore.cache.Get(shardKey)
_, err = edsStore.cache.Load().Get(shardKey)
require.NoError(t, err)
}

Expand All @@ -362,7 +362,7 @@ func Test_CachedAccessor(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// accessor should be in cache
_, err = edsStore.cache.Get(shard.KeyFromString(dah.String()))
_, err = edsStore.cache.Load().Get(shard.KeyFromString(dah.String()))
require.NoError(t, err)

// first read from cached accessor
Expand Down Expand Up @@ -393,7 +393,7 @@ func Test_NotCachedAccessor(t *testing.T) {
err = edsStore.Start(ctx)
require.NoError(t, err)
// replace cache with noopCache to
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})
edsStore.cache.Store(cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{}))

eds, dah := randomEDS(t)
err = edsStore.Put(ctx, dah.Hash(), eds)
Expand All @@ -403,7 +403,7 @@ func Test_NotCachedAccessor(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// accessor should not be in cache
_, err = edsStore.cache.Get(shard.KeyFromString(dah.String()))
_, err = edsStore.cache.Load().Get(shard.KeyFromString(dah.String()))
require.Error(t, err)

// first read from direct accessor (not from cache)
Expand Down

0 comments on commit 27024ee

Please sign in to comment.