From 255cb4086b70a079685794cced86d2dccc97f350 Mon Sep 17 00:00:00 2001 From: smartcontracts Date: Thu, 11 Mar 2021 10:25:11 -0800 Subject: [PATCH] dev: Revert changes to txpool (#270) * dev: Revert changes to txpool * Unskip test * Expose ValidateTx again --- core/tx_pool.go | 77 +++++++++----------------------------------- core/tx_pool_test.go | 27 ++++++++-------- 2 files changed, 29 insertions(+), 75 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 5bf3c49a2..a5a581c83 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -17,7 +17,6 @@ package core import ( - "bytes" "errors" "fmt" "math" @@ -185,6 +184,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second) conf.Rejournal = time.Second } + if conf.PriceLimit < 1 { + log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit) + conf.PriceLimit = DefaultTxPoolConfig.PriceLimit + } if conf.PriceBump < 1 { log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump) conf.PriceBump = DefaultTxPoolConfig.PriceBump @@ -228,7 +231,6 @@ type TxPool struct { scope event.SubscriptionScope signer types.Signer mu sync.RWMutex - rmu sync.Mutex // Used for locking addRemotes for sequencer reorgs istanbul bool // Fork indicator whether we are in the istanbul stage. @@ -257,7 +259,6 @@ type TxPool struct { type txpoolResetRequest struct { oldHead, newHead *types.Header - tx *types.Transaction } // NewTxPool creates a new transaction pool to gather, sort and filter inbound @@ -271,7 +272,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block config: config, chainconfig: chainconfig, chain: chain, - signer: types.NewOVMSigner(chainconfig.ChainID), + signer: types.NewEIP155Signer(chainconfig.ChainID), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), @@ -284,7 +285,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block reorgShutdownCh: make(chan struct{}), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } - pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { log.Info("Setting new local account", "address", addr) @@ -341,12 +341,7 @@ func (pool *TxPool) loop() { // Handle ChainHeadEvent case ev := <-pool.chainHeadCh: if ev.Block != nil { - txs := ev.Block.Transactions() - var tx *types.Transaction - if len(txs) > 0 { - tx = txs[0] - } - pool.requestReset(head.Header(), ev.Block.Header(), tx) + pool.requestReset(head.Header(), ev.Block.Header()) head = ev.Block } @@ -412,16 +407,6 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } -// LockAddRemote -func (pool *TxPool) LockAddRemote() { - pool.rmu.Lock() -} - -// UnlockAddRemote -func (pool *TxPool) UnlockAddRemote() { - pool.rmu.Unlock() -} - // SubscribeNewTxsEvent registers a subscription of NewTxsEvent and // starts sending event to the given channel. func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription { @@ -779,8 +764,6 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error { // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { - pool.LockAddRemote() - defer pool.UnlockAddRemote() return pool.addTxs(txs, false, false) } @@ -943,9 +926,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // requestPromoteExecutables requests a pool reset to the new head block. // The returned channel is closed when the reset has occurred. -func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header, tx *types.Transaction) chan struct{} { +func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} { select { - case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead, tx}: + case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}: return <-pool.reorgDoneCh case <-pool.reorgShutdownCh: return pool.reorgShutdownCh @@ -1081,12 +1064,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // If a new block appeared, validate the pool of pending transactions. This will // remove any transaction that has been included in the block or was invalidated // because of another transaction (e.g. higher gas price). - var tx *types.Transaction if reset != nil { - tx = reset.tx - } - if reset != nil { - pool.demoteUnexecutables(tx) + pool.demoteUnexecutables() } // Ensure pool.queue and pool.pending sizes stay within the configured limits. pool.truncatePending() @@ -1187,14 +1166,10 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimit - // OVM Change. Do not reinject reorganized transactions - // into the mempool. - if vm.UsingOVM { - // Inject any transactions discarded due to reorgs - log.Debug("Reinjecting stale transactions", "count", len(reinject)) - senderCacher.recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) - } + // Inject any transactions discarded due to reorgs + log.Debug("Reinjecting stale transactions", "count", len(reinject)) + senderCacher.recover(pool.signer, reinject) + pool.addTxsLocked(reinject, false) // Update all fork indicator by next pending block number. next := new(big.Int).Add(newHead.Number, big.NewInt(1)) @@ -1231,22 +1206,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans queuedNofundsMeter.Mark(int64(len(drops))) // Gather all executable transactions and promote them - nonce := pool.pendingNonces.get(addr) - var readies types.Transactions - // QueueOriginL1ToL2 transactions do not increment the nonce - // and the sender is the zero address, always promote them. - if vm.UsingOVM { - if addr == (common.Address{}) { - readies = list.Flatten() - for _, tx := range readies { - list.Remove(tx) - } - } else { - readies = list.Ready(nonce) - } - } else { - readies = list.Ready(nonce) - } + readies := list.Ready(pool.pendingNonces.get(addr)) for _, tx := range readies { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { @@ -1416,16 +1376,11 @@ func (pool *TxPool) truncateQueue() { // demoteUnexecutables removes invalid and processed transactions from the pools // executable/pending queue and any subsequent transactions that become unexecutable // are moved back into the future queue. -func (pool *TxPool) demoteUnexecutables(txn *types.Transaction) { +func (pool *TxPool) demoteUnexecutables() { // Iterate over all accounts and demote any non-executable transactions for addr, list := range pool.pending { nonce := pool.currentState.GetNonce(addr) - if vm.UsingOVM { - from, _ := types.Sender(pool.signer, txn) - if txn != nil && bytes.Equal(from.Bytes(), addr.Bytes()) { - nonce = txn.Nonce() + 1 - } - } + // Drop all transactions that are deemed too old (low nonce) olds := list.Forward(nonce) for _, tx := range olds { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index a1e95c590..f9c1c37ef 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -217,7 +217,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { // trigger state change in the background trigger = true - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) _, err := pool.Pending() if err != nil { @@ -275,7 +275,7 @@ func TestTransactionQueue(t *testing.T) { tx := transaction(0, 100, key) from, _ := deriveSender(tx) pool.currentState.AddBalance(from, big.NewInt(1000)) - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) pool.enqueueTx(tx.Hash(), tx) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) @@ -349,7 +349,7 @@ func TestTransactionChainFork(t *testing.T) { statedb.AddBalance(addr, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) } resetState() @@ -378,7 +378,7 @@ func TestTransactionDoubleNonce(t *testing.T) { statedb.AddBalance(addr, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) } resetState() @@ -450,7 +450,7 @@ func TestTransactionNonceRecovery(t *testing.T) { addr := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.SetNonce(addr, n) pool.currentState.AddBalance(addr, big.NewInt(100000000000000)) - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) tx := transaction(n, 100000, key) if err := pool.AddRemote(tx); err != nil { @@ -458,7 +458,7 @@ func TestTransactionNonceRecovery(t *testing.T) { } // simulate some weird re-order of transactions and missing nonce(s) pool.currentState.SetNonce(addr, n-1) - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) if fn := pool.Nonce(addr); fn != n-1 { t.Errorf("expected nonce to be %d, got %d", n-1, fn) } @@ -502,7 +502,7 @@ func TestTransactionDropping(t *testing.T) { if pool.all.Count() != 6 { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) } - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) if pool.pending[account].Len() != 3 { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) } @@ -514,7 +514,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the balance of the account, and check that invalidated transactions are dropped pool.currentState.AddBalance(account, big.NewInt(-650)) - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -539,7 +539,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the block gas limit, check that invalidated transactions are dropped pool.chain.(*testBlockChain).gasLimit = 100 - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -610,7 +610,7 @@ func TestTransactionPostponing(t *testing.T) { if pool.all.Count() != len(txs) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) } - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs)) } @@ -624,7 +624,7 @@ func TestTransactionPostponing(t *testing.T) { for _, addr := range accs { pool.currentState.AddBalance(addr, big.NewInt(-1)) } - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) // The first account's first transaction remains valid, check that subsequent // ones are either filtered out, or queued up for later. @@ -1335,7 +1335,6 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) { // // Note, local transactions are never allowed to be dropped. func TestTransactionPoolUnderpricing(t *testing.T) { - t.Skip("OVM changes break this test") t.Parallel() // Create the pool to test the pricing enforcement with @@ -1736,7 +1735,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { } // Bump the nonce temporarily and ensure the newly invalidated transaction is removed statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) - <-pool.requestReset(nil, nil, nil) + <-pool.requestReset(nil, nil) time.Sleep(2 * config.Rejournal) pool.Stop() @@ -1858,7 +1857,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.demoteUnexecutables(nil) + pool.demoteUnexecutables() } }