From 2d83277fe6690e52eaaa9d9b99a21af1f1f7f8d7 Mon Sep 17 00:00:00 2001 From: Roshan Date: Tue, 23 Apr 2024 15:41:50 +0800 Subject: [PATCH] fix: bundlepool concurrent read and write and commit blob tx issue --- core/blockchain.go | 2 +- core/txpool/bundlepool/bundlepool.go | 5 ++- miner/ordering.go | 13 ++++--- miner/worker.go | 2 +- miner/worker_builder.go | 56 +++++++++++++++------------- 5 files changed, 42 insertions(+), 36 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 4053f05596..1e1fe99de3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -31,6 +31,7 @@ import ( mapset "github.com/deckarep/golang-set/v2" exlru "github.com/hashicorp/golang-lru" "golang.org/x/crypto/sha3" + "golang.org/x/exp/slices" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" @@ -56,7 +57,6 @@ import ( "github.com/ethereum/go-ethereum/triedb" "github.com/ethereum/go-ethereum/triedb/hashdb" "github.com/ethereum/go-ethereum/triedb/pathdb" - "golang.org/x/exp/slices" ) var ( diff --git a/core/txpool/bundlepool/bundlepool.go b/core/txpool/bundlepool/bundlepool.go index 3edd6ad1de..9012e9cf2d 100644 --- a/core/txpool/bundlepool/bundlepool.go +++ b/core/txpool/bundlepool/bundlepool.go @@ -130,6 +130,9 @@ func (p *BundlePool) AddBundle(bundle *types.Bundle) error { } bundle.Price = price + p.mu.Lock() + defer p.mu.Unlock() + hash := bundle.Hash() if _, ok := p.bundles[hash]; ok { return ErrBundleAlreadyExist @@ -137,8 +140,6 @@ func (p *BundlePool) AddBundle(bundle *types.Bundle) error { for p.slots+numSlots(bundle) > p.config.GlobalSlots { p.drop() } - p.mu.Lock() - defer p.mu.Unlock() p.bundles[hash] = bundle heap.Push(&p.bundleHeap, bundle) p.slots += numSlots(bundle) diff --git a/miner/ordering.go b/miner/ordering.go index 7cbe2d5630..5c432dc9a0 100644 --- a/miner/ordering.go +++ b/miner/ordering.go @@ -20,10 +20,11 @@ import ( "container/heap" "math/big" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" - "github.com/holiman/uint256" ) // txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap @@ -205,11 +206,11 @@ func (t *transactionsByPriceAndNonce) Forward(tx *types.Transaction) { } return } - //check whether target tx exists in t.heads + // check whether target tx exists in t.heads for _, head := range t.heads { if head.tx != nil && head.tx.Resolve() != nil { if tx == head.tx.Tx { - //shift t to the position one after tx + // shift t to the position one after tx txTmp := t.PeekWithUnwrap() for txTmp != tx { t.Shift() @@ -220,13 +221,13 @@ func (t *transactionsByPriceAndNonce) Forward(tx *types.Transaction) { } } } - //get the sender address of tx + // get the sender address of tx acc, _ := types.Sender(t.signer, tx) - //check whether target tx exists in t.txs + // check whether target tx exists in t.txs if txs, ok := t.txs[acc]; ok { for _, txLazyTmp := range txs { if txLazyTmp != nil && txLazyTmp.Resolve() != nil { - //found the same pointer in t.txs as tx and then shift t to the position one after tx + // found the same pointer in t.txs as tx and then shift t to the position one after tx if tx == txLazyTmp.Tx { txTmp := t.PeekWithUnwrap() for txTmp != tx { diff --git a/miner/worker.go b/miner/worker.go index 22d432b293..53d431bd76 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -755,7 +755,7 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction, return nil, err } sc.TxIndex = uint64(len(env.txs)) - env.txs = append(env.txs, tx.WithoutBlobTxSidecar()) + env.txs = append(env.txs, tx) env.receipts = append(env.receipts, receipt) env.sidecars = append(env.sidecars, sc) env.blobs += len(sc.Blobs) diff --git a/miner/worker_builder.go b/miner/worker_builder.go index 64143704c6..342014e25b 100644 --- a/miner/worker_builder.go +++ b/miner/worker_builder.go @@ -28,6 +28,8 @@ var ( // fillTransactions retrieves the pending bundles and transactions from the txpool and fills them // into the given sealing block. The selection and ordering strategy can be extended in the future. func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environment, stopTimer *time.Timer) error { + env.state.StopPrefetcher() // no need to prefetch txs for a builder + var ( localPlainTxs map[common.Address][]*txpool.LazyTransaction remotePlainTxs map[common.Address][]*txpool.LazyTransaction @@ -35,6 +37,32 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ remoteBlobTxs map[common.Address][]*txpool.LazyTransaction bundles []*types.Bundle ) + + // commit bundles + { + bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time) + + // if no bundles, not necessary to fill transactions + if len(bundles) == 0 { + return errors.New("no bundles in bundle pool") + } + + txs, bundle, err := w.generateOrderedBundles(env, bundles) + if err != nil { + log.Error("fail to generate ordered bundles", "err", err) + return err + } + + if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil { + log.Error("fail to commit bundles", "err", err) + return err + } + + env.profit.Add(env.profit, bundle.EthSentToSystem) + log.Info("fill bundles", "bundles_count", len(bundles)) + } + + // commit normal transactions { w.mu.RLock() tip := w.tip @@ -71,34 +99,9 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ localBlobTxs[account] = txs } } - - bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time) - - log.Info("fill bundles and transactions", "bundles_count", len(bundles), "tx_count", len(localPlainTxs)+len(remotePlainTxs)) - - // if no bundles, not necessary to fill transactions - if len(bundles) == 0 { - return errors.New("no bundles in bundle pool") - } + log.Info("fill transactions", "plain_txs_count", len(localPlainTxs)+len(remotePlainTxs), "blob_txs_count", len(localBlobTxs)+len(remoteBlobTxs)) } - { - txs, bundle, err := w.generateOrderedBundles(env, bundles) - if err != nil { - log.Error("fail to generate ordered bundles", "err", err) - return err - } - - if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil { - log.Error("fail to commit bundles", "err", err) - return err - } - - env.profit.Add(env.profit, bundle.EthSentToSystem) - } - - env.state.StopPrefetcher() // no need to prefetch txs for a builder - // Fill the block with all available pending transactions. // we will abort when: // 1.new block was imported @@ -122,6 +125,7 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ return err } } + log.Info("fill bundles and transactions done", "total_txs_count", len(env.txs)) return nil }