Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to serial execution if parallel execution fails #1390

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 35 additions & 28 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/blockstm"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/snapshot"
Expand Down Expand Up @@ -86,12 +85,15 @@ var (
blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil)
triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil)

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil)
blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil)
blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil)
blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil)
blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil)

blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -569,7 +571,7 @@ func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis
return bc, nil
}

func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, blockEndErr error) {
func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) {
// Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -597,6 +599,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
err error
statedb *state.StateDB
counter metrics.Counter
parallel bool
}

resultChan := make(chan Result, 2)
Expand All @@ -606,44 +609,58 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
if bc.parallelProcessor != nil {
parallelStatedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, nil, err
return nil, nil, 0, nil, 0, err
}
parallelStatedb.SetLogger(bc.logger)

processorCount++

go func() {
parallelStatedb.StartPrefetcher("chain", nil)
pstart := time.Now()
receipts, logs, usedGas, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter}
blockExecutionParallelTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, parallelStatedb, receipts, usedGas, false)
vtime = time.Since(vstart)
}
resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter, true}
}()
}

if bc.processor != nil {
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return nil, nil, 0, nil, err
return nil, nil, 0, nil, 0, err
}
statedb.SetLogger(bc.logger)

processorCount++

go func() {
statedb.StartPrefetcher("chain", nil)
pstart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, ctx)
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter}
blockExecutionSerialTimer.UpdateSince(pstart)
if err == nil {
vstart := time.Now()
err = bc.validator.ValidateState(block, statedb, receipts, usedGas, false)
vtime = time.Since(vstart)
}
resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter, false}
}()
}

result := <-resultChan

if _, ok := result.err.(blockstm.ParallelExecFailedError); ok {
if result.parallel && result.err != nil {
log.Warn("Parallel state processor failed", "err", result.err)

blockExecutionParallelErrorCounter.Inc(1)
// If the parallel processor failed, we will fallback to the serial processor if enabled
if processorCount == 2 {
result.statedb.StopPrefetcher()
result = <-resultChan
result.statedb.StopPrefetcher()
processorCount--
}
}
Expand All @@ -658,7 +675,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_
}()
}

return result.receipts, result.logs, result.usedGas, result.statedb, result.err
return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err
}

// empty returns an indicator whether the blockchain is empty.
Expand Down Expand Up @@ -2323,7 +2340,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Process block using the parent state as reference point
pstart := time.Now()
receipts, logs, usedGas, statedb, err := bc.ProcessBlock(block, parent)
receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent)
activeState = statedb

if err != nil {
Expand All @@ -2338,18 +2355,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
}
// BOR
ptime := time.Since(pstart)

vstart := time.Now()

if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)

return it.index, err
}
ptime := time.Since(pstart) - vtime

vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

// Update the metrics touched during block processing and validation
Expand Down
72 changes: 71 additions & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -170,7 +171,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
return err
}

receipts, _, usedGas, statedb, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())
receipts, _, usedGas, statedb, _, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header())

if err != nil {
blockchain.reportBlock(block, receipts, err)
Expand Down Expand Up @@ -216,6 +217,75 @@ func testParallelBlockChainImport(t *testing.T, scheme string) {
}
}

type AlwaysFailParallelStateProcessor struct {
}

func (p *AlwaysFailParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
return nil, nil, 0, errors.New("always fail")
}

type SlowSerialStateProcessor struct {
s Processor
}

func NewSlowSerialStateProcessor(s Processor) *SlowSerialStateProcessor {
return &SlowSerialStateProcessor{s: s}
}

func (p *SlowSerialStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
time.Sleep(100 * time.Millisecond)
return p.s.Process(block, statedb, cfg, interruptCtx)
}

func TestSuccessfulBlockImportParallelFailed(t *testing.T) {
t.Parallel()

testSuccessfulBlockImportParallelFailed(t, rawdb.HashScheme)
testSuccessfulBlockImportParallelFailed(t, rawdb.PathScheme)
}

func testSuccessfulBlockImportParallelFailed(t *testing.T, scheme string) {
// Create a new blockchain with 10 initial blocks
db, _, blockchain, err := newCanonical(ethash.NewFaker(), 10, true, scheme)
blockchain.parallelProcessor = &AlwaysFailParallelStateProcessor{}
blockchain.processor = NewSlowSerialStateProcessor(blockchain.processor)
if err != nil {
t.Fatalf("failed to create canonical chain: %v", err)
}
defer blockchain.Stop()

// Create valid blocks to import
block := blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash())
blocks := makeBlockChain(blockchain.chainConfig, block, 5, ethash.NewFaker(), db, canonicalSeed)

// Import the blocks
n, err := blockchain.InsertChain(blocks)
if err != nil {
t.Fatalf("failed to import valid blocks: %v", err)
}

// Verify all blocks were imported
if n != len(blocks) {
t.Errorf("imported %d blocks, wanted %d", n, len(blocks))
}

// Verify the last block is properly linked
if blockchain.CurrentBlock().Hash() != blocks[len(blocks)-1].Hash() {
t.Errorf("current block hash mismatch: got %x, want %x",
blockchain.CurrentBlock().Hash(),
blocks[len(blocks)-1].Hash())
}

// Verify block numbers are sequential
for i, block := range blocks {
expectedNumber := uint64(11 + i) // 10 initial blocks + new blocks
if block.NumberU64() != expectedNumber {
t.Errorf("block %d has wrong number: got %d, want %d",
i, block.NumberU64(), expectedNumber)
}
}
}

// testHeaderChainImport tries to process a chain of header, writing them into
// the database if successful.
func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {
Expand Down
Loading