Skip to content

Commit

Permalink
Fallback to serial execution if parallel execution fails
Browse files Browse the repository at this point in the history
  • Loading branch information
cffls committed Dec 21, 2024
1 parent e1f6d25 commit 7f7bb48
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 29 deletions.
63 changes: 35 additions & 28 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/blockstm"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
Expand Down Expand Up @@ -86,12 +85,15 @@ var (
blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil)
triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil)

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)
blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil)
blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil)
blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil)

blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -569,7 +571,7 @@ func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis
return bc, nil
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, blockEndErr error) {
func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -597,6 +599,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
err error
statedb *state.StateDB
counter metrics.Counter
parallel bool
}

resultChan := make(chan Result, 2)
Expand All @@ -606,44 +609,58 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
if bc.parallelProcessor != nil {
parallelStatedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, nil, err
return nil, nil, 0, nil, 0, err
}
parallelStatedb.SetLogger(bc.logger)

processorCount++

go func() {
parallelStatedb.StartPrefetcher("chain", nil)
pstart := time.Now()
receipts, logs, usedGas, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter}
blockExecutionParallelTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, parallelStatedb, receipts, usedGas, false)
vtime = time.Since(vstart)
}
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter, true}
}()
}

if bc.processor != nil {
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, nil, err
return nil, nil, 0, nil, 0, err
}
statedb.SetLogger(bc.logger)

processorCount++

go func() {
statedb.StartPrefetcher("chain", nil)
pstart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter}
blockExecutionSerialTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, statedb, receipts, usedGas, false)
vtime = time.Since(vstart)
}
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter, false}
}()
}

result := <-resultChan

if _, ok := result.err.(blockstm.ParallelExecFailedError); ok {
if result.parallel && result.err != nil {
log.Warn("Parallel state processor failed", "err", result.err)

blockExecutionParallelErrorCounter.Inc(1)
// If the parallel processor failed, we will fallback to the serial processor if enabled
if processorCount == 2 {
result.statedb.StopPrefetcher()
result = <-resultChan
result.statedb.StopPrefetcher()
processorCount--
}
}
Expand All @@ -658,7 +675,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
}()
}

return result.receipts, result.logs, result.usedGas, result.statedb, result.err
return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err
}

// empty returns an indicator whether the blockchain is empty.
Expand Down Expand Up @@ -2323,7 +2340,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Process block using the parent state as reference point
pstart := time.Now()
receipts, logs, usedGas, statedb, err := bc.ProcessBlock(block, parent)
receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent)
activeState = statedb

if err != nil {
Expand All @@ -2338,18 +2355,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
}
// BOR
ptime := time.Since(pstart)

vstart := time.Now()

if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)

return it.index, err
}
ptime := time.Since(pstart) - vtime

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

// Update the metrics touched during block processing and validation
Expand Down
72 changes: 71 additions & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -170,7 +171,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
return err
}

receipts, _, usedGas, statedb, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())
receipts, _, usedGas, statedb, _, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())

if err != nil {
blockchain.reportBlock(block, receipts, err)
Expand Down Expand Up @@ -216,6 +217,75 @@ func testParallelBlockChainImport(t *testing.T, scheme string) {
}
}

type AlwaysFailParallelStateProcessor struct {
}

func (p *AlwaysFailParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
return nil, nil, 0, errors.New("always fail")
}

type SlowSerialStateProcessor struct {
s Processor
}

func NewSlowSerialStateProcessor(s Processor) *SlowSerialStateProcessor {
return &SlowSerialStateProcessor{s: s}
}

func (p *SlowSerialStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
time.Sleep(100 * time.Millisecond)
return p.s.Process(block, statedb, cfg, interruptCtx)
}

func TestSuccessfulBlockImportParallelFailed(t *testing.T) {
t.Parallel()

testSuccessfulBlockImportParallelFailed(t, rawdb.HashScheme)
testSuccessfulBlockImportParallelFailed(t, rawdb.PathScheme)
}

func testSuccessfulBlockImportParallelFailed(t *testing.T, scheme string) {
// Create a new blockchain with 10 initial blocks
db, _, blockchain, err := newCanonical(ethash.NewFaker(), 10, true, scheme)
blockchain.parallelProcessor = &AlwaysFailParallelStateProcessor{}
blockchain.processor = NewSlowSerialStateProcessor(blockchain.processor)
if err != nil {
t.Fatalf("failed to create canonical chain: %v", err)
}
defer blockchain.Stop()

// Create valid blocks to import
block := blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash())
blocks := makeBlockChain(blockchain.chainConfig, block, 5, ethash.NewFaker(), db, canonicalSeed)

// Import the blocks
n, err := blockchain.InsertChain(blocks)
if err != nil {
t.Fatalf("failed to import valid blocks: %v", err)
}

// Verify all blocks were imported
if n != len(blocks) {
t.Errorf("imported %d blocks, wanted %d", n, len(blocks))
}

// Verify the last block is properly linked
if blockchain.CurrentBlock().Hash() != blocks[len(blocks)-1].Hash() {
t.Errorf("current block hash mismatch: got %x, want %x",
blockchain.CurrentBlock().Hash(),
blocks[len(blocks)-1].Hash())
}

// Verify block numbers are sequential
for i, block := range blocks {
expectedNumber := uint64(11 + i) // 10 initial blocks + new blocks
if block.NumberU64() != expectedNumber {
t.Errorf("block %d has wrong number: got %d, want %d",
i, block.NumberU64(), expectedNumber)
}
}
}

// testHeaderChainImport tries to process a chain of header, writing them into
// the database if successful.
func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {
Expand Down

0 comments on commit 7f7bb48

Please sign in to comment.