diff --git a/core/blockchain.go b/core/blockchain.go index 818b72e953..6c849f8e82 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -39,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/core/blockstm" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" @@ -86,12 +85,15 @@ var ( blockImportTimer = metrics.NewRegisteredMeter("chain/imports", nil) triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil) - blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) - blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) - blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) - blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) - blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil) - blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil) + blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil) + blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) + blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) + blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) + blockExecutionParallelCounter = metrics.NewRegisteredCounter("chain/execution/parallel", nil) + blockExecutionSerialCounter = metrics.NewRegisteredCounter("chain/execution/serial", nil) + blockExecutionParallelErrorCounter = metrics.NewRegisteredCounter("chain/execution/parallel/error", nil) + blockExecutionParallelTimer = metrics.NewRegisteredTimer("chain/execution/parallel/timer", nil) + blockExecutionSerialTimer = metrics.NewRegisteredTimer("chain/execution/serial/timer", nil) blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) @@ -569,7 +571,7 @@ func NewParallelBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis return bc, nil } -func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, blockEndErr error) { +func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { // Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -597,6 +599,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ err error statedb *state.StateDB counter metrics.Counter + parallel bool } resultChan := make(chan Result, 2) @@ -606,7 +609,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ if bc.parallelProcessor != nil { parallelStatedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) if err != nil { - return nil, nil, 0, nil, err + return nil, nil, 0, nil, 0, err } parallelStatedb.SetLogger(bc.logger) @@ -614,15 +617,22 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ go func() { parallelStatedb.StartPrefetcher("chain", nil) + pstart := time.Now() receipts, logs, usedGas, err := bc.parallelProcessor.Process(block, parallelStatedb, bc.vmConfig, ctx) - resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter} + blockExecutionParallelTimer.UpdateSince(pstart) + if err == nil { + vstart := time.Now() + err = bc.validator.ValidateState(block, parallelStatedb, receipts, usedGas, false) + vtime = time.Since(vstart) + } + resultChan <- Result{receipts, logs, usedGas, err, parallelStatedb, blockExecutionParallelCounter, true} }() } if bc.processor != nil { statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) if err != nil { - return nil, nil, 0, nil, err + return nil, nil, 0, nil, 0, err } statedb.SetLogger(bc.logger) @@ -630,20 +640,27 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ go func() { statedb.StartPrefetcher("chain", nil) + pstart := time.Now() receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, ctx) - resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter} + blockExecutionSerialTimer.UpdateSince(pstart) + if err == nil { + vstart := time.Now() + err = bc.validator.ValidateState(block, statedb, receipts, usedGas, false) + vtime = time.Since(vstart) + } + resultChan <- Result{receipts, logs, usedGas, err, statedb, blockExecutionSerialCounter, false} }() } result := <-resultChan - if _, ok := result.err.(blockstm.ParallelExecFailedError); ok { + if result.parallel && result.err != nil { log.Warn("Parallel state processor failed", "err", result.err) - + blockExecutionParallelErrorCounter.Inc(1) // If the parallel processor failed, we will fallback to the serial processor if enabled if processorCount == 2 { - result.statedb.StopPrefetcher() result = <-resultChan + result.statedb.StopPrefetcher() processorCount-- } } @@ -658,7 +675,7 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header) (_ }() } - return result.receipts, result.logs, result.usedGas, result.statedb, result.err + return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err } // empty returns an indicator whether the blockchain is empty. @@ -2323,7 +2340,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) // Process block using the parent state as reference point pstart := time.Now() - receipts, logs, usedGas, statedb, err := bc.ProcessBlock(block, parent) + receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent) activeState = statedb if err != nil { @@ -2338,18 +2355,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) bc.stateSyncFeed.Send(StateSyncEvent{Data: data}) } // BOR - ptime := time.Since(pstart) - - vstart := time.Now() - - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, false); err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - - return it.index, err - } + ptime := time.Since(pstart) - vtime - vtime := time.Since(vstart) proctime := time.Since(start) // processing + validation // Update the metrics touched during block processing and validation diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 43c7f2c4b0..47738f2bc6 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -17,6 +17,7 @@ package core import ( + "context" "errors" "fmt" "math/big" @@ -170,7 +171,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { return err } - receipts, _, usedGas, statedb, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header()) + receipts, _, usedGas, statedb, _, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header()) if err != nil { blockchain.reportBlock(block, receipts, err) @@ -216,6 +217,75 @@ func testParallelBlockChainImport(t *testing.T, scheme string) { } } +type AlwaysFailParallelStateProcessor struct { +} + +func (p *AlwaysFailParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) { + return nil, nil, 0, errors.New("always fail") +} + +type SlowSerialStateProcessor struct { + s Processor +} + +func NewSlowSerialStateProcessor(s Processor) *SlowSerialStateProcessor { + return &SlowSerialStateProcessor{s: s} +} + +func (p *SlowSerialStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) { + time.Sleep(100 * time.Millisecond) + return p.s.Process(block, statedb, cfg, interruptCtx) +} + +func TestSuccessfulBlockImportParallelFailed(t *testing.T) { + t.Parallel() + + testSuccessfulBlockImportParallelFailed(t, rawdb.HashScheme) + testSuccessfulBlockImportParallelFailed(t, rawdb.PathScheme) +} + +func testSuccessfulBlockImportParallelFailed(t *testing.T, scheme string) { + // Create a new blockchain with 10 initial blocks + db, _, blockchain, err := newCanonical(ethash.NewFaker(), 10, true, scheme) + blockchain.parallelProcessor = &AlwaysFailParallelStateProcessor{} + blockchain.processor = NewSlowSerialStateProcessor(blockchain.processor) + if err != nil { + t.Fatalf("failed to create canonical chain: %v", err) + } + defer blockchain.Stop() + + // Create valid blocks to import + block := blockchain.GetBlockByHash(blockchain.CurrentBlock().Hash()) + blocks := makeBlockChain(blockchain.chainConfig, block, 5, ethash.NewFaker(), db, canonicalSeed) + + // Import the blocks + n, err := blockchain.InsertChain(blocks) + if err != nil { + t.Fatalf("failed to import valid blocks: %v", err) + } + + // Verify all blocks were imported + if n != len(blocks) { + t.Errorf("imported %d blocks, wanted %d", n, len(blocks)) + } + + // Verify the last block is properly linked + if blockchain.CurrentBlock().Hash() != blocks[len(blocks)-1].Hash() { + t.Errorf("current block hash mismatch: got %x, want %x", + blockchain.CurrentBlock().Hash(), + blocks[len(blocks)-1].Hash()) + } + + // Verify block numbers are sequential + for i, block := range blocks { + expectedNumber := uint64(11 + i) // 10 initial blocks + new blocks + if block.NumberU64() != expectedNumber { + t.Errorf("block %d has wrong number: got %d, want %d", + i, block.NumberU64(), expectedNumber) + } + } +} + // testHeaderChainImport tries to process a chain of header, writing them into // the database if successful. func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {