diff --git a/core/blockchain.go b/core/blockchain.go index 2f25bbc2d..2b9814ca2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -539,6 +539,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis log.Info("Parallel V2 enabled", "parallelNum", ParallelNum()) } else { bc.processor = NewStateProcessor(chainConfig, bc, engine) + bc.serialProcessor = bc.processor } // Start future block processor. bc.wg.Add(1) @@ -1832,6 +1833,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) defer func() { DebugInnerExecutionDuration = 0 }() + + if bc.serialProcessor == nil { + bc.serialProcessor = bc.processor + } + for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() { DebugInnerExecutionDuration = 0 // If the chain is terminating, stop processing blocks @@ -1897,6 +1903,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) err error ) + blockProcessedInParallel := false // skip block process if we already have the state, receipts and logs from mining work if !(receiptExist && logExist && stateExist) { // Retrieve the parent block and it's state to execute on top @@ -1940,8 +1947,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) pstart = time.Now() if bc.vmConfig.TxDAG == nil && bc.vmConfig.EnableParallelUnorderedMerge { receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = false } else { receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = true + if err != nil { + // parallel processing fail , fallback to serial with new statDB. + log.Warn("ParallelEVM fallback!!!", "error", err.Error()) + statedb, err = bc.reGenerateStateForFallBack(parent.Root, block.Root(), statedb) + statedb.StartPrefetcher("chain") + activeState = statedb + receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = false + } } if err != nil { bc.reportBlock(block, receipts, err) @@ -1953,9 +1971,30 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) vstart := time.Now() if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - return it.index, err + if blockProcessedInParallel { + // invalid parallel execution, try serial + log.Warn("ParallelEVM fallback after ValidateState!!!", "error", err.Error()) + parent := it.previous() + if parent == nil { + parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) + } + + statedb, err = bc.reGenerateStateForFallBack(parent.Root, block.Root(), statedb) + statedb.StartPrefetcher("chain") + activeState = statedb + + receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = false + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } + } else { + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } } vtime := time.Since(vstart) @@ -2784,6 +2823,17 @@ func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) { } +func (bc *BlockChain) reGenerateStateForFallBack(parentRoot common.Hash, blockRoot common.Hash, oldDB *state.StateDB) (*state.StateDB, error) { + oldDB.StopPrefetcher() + statedb, err := state.New(parentRoot, bc.stateCache, bc.snaps) + if err != nil { + return nil, err + } + + statedb.SetExpectedStateRoot(blockRoot) + return statedb, nil +} + type TxDAGOutputItem struct { blockNumber uint64 txDAG types.TxDAG