Skip to content

Commit

Permalink
Problem: memiavl api don't work with FinalizeBlock (#1149)
Browse files Browse the repository at this point in the history
* Problem: memiavl api don't work with FinalizeBlock

FinalizeBlock in abci 2.0 need the WorkingHash API.

Solution:
- redesign the internals APIs to support WorkingHash.
  instead of exposing a single `Commit` API, now we exposes separate APIs for `ApplyChangeSets` and `Commit`.

* changelog

* tidy

* cleanup

* sdk46 compact WorkingHash

* unit tests
  • Loading branch information
yihuang authored Sep 1, 2023
1 parent 265459d commit d2a0003
Show file tree
Hide file tree
Showing 13 changed files with 343 additions and 119 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
- [#1092](https://github.com/crypto-org-chain/cronos/pull/1092) memiavl disable sdk address cache if zero-copy enabled, and disable zero-copy by default.
- [#1099](https://github.com/crypto-org-chain/cronos/pull/1099) clean up memiavl tmp directories left behind.
- [#940](https://github.com/crypto-org-chain/cronos/pull/940) Update rocksdb dependency to 8.1.1.
- [#1149](https://github.com/crypto-org-chain/cronos/pull/1149) memiavl support `WorkingHash` api required by `FinalizeBlock`.

*April 13, 2023*

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/cosmos/cosmos-sdk v0.46.15-0.20230807104542-537257060180
github.com/cosmos/gogoproto v1.4.8
github.com/cosmos/ibc-go/v6 v6.2.0
github.com/crypto-org-chain/cronos/store v0.0.2
github.com/crypto-org-chain/cronos/store v0.0.4
github.com/crypto-org-chain/cronos/versiondb v0.0.0-00010101000000-000000000000
github.com/ethereum/go-ethereum v1.10.26
github.com/evmos/ethermint v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -79,7 +79,7 @@ require (
github.com/cosmos/ibc-go/v5 v5.2.1 // indirect
github.com/cosmos/ledger-cosmos-go v0.12.2 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
github.com/crypto-org-chain/cronos/memiavl v0.0.3 // indirect
github.com/crypto-org-chain/cronos/memiavl v0.0.4 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
Expand Down
136 changes: 100 additions & 36 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"

storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/iavl"
"github.com/tendermint/tendermint/libs/log"
"github.com/tidwall/wal"
)
Expand Down Expand Up @@ -63,8 +65,8 @@ type DB struct {
walChan chan *walEntry
walQuit chan error

// pending store upgrades, will be written into WAL in next Commit call
pendingUpgrades []*TreeNameUpgrade
// pending changes, will be written into WAL in next Commit call
pendingLog WALEntry

// The assumptions to concurrency:
// - The methods on DB are protected by a mutex
Expand Down Expand Up @@ -281,19 +283,23 @@ func (db *DB) SetInitialVersion(initialVersion int64) error {
db.mtx.Lock()
defer db.mtx.Unlock()

if err := db.MultiTree.SetInitialVersion(initialVersion); err != nil {
return err
if db.readOnly {
return errReadOnly
}

if err := initEmptyDB(db.dir, db.initialVersion); err != nil {
if db.lastCommitInfo.Version > 0 {
return errors.New("initial version can only be set before any commit")
}

if err := db.MultiTree.SetInitialVersion(initialVersion); err != nil {
return err
}

return db.reload()
return initEmptyDB(db.dir, db.initialVersion)
}

// ApplyUpgrades wraps MultiTree.ApplyUpgrades, it also append the upgrades in a temporary field,
// and include in the WAL entry in next Commit call.
// ApplyUpgrades wraps MultiTree.ApplyUpgrades, it also append the upgrades in a pending log,
// which will be persisted to the WAL in next Commit call.
func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
db.mtx.Lock()
defer db.mtx.Unlock()
Expand All @@ -306,10 +312,63 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
return err
}

db.pendingUpgrades = append(db.pendingUpgrades, upgrades...)
db.pendingLog.Upgrades = append(db.pendingLog.Upgrades, upgrades...)
return nil
}

// ApplyChangeSets wraps MultiTree.ApplyChangeSets, it also append the changesets in the pending log,
// which will be persisted to the WAL in next Commit call.
func (db *DB) ApplyChangeSets(changeSets []*NamedChangeSet) error {
if len(changeSets) == 0 {
return nil
}

db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return errReadOnly
}

if len(db.pendingLog.Changesets) > 0 {
return errors.New("don't support multiple ApplyChangeSets calls in the same version")
}
db.pendingLog.Changesets = changeSets

return db.MultiTree.ApplyChangeSets(changeSets)
}

// ApplyChangeSet wraps MultiTree.ApplyChangeSet, it also append the changesets in the pending log,
// which will be persisted to the WAL in next Commit call.
func (db *DB) ApplyChangeSet(name string, changeSet iavl.ChangeSet) error {
if len(changeSet.Pairs) == 0 {
return nil
}

db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return errReadOnly
}

for _, cs := range db.pendingLog.Changesets {
if cs.Name == name {
return errors.New("don't support multiple ApplyChangeSet calls with the same name in the same version")
}
}

db.pendingLog.Changesets = append(db.pendingLog.Changesets, &NamedChangeSet{
Name: name,
Changeset: changeSet,
})
sort.SliceStable(db.pendingLog.Changesets, func(i, j int) bool {
return db.pendingLog.Changesets[i].Name < db.pendingLog.Changesets[j].Name
})

return db.MultiTree.ApplyChangeSet(name, changeSet)
}

// checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result
func (db *DB) checkAsyncTasks() error {
return errors.Join(
Expand Down Expand Up @@ -441,32 +500,23 @@ func (db *DB) pruneSnapshots() {
}()
}

// Commit wraps `MultiTree.ApplyChangeSet` to add some db level operations:
// - manage background snapshot rewriting
// - write WAL
func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
// Commit wraps SaveVersion to bump the version and writes the pending changes into log files to persist on disk
func (db *DB) Commit() ([]byte, int64, error) {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return nil, 0, errReadOnly
}

if err := db.checkAsyncTasks(); err != nil {
return nil, 0, err
}

hash, v, err := db.MultiTree.ApplyChangeSet(changeSets, true)
hash, v, err := db.MultiTree.SaveVersion(true)
if err != nil {
return nil, 0, err
}

// write logs if enabled
if db.wal != nil {
// write write-ahead-log
entry := walEntry{index: walIndex(v, db.initialVersion), data: &WALEntry{
Changesets: changeSets,
Upgrades: db.pendingUpgrades,
}}
entry := walEntry{index: walIndex(v, db.initialVersion), data: db.pendingLog}
if db.walChanSize >= 0 {
if db.walChan == nil {
db.initAsyncCommit()
Expand All @@ -485,8 +535,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
}
}

db.pendingUpgrades = db.pendingUpgrades[:0]
db.pendingLog = WALEntry{}

if err := db.checkAsyncTasks(); err != nil {
return nil, 0, err
}
db.rewriteIfApplicable(v)

return hash, v, nil
Expand Down Expand Up @@ -607,14 +660,8 @@ func (db *DB) reloadMultiTree(mtree *MultiTree) error {
}

db.MultiTree = *mtree

if len(db.pendingUpgrades) > 0 {
if err := db.MultiTree.ApplyUpgrades(db.pendingUpgrades); err != nil {
return err
}
}

return nil
// catch-up the pending changes
return db.MultiTree.applyWALEntry(db.pendingLog)
}

// rewriteIfApplicable execute the snapshot rewrite strategy according to current height
Expand Down Expand Up @@ -734,23 +781,40 @@ func (db *DB) LastCommitInfo() *storetypes.CommitInfo {
return db.MultiTree.LastCommitInfo()
}

// ApplyChangeSet wraps MultiTree.ApplyChangeSet to add a lock.
func (db *DB) ApplyChangeSet(changeSets []*NamedChangeSet, updateCommitInfo bool) ([]byte, int64, error) {
func (db *DB) SaveVersion(updateCommitInfo bool) ([]byte, int64, error) {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
return nil, 0, errReadOnly
}

return db.MultiTree.ApplyChangeSet(changeSets, updateCommitInfo)
return db.MultiTree.SaveVersion(updateCommitInfo)
}

func (db *DB) WorkingCommitInfo() *storetypes.CommitInfo {
db.mtx.Lock()
defer db.mtx.Unlock()

return db.MultiTree.WorkingCommitInfo()
}

func (db *DB) WorkingHash() []byte {
db.mtx.Lock()
defer db.mtx.Unlock()

return db.MultiTree.WorkingHash()
}

// UpdateCommitInfo wraps MultiTree.UpdateCommitInfo to add a lock.
func (db *DB) UpdateCommitInfo() []byte {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.readOnly {
panic("can't update commit info in read-only mode")
}

return db.MultiTree.UpdateCommitInfo()
}

Expand Down Expand Up @@ -936,7 +1000,7 @@ func createDBIfNotExist(dir string, initialVersion uint32) error {

type walEntry struct {
index uint64
data *WALEntry
data WALEntry
}

func isSnapshotName(name string) bool {
Expand Down
Loading

0 comments on commit d2a0003

Please sign in to comment.