Skip to content

Commit

Permalink
feat: split batch for base writing
Browse files Browse the repository at this point in the history
  • Loading branch information
krish-nr committed Nov 14, 2024
1 parent 7439797 commit 15259a0
Showing 1 changed file with 2 additions and 19 deletions.
21 changes: 2 additions & 19 deletions triedb/pathdb/nodebufferlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,57 +1100,46 @@ func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache,
)

//start
// 判断是否需要分批次写入
if splitBatches {
// 获取 map 的 key 总数
totalKeys := len(mf.nodes)
if totalKeys == 0 {
return nil // 如果没有节点,直接返回
return nil
}

log.Info("total keys", "total num", totalKeys)
// 定义固定的批次数
const numBatches = 3

// 计算每批次处理的 key 数量
batchKeyLimit := totalKeys / numBatches
if totalKeys%numBatches != 0 {
batchKeyLimit++ // 如果不能整除,最后一批会处理剩余的 key
batchKeyLimit++
}

totalNodes := 0

for len(mf.nodes) > 0 {
// 创建一个新的批处理
batch := db.NewBatch()

// 从 map 中提取 batchKeyLimit 个 key 进行处理
writtenKeys, err := writeNodesInBatchByKeys(batch, mf.nodes, clean, batchKeyLimit)
if err != nil {
return err
}

totalNodes += len(writtenKeys)

// 写入持久化状态 ID
rawdb.WritePersistentStateID(batch, id)

// 提交批处理
if err := batch.Write(); err != nil {
return err
}

// 从 mf.nodes 中移除已经写入的节点
removeWrittenKeys(mf.nodes, writtenKeys)

// 如果已经处理完所有节点,跳出循环
if len(writtenKeys) == 0 {
log.Info("success")
break
}
}

// 更新统计信息
commitTimeTimer.UpdateSince(start)
log.Info("Persisted pathdb nodes in batches", "nodes", totalNodes, "state_id", id, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
Expand Down Expand Up @@ -1225,22 +1214,17 @@ func (mf *multiDifflayer) revert(db ethdb.KeyValueReader, nodes map[common.Hash]
return nil
}

// writeNodesInBatchByKeys 将 map 中最多 batchKeyLimit 个 key 对应的节点写入批处理
func writeNodesInBatchByKeys(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache, batchKeyLimit int) ([]common.Hash, error) {
writtenKeys := make([]common.Hash, 0, batchKeyLimit)

// 遍历 map 并处理最多 batchKeyLimit 个 key
for hash, nodeMap := range nodes {
// 将节点写入 batch
written := writeNodes(batch, map[common.Hash]map[string]*trienode.Node{hash: nodeMap}, clean)
if written == 0 {
return nil, fmt.Errorf("failed to write node for key: %v", hash)
}

// 记录已写入的 key
writtenKeys = append(writtenKeys, hash)

// 如果已经处理了 batchKeyLimit 个 key,停止写入
if len(writtenKeys) >= batchKeyLimit {
log.Info("writtenKeys access limit")
break
Expand All @@ -1250,7 +1234,6 @@ func writeNodesInBatchByKeys(batch ethdb.Batch, nodes map[common.Hash]map[string
return writtenKeys, nil
}

// removeWrittenKeys 从 map 中移除已写入的 key
func removeWrittenKeys(nodes map[common.Hash]map[string]*trienode.Node, writtenKeys []common.Hash) {
for _, key := range writtenKeys {
delete(nodes, key)
Expand Down

0 comments on commit 15259a0

Please sign in to comment.