Skip to content
This repository has been archived by the owner on Apr 11, 2021. It is now read-only.

Commit

Permalink
dev: Revert changes to txpool (#270)
Browse files Browse the repository at this point in the history
* dev: Revert changes to txpool

* Unskip test

* Expose ValidateTx again
  • Loading branch information
smartcontracts authored Mar 11, 2021
1 parent 2dd129f commit 255cb40
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 75 deletions.
77 changes: 16 additions & 61 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package core

import (
"bytes"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -185,6 +184,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
conf.Rejournal = time.Second
}
if conf.PriceLimit < 1 {
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
}
if conf.PriceBump < 1 {
log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
conf.PriceBump = DefaultTxPoolConfig.PriceBump
Expand Down Expand Up @@ -228,7 +231,6 @@ type TxPool struct {
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
rmu sync.Mutex // Used for locking addRemotes for sequencer reorgs

istanbul bool // Fork indicator whether we are in the istanbul stage.

Expand Down Expand Up @@ -257,7 +259,6 @@ type TxPool struct {

type txpoolResetRequest struct {
oldHead, newHead *types.Header
tx *types.Transaction
}

// NewTxPool creates a new transaction pool to gather, sort and filter inbound
Expand All @@ -271,7 +272,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewOVMSigner(chainconfig.ChainID),
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
Expand All @@ -284,7 +285,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
reorgShutdownCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}

pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
Expand Down Expand Up @@ -341,12 +341,7 @@ func (pool *TxPool) loop() {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
txs := ev.Block.Transactions()
var tx *types.Transaction
if len(txs) > 0 {
tx = txs[0]
}
pool.requestReset(head.Header(), ev.Block.Header(), tx)
pool.requestReset(head.Header(), ev.Block.Header())
head = ev.Block
}

Expand Down Expand Up @@ -412,16 +407,6 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}

// LockAddRemote
func (pool *TxPool) LockAddRemote() {
pool.rmu.Lock()
}

// UnlockAddRemote
func (pool *TxPool) UnlockAddRemote() {
pool.rmu.Unlock()
}

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
Expand Down Expand Up @@ -779,8 +764,6 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
pool.LockAddRemote()
defer pool.UnlockAddRemote()
return pool.addTxs(txs, false, false)
}

Expand Down Expand Up @@ -943,9 +926,9 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {

// requestPromoteExecutables requests a pool reset to the new head block.
// The returned channel is closed when the reset has occurred.
func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header, tx *types.Transaction) chan struct{} {
func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
select {
case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead, tx}:
case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}:
return <-pool.reorgDoneCh
case <-pool.reorgShutdownCh:
return pool.reorgShutdownCh
Expand Down Expand Up @@ -1081,12 +1064,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
// because of another transaction (e.g. higher gas price).
var tx *types.Transaction
if reset != nil {
tx = reset.tx
}
if reset != nil {
pool.demoteUnexecutables(tx)
pool.demoteUnexecutables()
}
// Ensure pool.queue and pool.pending sizes stay within the configured limits.
pool.truncatePending()
Expand Down Expand Up @@ -1187,14 +1166,10 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
pool.pendingNonces = newTxNoncer(statedb)
pool.currentMaxGas = newHead.GasLimit

// OVM Change. Do not reinject reorganized transactions
// into the mempool.
if vm.UsingOVM {
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
}
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)

// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
Expand Down Expand Up @@ -1231,22 +1206,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
queuedNofundsMeter.Mark(int64(len(drops)))

// Gather all executable transactions and promote them
nonce := pool.pendingNonces.get(addr)
var readies types.Transactions
// QueueOriginL1ToL2 transactions do not increment the nonce
// and the sender is the zero address, always promote them.
if vm.UsingOVM {
if addr == (common.Address{}) {
readies = list.Flatten()
for _, tx := range readies {
list.Remove(tx)
}
} else {
readies = list.Ready(nonce)
}
} else {
readies = list.Ready(nonce)
}
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
Expand Down Expand Up @@ -1416,16 +1376,11 @@ func (pool *TxPool) truncateQueue() {
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables(txn *types.Transaction) {
func (pool *TxPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)
if vm.UsingOVM {
from, _ := types.Sender(pool.signer, txn)
if txn != nil && bytes.Equal(from.Bytes(), addr.Bytes()) {
nonce = txn.Nonce() + 1
}
}

// Drop all transactions that are deemed too old (low nonce)
olds := list.Forward(nonce)
for _, tx := range olds {
Expand Down
27 changes: 13 additions & 14 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {

// trigger state change in the background
trigger = true
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)

_, err := pool.Pending()
if err != nil {
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction(0, 100, key)
from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1000))
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)

pool.enqueueTx(tx.Hash(), tx)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestTransactionChainFork(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))

pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)
}
resetState()

Expand Down Expand Up @@ -378,7 +378,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))

pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)
}
resetState()

Expand Down Expand Up @@ -450,15 +450,15 @@ func TestTransactionNonceRecovery(t *testing.T) {
addr := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.SetNonce(addr, n)
pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)

tx := transaction(n, 100000, key)
if err := pool.AddRemote(tx); err != nil {
t.Error(err)
}
// simulate some weird re-order of transactions and missing nonce(s)
pool.currentState.SetNonce(addr, n-1)
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)
if fn := pool.Nonce(addr); fn != n-1 {
t.Errorf("expected nonce to be %d, got %d", n-1, fn)
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func TestTransactionDropping(t *testing.T) {
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
}
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
Expand All @@ -514,7 +514,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the balance of the account, and check that invalidated transactions are dropped
pool.currentState.AddBalance(account, big.NewInt(-650))
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)

if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
Expand All @@ -539,7 +539,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)

if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestTransactionPostponing(t *testing.T) {
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
}
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
}
Expand All @@ -624,7 +624,7 @@ func TestTransactionPostponing(t *testing.T) {
for _, addr := range accs {
pool.currentState.AddBalance(addr, big.NewInt(-1))
}
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)

// The first account's first transaction remains valid, check that subsequent
// ones are either filtered out, or queued up for later.
Expand Down Expand Up @@ -1335,7 +1335,6 @@ func TestTransactionPoolRepricingKeepsLocals(t *testing.T) {
//
// Note, local transactions are never allowed to be dropped.
func TestTransactionPoolUnderpricing(t *testing.T) {
t.Skip("OVM changes break this test")
t.Parallel()

// Create the pool to test the pricing enforcement with
Expand Down Expand Up @@ -1736,7 +1735,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
<-pool.requestReset(nil, nil, nil)
<-pool.requestReset(nil, nil)
time.Sleep(2 * config.Rejournal)
pool.Stop()

Expand Down Expand Up @@ -1858,7 +1857,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) {
// Benchmark the speed of pool validation
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.demoteUnexecutables(nil)
pool.demoteUnexecutables()
}
}

Expand Down

0 comments on commit 255cb40

Please sign in to comment.