From 949a826ae4546d7f92d25076200f3581ec2e3206 Mon Sep 17 00:00:00 2001 From: Devon Bear Date: Wed, 1 Nov 2023 16:22:54 -0400 Subject: [PATCH] refactor(blockchain): Minor cleanup and add safety checks. (#1266) --- cosmos/miner/miner.go | 44 +++- cosmos/runtime/runtime.go | 11 +- cosmos/x/evm/keeper/abci.go | 8 +- cosmos/x/evm/keeper/genesis.go | 5 +- cosmos/x/evm/keeper/keeper.go | 7 +- cosmos/x/evm/keeper/processor.go | 2 +- cosmos/x/evm/module.go | 4 +- .../distribution/distribution_test.go | 4 +- e2e/testapp/app.go | 2 +- eth/core/chain.go | 21 +- eth/core/chain_reader.go | 1 - eth/core/chain_resources.go | 21 ++ eth/core/chain_writer.go | 220 +++++++++++------- eth/polar/api_backend.go | 1 - 14 files changed, 233 insertions(+), 118 deletions(-) diff --git a/cosmos/miner/miner.go b/cosmos/miner/miner.go index 39a6ab583..10bccbc69 100644 --- a/cosmos/miner/miner.go +++ b/cosmos/miner/miner.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/miner" + evmkeeper "pkg.berachain.dev/polaris/cosmos/x/evm/keeper" "pkg.berachain.dev/polaris/eth" "pkg.berachain.dev/polaris/eth/core/types" ) @@ -45,17 +46,33 @@ type EnvelopeSerializer interface { ToSdkTxBytes(*engine.ExecutionPayloadEnvelope, uint64) ([]byte, error) } +type App interface { + BeginBlocker(sdk.Context) (sdk.BeginBlock, error) + PreBlocker(sdk.Context, *abci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) +} + +// EVMKeeper is an interface that defines the methods needed for the EVM setup. +type EVMKeeper interface { + // Setup initializes the EVM keeper. + Setup(evmkeeper.Blockchain) error + SetLatestQueryContext(context.Context) error +} + // Miner implements the baseapp.TxSelector interface. type Miner struct { eth.Miner + app App + keeper EVMKeeper serializer EnvelopeSerializer currentPayload *miner.Payload } // New produces a cosmos miner from a geth miner. -func New(gm eth.Miner) *Miner { +func New(gm eth.Miner, app App, keeper EVMKeeper) *Miner { return &Miner{ - Miner: gm, + Miner: gm, + keeper: keeper, + app: app, } } @@ -68,11 +85,30 @@ func (m *Miner) Init(serializer EnvelopeSerializer) { func (m *Miner) PrepareProposal( ctx sdk.Context, _ *abci.RequestPrepareProposal, ) (*abci.ResponsePrepareProposal, error) { - var payloadEnvelopeBz []byte - var err error + var ( + payloadEnvelopeBz []byte + err error + ) + + // We have to prime the state plugin. + if err = m.keeper.SetLatestQueryContext(ctx); err != nil { + return nil, err + } + + // We have to run the PreBlocker && BeginBlocker to get the chain into the state + // it'll be in when the EVM transaction actually runs. + if _, err = m.app.PreBlocker(ctx, nil); err != nil { + return nil, err + } else if _, err = m.app.BeginBlocker(ctx); err != nil { + return nil, err + } + + // Trigger the geth miner to build a block. if payloadEnvelopeBz, err = m.buildBlock(ctx); err != nil { return nil, err } + + // Return the payload as a transaction in the proposal. return &abci.ResponsePrepareProposal{Txs: [][]byte{payloadEnvelopeBz}}, err } diff --git a/cosmos/runtime/runtime.go b/cosmos/runtime/runtime.go index 324386800..d37058df2 100644 --- a/cosmos/runtime/runtime.go +++ b/cosmos/runtime/runtime.go @@ -21,6 +21,7 @@ package runtime import ( + "context" "time" "cosmossdk.io/log" @@ -50,6 +51,7 @@ import ( type EVMKeeper interface { // Setup initializes the EVM keeper. Setup(evmkeeper.Blockchain) error + SetLatestQueryContext(context.Context) error } // CosmosApp is an interface that defines the methods needed for the Cosmos setup. @@ -57,6 +59,7 @@ type CosmosApp interface { SetPrepareProposal(sdk.PrepareProposalHandler) SetMempool(mempool.Mempool) SetAnteHandler(sdk.AnteHandler) + miner.App } // Polaris is a struct that wraps the Polaris struct from the polar package. @@ -92,10 +95,6 @@ func New( panic(err) } - // Wrap the geth miner and txpool with the cosmos miner and txpool. - p.WrappedTxPool = txpool.New(p.Blockchain(), p.TxPool()) - p.WrappedMiner = miner.New(p.Miner()) - return p } @@ -103,6 +102,10 @@ func New( // It takes a BaseApp and an EVMKeeper as arguments. // It returns an error if the setup fails. func (p *Polaris) Build(app CosmosApp, ek EVMKeeper) error { + // Wrap the geth miner and txpool with the cosmos miner and txpool. + p.WrappedTxPool = txpool.New(p.Blockchain(), p.TxPool()) + p.WrappedMiner = miner.New(p.Miner(), app, ek) + app.SetMempool(p.WrappedTxPool) app.SetPrepareProposal(p.WrappedMiner.PrepareProposal) diff --git a/cosmos/x/evm/keeper/abci.go b/cosmos/x/evm/keeper/abci.go index 811cc4ba5..c5fc838b3 100644 --- a/cosmos/x/evm/keeper/abci.go +++ b/cosmos/x/evm/keeper/abci.go @@ -35,7 +35,9 @@ func (k *Keeper) Precommit(ctx context.Context) error { block := k.chain.GetBlockByNumber(blockNum) if block == nil { panic( - fmt.Sprintf("EVM BLOCK FAILURE AT BLOCK %d", blockNum), + fmt.Sprintf( + "EVM BLOCK %d FAILED TO PROCESS", blockNum, + ), ) } else if block.NumberU64() != blockNum { panic( @@ -47,8 +49,8 @@ func (k *Keeper) Precommit(ctx context.Context) error { return nil } -// PrepareCheckState runs on the Cosmos-SDK lifecycle PrepareCheckState(). -func (k *Keeper) PrepareCheckState(ctx context.Context) error { +// SetLatestQueryContext runs on the Cosmos-SDK lifecycle SetLatestQueryContext(). +func (k *Keeper) SetLatestQueryContext(ctx context.Context) error { k.sp.Prepare(ctx) return nil } diff --git a/cosmos/x/evm/keeper/genesis.go b/cosmos/x/evm/keeper/genesis.go index 1928a5205..22816847e 100644 --- a/cosmos/x/evm/keeper/genesis.go +++ b/cosmos/x/evm/keeper/genesis.go @@ -44,9 +44,8 @@ func (k *Keeper) InitGenesis(ctx sdk.Context, genState *core.Genesis) error { } // Insert to chain. - k.chain. - PreparePlugins(ctx.WithEventManager(sdk.NewEventManager())) - return k.chain.InsertBlockWithoutSetHead(genState.ToBlock()) + k.chain.PreparePlugins(ctx.WithEventManager(sdk.NewEventManager())) + return k.chain.WriteGenesisBlock(genState.ToBlock()) } // ExportGenesis returns the exported genesis state. diff --git a/cosmos/x/evm/keeper/keeper.go b/cosmos/x/evm/keeper/keeper.go index ec6024841..f1bd832d3 100644 --- a/cosmos/x/evm/keeper/keeper.go +++ b/cosmos/x/evm/keeper/keeper.go @@ -31,16 +31,17 @@ import ( "pkg.berachain.dev/polaris/cosmos/config" "pkg.berachain.dev/polaris/cosmos/x/evm/plugins/state" "pkg.berachain.dev/polaris/cosmos/x/evm/types" - "pkg.berachain.dev/polaris/eth/core" ethprecompile "pkg.berachain.dev/polaris/eth/core/precompile" + coretypes "pkg.berachain.dev/polaris/eth/core/types" "pkg.berachain.dev/polaris/eth/params" ) type Blockchain interface { PreparePlugins(context.Context) Config() *params.ChainConfig - core.ChainWriter - core.ChainReader + WriteGenesisBlock(*coretypes.Block) error + InsertBlockAndSetHead(*coretypes.Block) error + GetBlockByNumber(uint64) *coretypes.Block } type Keeper struct { diff --git a/cosmos/x/evm/keeper/processor.go b/cosmos/x/evm/keeper/processor.go index da949bc79..38d3037ac 100644 --- a/cosmos/x/evm/keeper/processor.go +++ b/cosmos/x/evm/keeper/processor.go @@ -61,7 +61,7 @@ func (k *Keeper) ProcessPayloadEnvelope( // Prepare should be moved to the blockchain? THIS IS VERY HOOD YES NEEDS TO BE MOVED. k.chain.PreparePlugins(ctx) - if err = k.chain.InsertBlockWithoutSetHead(block); err != nil { + if err = k.chain.InsertBlockAndSetHead(block); err != nil { return nil, err } diff --git a/cosmos/x/evm/module.go b/cosmos/x/evm/module.go index 5969ad3dd..07d2d653a 100644 --- a/cosmos/x/evm/module.go +++ b/cosmos/x/evm/module.go @@ -126,9 +126,9 @@ func (am AppModule) RegisterServices(registrar grpc.ServiceRegistrar) error { // ConsensusVersion implements AppModule/ConsensusVersion. func (AppModule) ConsensusVersion() uint64 { return ConsensusVersion } -// PrepareCheckState prepares the application state for a check. +// SetLatestQueryContext prepares the application state for a check. func (am AppModule) PrepareCheckState(ctx context.Context) error { - return am.keeper.PrepareCheckState(ctx) + return am.keeper.SetLatestQueryContext(ctx) } // Precommit performs precommit operations. diff --git a/e2e/precompile/contracts/distribution/distribution_test.go b/e2e/precompile/contracts/distribution/distribution_test.go index a79517c97..68db70c7e 100644 --- a/e2e/precompile/contracts/distribution/distribution_test.go +++ b/e2e/precompile/contracts/distribution/distribution_test.go @@ -121,8 +121,8 @@ var _ = Describe("Distribution Precompile", func() { Expect(err).ToNot(HaveOccurred()) ExpectSuccessReceipt(tf.EthClient(), tx) - // Wait for 2 blocks to be produced, to make sure there are rewards. - for i := 0; i < 2; i++ { + // Wait for 5 blocks to be produced, to make sure there are rewards. + for i := 0; i < 5; i++ { Expect(tf.WaitForNextBlock()).To(Succeed()) } diff --git a/e2e/testapp/app.go b/e2e/testapp/app.go index 25231c7e2..fa5d851e9 100644 --- a/e2e/testapp/app.go +++ b/e2e/testapp/app.go @@ -191,7 +191,7 @@ func NewPolarisApp( ) // Setup Polaris Runtime. - if err := app.Polaris.Build(app.BaseApp, app.EVMKeeper); err != nil { + if err := app.Polaris.Build(app, app.EVMKeeper); err != nil { panic(err) } diff --git a/eth/core/chain.go b/eth/core/chain.go index f419e17ff..53fcf2816 100644 --- a/eth/core/chain.go +++ b/eth/core/chain.go @@ -66,6 +66,7 @@ type blockchain struct { engine consensus.Engine processor core.Processor + validator core.Validator // statedb is the state database that is used to mange state during transactions. statedb state.StateDB @@ -79,10 +80,6 @@ type blockchain struct { currentBlock atomic.Pointer[types.Block] // finalizedBlock is the finalized/latest block. finalizedBlock atomic.Pointer[types.Block] - // currentReceipts is the current/pending receipts. - currentReceipts atomic.Value - // currentLogs is the current/pending logs. - currentLogs atomic.Value // receiptsCache is a cache of the receipts for the last `defaultCacheSizeBytes` bytes of // blocks. blockHash -> receipts @@ -98,14 +95,13 @@ type blockchain struct { txLookupCache *lru.Cache[common.Hash, *types.TxLookupEntry] // subscription event feeds - scope event.SubscriptionScope - chainFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - pendingLogsFeed event.Feed - rmLogsFeed event.Feed // currently never used - chainSideFeed event.Feed // currently never used - logger log.Logger + scope event.SubscriptionScope + chainFeed event.Feed + chainHeadFeed event.Feed + logsFeed event.Feed + rmLogsFeed event.Feed // currently never used + chainSideFeed event.Feed // currently never used + logger log.Logger } // ========================================================================= @@ -134,6 +130,7 @@ func NewChain( } bc.statedb = state.NewStateDB(bc.sp, bc.pp) bc.processor = core.NewStateProcessor(bc.config, bc, bc.engine) + bc.validator = core.NewBlockValidator(bc.config, bc, bc.engine) // TODO: hmm... bc.currentBlock.Store( types.NewBlock(&types.Header{Time: 0, Number: big.NewInt(0), diff --git a/eth/core/chain_reader.go b/eth/core/chain_reader.go index cccaba8d8..b284e0076 100644 --- a/eth/core/chain_reader.go +++ b/eth/core/chain_reader.go @@ -97,7 +97,6 @@ func (bc *blockchain) CurrentFinalBlock() *types.Header { // CurrentSafeBlock retrieves the current safe block of the canonical // chain. The block is retrieved from the blockchain's internal cache. func (bc *blockchain) CurrentSafeBlock() *types.Header { - // TODO: determine the difference between safe and final in polaris. return bc.CurrentFinalBlock() } diff --git a/eth/core/chain_resources.go b/eth/core/chain_resources.go index 95b67cb42..cd14c21ed 100644 --- a/eth/core/chain_resources.go +++ b/eth/core/chain_resources.go @@ -54,6 +54,27 @@ func (bc *blockchain) StateAtBlockNumber(number uint64) (state.StateDB, error) { return state.NewStateDB(sp, bc.pp), nil } +// HasBlockAndState checks if the blockchain has a block and its state at +// a given hash and number. +func (bc *blockchain) HasBlockAndState(hash common.Hash, number uint64) bool { + // Check for State. + if sdb, err := bc.StateAt(hash); sdb == nil || err == nil { + sdb, err = bc.StateAtBlockNumber(number) + if sdb == nil || err != nil { + return false + } + } + + // Check for Block. + if block := bc.GetBlockByNumber(number); block == nil { + block = bc.GetBlockByHash(hash) + if block == nil { + return false + } + } + return true +} + // GetVMConfig returns the vm.Config for the current chain. func (bc *blockchain) GetVMConfig() *vm.Config { return bc.vmConfig diff --git a/eth/core/chain_writer.go b/eth/core/chain_writer.go index e96ae7b97..fd2e196ef 100644 --- a/eth/core/chain_writer.go +++ b/eth/core/chain_writer.go @@ -22,83 +22,177 @@ package core import ( "context" - "time" + "errors" "github.com/ethereum/go-ethereum/core" "pkg.berachain.dev/polaris/eth/core/state" "pkg.berachain.dev/polaris/eth/core/types" + "pkg.berachain.dev/polaris/eth/log" ) // ChainWriter defines methods that are used to perform state and block transitions. type ChainWriter interface { LoadLastState(context.Context, uint64) error - InsertBlock(block *types.Block, receipts types.Receipts, logs []*types.Log) error - InsertBlockWithoutSetHead(block *types.Block) error + WriteGenesisBlock(block *types.Block) error + InsertBlockAndSetHead(block *types.Block) error WriteBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state state.StateDB, emitHeadEvent bool) (status core.WriteStatus, err error) } -// WriteBlockAndSetHead is a no-op in the current implementation. Potentially usable later. -func (*blockchain) WriteBlockAndSetHead( - _ *types.Block, _ []*types.Receipt, _ []*types.Log, _ state.StateDB, - _ bool) (core.WriteStatus, error) { - return core.NonStatTy, nil +// WriteGenesisBlock inserts the genesis block into the blockchain. +func (bc *blockchain) WriteGenesisBlock(block *types.Block) error { + // TODO: add more validation here. + if block.NumberU64() != 0 { + return errors.New("not the genesis block") + } + _, err := bc.WriteBlockAndSetHead(block, nil, nil, nil, true) + return err } -func (bc *blockchain) InsertBlockWithoutSetHead(block *types.Block) error { - // Retrieve the parent block and it's state to execute on top - // parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - // if parent == nil { - // return fmt.Errorf("parent block not found") - // } +// InsertBlockAndSetHead inserts a block into the blockchain and sets the head. +func (bc *blockchain) InsertBlockAndSetHead(block *types.Block) error { + // Validate that we are about to insert a valid block. + if block.NumberU64() > 1 { // TODO DIAGNOSE + if err := bc.validator.ValidateBody(block); err != nil { + log.Error("invalid block body", "err", err) + return err + } + } - // Process block using the parent state as reference point - pstart := time.Now() - receipts, logs, _, err := bc.processor.Process(block, bc.statedb, *bc.vmConfig) + // Process the incoming EVM block. + receipts, logs, usedGas, err := bc.processor.Process(block, bc.statedb, *bc.vmConfig) if err != nil { + log.Error("failed to process block", "num", block.NumberU64(), "err", err) return err } - ptime := time.Since(pstart) - bc.logger.Info("processed block in", "time", ptime) - return bc.InsertBlock(block, receipts, logs) -} -// InsertBlock inserts a block into the canonical chain and updates the state of the blockchain. -func (bc *blockchain) InsertBlock( - block *types.Block, - receipts types.Receipts, - logs []*types.Log, -) error { - var err error - if _, err = bc.statedb.Commit( - block.NumberU64(), - bc.config.IsEIP158(block.Header().Number), - ); err != nil { + // ValidateState validates the statedb post block processing. + if err = bc.validator.ValidateState(block, bc.statedb, receipts, usedGas); err != nil { + log.Error("invalid state after processing block", "num", block.NumberU64(), "err", err) return err } - // TODO: prepare historical plugin here? - // TBH still think we should deprecate it and run in another routine as indexer. + // We can just immediately finalize the block. It's okay in this context. + if _, err = bc.WriteBlockAndSetHead( + block, receipts, logs, nil, true); err != nil { + log.Error("failed to write block", "num", block.NumberU64(), "err", err) + return err + } - // ***************************************** // - // TODO: add safety check for canonicallness // - // ***************************************** // + return err +} - // *********************************************** // - // TODO: restructure this function / flow it sucks // - // *********************************************** // - blockHash, blockNum := block.Hash(), block.Number().Uint64() - bc.logger.Info( - "finalizing evm block", "hash", blockHash.Hex(), "num_txs", len(receipts)) +// WriteBlockAndSetHead sets the head of the blockchain to the given block and finalizes the block. +func (bc *blockchain) WriteBlockAndSetHead( + block *types.Block, receipts []*types.Receipt, logs []*types.Log, + _ state.StateDB, emitHeadEvent bool, +) (core.WriteStatus, error) { + // Write the block to the store. + if err := bc.writeBlockWithState(block, receipts); err != nil { + return core.NonStatTy, err + } + currentBlock := bc.currentBlock.Load() + + // We need to error if the parent is not the head block. + if block.NumberU64() > 0 && block.ParentHash() != currentBlock.Hash() { + log.Error("canonical chain broken", + "block-number", block.NumberU64(), "block-hash", block.ParentHash().Hex()) + return core.NonStatTy, errors.New("canonical chain broken") + } + + // Set the current block. + bc.currentBlock.Store(block) + + // TODO: this is fine to do here but not really semantically correct + // and is very confusing. + // For clarity reasons, we should make the cosmos chain make a separate call + // to finalize the block. + bc.finalizedBlock.Store(block) + + // Store txLookup entries for all transactions in the block. + blockNum := block.NumberU64() + blockHash := block.Hash() + bc.blockNumCache.Add(blockNum, block) + bc.blockHashCache.Add(blockHash, block) + for txIndex, tx := range block.Transactions() { + bc.txLookupCache.Add( + tx.Hash(), + &types.TxLookupEntry{ + Tx: tx, + TxIndex: uint64(txIndex), + BlockNum: blockNum, + BlockHash: blockHash, + }, + ) + } + + // Write the receipts cache. + // TODO deprecate this cache? + if receipts != nil { + bc.receiptsCache.Add(block.Hash(), receipts) + } + + // Fire off the feeds. + bc.chainFeed.Send(ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + if len(logs) > 0 { + bc.logsFeed.Send(logs) + } + + // In theory, we should fire a ChainHeadEvent when we inject + // a canonical block, but sometimes we can insert a batch of + // canonical blocks. Avoid firing too many ChainHeadEvents, + // we will fire an accumulated ChainHeadEvent and disable fire + // event here. + if emitHeadEvent { + bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) + } + + return core.CanonStatTy, nil +} - // store the block header on the host chain - err = bc.bp.StoreHeader(block.Header()) +// writeBlockWithState writes the block along with its state (receipts and logs) +// into the blockchain. +func (bc *blockchain) writeBlockWithState( + block *types.Block, receipts []*types.Receipt, +) error { + // In Polaris since we are using single block finality. + // Finalized == Current == Safe. All are the same. + // Store the header as well as update all the finalized stuff. + err := bc.bp.StoreHeader(block.Header()) if err != nil { bc.logger.Error("failed to store block header", "err", err) return err } + // Irrelevant of the canonical status, write the block itself to the database. + // TODO THIS NEEDS TO WRITE TO EXTERNAL DB. + if err = bc.writeHistoricalData(block, receipts); err != nil { + return err + } + + // Commit all cached state changes into underlying memory database. + // In Polaris this is a no-op. + _, err = bc.statedb.Commit(block.NumberU64(), bc.config.IsEIP158(block.Number())) + if err != nil { + return err + } + + bc.logger.Info( + "finalizing evm block", "hash", block.Hash().Hex(), "num_txs", len(receipts)) + + return nil +} + +// InsertBlock inserts a block into the canonical chain and updates the state of the blockchain. +// TODO: WRITE TO EXTERNAL STORE +func (bc *blockchain) writeHistoricalData( + block *types.Block, + receipts types.Receipts, +) error { + var err error + blockHash, blockNum := block.Hash(), block.Number().Uint64() + // store the block, receipts, and txs on the host chain if historical plugin is supported if bc.hp != nil { if err = bc.hp.StoreBlock(block); err != nil { @@ -115,41 +209,5 @@ func (bc *blockchain) InsertBlock( } } - // mark the current block, receipts, and logs - if block != nil { - bc.currentBlock.Store(block) - bc.finalizedBlock.Store(block) - - bc.blockNumCache.Add(blockNum, block) - bc.blockHashCache.Add(blockHash, block) - - for txIndex, tx := range block.Transactions() { - bc.txLookupCache.Add( - tx.Hash(), - &types.TxLookupEntry{ - Tx: tx, - TxIndex: uint64(txIndex), - BlockNum: blockNum, - BlockHash: blockHash, - }, - ) - } - } - if receipts != nil { - bc.currentReceipts.Store(receipts) - bc.receiptsCache.Add(blockHash, receipts) - } - if logs != nil { - bc.pendingLogsFeed.Send(logs) - bc.currentLogs.Store(logs) - if len(logs) > 0 { - bc.logsFeed.Send(logs) - } - } - - // Send chain events. - bc.chainFeed.Send(ChainEvent{Block: block, Hash: blockHash, Logs: logs}) - bc.chainHeadFeed.Send(ChainHeadEvent{Block: block}) - return nil } diff --git a/eth/polar/api_backend.go b/eth/polar/api_backend.go index 97ae4a33c..5d55dc253 100644 --- a/eth/polar/api_backend.go +++ b/eth/polar/api_backend.go @@ -556,7 +556,6 @@ func (b *backend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) eve } func (b *backend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { - b.logger.Debug("called eth.rpc.backend.SubscribeLogsEvent", "ch", ch) return b.polar.blockchain.SubscribeLogsEvent(ch) }