From 743979716de981abc32e682c38e1cc20e3ed887b Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 13 Nov 2024 17:21:45 +0800 Subject: [PATCH] test splitbatch --- triedb/pathdb/nodebufferlist.go | 106 ++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 5 deletions(-) diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 2128892221..8f2c8c2b85 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -489,7 +489,7 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, nf.traverseReverse(commitFunc) persistID := nf.persistID + nf.base.layers - err := nf.base.flush(nf.db, nf.clean, persistID) + err := nf.base.flush(nf.db, nf.clean, persistID, false) if err != nil { log.Crit("failed to flush base node buffer to disk", "error", err) } @@ -742,13 +742,13 @@ func (nf *nodebufferlist) diffToBase() { } // backgroundFlush flush base node buffer to disk. -func (nf *nodebufferlist) backgroundFlush() { +func (nf *nodebufferlist) backgroundFlush(splitBatches bool) { nf.flushMux.Lock() defer nf.flushMux.Unlock() nf.baseMux.RLock() persistID := nf.persistID + nf.base.layers nf.baseMux.RUnlock() - err := nf.base.flush(nf.db, nf.clean, persistID) + err := nf.base.flush(nf.db, nf.clean, persistID, splitBatches) if err != nil { log.Error("failed to flush base node buffer to disk", "error", err) return @@ -817,7 +817,11 @@ func (nf *nodebufferlist) loop() { } nf.diffToBase() if nf.base.size >= nf.base.limit { - nf.backgroundFlush() + if nf.base.size >= 4*1024*1024*1024 { + nf.backgroundFlush(true) + } else { + nf.backgroundFlush(false) + } } nf.isFlushing.Swap(false) } @@ -1084,7 +1088,7 @@ func (mf *multiDifflayer) empty() bool { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { +func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, splitBatches bool) error { // Ensure the target state id is aligned with the internal counter. head := rawdb.ReadPersistentStateID(db) if head+mf.layers != id { @@ -1094,6 +1098,66 @@ func (mf *multiDifflayer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, start = time.Now() batch = db.NewBatchWithSize(int(float64(mf.size) * DefaultBatchRedundancyRate)) ) + + //start + // 判断是否需要分批次写入 + if splitBatches { + // 获取 map 的 key 总数 + totalKeys := len(mf.nodes) + if totalKeys == 0 { + return nil // 如果没有节点,直接返回 + } + + log.Info("total keys", "total num", totalKeys) + // 定义固定的批次数 + const numBatches = 3 + + // 计算每批次处理的 key 数量 + batchKeyLimit := totalKeys / numBatches + if totalKeys%numBatches != 0 { + batchKeyLimit++ // 如果不能整除,最后一批会处理剩余的 key + } + + totalNodes := 0 + + for len(mf.nodes) > 0 { + // 创建一个新的批处理 + batch := db.NewBatch() + + // 从 map 中提取 batchKeyLimit 个 key 进行处理 + writtenKeys, err := writeNodesInBatchByKeys(batch, mf.nodes, clean, batchKeyLimit) + if err != nil { + return err + } + + totalNodes += len(writtenKeys) + + // 写入持久化状态 ID + rawdb.WritePersistentStateID(batch, id) + + // 提交批处理 + if err := batch.Write(); err != nil { + return err + } + + // 从 mf.nodes 中移除已经写入的节点 + removeWrittenKeys(mf.nodes, writtenKeys) + + // 如果已经处理完所有节点,跳出循环 + if len(writtenKeys) == 0 { + log.Info("success") + break + } + } + + // 更新统计信息 + commitTimeTimer.UpdateSince(start) + log.Info("Persisted pathdb nodes in batches", "nodes", totalNodes, "state_id", id, "elapsed", common.PrettyDuration(time.Since(start))) + return nil + } + + //end + nodes := writeNodes(batch, mf.nodes, clean) rawdb.WritePersistentStateID(batch, id) @@ -1160,3 +1224,35 @@ func (mf *multiDifflayer) revert(db ethdb.KeyValueReader, nodes map[common.Hash] mf.updateSize(delta) return nil } + +// writeNodesInBatchByKeys 将 map 中最多 batchKeyLimit 个 key 对应的节点写入批处理 +func writeNodesInBatchByKeys(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache, batchKeyLimit int) ([]common.Hash, error) { + writtenKeys := make([]common.Hash, 0, batchKeyLimit) + + // 遍历 map 并处理最多 batchKeyLimit 个 key + for hash, nodeMap := range nodes { + // 将节点写入 batch + written := writeNodes(batch, map[common.Hash]map[string]*trienode.Node{hash: nodeMap}, clean) + if written == 0 { + return nil, fmt.Errorf("failed to write node for key: %v", hash) + } + + // 记录已写入的 key + writtenKeys = append(writtenKeys, hash) + + // 如果已经处理了 batchKeyLimit 个 key,停止写入 + if len(writtenKeys) >= batchKeyLimit { + log.Info("writtenKeys access limit") + break + } + } + + return writtenKeys, nil +} + +// removeWrittenKeys 从 map 中移除已写入的 key +func removeWrittenKeys(nodes map[common.Hash]map[string]*trienode.Node, writtenKeys []common.Hash) { + for _, key := range writtenKeys { + delete(nodes, key) + } +}