Skip to content

Commit

Permalink
fix: bundlepool concurrent read and write and commit blob tx issue
Browse files Browse the repository at this point in the history
  • Loading branch information
pythonberg1997 committed Apr 23, 2024
1 parent 7679a9f commit 2d83277
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 36 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
exlru "github.com/hashicorp/golang-lru"
"golang.org/x/crypto/sha3"
"golang.org/x/exp/slices"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
Expand All @@ -56,7 +57,6 @@ import (
"github.com/ethereum/go-ethereum/triedb"
"github.com/ethereum/go-ethereum/triedb/hashdb"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"golang.org/x/exp/slices"
)

var (
Expand Down
5 changes: 3 additions & 2 deletions core/txpool/bundlepool/bundlepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,16 @@ func (p *BundlePool) AddBundle(bundle *types.Bundle) error {
}
bundle.Price = price

p.mu.Lock()
defer p.mu.Unlock()

hash := bundle.Hash()
if _, ok := p.bundles[hash]; ok {
return ErrBundleAlreadyExist
}
for p.slots+numSlots(bundle) > p.config.GlobalSlots {
p.drop()
}
p.mu.Lock()
defer p.mu.Unlock()
p.bundles[hash] = bundle
heap.Push(&p.bundleHeap, bundle)
p.slots += numSlots(bundle)
Expand Down
13 changes: 7 additions & 6 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"container/heap"
"math/big"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
)

// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
Expand Down Expand Up @@ -205,11 +206,11 @@ func (t *transactionsByPriceAndNonce) Forward(tx *types.Transaction) {
}
return
}
//check whether target tx exists in t.heads
// check whether target tx exists in t.heads
for _, head := range t.heads {
if head.tx != nil && head.tx.Resolve() != nil {
if tx == head.tx.Tx {
//shift t to the position one after tx
// shift t to the position one after tx
txTmp := t.PeekWithUnwrap()
for txTmp != tx {
t.Shift()
Expand All @@ -220,13 +221,13 @@ func (t *transactionsByPriceAndNonce) Forward(tx *types.Transaction) {
}
}
}
//get the sender address of tx
// get the sender address of tx
acc, _ := types.Sender(t.signer, tx)
//check whether target tx exists in t.txs
// check whether target tx exists in t.txs
if txs, ok := t.txs[acc]; ok {
for _, txLazyTmp := range txs {
if txLazyTmp != nil && txLazyTmp.Resolve() != nil {
//found the same pointer in t.txs as tx and then shift t to the position one after tx
// found the same pointer in t.txs as tx and then shift t to the position one after tx
if tx == txLazyTmp.Tx {
txTmp := t.PeekWithUnwrap()
for txTmp != tx {
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction,
return nil, err
}
sc.TxIndex = uint64(len(env.txs))
env.txs = append(env.txs, tx.WithoutBlobTxSidecar())
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
env.sidecars = append(env.sidecars, sc)
env.blobs += len(sc.Blobs)
Expand Down
56 changes: 30 additions & 26 deletions miner/worker_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,41 @@ var (
// fillTransactions retrieves the pending bundles and transactions from the txpool and fills them
// into the given sealing block. The selection and ordering strategy can be extended in the future.
func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environment, stopTimer *time.Timer) error {
env.state.StopPrefetcher() // no need to prefetch txs for a builder

var (
localPlainTxs map[common.Address][]*txpool.LazyTransaction
remotePlainTxs map[common.Address][]*txpool.LazyTransaction
localBlobTxs map[common.Address][]*txpool.LazyTransaction
remoteBlobTxs map[common.Address][]*txpool.LazyTransaction
bundles []*types.Bundle
)

// commit bundles
{
bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time)

// if no bundles, not necessary to fill transactions
if len(bundles) == 0 {
return errors.New("no bundles in bundle pool")
}

txs, bundle, err := w.generateOrderedBundles(env, bundles)
if err != nil {
log.Error("fail to generate ordered bundles", "err", err)
return err
}

if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil {
log.Error("fail to commit bundles", "err", err)
return err
}

env.profit.Add(env.profit, bundle.EthSentToSystem)
log.Info("fill bundles", "bundles_count", len(bundles))
}

// commit normal transactions
{
w.mu.RLock()
tip := w.tip
Expand Down Expand Up @@ -71,34 +99,9 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ
localBlobTxs[account] = txs
}
}

bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time)

log.Info("fill bundles and transactions", "bundles_count", len(bundles), "tx_count", len(localPlainTxs)+len(remotePlainTxs))

// if no bundles, not necessary to fill transactions
if len(bundles) == 0 {
return errors.New("no bundles in bundle pool")
}
log.Info("fill transactions", "plain_txs_count", len(localPlainTxs)+len(remotePlainTxs), "blob_txs_count", len(localBlobTxs)+len(remoteBlobTxs))
}

{
txs, bundle, err := w.generateOrderedBundles(env, bundles)
if err != nil {
log.Error("fail to generate ordered bundles", "err", err)
return err
}

if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil {
log.Error("fail to commit bundles", "err", err)
return err
}

env.profit.Add(env.profit, bundle.EthSentToSystem)
}

env.state.StopPrefetcher() // no need to prefetch txs for a builder

// Fill the block with all available pending transactions.
// we will abort when:
// 1.new block was imported
Expand All @@ -122,6 +125,7 @@ func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environ
return err
}
}
log.Info("fill bundles and transactions done", "total_txs_count", len(env.txs))
return nil
}

Expand Down

0 comments on commit 2d83277

Please sign in to comment.