From 759f4304dbfaf1755853ae53b32f967ff869992c Mon Sep 17 00:00:00 2001 From: VM Date: Fri, 22 Nov 2024 16:16:01 +0800 Subject: [PATCH] feat: journal nodebufferlist snapshot data for journal file recoering --- triedb/pathdb/asyncnodebuffer.go | 4 ++ triedb/pathdb/disklayer.go | 7 ++- triedb/pathdb/journal.go | 84 +++++++++++++++++++++----------- triedb/pathdb/nodebuffer.go | 4 ++ triedb/pathdb/nodebufferlist.go | 63 +++++++++++++++++++++++- 5 files changed, 130 insertions(+), 32 deletions(-) diff --git a/triedb/pathdb/asyncnodebuffer.go b/triedb/pathdb/asyncnodebuffer.go index c9363348c0..b2e164877d 100644 --- a/triedb/pathdb/asyncnodebuffer.go +++ b/triedb/pathdb/asyncnodebuffer.go @@ -217,6 +217,10 @@ func (a *asyncnodebuffer) getLatestStatus() (common.Hash, uint64, error) { return common.Hash{}, 0, errors.New("unsupported method for async node buffer") } +func (a *asyncnodebuffer) getMultiLayerNodes() []nblJournalData { + return nil +} + type nodecache struct { layers uint64 // The number of diff layers aggregated inside size uint64 // The size of aggregated writes diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index ec6cfd15d2..6a685cd2b5 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -85,6 +85,8 @@ type trienodebuffer interface { // getLatestStatus returns latest status for disk layer getLatestStatus() (common.Hash, uint64, error) + + getMultiLayerNodes() []nblJournalData } type NodeBufferType int32 @@ -122,6 +124,7 @@ func NewTrieNodeBuffer( trieNodeBufferType NodeBufferType, limit int, nodes map[common.Hash]map[string]*trienode.Node, + nodesArray []nblJournalData, layers, proposeBlockInterval uint64, keepFunc NotifyKeepFunc, freezer *rawdb.ResettableFreezer, @@ -130,13 +133,13 @@ func NewTrieNodeBuffer( log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType]) switch trieNodeBufferType { case NodeBufferList: - return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase) + return newNodeBufferList(db, uint64(limit), nodes, nodesArray, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase) case AsyncNodeBuffer: return newAsyncNodeBuffer(limit, nodes, layers) case SyncNodeBuffer: return newNodeBuffer(limit, nodes, layers) default: - return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase) + return newNodeBufferList(db, uint64(limit), nodes, nodesArray, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase) } } diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index eb7df892c8..b38cf9368a 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -61,6 +61,14 @@ type journalNodes struct { Nodes []journalNode } +// nblJournalData is used for journal nodebufferlist data in journa file. +type nblJournalData struct { + root common.Hash + layers uint64 + size uint64 + nodes map[common.Hash]map[string]*trienode.Node +} + // journalAccounts represents a list accounts belong to the layer. type journalAccounts struct { Addresses []common.Address @@ -273,7 +281,7 @@ func (db *Database) loadLayers() layer { start := time.Now() log.Info("Recover node buffer list from ancient db") - nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, + nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase) if err != nil { log.Error("Failed to new trie node buffer for recovery", "error", err) @@ -285,7 +293,7 @@ func (db *Database) loadLayers() layer { } if nb == nil || err != nil { // Return single layer with persistent state. - nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, + nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, nil, 0, db.config.ProposeBlockInterval, db.config.NotifyKeep, nil, false, db.useBase) if err != nil { log.Crit("Failed to new trie node buffer", "error", err) @@ -330,22 +338,32 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp if stored > id { return nil, fmt.Errorf("invalid state id: stored %d resolved %d", stored, id) } - // Resolve nodes cached in node buffer - var encoded []journalNodes - if err := journalBuf.Decode(&encoded); err != nil { - return nil, fmt.Errorf("failed to load disk nodes: %v", err) - } - nodes := make(map[common.Hash]map[string]*trienode.Node) - for _, entry := range encoded { - subset := make(map[string]*trienode.Node) - for _, n := range entry.Nodes { - if len(n.Blob) > 0 { - subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob) - } else { - subset[string(n.Path)] = trienode.NewDeleted() + + var nodes map[common.Hash]map[string]*trienode.Node + var nodesArray []nblJournalData + + if db.config.TrieNodeBufferType == NodeBufferList && journalTypeForReader == JournalFileType { + if err := journalBuf.Decode(&nodesArray); err != nil { + return nil, fmt.Errorf("11 failed to load disk nodes: %v", err) + } + } else { + // Resolve nodes cached in node buffer + var encoded []journalNodes + if err := journalBuf.Decode(&encoded); err != nil { + return nil, fmt.Errorf("failed to load disk nodes: %v", err) + } + nodes = make(map[common.Hash]map[string]*trienode.Node) + for _, entry := range encoded { + subset := make(map[string]*trienode.Node) + for _, n := range entry.Nodes { + if len(n.Blob) > 0 { + subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob) + } else { + subset[string(n.Path)] = trienode.NewDeleted() + } } + nodes[entry.Owner] = subset } - nodes[entry.Owner] = subset } if journalTypeForReader == JournalFileType { @@ -361,7 +379,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp } // Calculate the internal state transitions by id difference. - nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, + nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, nodesArray, id-stored, db.config.ProposeBlockInterval, db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase) if err != nil { log.Error("Failed to new trie node buffer", "error", err) @@ -494,18 +512,28 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error { if err := rlp.Encode(journalBuf, dl.id); err != nil { return err } + // Step three, write all unwritten nodes into the journal - bufferNodes := dl.buffer.getAllNodes() - nodes := make([]journalNodes, 0, len(bufferNodes)) - for owner, subset := range bufferNodes { - entry := journalNodes{Owner: owner} - for path, node := range subset { - entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) + if _, ok := dl.buffer.(*nodebufferlist); ok && journalType == JournalFileType { + nodes := dl.buffer.getMultiLayerNodes() + if err := rlp.Encode(journalBuf, nodes); err != nil { + return err } - nodes = append(nodes, entry) - } - if err := rlp.Encode(journalBuf, nodes); err != nil { - return err + log.Info("Journal file and node buffer list", "multi layer nodes count", len(nodes)) + } else { + bufferNodes := dl.buffer.getAllNodes() + nodes := make([]journalNodes, 0, len(bufferNodes)) + for owner, subset := range bufferNodes { + entry := journalNodes{Owner: owner} + for path, node := range subset { + entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) + } + nodes = append(nodes, entry) + } + if err := rlp.Encode(journalBuf, nodes); err != nil { + return err + } + log.Info("get all nodes", "nodes count", len(nodes)) } // Store the journal buf into w and calculate checksum @@ -523,7 +551,7 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error { } } - log.Info("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes)) + log.Info("Journaled pathdb disk layer", "root", dl.root) return nil } diff --git a/triedb/pathdb/nodebuffer.go b/triedb/pathdb/nodebuffer.go index 6e72eb36ad..16e5e0c784 100644 --- a/triedb/pathdb/nodebuffer.go +++ b/triedb/pathdb/nodebuffer.go @@ -311,3 +311,7 @@ func (b *nodebuffer) proposedBlockReader(blockRoot common.Hash) (layer, error) { func (b *nodebuffer) getLatestStatus() (common.Hash, uint64, error) { return common.Hash{}, 0, errors.New("unsupported method for node buffer") } + +func (b *nodebuffer) getMultiLayerNodes() []nblJournalData { + return nil +} diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index ecd2359b04..140654375c 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -84,6 +84,7 @@ func newNodeBufferList( db ethdb.Database, limit uint64, nodes map[common.Hash]map[string]*trienode.Node, + nodesArray []nblJournalData, layers uint64, proposeBlockInterval uint64, keepFunc NotifyKeepFunc, @@ -108,13 +109,32 @@ func newNodeBufferList( dlInMd = wpBlocks } + if nodes == nil { + nodes = make(map[common.Hash]map[string]*trienode.Node) + } + + var base *multiDifflayer + if nodesArray != nil { + base = newMultiDifflayer(limit, nodesArray[0].size, nodesArray[0].root, nodesArray[0].nodes, nodesArray[0].layers) + } else if nodes != nil { + var size uint64 + for _, subset := range nodes { + for path, n := range subset { + size += uint64(len(n.Blob) + len(path)) + } + } + base = newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) + } else { + base = newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) + } + nf := &nodebufferlist{ db: db, wpBlocks: wpBlocks, rsevMdNum: rsevMdNum, dlInMd: dlInMd, limit: limit, - base: newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0), + base: base, persistID: rawdb.ReadPersistentStateID(db), stopCh: make(chan struct{}), waitStopCh: make(chan struct{}), @@ -123,7 +143,9 @@ func newNodeBufferList( keepFunc: keepFunc, } - if !useBase && fastRecovery { + if nodesArray != nil { + nf.recoverJournalData(nodesArray) + } else if !useBase && fastRecovery { if freezer == nil { log.Crit("Use unopened freezer db to recover node buffer list") } @@ -529,6 +551,43 @@ func (nf *nodebufferlist) getAllNodes() map[common.Hash]map[string]*trienode.Nod return nc.nodes } +func (nf *nodebufferlist) recoverJournalData(nodesArray []nblJournalData) { + // skip index 0, it belongs to base buffer + for i := 1; i < len(nodesArray)-1; i++ { + mdl := newMultiDifflayer(nf.limit, nodesArray[i].size, nodesArray[i].root, nodesArray[i].nodes, nodesArray[i].layers) + nf.pushFront(mdl) + } + log.Info("recover journal data", "nf count", nf.count, "node array", len(nodesArray)) +} + +// getMultiLayerNodes return all the trie nodes are cached in trienodebuffer. +func (nf *nodebufferlist) getMultiLayerNodes() []nblJournalData { + nf.mux.Lock() + nf.baseMux.Lock() + defer nf.mux.Unlock() + defer nf.baseMux.Unlock() + + nodesArray := make([]nblJournalData, 0, nf.count+1) + nodesArray = append(nodesArray, nblJournalData{ + root: nf.base.root, + layers: nf.base.layers, + size: nf.base.size, + nodes: nf.base.nodes, + }) + + merge := func(buffer *multiDifflayer) bool { + nodesArray = append(nodesArray, nblJournalData{ + root: buffer.root, + layers: buffer.layers, + size: buffer.size, + nodes: buffer.nodes, + }) + return true + } + nf.traverseReverse(merge) + return nodesArray +} + // getLayers return the size of cached difflayers. func (nf *nodebufferlist) getLayers() uint64 { for {