Skip to content

Commit

Permalink
Merge pull request #677 from onflow/janez/seperate-pebble-db-creation…
Browse files Browse the repository at this point in the history
…-closure

Seperate creating and closing pebbleDB from storage
  • Loading branch information
janezpodhostnik authored Nov 26, 2024
2 parents bca29d5 + 79fd1ed commit 258a037
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 80 deletions.
31 changes: 17 additions & 14 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Bootstrap struct {
metrics *metrics2.Server
events *ingestion.Engine
profiler *api.ProfileServer
db *pebbleDB.DB
}

func New(config *config.Config) (*Bootstrap, error) {
Expand All @@ -72,7 +73,7 @@ func New(config *config.Config) (*Bootstrap, error) {
return nil, err
}

storages, err := setupStorage(config, client, logger)
db, storages, err := setupStorage(config, client, logger)
if err != nil {
return nil, err
}
Expand All @@ -83,6 +84,7 @@ func New(config *config.Config) (*Bootstrap, error) {
Transaction: models.NewPublisher[*gethTypes.Transaction](),
Logs: models.NewPublisher[[]*gethTypes.Log](),
},
db: db,
storages: storages,
logger: logger,
config: config,
Expand Down Expand Up @@ -387,10 +389,10 @@ func (b *Bootstrap) StopProfilerServer() {
}

func (b *Bootstrap) StopDB() {
if b.storages == nil || b.storages.Storage == nil {
if b.db == nil {
return
}
err := b.storages.Storage.Close()
err := b.db.Close()
if err != nil {
b.logger.Err(err).Msg("PebbleDB graceful shutdown failed")
}
Expand Down Expand Up @@ -465,12 +467,13 @@ func setupStorage(
config *config.Config,
client *requester.CrossSporkClient,
logger zerolog.Logger,
) (*Storages, error) {
) (*pebbleDB.DB, *Storages, error) {
// create pebble storage from the provided database root directory
store, err := pebble.New(config.DatabaseDir, logger)
db, err := pebble.OpenDB(config.DatabaseDir)
if err != nil {
return nil, err
return nil, nil, err
}
store := pebble.New(db, logger)

blocks := pebble.NewBlocks(store, config.FlowNetworkID)
storageAddress := evm.StorageAccountAddress(config.FlowNetworkID)
Expand All @@ -480,7 +483,7 @@ func setupStorage(
if config.ForceStartCadenceHeight != 0 {
logger.Warn().Uint64("height", config.ForceStartCadenceHeight).Msg("force setting starting Cadence height!!!")
if err := blocks.SetLatestCadenceHeight(config.ForceStartCadenceHeight, nil); err != nil {
return nil, err
return nil, nil, err
}
}

Expand All @@ -499,12 +502,12 @@ func setupStorage(
evmBlokcHeight := uint64(0)
cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
return nil, nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
}

snapshot, err := registerStore.GetSnapshotAt(evmBlokcHeight)
if err != nil {
return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
return nil, nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
}

delta := storage.NewRegisterDelta(snapshot)
Expand All @@ -515,16 +518,16 @@ func setupStorage(
accountStatus.ToBytes(),
)
if err != nil {
return nil, fmt.Errorf("could not set account status: %w", err)
return nil, nil, fmt.Errorf("could not set account status: %w", err)
}

err = registerStore.Store(delta.GetUpdates(), evmBlokcHeight, batch)
if err != nil {
return nil, fmt.Errorf("could not store register updates: %w", err)
return nil, nil, fmt.Errorf("could not store register updates: %w", err)
}

if err := blocks.InitHeights(cadenceHeight, cadenceBlock.ID, batch); err != nil {
return nil, fmt.Errorf(
return nil, nil, fmt.Errorf(
"failed to init the database for block height: %d and ID: %s, with : %w",
cadenceHeight,
cadenceBlock.ID,
Expand All @@ -534,7 +537,7 @@ func setupStorage(

err = batch.Commit(pebbleDB.Sync)
if err != nil {
return nil, fmt.Errorf("could not commit register updates: %w", err)
return nil, nil, fmt.Errorf("could not commit register updates: %w", err)
}

logger.Info().
Expand All @@ -545,7 +548,7 @@ func setupStorage(
// // TODO(JanezP): verify storage account owner is correct
//}

return &Storages{
return db, &Storages{
Storage: store,
Blocks: blocks,
Registers: registerStore,
Expand Down
3 changes: 2 additions & 1 deletion services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,9 @@ func defaultReplayerConfig() replayer.Config {
}

func setupStore(t *testing.T) (*pebble.Storage, *pebble.RegisterStorage) {
store, err := pebble.New(t.TempDir(), zerolog.Nop())
db, err := pebble.OpenDB(t.TempDir())
require.NoError(t, err)
store := pebble.New(db, zerolog.Nop())

storageAddress := evm.StorageAccountAddress(flowGo.Emulator)
registerStore := pebble.NewRegisterStorage(store, storageAddress)
Expand Down
3 changes: 2 additions & 1 deletion services/replayer/blocks_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ func TestGetSnapshotAt(t *testing.T) {

func setupBlocksDB(t *testing.T) (*pebble.Storage, storage.BlockIndexer) {
dir := t.TempDir()
db, err := pebble.New(dir, zerolog.Nop())
pebbleDB, err := pebble.OpenDB(dir)
require.NoError(t, err)
db := pebble.New(pebbleDB, zerolog.Nop())
batch := db.NewBatch()

chainID := flowGo.Emulator
Expand Down
62 changes: 62 additions & 0 deletions storage/pebble/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package pebble

import (
"fmt"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
)

// OpenDB opens a new pebble database at the provided directory.
func OpenDB(dir string) (*pebble.DB, error) {
cache := pebble.NewCache(1 << 20)
defer cache.Unref()

// currently pebble is only used for registers
opts := &pebble.Options{
Cache: cache,
Comparer: NewMVCCComparer(),
FormatMajorVersion: pebble.FormatNewest,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
// When the maximum number of bytes for a level is exceeded, compaction is requested.
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: make([]pebble.LevelOptions, 7),
MaxOpenFiles: 16384,
// Writes are stopped when the sum of the queued memtable sizes exceeds MemTableStopWritesThreshold*MemTableSize.
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
// The default is 1.
MaxConcurrentCompactions: func() int { return 4 },
}

for i := 0; i < len(opts.Levels); i++ {
l := &opts.Levels[i]
// The default is 4KiB (uncompressed), which is too small
// for good performance (esp. on stripped storage).
l.BlockSize = 32 << 10 // 32 KB
l.IndexBlockSize = 256 << 10 // 256 KB

// The bloom filter speedsup our SeekPrefixGE by skipping
// sstables that do not contain the prefix
l.FilterPolicy = bloom.FilterPolicy(MinLookupKeyLen)
l.FilterType = pebble.TableFilter

if i > 0 {
// L0 starts at 2MiB, each level is 2x the previous.
l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2
}
l.EnsureDefaults()
}

// Splitting sstables during flush allows increased compaction flexibility and concurrency when those
// tables are compacted to lower levels.
opts.FlushSplitBytes = opts.Levels[0].TargetFileSize
opts.EnsureDefaults()

db, err := pebble.Open(dir, opts)
if err != nil {
return nil, fmt.Errorf("failed to open db for dir: %s, with: %w", dir, err)
}
return db, nil
}
65 changes: 3 additions & 62 deletions storage/pebble/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package pebble

import (
"errors"
"fmt"
"io"

"github.com/cockroachdb/pebble/bloom"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"

Expand All @@ -18,64 +15,12 @@ type Storage struct {
log zerolog.Logger
}

// New creates a new storage instance using the provided dir location as the storage directory.
func New(dir string, log zerolog.Logger) (*Storage, error) {
cache := pebble.NewCache(1 << 20)
defer cache.Unref()

log = log.With().Str("component", "storage").Logger()

// currently pebble is only used for registers
opts := &pebble.Options{
Cache: cache,
Comparer: NewMVCCComparer(),
FormatMajorVersion: pebble.FormatNewest,
L0CompactionThreshold: 2,
L0StopWritesThreshold: 1000,
// When the maximum number of bytes for a level is exceeded, compaction is requested.
LBaseMaxBytes: 64 << 20, // 64 MB
Levels: make([]pebble.LevelOptions, 7),
MaxOpenFiles: 16384,
// Writes are stopped when the sum of the queued memtable sizes exceeds MemTableStopWritesThreshold*MemTableSize.
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
// The default is 1.
MaxConcurrentCompactions: func() int { return 4 },
}

for i := 0; i < len(opts.Levels); i++ {
l := &opts.Levels[i]
// The default is 4KiB (uncompressed), which is too small
// for good performance (esp. on stripped storage).
l.BlockSize = 32 << 10 // 32 KB
l.IndexBlockSize = 256 << 10 // 256 KB

// The bloom filter speedsup our SeekPrefixGE by skipping
// sstables that do not contain the prefix
l.FilterPolicy = bloom.FilterPolicy(MinLookupKeyLen)
l.FilterType = pebble.TableFilter

if i > 0 {
// L0 starts at 2MiB, each level is 2x the previous.
l.TargetFileSize = opts.Levels[i-1].TargetFileSize * 2
}
l.EnsureDefaults()
}

// Splitting sstables during flush allows increased compaction flexibility and concurrency when those
// tables are compacted to lower levels.
opts.FlushSplitBytes = opts.Levels[0].TargetFileSize
opts.EnsureDefaults()

db, err := pebble.Open(dir, opts)
if err != nil {
return nil, fmt.Errorf("failed to open db for dir: %s, with: %w", dir, err)
}

// New creates a new storage instance using the provided db.
func New(db *pebble.DB, log zerolog.Logger) *Storage {
return &Storage{
db: db,
log: log,
}, nil
}
}

// set key-value pair identified by key code (which act as an entity identifier).
Expand Down Expand Up @@ -116,7 +61,3 @@ func (s *Storage) get(keyCode byte, key ...[]byte) ([]byte, error) {
func (s *Storage) NewBatch() *pebble.Batch {
return s.db.NewBatch()
}

func (s *Storage) Close() error {
return s.db.Close()
}
3 changes: 2 additions & 1 deletion storage/pebble/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ func TestBatch(t *testing.T) {
func runDB(name string, t *testing.T, f func(t *testing.T, db *Storage)) {
dir := t.TempDir()

db, err := New(dir, zerolog.New(zerolog.NewTestWriter(t)))
pebbleDB, err := OpenDB(dir)
require.NoError(t, err)
db := New(pebbleDB, zerolog.New(zerolog.NewTestWriter(t)))

t.Run(name, func(t *testing.T) {
f(t, db)
Expand Down
3 changes: 2 additions & 1 deletion storage/register_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,9 @@ func Test_RegisterDeltaWithStorage(t *testing.T) {
func runDB(name string, t *testing.T, f func(t *testing.T, db *pebbleStorage.Storage)) {
dir := t.TempDir()

db, err := pebbleStorage.New(dir, zerolog.New(zerolog.NewTestWriter(t)))
pebbleDB, err := pebbleStorage.OpenDB(dir)
require.NoError(t, err)
db := pebbleStorage.New(pebbleDB, zerolog.New(zerolog.NewTestWriter(t)))

t.Run(name, func(t *testing.T) {
f(t, db)
Expand Down

0 comments on commit 258a037

Please sign in to comment.