Skip to content

Commit

Permalink
fix(localstore): leveldb-batch based transactions for both index and …
Browse files Browse the repository at this point in the history
…chunkstore for every localstore op (#4555)
  • Loading branch information
istae authored Jan 30, 2024
1 parent ff3eb1d commit 9424557
Show file tree
Hide file tree
Showing 93 changed files with 2,930 additions and 6,263 deletions.
13 changes: 2 additions & 11 deletions cmd/bee/cmd/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestDBExportImport(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("put chunk: ", ch.Address().String())
chunks[ch.Address().String()] = 0
}
db1.Close()
Expand Down Expand Up @@ -115,7 +114,6 @@ func TestDBExportImportPinning(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("collection ", rootAddr.String(), " put chunk: ", ch.Address().String())
chunks[ch.Address().String()] = 0
}
err = collection.Done(rootAddr)
Expand All @@ -125,16 +123,9 @@ func TestDBExportImportPinning(t *testing.T) {
pins[rootAddr.String()] = nil
}

addresses, err := db1.Pins()
if err != nil {
t.Fatal(err)
}
for _, addr := range addresses {
fmt.Println("pin: ", addr.String())
}
db1.Close()

err = newCommand(t, cmd.WithArgs("db", "export", "pinning", export, "--data-dir", dir1)).Execute()
err := newCommand(t, cmd.WithArgs("db", "export", "pinning", export, "--data-dir", dir1)).Execute()
if err != nil {
t.Fatal(err)
}
Expand All @@ -150,7 +141,7 @@ func TestDBExportImportPinning(t *testing.T) {
Logger: testutil.NewLogger(t),
ReserveCapacity: node.ReserveCapacity,
}, dir2)
addresses, err = db2.Pins()
addresses, err := db2.Pins()
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ const (
minPaymentThreshold = 2 * refreshRate // minimal accepted payment threshold of full nodes
maxPaymentThreshold = 24 * refreshRate // maximal accepted payment threshold of full nodes
mainnetNetworkID = uint64(1) //
ReserveCapacity = 4_194_304 // 2^22 chunks
reserveWakeUpDuration = 30 * time.Minute // time to wait before waking up reserveWorker
ReserveCapacity = 16384 // 2^14 chunks
reserveWakeUpDuration = 15 * time.Minute // time to wait before waking up reserveWorker
reserveTreshold = ReserveCapacity * 5 / 10
reserveMinimumRadius = 0
)
Expand Down
5 changes: 1 addition & 4 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
return nil, nil, err
}

caching, err := cache.Wrap(ldb, int(cacheCapacity))
if err != nil {
return nil, nil, err
}
caching := cache.MustWrap(ldb, int(cacheCapacity))
stateStore, err := storeadapter.NewStateStorerAdapter(caching)

return stateStore, caching, err
Expand Down
11 changes: 0 additions & 11 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"fmt"
"io"
"math"
"strconv"
"sync/atomic"
"time"

Expand All @@ -27,7 +26,6 @@ import (
"github.com/ethersphere/bee/pkg/storage"
storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
"resenje.org/multex"
"resenje.org/singleflight"
)

Expand Down Expand Up @@ -72,7 +70,6 @@ type Syncer struct {
validStamp postage.ValidStampFn
intervalsSF singleflight.Group[string, *collectAddrsResult]
syncInProgress atomic.Int32
binLock *multex.Multex

maxPage uint64

Expand All @@ -98,7 +95,6 @@ func New(
logger: logger.WithName(loggerName).Register(),
quit: make(chan struct{}),
maxPage: maxPage,
binLock: multex.New(),
}
}

Expand Down Expand Up @@ -261,13 +257,6 @@ func (s *Syncer) Sync(ctx context.Context, peer swarm.Address, bin uint8, start
s.metrics.Delivered.Add(float64(len(chunksToPut)))
s.metrics.LastReceived.WithLabelValues(fmt.Sprintf("%d", bin)).Add(float64(len(chunksToPut)))

// if we have parallel sync workers for the same bin, we need to rate limit them
// in order to not overload the storage with unnecessary requests as there is
// a chance that the same chunk is being synced by multiple workers.
key := strconv.Itoa(int(bin))
s.binLock.Lock(key)
defer s.binLock.Unlock(key)

for _, c := range chunksToPut {
if err := s.store.ReservePutter().Put(ctx, c); err != nil {
// in case of these errors, no new items are added to the storage, so it
Expand Down
77 changes: 40 additions & 37 deletions pkg/statestore/storeadapter/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,61 @@ import (
"github.com/ethersphere/bee/pkg/storage/migration"
)

func allSteps() migration.Steps {
func allSteps(st storage.BatchStore) migration.Steps {
return map[uint64]migration.StepFn{
1: epochMigration,
2: deletePrefix("sync_interval"),
3: deletePrefix("sync_interval"),
4: deletePrefix("blocklist"),
5: deletePrefix("batchstore"),
6: deletePrefix("sync_interval"),
7: deletePrefix("sync_interval"),
1: epochMigration(st),
2: deletePrefix(st, "sync_interval"),
3: deletePrefix(st, "sync_interval"),
4: deletePrefix(st, "blocklist"),
5: deletePrefix(st, "batchstore"),
6: deletePrefix(st, "sync_interval"),
7: deletePrefix(st, "sync_interval"),
}
}

func deletePrefix(prefix string) migration.StepFn {
return func(s storage.BatchedStore) error {
func deletePrefix(s storage.BatchStore, prefix string) migration.StepFn {
return func() error {
store := &StateStorerAdapter{s}
return store.Iterate(prefix, func(key, val []byte) (stop bool, err error) {
return false, store.Delete(string(key))
})
}
}

func epochMigration(s storage.BatchedStore) error {
func epochMigration(s storage.BatchStore) migration.StepFn {

var deleteEntries = []string{
"statestore_schema",
"tags",
"sync_interval",
"kademlia-counters",
"addressbook",
"batch",
}
return func() error {

return s.Iterate(storage.Query{
Factory: func() storage.Item { return &rawItem{&proxyItem{obj: []byte(nil)}} },
}, func(res storage.Result) (stop bool, err error) {
if strings.HasPrefix(res.ID, stateStoreNamespace) {
return false, nil
var deleteEntries = []string{
"statestore_schema",
"tags",
"sync_interval",
"kademlia-counters",
"addressbook",
"batch",
}
for _, e := range deleteEntries {
if strings.HasPrefix(res.ID, e) {
_ = s.Delete(&rawItem{&proxyItem{key: res.ID}})

return s.Iterate(storage.Query{
Factory: func() storage.Item { return &rawItem{&proxyItem{obj: []byte(nil)}} },
}, func(res storage.Result) (stop bool, err error) {
if strings.HasPrefix(res.ID, stateStoreNamespace) {
return false, nil
}
}
for _, e := range deleteEntries {
if strings.HasPrefix(res.ID, e) {
_ = s.Delete(&rawItem{&proxyItem{key: res.ID}})
return false, nil
}
}

item := res.Entry.(*rawItem)
item.key = res.ID
item.ns = stateStoreNamespace
if err := s.Put(item); err != nil {
return true, err
}
_ = s.Delete(&rawItem{&proxyItem{key: res.ID}})
return false, nil
})
item := res.Entry.(*rawItem)
item.key = res.ID
item.ns = stateStoreNamespace
if err := s.Put(item); err != nil {
return true, err
}
_ = s.Delete(&rawItem{&proxyItem{key: res.ID}})
return false, nil
})
}
}
4 changes: 2 additions & 2 deletions pkg/statestore/storeadapter/storeadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (s *StateStorerAdapter) deleteKeys(keys []string) error {
}

// NewStateStorerAdapter creates a new StateStorerAdapter.
func NewStateStorerAdapter(storage storage.BatchedStore) (*StateStorerAdapter, error) {
err := migration.Migrate(storage, "migration", allSteps())
func NewStateStorerAdapter(storage storage.BatchStore) (*StateStorerAdapter, error) {
err := migration.Migrate(storage, "migration", allSteps(storage))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ type Batch interface {
// Batcher specifies a constructor for creating new batches.
type Batcher interface {
// Batch returns a new Batch.
Batch(context.Context) (Batch, error)
Batch(context.Context) Batch
}
22 changes: 8 additions & 14 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package cache

import (
"errors"

"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storageutil"
lru "github.com/hashicorp/golang-lru/v2"
Expand All @@ -17,12 +15,12 @@ func key(key storage.Key) string {
return storageutil.JoinFields(key.Namespace(), key.ID())
}

var _ storage.BatchedStore = (*Cache)(nil)
var _ storage.BatchStore = (*Cache)(nil)

// Cache is a wrapper around a storage.Store that adds a layer
// of in-memory caching for the Get and Has operations.
type Cache struct {
storage.BatchedStore
storage.BatchStore

lru *lru.Cache[string, []byte]
metrics metrics
Expand All @@ -31,11 +29,7 @@ type Cache struct {
// Wrap adds a layer of in-memory caching to storage.Reader Get and Has operations.
// It returns an error if the capacity is less than or equal to zero or if the
// given store implements storage.Tx
func Wrap(store storage.BatchedStore, capacity int) (*Cache, error) {
if _, ok := store.(storage.Tx); ok {
return nil, errors.New("cache should not be used with transactions")
}

func Wrap(store storage.BatchStore, capacity int) (*Cache, error) {
lru, err := lru.New[string, []byte](capacity)
if err != nil {
return nil, err
Expand All @@ -45,7 +39,7 @@ func Wrap(store storage.BatchedStore, capacity int) (*Cache, error) {
}

// MustWrap is like Wrap but panics on error.
func MustWrap(store storage.BatchedStore, capacity int) *Cache {
func MustWrap(store storage.BatchStore, capacity int) *Cache {
c, err := Wrap(store, capacity)
if err != nil {
panic(err)
Expand All @@ -72,7 +66,7 @@ func (c *Cache) Get(i storage.Item) error {
return i.Unmarshal(val)
}

if err := c.BatchedStore.Get(i); err != nil {
if err := c.BatchStore.Get(i); err != nil {
return err
}

Expand All @@ -93,20 +87,20 @@ func (c *Cache) Has(k storage.Key) (bool, error) {
}

c.metrics.CacheMiss.Inc()
return c.BatchedStore.Has(k)
return c.BatchStore.Has(k)
}

// Put implements storage.Store interface.
// On a call it also inserts the item into the cache so that the next
// call to Put and Has will be able to retrieve the item from cache.
func (c *Cache) Put(i storage.Item) error {
c.add(i)
return c.BatchedStore.Put(i)
return c.BatchStore.Put(i)
}

// Delete implements storage.Store interface.
// On a call it also removes the item from the cache.
func (c *Cache) Delete(i storage.Item) error {
_ = c.lru.Remove(key(i))
return c.BatchedStore.Delete(i)
return c.BatchStore.Delete(i)
}
53 changes: 0 additions & 53 deletions pkg/storage/chunkstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package storage

import (
"context"
"fmt"
"io"

"github.com/ethersphere/bee/pkg/swarm"
)
Expand Down Expand Up @@ -68,7 +66,6 @@ type ChunkGetterDeleter interface {
}

type ChunkStore interface {
io.Closer
Getter
Putter
Deleter
Expand All @@ -82,53 +79,3 @@ type ReadOnlyChunkStore interface {
Getter
Hasser
}

type SizeReporter interface {
Size() (uint64, error)
Capacity() uint64
}

// Descriptor holds information required for Pull syncing. This struct
// is provided by subscribing to pull index.
type Descriptor struct {
Address swarm.Address
BinID uint64
}

func (d *Descriptor) String() string {
if d == nil {
return ""
}
return fmt.Sprintf("%s bin id %v", d.Address, d.BinID)
}

type PullSubscriber interface {
SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, closed <-chan struct{}, stop func())
}

type PushSubscriber interface {
SubscribePush(ctx context.Context) (c <-chan swarm.Chunk, stop func())
}

type ChunkState = int

const (
// ChunkSent is used by the pusher component to notify about successful push of chunk from
// the node. A chunk could be retried on failure so, this sent count is maintained to
// understand how many attempts were made by the node while pushing. The attempts are
// registered only when an actual request was sent from this node.
ChunkSent ChunkState = iota
// ChunkStored is used by the pusher component to notify that the uploader node is
// the closest node and has stored the chunk.
ChunkStored
// ChunkSynced is used by the pusher component to notify that the chunk is synced to the
// network. This is reported when a valid receipt was received after the chunk was
// pushed.
ChunkSynced
ChunkCouldNotSync
)

// PushReporter is used to report chunk state.
type PushReporter interface {
Report(context.Context, swarm.Chunk, ChunkState) error
}
Loading

0 comments on commit 9424557

Please sign in to comment.