Skip to content

Commit

Permalink
mdbx added into storage V2
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverbundalo committed Feb 22, 2024
1 parent 0530769 commit 7ba3467
Show file tree
Hide file tree
Showing 11 changed files with 736 additions and 18 deletions.
20 changes: 10 additions & 10 deletions blockchain/storagev2/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ type levelDB struct {

// DB key = k + mapper
var tableMapper = map[uint8][]byte{
storagev2.BODY: []byte("b"), // DB key = block number + mapper
storagev2.CANONICAL: []byte("c"), // DB key = block number + mapper
storagev2.DIFFICULTY: []byte("d"), // DB key = block number + mapper
storagev2.HEADER: []byte("h"), // DB key = block number + mapper
storagev2.RECEIPTS: []byte("r"), // DB key = block number + mapper
storagev2.FORK: []byte("0000000f"), // DB key = empty + mapper
storagev2.HEAD_HASH: []byte("0000000h"), // DB key = empty + mapper
storagev2.HEAD_NUMBER: []byte("0000000n"), // DB key = empty + mapper
storagev2.BLOCK_LOOKUP: {}, // DB key = block hash + mapper, value = block number
storagev2.TX_LOOKUP: {}, // DB key = tx hash + mapper, value = block number
storagev2.BODY: []byte("b"), // DB key = block number + mapper
storagev2.CANONICAL: []byte("c"), // DB key = block number + mapper
storagev2.DIFFICULTY: []byte("d"), // DB key = block number + mapper
storagev2.HEADER: []byte("h"), // DB key = block number + mapper
storagev2.RECEIPTS: []byte("r"), // DB key = block number + mapper
storagev2.FORK: {}, // DB key = FORK_KEY + mapper
storagev2.HEAD_HASH: {}, // DB key = HEAD_HASH_KEY + mapper
storagev2.HEAD_NUMBER: {}, // DB key = HEAD_NUMBER_KEY + mapper
storagev2.BLOCK_LOOKUP: {}, // DB key = block hash + mapper, value = block number
storagev2.TX_LOOKUP: {}, // DB key = tx hash + mapper, value = block number
}

// NewLevelDBStorage creates the new storage reference with leveldb default options
Expand Down
2 changes: 1 addition & 1 deletion blockchain/storagev2/leveldb/leveldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func countLdbFilesInPath(path string) int {
func generateBlocks(t *testing.T, count int, ch chan *types.FullBlock, ctx context.Context) {
t.Helper()

ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(100 * time.Millisecond)

for i := 1; i <= count; i++ {
b := generateBlock(t, uint64(i))
Expand Down
46 changes: 46 additions & 0 deletions blockchain/storagev2/mdbx/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//nolint:errcheck
package mdbx

import (
"runtime"

"github.com/0xPolygon/polygon-edge/blockchain/storagev2"
"github.com/erigontech/mdbx-go/mdbx"
)

type batchMdbx struct {
tx *mdbx.Txn
dbi [storagev2.MAX_TABLES]mdbx.DBI
}

func newBatchMdbx(db *MdbxDB) *batchMdbx {
runtime.LockOSThread()

tx, err := db.env.BeginTxn(nil, 0)
if err != nil {
return nil
}

return &batchMdbx{
tx: tx,
dbi: db.dbi,
}
}

func (b *batchMdbx) Put(t uint8, k []byte, v []byte) {
if t&storagev2.GIDLID_INDEX != 0 {
// Random write
b.tx.Put(b.dbi[t], k, v, mdbx.NoDupData)
} else {
// Sequential write
b.tx.Put(b.dbi[t], k, v, mdbx.Append) // Append at the end
}
}

func (b *batchMdbx) Write() error {
defer runtime.UnlockOSThread()

_, err := b.tx.Commit()

return err
}
190 changes: 190 additions & 0 deletions blockchain/storagev2/mdbx/mdbx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package mdbx

import (
"os"

"github.com/0xPolygon/polygon-edge/blockchain/storagev2"
"github.com/erigontech/mdbx-go/mdbx"
"github.com/hashicorp/go-hclog"
)

type MdbxOpts struct {
// must be in the range from 12.5% (almost empty) to 50% (half empty)
// which corresponds to the range from 8192 and to 32768 in units respectively

// log log.Logger
// roTxsLimiter *semaphore.Weighted
// bucketsCfg TableCfgFunc
path string
// syncPeriod time.Duration
// mapSize datasize.ByteSize
// growthStep datasize.ByteSize
// shrinkThreshold int
flags uint
// pageSize uint64
// dirtySpace uint64 // if exeed this space, modified pages will `spill` to disk
// mergeThreshold uint64
// verbosity kv.DBVerbosityLvl
}

// MdbxDB is the mdbx implementation of the kv storage
type MdbxDB struct {
env *mdbx.Env
dbi [storagev2.MAX_TABLES]mdbx.DBI
}

var tableMapper = map[uint8]string{
storagev2.BODY: "Body",
storagev2.CANONICAL: "Canonical",
storagev2.DIFFICULTY: "Difficulty",
storagev2.HEADER: "Header",
storagev2.RECEIPTS: "Receipts",
storagev2.FORK: "Fork",
storagev2.HEAD_HASH: "HeadHash",
storagev2.HEAD_NUMBER: "HeadNumber",
storagev2.BLOCK_LOOKUP: "BlockLookup",
storagev2.TX_LOOKUP: "TxLookup",
}

// NewMdbxStorage creates the new storage reference for mdbx database
func NewMdbxStorage(path string, logger hclog.Logger) (*storagev2.Storage, error) {
var dbs [2]storagev2.Database

// Set default options
opts := &MdbxOpts{
path: path,
}

env, err := mdbx.NewEnv()
if err != nil {
return nil, err
}

if err = env.SetOption(mdbx.OptMaxDB, uint64(storagev2.MAX_TABLES)); err != nil {
return nil, err
}

if err = env.SetGeometry(-1, -1, 2*1024*1024*1024*1024, 2*1024*1024*1024, -1, int(defaultPageSize())); err != nil {
return nil, err
}

err = env.Open(opts.path, opts.flags, 0664)
if err != nil {
return nil, err
}

db := &MdbxDB{
env: env,
}

if err := db.openDBI(0); err != nil {
return nil, err
}

dbs[0] = db
dbs[1] = nil

return storagev2.Open(logger.Named("mdbx"), dbs)
}

func defaultPageSize() uint64 {
osPageSize := os.Getpagesize()
if osPageSize < 4096 { // reduce further may lead to errors (because some data is just big)
osPageSize = 4096
} else if osPageSize > mdbx.MaxPageSize {
osPageSize = mdbx.MaxPageSize
}

osPageSize = osPageSize / 4096 * 4096 // ensure it's rounded

return uint64(osPageSize)
}

func (db *MdbxDB) view(f func(tx *mdbx.Txn) error) (err error) {
// can't use db.env.View method - because it calls commit for read transactions - it conflicts with write transactions.
tx, err := db.env.BeginTxn(nil, mdbx.Readonly)
if err != nil {
return err
}

return f(tx)
}

func (db *MdbxDB) update(f func(tx *mdbx.Txn) error) (err error) {
tx, err := db.env.BeginTxn(nil, 0)
if err != nil {
return err
}

err = f(tx)
if err != nil {
return err
}

_, err = tx.Commit()
if err != nil {
return err
}

return nil
}

func (db *MdbxDB) openDBI(flags uint) error {
if flags&mdbx.Accede != 0 {
return db.view(func(tx *mdbx.Txn) error {
for i, name := range tableMapper {
dbi, err := tx.OpenDBISimple(name, mdbx.Accede)
if err == nil {
db.dbi[i] = dbi
} else {
return err
}
}

return nil
})
}

err := db.update(func(tx *mdbx.Txn) error {
for i, name := range tableMapper {
dbi, err := tx.OpenDBISimple(name, mdbx.Create)
if err == nil {
db.dbi[i] = dbi
} else {
return err
}
}

return nil
})

return err
}

// Get retrieves the key-value pair in mdbx storage
func (db *MdbxDB) Get(t uint8, k []byte) ([]byte, error) {
tx, err := db.env.BeginTxn(nil, mdbx.Readonly)
defer tx.Abort()

if err != nil {
return nil, err
}

v, err := tx.Get(db.dbi[t], k)
if err != nil {
return nil, err
}

return v, nil
}

// Close closes the mdbx storage instance
func (db *MdbxDB) Close() error {
db.env.Close()

return nil
}

func (db *MdbxDB) NewBatch() storagev2.Batch {
return newBatchMdbx(db)
}
Loading

0 comments on commit 7ba3467

Please sign in to comment.