diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b3def4e61..de12296b03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ - [#1092](https://github.com/crypto-org-chain/cronos/pull/1092) memiavl disable sdk address cache if zero-copy enabled, and disable zero-copy by default. - [#1099](https://github.com/crypto-org-chain/cronos/pull/1099) clean up memiavl tmp directories left behind. - [#940](https://github.com/crypto-org-chain/cronos/pull/940) Update rocksdb dependency to 8.1.1. +- [#1149](https://github.com/crypto-org-chain/cronos/pull/1149) memiavl support `WorkingHash` api required by `FinalizeBlock`. *April 13, 2023* diff --git a/go.mod b/go.mod index 434b506462..c3675ec0c0 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/cosmos/cosmos-sdk v0.46.15-0.20230807104542-537257060180 github.com/cosmos/gogoproto v1.4.8 github.com/cosmos/ibc-go/v6 v6.2.0 - github.com/crypto-org-chain/cronos/store v0.0.2 + github.com/crypto-org-chain/cronos/store v0.0.4 github.com/crypto-org-chain/cronos/versiondb v0.0.0-00010101000000-000000000000 github.com/ethereum/go-ethereum v1.10.26 github.com/evmos/ethermint v0.0.0-00010101000000-000000000000 @@ -79,7 +79,7 @@ require ( github.com/cosmos/ibc-go/v5 v5.2.1 // indirect github.com/cosmos/ledger-cosmos-go v0.12.2 // indirect github.com/creachadair/taskgroup v0.3.2 // indirect - github.com/crypto-org-chain/cronos/memiavl v0.0.3 // indirect + github.com/crypto-org-chain/cronos/memiavl v0.0.4 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set v1.8.0 // indirect diff --git a/memiavl/db.go b/memiavl/db.go index c2669cafbe..38f3093a3b 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -5,12 +5,14 @@ import ( "fmt" "os" "path/filepath" + "sort" "strconv" "strings" "sync" "time" storetypes "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/iavl" "github.com/tendermint/tendermint/libs/log" "github.com/tidwall/wal" ) @@ -63,8 +65,8 @@ type DB struct { walChan chan *walEntry walQuit chan error - // pending store upgrades, will be written into WAL in next Commit call - pendingUpgrades []*TreeNameUpgrade + // pending changes, will be written into WAL in next Commit call + pendingLog WALEntry // The assumptions to concurrency: // - The methods on DB are protected by a mutex @@ -281,19 +283,23 @@ func (db *DB) SetInitialVersion(initialVersion int64) error { db.mtx.Lock() defer db.mtx.Unlock() - if err := db.MultiTree.SetInitialVersion(initialVersion); err != nil { - return err + if db.readOnly { + return errReadOnly } - if err := initEmptyDB(db.dir, db.initialVersion); err != nil { + if db.lastCommitInfo.Version > 0 { + return errors.New("initial version can only be set before any commit") + } + + if err := db.MultiTree.SetInitialVersion(initialVersion); err != nil { return err } - return db.reload() + return initEmptyDB(db.dir, db.initialVersion) } -// ApplyUpgrades wraps MultiTree.ApplyUpgrades, it also append the upgrades in a temporary field, -// and include in the WAL entry in next Commit call. +// ApplyUpgrades wraps MultiTree.ApplyUpgrades, it also append the upgrades in a pending log, +// which will be persisted to the WAL in next Commit call. func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error { db.mtx.Lock() defer db.mtx.Unlock() @@ -306,10 +312,63 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error { return err } - db.pendingUpgrades = append(db.pendingUpgrades, upgrades...) + db.pendingLog.Upgrades = append(db.pendingLog.Upgrades, upgrades...) return nil } +// ApplyChangeSets wraps MultiTree.ApplyChangeSets, it also append the changesets in the pending log, +// which will be persisted to the WAL in next Commit call. +func (db *DB) ApplyChangeSets(changeSets []*NamedChangeSet) error { + if len(changeSets) == 0 { + return nil + } + + db.mtx.Lock() + defer db.mtx.Unlock() + + if db.readOnly { + return errReadOnly + } + + if len(db.pendingLog.Changesets) > 0 { + return errors.New("don't support multiple ApplyChangeSets calls in the same version") + } + db.pendingLog.Changesets = changeSets + + return db.MultiTree.ApplyChangeSets(changeSets) +} + +// ApplyChangeSet wraps MultiTree.ApplyChangeSet, it also append the changesets in the pending log, +// which will be persisted to the WAL in next Commit call. +func (db *DB) ApplyChangeSet(name string, changeSet iavl.ChangeSet) error { + if len(changeSet.Pairs) == 0 { + return nil + } + + db.mtx.Lock() + defer db.mtx.Unlock() + + if db.readOnly { + return errReadOnly + } + + for _, cs := range db.pendingLog.Changesets { + if cs.Name == name { + return errors.New("don't support multiple ApplyChangeSet calls with the same name in the same version") + } + } + + db.pendingLog.Changesets = append(db.pendingLog.Changesets, &NamedChangeSet{ + Name: name, + Changeset: changeSet, + }) + sort.SliceStable(db.pendingLog.Changesets, func(i, j int) bool { + return db.pendingLog.Changesets[i].Name < db.pendingLog.Changesets[j].Name + }) + + return db.MultiTree.ApplyChangeSet(name, changeSet) +} + // checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result func (db *DB) checkAsyncTasks() error { return errors.Join( @@ -441,10 +500,8 @@ func (db *DB) pruneSnapshots() { }() } -// Commit wraps `MultiTree.ApplyChangeSet` to add some db level operations: -// - manage background snapshot rewriting -// - write WAL -func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { +// Commit wraps SaveVersion to bump the version and writes the pending changes into log files to persist on disk +func (db *DB) Commit() ([]byte, int64, error) { db.mtx.Lock() defer db.mtx.Unlock() @@ -452,21 +509,14 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { return nil, 0, errReadOnly } - if err := db.checkAsyncTasks(); err != nil { - return nil, 0, err - } - - hash, v, err := db.MultiTree.ApplyChangeSet(changeSets, true) + hash, v, err := db.MultiTree.SaveVersion(true) if err != nil { return nil, 0, err } + // write logs if enabled if db.wal != nil { - // write write-ahead-log - entry := walEntry{index: walIndex(v, db.initialVersion), data: &WALEntry{ - Changesets: changeSets, - Upgrades: db.pendingUpgrades, - }} + entry := walEntry{index: walIndex(v, db.initialVersion), data: db.pendingLog} if db.walChanSize >= 0 { if db.walChan == nil { db.initAsyncCommit() @@ -485,8 +535,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { } } - db.pendingUpgrades = db.pendingUpgrades[:0] + db.pendingLog = WALEntry{} + if err := db.checkAsyncTasks(); err != nil { + return nil, 0, err + } db.rewriteIfApplicable(v) return hash, v, nil @@ -607,14 +660,8 @@ func (db *DB) reloadMultiTree(mtree *MultiTree) error { } db.MultiTree = *mtree - - if len(db.pendingUpgrades) > 0 { - if err := db.MultiTree.ApplyUpgrades(db.pendingUpgrades); err != nil { - return err - } - } - - return nil + // catch-up the pending changes + return db.MultiTree.applyWALEntry(db.pendingLog) } // rewriteIfApplicable execute the snapshot rewrite strategy according to current height @@ -734,8 +781,7 @@ func (db *DB) LastCommitInfo() *storetypes.CommitInfo { return db.MultiTree.LastCommitInfo() } -// ApplyChangeSet wraps MultiTree.ApplyChangeSet to add a lock. -func (db *DB) ApplyChangeSet(changeSets []*NamedChangeSet, updateCommitInfo bool) ([]byte, int64, error) { +func (db *DB) SaveVersion(updateCommitInfo bool) ([]byte, int64, error) { db.mtx.Lock() defer db.mtx.Unlock() @@ -743,7 +789,21 @@ func (db *DB) ApplyChangeSet(changeSets []*NamedChangeSet, updateCommitInfo bool return nil, 0, errReadOnly } - return db.MultiTree.ApplyChangeSet(changeSets, updateCommitInfo) + return db.MultiTree.SaveVersion(updateCommitInfo) +} + +func (db *DB) WorkingCommitInfo() *storetypes.CommitInfo { + db.mtx.Lock() + defer db.mtx.Unlock() + + return db.MultiTree.WorkingCommitInfo() +} + +func (db *DB) WorkingHash() []byte { + db.mtx.Lock() + defer db.mtx.Unlock() + + return db.MultiTree.WorkingHash() } // UpdateCommitInfo wraps MultiTree.UpdateCommitInfo to add a lock. @@ -751,6 +811,10 @@ func (db *DB) UpdateCommitInfo() []byte { db.mtx.Lock() defer db.mtx.Unlock() + if db.readOnly { + panic("can't update commit info in read-only mode") + } + return db.MultiTree.UpdateCommitInfo() } @@ -936,7 +1000,7 @@ func createDBIfNotExist(dir string, initialVersion uint32) error { type walEntry struct { index uint64 - data *WALEntry + data WALEntry } func isSnapshotName(name string) bool { diff --git a/memiavl/db_test.go b/memiavl/db_test.go index 469b1a9bd8..363b81a5b8 100644 --- a/memiavl/db_test.go +++ b/memiavl/db_test.go @@ -29,7 +29,8 @@ func TestRewriteSnapshot(t *testing.T) { }, } t.Run(strconv.Itoa(i), func(t *testing.T) { - _, v, err := db.Commit(cs) + require.NoError(t, db.ApplyChangeSets(cs)) + _, v, err := db.Commit() require.NoError(t, err) require.Equal(t, i+1, int(v)) require.Equal(t, RefHashes[i], db.lastCommitInfo.StoreInfos[0].CommitId.Hash) @@ -94,7 +95,8 @@ func TestRewriteSnapshotBackground(t *testing.T) { Changeset: changes, }, } - _, v, err := db.Commit(cs) + require.NoError(t, db.ApplyChangeSets(cs)) + _, v, err := db.Commit() require.NoError(t, err) require.Equal(t, i+1, int(v)) require.Equal(t, RefHashes[i], db.lastCommitInfo.StoreInfos[0].CommitId.Hash) @@ -129,7 +131,8 @@ func TestWAL(t *testing.T) { Changeset: changes, }, } - _, _, err := db.Commit(cs) + require.NoError(t, db.ApplyChangeSets(cs)) + _, _, err := db.Commit() require.NoError(t, err) } @@ -145,7 +148,7 @@ func TestWAL(t *testing.T) { Delete: true, }, })) - _, _, err = db.Commit(nil) + _, _, err = db.Commit() require.NoError(t, err) require.NoError(t, db.Close()) @@ -183,7 +186,8 @@ func TestInitialVersion(t *testing.T) { db, err := Load(dir, Options{CreateIfMissing: true, InitialStores: []string{name}}) require.NoError(t, err) db.SetInitialVersion(initialVersion) - hash, v, err := db.Commit(mockNameChangeSet(name, key, value)) + require.NoError(t, db.ApplyChangeSets(mockNameChangeSet(name, key, value))) + hash, v, err := db.Commit() require.NoError(t, err) if initialVersion <= 1 { require.Equal(t, int64(1), v) @@ -191,7 +195,8 @@ func TestInitialVersion(t *testing.T) { require.Equal(t, initialVersion, v) } require.Equal(t, "2b650e7f3495c352dbf575759fee86850e4fc63291a5889847890ebf12e3f585", hex.EncodeToString(hash)) - hash, v, err = db.Commit(mockNameChangeSet(name, key, "world1")) + require.NoError(t, db.ApplyChangeSets(mockNameChangeSet(name, key, "world1"))) + hash, v, err = db.Commit() require.NoError(t, err) if initialVersion <= 1 { require.Equal(t, int64(2), v) @@ -209,7 +214,8 @@ func TestInitialVersion(t *testing.T) { require.Equal(t, hex.EncodeToString(hash), hex.EncodeToString(db.Hash())) db.ApplyUpgrades([]*TreeNameUpgrade{{Name: name1}}) - _, v, err = db.Commit((mockNameChangeSet(name1, key, value))) + require.NoError(t, db.ApplyChangeSets(mockNameChangeSet(name1, key, value))) + _, v, err = db.Commit() require.NoError(t, err) if initialVersion <= 1 { require.Equal(t, int64(3), v) @@ -228,7 +234,8 @@ func TestInitialVersion(t *testing.T) { require.NoError(t, db.Reload()) db.ApplyUpgrades([]*TreeNameUpgrade{{Name: name2}}) - _, v, err = db.Commit((mockNameChangeSet(name2, key, value))) + require.NoError(t, db.ApplyChangeSets(mockNameChangeSet(name2, key, value))) + _, v, err = db.Commit() require.NoError(t, err) if initialVersion <= 1 { require.Equal(t, int64(4), v) @@ -259,11 +266,18 @@ func TestLoadVersion(t *testing.T) { }, } t.Run(strconv.Itoa(i), func(t *testing.T) { - _, _, err := db.Commit(cs) + require.NoError(t, db.ApplyChangeSets(cs)) + + // check the working hash + workingHash := db.WorkingHash() + require.Equal(t, RefHashes[db.Version()], db.WorkingCommitInfo().StoreInfos[0].CommitId.Hash) + + h, _, err := db.Commit() require.NoError(t, err) + require.Equal(t, workingHash, h) }) } - require.NoError(t, db.WaitAsyncCommit()) + require.NoError(t, db.Close()) for v, expItems := range ExpectItems { if v == 0 { @@ -282,10 +296,10 @@ func TestLoadVersion(t *testing.T) { func TestZeroCopy(t *testing.T) { db, err := Load(t.TempDir(), Options{InitialStores: []string{"test", "test2"}, CreateIfMissing: true, ZeroCopy: true}) require.NoError(t, err) - _, _, err = db.Commit([]*NamedChangeSet{ + require.NoError(t, db.ApplyChangeSets([]*NamedChangeSet{ {Name: "test", Changeset: ChangeSets[0]}, - {Name: "test2"}, - }) + })) + _, _, err = db.Commit() require.NoError(t, err) require.NoError(t, errors.Join( db.RewriteSnapshot(), @@ -293,10 +307,10 @@ func TestZeroCopy(t *testing.T) { )) // the test tree's root hash will reference the zero-copy value - _, _, err = db.Commit([]*NamedChangeSet{ - {Name: "test"}, + require.NoError(t, db.ApplyChangeSets([]*NamedChangeSet{ {Name: "test2", Changeset: ChangeSets[0]}, - }) + })) + _, _, err = db.Commit() require.NoError(t, err) commitInfo := *db.LastCommitInfo() @@ -347,7 +361,7 @@ func TestEmptyValue(t *testing.T) { db, err := Load(dir, Options{InitialStores: []string{"test"}, CreateIfMissing: true, ZeroCopy: true}) require.NoError(t, err) - _, _, err = db.Commit([]*NamedChangeSet{ + require.NoError(t, db.ApplyChangeSets([]*NamedChangeSet{ {Name: "test", Changeset: iavl.ChangeSet{ Pairs: []*iavl.KVPair{ {Key: []byte("hello1"), Value: []byte("")}, @@ -355,13 +369,16 @@ func TestEmptyValue(t *testing.T) { {Key: []byte("hello3"), Value: []byte("")}, }, }}, - }) + })) + _, _, err = db.Commit() require.NoError(t, err) - hash, version, err := db.Commit([]*NamedChangeSet{ + + require.NoError(t, db.ApplyChangeSets([]*NamedChangeSet{ {Name: "test", Changeset: iavl.ChangeSet{ Pairs: []*iavl.KVPair{{Key: []byte("hello1"), Delete: true}}, }}, - }) + })) + hash, version, err := db.Commit() require.NoError(t, err) require.NoError(t, db.Close()) @@ -425,10 +442,68 @@ func TestFastCommit(t *testing.T) { // the bug reproduce when the wal writing is slower than commit, that happens when wal segment is full and create a new one, the wal writing will slow down a little bit, // segment size is 20m, each change set is 1m, so we need a bit more than 20 commits to reproduce. for i := 0; i < 30; i++ { - _, _, err := db.Commit([]*NamedChangeSet{{Name: "test", Changeset: cs}}) + require.NoError(t, db.ApplyChangeSets([]*NamedChangeSet{{Name: "test", Changeset: cs}})) + _, _, err := db.Commit() require.NoError(t, err) } <-db.snapshotRewriteChan require.NoError(t, db.Close()) } + +func TestRepeatedApplyChangeSet(t *testing.T) { + db, err := Load(t.TempDir(), Options{CreateIfMissing: true, InitialStores: []string{"test1", "test2"}, SnapshotInterval: 3, AsyncCommitBuffer: 10}) + require.NoError(t, err) + + err = db.ApplyChangeSets([]*NamedChangeSet{ + {Name: "test1", Changeset: iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello1"), Value: []byte("world1")}, + }, + }}, + {Name: "test2", Changeset: iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello2"), Value: []byte("world2")}, + }, + }}, + }) + require.NoError(t, err) + + err = db.ApplyChangeSets([]*NamedChangeSet{{Name: "test1"}}) + require.Error(t, err) + err = db.ApplyChangeSet("test1", iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello2"), Value: []byte("world2")}, + }, + }) + require.Error(t, err) + + _, _, err = db.Commit() + require.NoError(t, err) + + err = db.ApplyChangeSet("test1", iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello2"), Value: []byte("world2")}, + }, + }) + require.NoError(t, err) + err = db.ApplyChangeSet("test2", iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello2"), Value: []byte("world2")}, + }, + }) + require.NoError(t, err) + + err = db.ApplyChangeSet("test1", iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello2"), Value: []byte("world2")}, + }, + }) + require.Error(t, err) + err = db.ApplyChangeSet("test2", iavl.ChangeSet{ + Pairs: []*iavl.KVPair{ + {Key: []byte("hello2"), Value: []byte("world2")}, + }, + }) + require.Error(t, err) +} diff --git a/memiavl/iterator_test.go b/memiavl/iterator_test.go index 5a865c4787..4b20475b50 100644 --- a/memiavl/iterator_test.go +++ b/memiavl/iterator_test.go @@ -12,7 +12,8 @@ func TestIterator(t *testing.T) { require.Equal(t, ExpectItems[0], collectIter(tree.Iterator(nil, nil, true))) for _, changes := range ChangeSets { - _, v, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, v, err := tree.SaveVersion(true) require.NoError(t, err) require.Equal(t, ExpectItems[v], collectIter(tree.Iterator(nil, nil, true))) require.Equal(t, reverse(ExpectItems[v]), collectIter(tree.Iterator(nil, nil, false))) @@ -22,7 +23,8 @@ func TestIterator(t *testing.T) { func TestIteratorRange(t *testing.T) { tree := New(0) for _, changes := range ChangeSets[:6] { - _, _, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) } diff --git a/memiavl/multitree.go b/memiavl/multitree.go index e350709753..60ef55a853 100644 --- a/memiavl/multitree.go +++ b/memiavl/multitree.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/errors" storetypes "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/iavl" "github.com/tidwall/wal" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" @@ -191,6 +192,13 @@ func (t *MultiTree) LastCommitInfo() *storetypes.CommitInfo { return &t.lastCommitInfo } +func (t *MultiTree) applyWALEntry(entry WALEntry) error { + if err := t.ApplyUpgrades(entry.Upgrades); err != nil { + return err + } + return t.ApplyChangeSets(entry.Changesets) +} + // ApplyUpgrades store name upgrades func (t *MultiTree) ApplyUpgrades(upgrades []*TreeNameUpgrade) error { if len(upgrades) == 0 { @@ -241,38 +249,57 @@ func (t *MultiTree) ApplyUpgrades(upgrades []*TreeNameUpgrade) error { return nil } -// ApplyChangeSet applies change sets for all trees. -// if `updateCommitInfo` is `false`, the `lastCommitInfo.StoreInfos` is dirty. -func (t *MultiTree) ApplyChangeSet(changeSets []*NamedChangeSet, updateCommitInfo bool) ([]byte, int64, error) { - version := nextVersion(t.lastCommitInfo.Version, t.initialVersion) +// ApplyChangeSet applies change set for a single tree. +func (t *MultiTree) ApplyChangeSet(name string, changeSet iavl.ChangeSet) error { + i, found := t.treesByName[name] + if !found { + return fmt.Errorf("unknown tree name %s", name) + } + t.trees[i].Tree.ApplyChangeSet(changeSet) + return nil +} +// ApplyChangeSets applies change sets for multiple trees. +func (t *MultiTree) ApplyChangeSets(changeSets []*NamedChangeSet) error { for _, cs := range changeSets { - tree := t.trees[t.treesByName[cs.Name]].Tree + if err := t.ApplyChangeSet(cs.Name, cs.Changeset); err != nil { + return err + } + } + return nil +} - _, v, err := tree.ApplyChangeSet(cs.Changeset, updateCommitInfo) - if err != nil { +// WorkingCommitInfo returns the commit info for the working tree +func (t *MultiTree) WorkingCommitInfo() *storetypes.CommitInfo { + version := nextVersion(t.lastCommitInfo.Version, t.initialVersion) + return t.buildCommitInfo(version) +} + +func (t *MultiTree) WorkingHash() []byte { + return t.WorkingCommitInfo().Hash() +} + +// SaveVersion bumps the versions of all the stores and optionally returns the new app hash +func (t *MultiTree) SaveVersion(updateCommitInfo bool) ([]byte, int64, error) { + t.lastCommitInfo.Version = nextVersion(t.lastCommitInfo.Version, t.initialVersion) + for _, entry := range t.trees { + if _, _, err := entry.Tree.SaveVersion(updateCommitInfo); err != nil { return nil, 0, err } - if v != version { - return nil, 0, fmt.Errorf("multi tree version don't match(%d != %d)", v, version) - } } - t.lastCommitInfo.Version = version - var hash []byte if updateCommitInfo { hash = t.UpdateCommitInfo() } else { + // clear the dirty informaton t.lastCommitInfo.StoreInfos = []storetypes.StoreInfo{} } - return hash, version, nil + return hash, t.lastCommitInfo.Version, nil } -// UpdateCommitInfo update lastCommitInfo based on current status of trees. -// it's needed if `updateCommitInfo` is set to `false` in `ApplyChangeSet`. -func (t *MultiTree) UpdateCommitInfo() []byte { +func (t *MultiTree) buildCommitInfo(version int64) *storetypes.CommitInfo { var infos []storetypes.StoreInfo for _, entry := range t.trees { infos = append(infos, storetypes.StoreInfo{ @@ -284,7 +311,16 @@ func (t *MultiTree) UpdateCommitInfo() []byte { }) } - t.lastCommitInfo.StoreInfos = infos + return &storetypes.CommitInfo{ + Version: version, + StoreInfos: infos, + } +} + +// UpdateCommitInfo update lastCommitInfo based on current status of trees. +// it's needed if `updateCommitInfo` is set to `false` in `ApplyChangeSet`. +func (t *MultiTree) UpdateCommitInfo() []byte { + t.lastCommitInfo = *t.buildCommitInfo(t.lastCommitInfo.Version) return t.lastCommitInfo.Hash() } @@ -323,10 +359,10 @@ func (t *MultiTree) CatchupWAL(wal *wal.Log, endVersion int64) error { if err := entry.Unmarshal(bz); err != nil { return errors.Wrap(err, "unmarshal wal log failed") } - if err := t.ApplyUpgrades(entry.Upgrades); err != nil { - return errors.Wrap(err, "replay store upgrades failed") + if err := t.applyWALEntry(entry); err != nil { + return errors.Wrap(err, "replay wal entry failed") } - if _, _, err := t.ApplyChangeSet(entry.Changesets, false); err != nil { + if _, _, err := t.SaveVersion(false); err != nil { return errors.Wrap(err, "replay change set failed") } } diff --git a/memiavl/proof_test.go b/memiavl/proof_test.go index 8d6005d1bf..18672c4eb8 100644 --- a/memiavl/proof_test.go +++ b/memiavl/proof_test.go @@ -28,7 +28,8 @@ func TestProofs(t *testing.T) { for i, tc := range testCases { t.Run(strconv.Itoa(i), func(t *testing.T) { changes := ChangeSets[i] - _, _, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) proof, err := tree.GetMembershipProof(tc.existKey) diff --git a/memiavl/snapshot_test.go b/memiavl/snapshot_test.go index 16c2089b1b..a1effa58c6 100644 --- a/memiavl/snapshot_test.go +++ b/memiavl/snapshot_test.go @@ -14,7 +14,8 @@ func TestSnapshotEncodingRoundTrip(t *testing.T) { // setup test tree tree := New(0) for _, changes := range ChangeSets[:len(ChangeSets)-1] { - _, _, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) } @@ -41,7 +42,8 @@ func TestSnapshotEncodingRoundTrip(t *testing.T) { snapshot, err = OpenSnapshot(snapshotDir) require.NoError(t, err) tree3 := NewFromSnapshot(snapshot, true, 0) - hash, v, err := tree3.ApplyChangeSet(ChangeSets[len(ChangeSets)-1], true) + tree3.ApplyChangeSet(ChangeSets[len(ChangeSets)-1]) + hash, v, err := tree3.SaveVersion(true) require.NoError(t, err) require.Equal(t, RefHashes[len(ChangeSets)-1], hash) require.Equal(t, len(ChangeSets), int(v)) @@ -62,7 +64,8 @@ func TestSnapshotExport(t *testing.T) { // setup test tree tree := New(0) for _, changes := range ChangeSets[:3] { - _, _, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) } @@ -90,7 +93,8 @@ func TestSnapshotImportExport(t *testing.T) { // setup test tree tree := New(0) for _, changes := range ChangeSets { - _, _, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) } @@ -146,7 +150,8 @@ func TestDBSnapshotRestore(t *testing.T) { Changeset: changes, }, } - _, _, err := db.Commit(cs) + require.NoError(t, db.ApplyChangeSets(cs)) + _, _, err := db.Commit() require.NoError(t, err) testSnapshotRoundTrip(t, db) @@ -175,7 +180,7 @@ func testSnapshotRoundTrip(t *testing.T, db *DB) { require.Equal(t, db.Hash(), db2.Hash()) // the imported db function normally - _, _, err = db2.Commit(nil) + _, _, err = db2.Commit() require.NoError(t, err) } diff --git a/memiavl/tree.go b/memiavl/tree.go index 79067f27ed..4e4f419eea 100644 --- a/memiavl/tree.go +++ b/memiavl/tree.go @@ -110,7 +110,7 @@ func (t *Tree) Copy(cacheSize int) *Tree { } // ApplyChangeSet apply the change set of a whole version, and update hashes. -func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet, updateHash bool) ([]byte, int64, error) { +func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet) { for _, pair := range changeSet.Pairs { if pair.Delete { t.remove(pair.Key) @@ -118,8 +118,6 @@ func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet, updateHash bool) ([]byte t.set(pair.Key, pair.Value) } } - - return t.saveVersion(updateHash) } func (t *Tree) set(key, value []byte) { @@ -136,24 +134,18 @@ func (t *Tree) remove(key []byte) { t.cache.Remove(key) } -// saveVersion increases the version number and optionally updates the hashes -func (t *Tree) saveVersion(updateHash bool) ([]byte, int64, error) { - var hash []byte - if updateHash { - hash = t.RootHash() - } - +// SaveVersion increases the version number and optionally updates the hashes +func (t *Tree) SaveVersion(updateHash bool) ([]byte, int64, error) { if t.version >= uint32(math.MaxUint32) { return nil, 0, errors.New("version overflows uint32") } - t.version++ - // to be compatible with existing golang iavl implementation. - // see: https://github.com/cosmos/iavl/pull/660 - if t.version == 1 && t.initialVersion > 0 { - t.version = t.initialVersion + var hash []byte + if updateHash { + hash = t.RootHash() } + t.version = nextVersionU32(t.version, t.initialVersion) return hash, int64(t.version), nil } @@ -279,3 +271,12 @@ func (t *Tree) Close() error { t.root = nil return err } + +// nextVersionU32 is compatible with existing golang iavl implementation. +// see: https://github.com/cosmos/iavl/pull/660 +func nextVersionU32(v uint32, initialVersion uint32) uint32 { + if v == 0 && initialVersion > 1 { + return initialVersion + } + return v + 1 +} diff --git a/memiavl/tree_test.go b/memiavl/tree_test.go index fdff822903..79005076b6 100644 --- a/memiavl/tree_test.go +++ b/memiavl/tree_test.go @@ -154,7 +154,8 @@ func TestRootHashes(t *testing.T) { tree := New(0) for i, changes := range ChangeSets { - hash, v, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + hash, v, err := tree.SaveVersion(true) require.NoError(t, err) require.Equal(t, i+1, int(v)) require.Equal(t, RefHashes[i], hash) @@ -167,7 +168,7 @@ func TestNewKey(t *testing.T) { for i := 0; i < 4; i++ { tree.set([]byte(fmt.Sprintf("key-%d", i)), []byte{1}) } - _, _, err := tree.saveVersion(true) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) // the smallest key in the right half of the tree @@ -188,16 +189,18 @@ func TestEmptyTree(t *testing.T) { func TestTreeCopy(t *testing.T) { tree := New(0) - _, _, err := tree.ApplyChangeSet(iavl.ChangeSet{Pairs: []*iavl.KVPair{ + tree.ApplyChangeSet(iavl.ChangeSet{Pairs: []*iavl.KVPair{ {Key: []byte("hello"), Value: []byte("world")}, - }}, true) + }}) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) snapshot := tree.Copy(0) - _, _, err = tree.ApplyChangeSet(iavl.ChangeSet{Pairs: []*iavl.KVPair{ + tree.ApplyChangeSet(iavl.ChangeSet{Pairs: []*iavl.KVPair{ {Key: []byte("hello"), Value: []byte("world1")}, - }}, true) + }}) + _, _, err = tree.SaveVersion(true) require.NoError(t, err) require.Equal(t, []byte("world1"), tree.Get([]byte("hello"))) @@ -206,9 +209,10 @@ func TestTreeCopy(t *testing.T) { // check that normal copy don't work fakeSnapshot := *tree - _, _, err = tree.ApplyChangeSet(iavl.ChangeSet{Pairs: []*iavl.KVPair{ + tree.ApplyChangeSet(iavl.ChangeSet{Pairs: []*iavl.KVPair{ {Key: []byte("hello"), Value: []byte("world2")}, - }}, true) + }}) + _, _, err = tree.SaveVersion(true) require.NoError(t, err) // get modified in-place @@ -234,7 +238,8 @@ func TestGetByIndex(t *testing.T) { } tree := New(0) - _, _, err := tree.ApplyChangeSet(changes, true) + tree.ApplyChangeSet(changes) + _, _, err := tree.SaveVersion(true) require.NoError(t, err) for i, pair := range changes.Pairs { diff --git a/store/go.mod b/store/go.mod index 42c065b41e..53c4660de9 100644 --- a/store/go.mod +++ b/store/go.mod @@ -7,7 +7,7 @@ require ( github.com/confio/ics23/go v0.9.0 github.com/cosmos/cosmos-sdk v0.46.14 github.com/cosmos/iavl v0.19.6 - github.com/crypto-org-chain/cronos/memiavl v0.0.3 + github.com/crypto-org-chain/cronos/memiavl v0.0.4 github.com/gogo/protobuf v1.3.2 github.com/spf13/cast v1.5.0 github.com/stretchr/testify v1.8.3 diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index e3b2cc60d1..9f8c69cff1 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -66,25 +66,56 @@ func NewStore(dir string, logger log.Logger, sdk46Compact bool) *Store { } } -// Implements interface Committer -func (rs *Store) Commit() types.CommitID { +// flush writes all the pending change sets to memiavl tree. +func (rs *Store) flush() error { var changeSets []*memiavl.NamedChangeSet for key := range rs.stores { // it'll unwrap the inter-block cache store := rs.GetCommitKVStore(key) if memiavlStore, ok := store.(*memiavlstore.Store); ok { - changeSets = append(changeSets, &memiavl.NamedChangeSet{ - Name: key.Name(), - Changeset: memiavlStore.PopChangeSet(), - }) - } else { - _ = store.Commit() + cs := memiavlStore.PopChangeSet() + if len(cs.Pairs) > 0 { + changeSets = append(changeSets, &memiavl.NamedChangeSet{ + Name: key.Name(), + Changeset: cs, + }) + } } } sort.SliceStable(changeSets, func(i, j int) bool { return changeSets[i].Name < changeSets[j].Name }) - _, _, err := rs.db.Commit(changeSets) + + return rs.db.ApplyChangeSets(changeSets) +} + +// WorkingHash returns the app hash of the working tree, +// +// Implements interface Committer. +func (rs *Store) WorkingHash() []byte { + if err := rs.flush(); err != nil { + panic(err) + } + commitInfo := rs.db.WorkingCommitInfo() + if rs.sdk46Compact { + commitInfo = amendCommitInfo(commitInfo, rs.storesParams) + } + return commitInfo.Hash() +} + +// Implements interface Committer +func (rs *Store) Commit() types.CommitID { + if err := rs.flush(); err != nil { + panic(err) + } + + for _, store := range rs.stores { + if store.GetStoreType() != types.StoreTypeIAVL { + _ = store.Commit() + } + } + + _, _, err := rs.db.Commit() if err != nil { panic(err) } diff --git a/versiondb/client/verify.go b/versiondb/client/verify.go index 5a4233fd73..51764e10b8 100644 --- a/versiondb/client/verify.go +++ b/versiondb/client/verify.go @@ -218,7 +218,8 @@ func verifyOneStore(tree *memiavl.Tree, store, changeSetDir, saveSnapshot string } // no need to update hashes for intermediate versions. - _, v, err := tree.ApplyChangeSet(*changeSet, false) + tree.ApplyChangeSet(*changeSet) + _, v, err := tree.SaveVersion(false) if err != nil { return false, err }