Skip to content

Commit

Permalink
feat: journal nodebufferlist snapshot data for journal file recoering
Browse files Browse the repository at this point in the history
  • Loading branch information
VM committed Nov 22, 2024
1 parent d74fd83 commit 759f430
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 32 deletions.
4 changes: 4 additions & 0 deletions triedb/pathdb/asyncnodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (a *asyncnodebuffer) getLatestStatus() (common.Hash, uint64, error) {
return common.Hash{}, 0, errors.New("unsupported method for async node buffer")
}

func (a *asyncnodebuffer) getMultiLayerNodes() []nblJournalData {
return nil
}

type nodecache struct {
layers uint64 // The number of diff layers aggregated inside
size uint64 // The size of aggregated writes
Expand Down
7 changes: 5 additions & 2 deletions triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type trienodebuffer interface {

// getLatestStatus returns latest status for disk layer
getLatestStatus() (common.Hash, uint64, error)

getMultiLayerNodes() []nblJournalData
}

type NodeBufferType int32
Expand Down Expand Up @@ -122,6 +124,7 @@ func NewTrieNodeBuffer(
trieNodeBufferType NodeBufferType,
limit int,
nodes map[common.Hash]map[string]*trienode.Node,
nodesArray []nblJournalData,
layers, proposeBlockInterval uint64,
keepFunc NotifyKeepFunc,
freezer *rawdb.ResettableFreezer,
Expand All @@ -130,13 +133,13 @@ func NewTrieNodeBuffer(
log.Info("init trie node buffer", "type", nodeBufferTypeToString[trieNodeBufferType])
switch trieNodeBufferType {
case NodeBufferList:
return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase)
return newNodeBufferList(db, uint64(limit), nodes, nodesArray, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase)
case AsyncNodeBuffer:
return newAsyncNodeBuffer(limit, nodes, layers)
case SyncNodeBuffer:
return newNodeBuffer(limit, nodes, layers)
default:
return newNodeBufferList(db, uint64(limit), nodes, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase)
return newNodeBufferList(db, uint64(limit), nodes, nodesArray, layers, proposeBlockInterval, keepFunc, freezer, fastRecovery, useBase)
}
}

Expand Down
84 changes: 56 additions & 28 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ type journalNodes struct {
Nodes []journalNode
}

// nblJournalData is used for journal nodebufferlist data in journa file.
type nblJournalData struct {
root common.Hash
layers uint64
size uint64
nodes map[common.Hash]map[string]*trienode.Node
}

// journalAccounts represents a list accounts belong to the layer.
type journalAccounts struct {
Addresses []common.Address
Expand Down Expand Up @@ -273,7 +281,7 @@ func (db *Database) loadLayers() layer {
start := time.Now()
log.Info("Recover node buffer list from ancient db")

nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0,
nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, nil, 0,
db.config.ProposeBlockInterval, db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase)
if err != nil {
log.Error("Failed to new trie node buffer for recovery", "error", err)
Expand All @@ -285,7 +293,7 @@ func (db *Database) loadLayers() layer {
}
if nb == nil || err != nil {
// Return single layer with persistent state.
nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0,
nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, nil, 0,
db.config.ProposeBlockInterval, db.config.NotifyKeep, nil, false, db.useBase)
if err != nil {
log.Crit("Failed to new trie node buffer", "error", err)
Expand Down Expand Up @@ -330,22 +338,32 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp
if stored > id {
return nil, fmt.Errorf("invalid state id: stored %d resolved %d", stored, id)
}
// Resolve nodes cached in node buffer
var encoded []journalNodes
if err := journalBuf.Decode(&encoded); err != nil {
return nil, fmt.Errorf("failed to load disk nodes: %v", err)
}
nodes := make(map[common.Hash]map[string]*trienode.Node)
for _, entry := range encoded {
subset := make(map[string]*trienode.Node)
for _, n := range entry.Nodes {
if len(n.Blob) > 0 {
subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob)
} else {
subset[string(n.Path)] = trienode.NewDeleted()

var nodes map[common.Hash]map[string]*trienode.Node
var nodesArray []nblJournalData

if db.config.TrieNodeBufferType == NodeBufferList && journalTypeForReader == JournalFileType {
if err := journalBuf.Decode(&nodesArray); err != nil {
return nil, fmt.Errorf("11 failed to load disk nodes: %v", err)
}
} else {
// Resolve nodes cached in node buffer
var encoded []journalNodes
if err := journalBuf.Decode(&encoded); err != nil {
return nil, fmt.Errorf("failed to load disk nodes: %v", err)
}
nodes = make(map[common.Hash]map[string]*trienode.Node)
for _, entry := range encoded {
subset := make(map[string]*trienode.Node)
for _, n := range entry.Nodes {
if len(n.Blob) > 0 {
subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob)
} else {
subset[string(n.Path)] = trienode.NewDeleted()
}
}
nodes[entry.Owner] = subset
}
nodes[entry.Owner] = subset
}

if journalTypeForReader == JournalFileType {
Expand All @@ -361,7 +379,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp
}

// Calculate the internal state transitions by id difference.
nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval,
nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, nodesArray, id-stored, db.config.ProposeBlockInterval,
db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase)
if err != nil {
log.Error("Failed to new trie node buffer", "error", err)
Expand Down Expand Up @@ -494,18 +512,28 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error {
if err := rlp.Encode(journalBuf, dl.id); err != nil {
return err
}

// Step three, write all unwritten nodes into the journal
bufferNodes := dl.buffer.getAllNodes()
nodes := make([]journalNodes, 0, len(bufferNodes))
for owner, subset := range bufferNodes {
entry := journalNodes{Owner: owner}
for path, node := range subset {
entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob})
if _, ok := dl.buffer.(*nodebufferlist); ok && journalType == JournalFileType {
nodes := dl.buffer.getMultiLayerNodes()
if err := rlp.Encode(journalBuf, nodes); err != nil {
return err
}
nodes = append(nodes, entry)
}
if err := rlp.Encode(journalBuf, nodes); err != nil {
return err
log.Info("Journal file and node buffer list", "multi layer nodes count", len(nodes))
} else {
bufferNodes := dl.buffer.getAllNodes()
nodes := make([]journalNodes, 0, len(bufferNodes))
for owner, subset := range bufferNodes {
entry := journalNodes{Owner: owner}
for path, node := range subset {
entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob})
}
nodes = append(nodes, entry)
}
if err := rlp.Encode(journalBuf, nodes); err != nil {
return err
}
log.Info("get all nodes", "nodes count", len(nodes))
}

// Store the journal buf into w and calculate checksum
Expand All @@ -523,7 +551,7 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error {
}
}

log.Info("Journaled pathdb disk layer", "root", dl.root, "nodes", len(bufferNodes))
log.Info("Journaled pathdb disk layer", "root", dl.root)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,7 @@ func (b *nodebuffer) proposedBlockReader(blockRoot common.Hash) (layer, error) {
func (b *nodebuffer) getLatestStatus() (common.Hash, uint64, error) {
return common.Hash{}, 0, errors.New("unsupported method for node buffer")
}

func (b *nodebuffer) getMultiLayerNodes() []nblJournalData {
return nil
}
63 changes: 61 additions & 2 deletions triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func newNodeBufferList(
db ethdb.Database,
limit uint64,
nodes map[common.Hash]map[string]*trienode.Node,
nodesArray []nblJournalData,
layers uint64,
proposeBlockInterval uint64,
keepFunc NotifyKeepFunc,
Expand All @@ -108,13 +109,32 @@ func newNodeBufferList(
dlInMd = wpBlocks
}

if nodes == nil {
nodes = make(map[common.Hash]map[string]*trienode.Node)
}

var base *multiDifflayer
if nodesArray != nil {
base = newMultiDifflayer(limit, nodesArray[0].size, nodesArray[0].root, nodesArray[0].nodes, nodesArray[0].layers)
} else if nodes != nil {
var size uint64
for _, subset := range nodes {
for path, n := range subset {
size += uint64(len(n.Blob) + len(path))
}
}
base = newMultiDifflayer(limit, size, common.Hash{}, nodes, layers)
} else {
base = newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0)
}

nf := &nodebufferlist{
db: db,
wpBlocks: wpBlocks,
rsevMdNum: rsevMdNum,
dlInMd: dlInMd,
limit: limit,
base: newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0),
base: base,
persistID: rawdb.ReadPersistentStateID(db),
stopCh: make(chan struct{}),
waitStopCh: make(chan struct{}),
Expand All @@ -123,7 +143,9 @@ func newNodeBufferList(
keepFunc: keepFunc,
}

if !useBase && fastRecovery {
if nodesArray != nil {
nf.recoverJournalData(nodesArray)
} else if !useBase && fastRecovery {
if freezer == nil {
log.Crit("Use unopened freezer db to recover node buffer list")
}
Expand Down Expand Up @@ -529,6 +551,43 @@ func (nf *nodebufferlist) getAllNodes() map[common.Hash]map[string]*trienode.Nod
return nc.nodes
}

func (nf *nodebufferlist) recoverJournalData(nodesArray []nblJournalData) {
// skip index 0, it belongs to base buffer
for i := 1; i < len(nodesArray)-1; i++ {
mdl := newMultiDifflayer(nf.limit, nodesArray[i].size, nodesArray[i].root, nodesArray[i].nodes, nodesArray[i].layers)
nf.pushFront(mdl)
}
log.Info("recover journal data", "nf count", nf.count, "node array", len(nodesArray))
}

// getMultiLayerNodes return all the trie nodes are cached in trienodebuffer.
func (nf *nodebufferlist) getMultiLayerNodes() []nblJournalData {
nf.mux.Lock()
nf.baseMux.Lock()
defer nf.mux.Unlock()
defer nf.baseMux.Unlock()

nodesArray := make([]nblJournalData, 0, nf.count+1)
nodesArray = append(nodesArray, nblJournalData{
root: nf.base.root,
layers: nf.base.layers,
size: nf.base.size,
nodes: nf.base.nodes,
})

merge := func(buffer *multiDifflayer) bool {
nodesArray = append(nodesArray, nblJournalData{
root: buffer.root,
layers: buffer.layers,
size: buffer.size,
nodes: buffer.nodes,
})
return true
}
nf.traverseReverse(merge)
return nodesArray
}

// getLayers return the size of cached difflayers.
func (nf *nodebufferlist) getLayers() uint64 {
for {
Expand Down

0 comments on commit 759f430

Please sign in to comment.