Skip to content

Commit

Permalink
test splitbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
krish-nr committed Nov 13, 2024
1 parent 1770748 commit 7439797
Showing 1 changed file with 101 additions and 5 deletions.
106 changes: 101 additions & 5 deletions triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
nf.traverseReverse(commitFunc)

persistID := nf.persistID + nf.base.layers
err := nf.base.flush(nf.db, nf.clean, persistID)
err := nf.base.flush(nf.db, nf.clean, persistID, false)
if err != nil {
log.Crit("failed to flush base node buffer to disk", "error", err)
}
Expand Down Expand Up @@ -742,13 +742,13 @@ func (nf *nodebufferlist) diffToBase() {
}

// backgroundFlush flush base node buffer to disk.
func (nf *nodebufferlist) backgroundFlush() {
func (nf *nodebufferlist) backgroundFlush(splitBatches bool) {
nf.flushMux.Lock()
defer nf.flushMux.Unlock()
nf.baseMux.RLock()
persistID := nf.persistID + nf.base.layers
nf.baseMux.RUnlock()
err := nf.base.flush(nf.db, nf.clean, persistID)
err := nf.base.flush(nf.db, nf.clean, persistID, splitBatches)
if err != nil {
log.Error("failed to flush base node buffer to disk", "error", err)
return
Expand Down Expand Up @@ -817,7 +817,11 @@ func (nf *nodebufferlist) loop() {
}
nf.diffToBase()
if nf.base.size >= nf.base.limit {
nf.backgroundFlush()
if nf.base.size >= 4*1024*1024*1024 {
nf.backgroundFlush(true)
} else {
nf.backgroundFlush(false)
}
}
nf.isFlushing.Swap(false)
}
Expand Down Expand Up @@ -1084,7 +1088,7 @@ func (mf *multiDifflayer) empty() bool {

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, splitBatches bool) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+mf.layers != id {
Expand All @@ -1094,6 +1098,66 @@ func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
start = time.Now()
batch = db.NewBatchWithSize(int(float64(mf.size) * DefaultBatchRedundancyRate))
)

//start
// 判断是否需要分批次写入
if splitBatches {
// 获取 map 的 key 总数
totalKeys := len(mf.nodes)
if totalKeys == 0 {
return nil // 如果没有节点,直接返回
}

log.Info("total keys", "total num", totalKeys)
// 定义固定的批次数
const numBatches = 3

// 计算每批次处理的 key 数量
batchKeyLimit := totalKeys / numBatches
if totalKeys%numBatches != 0 {
batchKeyLimit++ // 如果不能整除,最后一批会处理剩余的 key
}

totalNodes := 0

for len(mf.nodes) > 0 {
// 创建一个新的批处理
batch := db.NewBatch()

// 从 map 中提取 batchKeyLimit 个 key 进行处理
writtenKeys, err := writeNodesInBatchByKeys(batch, mf.nodes, clean, batchKeyLimit)
if err != nil {
return err
}

totalNodes += len(writtenKeys)

// 写入持久化状态 ID
rawdb.WritePersistentStateID(batch, id)

// 提交批处理
if err := batch.Write(); err != nil {
return err
}

// 从 mf.nodes 中移除已经写入的节点
removeWrittenKeys(mf.nodes, writtenKeys)

// 如果已经处理完所有节点,跳出循环
if len(writtenKeys) == 0 {
log.Info("success")
break
}
}

// 更新统计信息
commitTimeTimer.UpdateSince(start)
log.Info("Persisted pathdb nodes in batches", "nodes", totalNodes, "state_id", id, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

//end

nodes := writeNodes(batch, mf.nodes, clean)
rawdb.WritePersistentStateID(batch, id)

Expand Down Expand Up @@ -1160,3 +1224,35 @@ func (mf *multiDifflayer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]
mf.updateSize(delta)
return nil
}

// writeNodesInBatchByKeys 将 map 中最多 batchKeyLimit 个 key 对应的节点写入批处理
func writeNodesInBatchByKeys(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache, batchKeyLimit int) ([]common.Hash, error) {
writtenKeys := make([]common.Hash, 0, batchKeyLimit)

// 遍历 map 并处理最多 batchKeyLimit 个 key
for hash, nodeMap := range nodes {
// 将节点写入 batch
written := writeNodes(batch, map[common.Hash]map[string]*trienode.Node{hash: nodeMap}, clean)
if written == 0 {
return nil, fmt.Errorf("failed to write node for key: %v", hash)
}

// 记录已写入的 key
writtenKeys = append(writtenKeys, hash)

// 如果已经处理了 batchKeyLimit 个 key,停止写入
if len(writtenKeys) >= batchKeyLimit {
log.Info("writtenKeys access limit")
break
}
}

return writtenKeys, nil
}

// removeWrittenKeys 从 map 中移除已写入的 key
func removeWrittenKeys(nodes map[common.Hash]map[string]*trienode.Node, writtenKeys []common.Hash) {
for _, key := range writtenKeys {
delete(nodes, key)
}
}

0 comments on commit 7439797

Please sign in to comment.