Skip to content

Commit

Permalink
fix: removed cache and fixed shallow copy
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 5, 2024
1 parent abbe057 commit 536034b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 38 deletions.
2 changes: 2 additions & 0 deletions pkg/storer/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ func (c *Cache) ShallowCopy(

entries := make([]*cacheEntry, 0, len(addrs))

entriesToAdd := make([]*cacheEntry, 0, len(addrs))

defer func() {
if err != nil {
for _, entry := range entries {
Expand Down
10 changes: 0 additions & 10 deletions pkg/storer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,7 @@ func TestCache(t *testing.T) {
t.Fatal(err)
}
}
// return error for state update, which occurs at the end of Get/Put operations
retErr := errors.New("dummy error")
// st.putFn = func(i storage.Item) error {
// if i.Namespace() == "cacheOrderIndex" {
// return retErr
// }

// return st.Run(context.Background(), func(s transaction.Store) error {
// return s.IndexStore().Put(i)
// })
// }

// on error the cache expects the overarching transactions to clean itself up
// and undo any store updates. So here we only want to ensure the state is
Expand Down
10 changes: 3 additions & 7 deletions pkg/storer/internal/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

m "github.com/ethersphere/bee/pkg/metrics"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/cache"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -83,7 +81,7 @@ type transaction struct {
func (s *store) NewTransaction(ctx context.Context) (Transaction, func()) {

b := s.bstore.Batch(ctx)
indexTrx := cache.MustWrap(&indexTrx{s.bstore, b}, math.MaxInt)
indexTrx := &indexTrx{s.bstore, b}
sharyTrx := &sharkyTrx{s.sharky, s.metrics, nil, nil}

t := &transaction{
Expand Down Expand Up @@ -185,15 +183,13 @@ func (t *transaction) Commit() (err error) {
}

// IndexStore gives acces to the index store of the transaction.
// Write operations are cached, so following Reads returns what was recently written to the transaction.
// Note that writes are not persisted to the disk until the commit is called.
// Note that no writes are persisted to the disk until the commit is called.
func (t *transaction) IndexStore() storage.IndexStore {
return t.indexstore
}

// ChunkStore gives acces to the chunkstore of the transaction.
// Write operations are cached, so following Reads returns what was recently written to the transaction.
// Note that writes are not persisted to the disk until the commit is called.
// Note that no writes are persisted to the disk until the commit is called.
func (t *transaction) ChunkStore() storage.ChunkStore {
return t.chunkStore
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/transaction/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func Test_TransactionStorage(t *testing.T) {

has, err := st.ReadOnly().ChunkStore().Has(context.Background(), ch1.Address())
assert.NoError(t, err)
if has {
if !has {
t.Fatal("should NOT have chunk")
}
})
Expand Down
44 changes: 24 additions & 20 deletions pkg/storer/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error {
unlock := db.Lock(uploadsLock)
defer unlock()
return db.storage.Run(ctx, func(s transaction.Store) error {
return errors.Join(
uploadPutter.Put(ctx, s, chunk),
func() error {
if pinningPutter != nil {
return errors.Join(
db.storage.Run(ctx, func(s transaction.Store) error {
return uploadPutter.Put(ctx, s, chunk)
}),
func() error {
if pinningPutter != nil {
return db.storage.Run(ctx, func(s transaction.Store) error {
return pinningPutter.Put(ctx, s, chunk)
}
return nil
}(),
)
})
})
}
return nil
}(),
)
}),
db.metrics,
"uploadstore",
Expand All @@ -92,17 +94,19 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession
defer db.events.Trigger(subscribePushEventKey)
unlock := db.Lock(uploadsLock)
defer unlock()
return db.storage.Run(ctx, func(s transaction.Store) error {
return errors.Join(
uploadPutter.Close(s.IndexStore(), address),
func() error {
if pinningPutter != nil {
return errors.Join(
db.storage.Run(ctx, func(s transaction.Store) error {
return uploadPutter.Close(s.IndexStore(), address)
}),
func() error {
if pinningPutter != nil {
return db.storage.Run(ctx, func(s transaction.Store) error {
return pinningPutter.Close(s.IndexStore(), address)
}
return nil
}(),
)
})
})
}
return nil
}(),
)
},
cleanup: func() error {
defer db.events.Trigger(subscribePushEventKey)
Expand Down

0 comments on commit 536034b

Please sign in to comment.