diff --git a/miner/miner.go b/miner/miner.go index 6c4dbf832..2bfb05cd3 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -72,6 +72,13 @@ var DefaultConfig = Config{ Recommit: 2 * time.Second, } +// prioritizedTransaction represents a single transaction that +// should be processed as the first transaction in the next block. +type prioritizedTransaction struct { + blockNumber uint64 + tx *types.Transaction +} + // Miner creates blocks and searches for proof-of-work values. type Miner struct { confMu sync.RWMutex // The lock used to protect the config fields: GasCeil, GasTip and Extradata @@ -93,6 +100,7 @@ type Miner struct { // Make sure the checker here is used by a single block one time, and must be reset for another block. circuitCapacityChecker *circuitcapacitychecker.CircuitCapacityChecker + prioritizedTx *prioritizedTransaction getWorkCh chan *getWorkReq exitCh chan struct{} diff --git a/miner/miner_test.go b/miner/miner_test.go index cd68cf07a..f70dad3ff 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -320,6 +320,24 @@ func TestBuildBlockTimeout(t *testing.T) { require.EqualValues(t, 0, len(newBlockResult.SkippedTxs)) } +func TestPriorizeTx(t *testing.T) { + miner := createMiner(t, nil, nil) + l2tx, _ := types.SignTx(types.NewTransaction(testNonce, destAddr, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), types.LatestSigner(miner.chainConfig), testKey1) + l2tx1, _ := types.SignTx(types.NewTransaction(testNonce+1, destAddr, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), types.LatestSigner(miner.chainConfig), testKey1) + l2tx2, _ := types.SignTx(types.NewTransaction(testNonce+2, destAddr, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), types.LatestSigner(miner.chainConfig), testKey1) + require.NoError(t, miner.txpool.AddLocal(l2tx)) + require.NoError(t, miner.txpool.AddLocal(l2tx1)) + require.NoError(t, miner.txpool.AddLocal(l2tx2)) + miner.circuitCapacityChecker.Skip(l2tx1.Hash(), circuitcapacitychecker.ErrBlockRowConsumptionOverflow) + parentHeader := miner.chain.CurrentHeader() + timestamp := time.Now().Add(3 * time.Second) + newBlockResult, err := miner.BuildBlock(parentHeader.ParentHash, timestamp, types.Transactions{}) + require.NoError(t, err) + require.NotNil(t, miner.prioritizedTx) + require.EqualValues(t, l2tx1.Hash().Hex(), miner.prioritizedTx.tx.Hash().Hex()) + require.EqualValues(t, 1, newBlockResult.Block.Transactions().Len()) +} + func TestBuildBlockErrorOnApplyStage(t *testing.T) { t.Run("wrong l1 index", func(t *testing.T) { miner := createMiner(t, nil, nil) diff --git a/miner/pipeline.go b/miner/pipeline.go index eab57735e..767be487c 100644 --- a/miner/pipeline.go +++ b/miner/pipeline.go @@ -87,6 +87,7 @@ type Pipeline struct { receipts types.Receipts gasPool *core.GasPool skippedL1Txs []*types.SkippedTransaction + prioritizedTx *prioritizedTransaction // com channels txnQueue chan *types.Transaction @@ -512,6 +513,7 @@ func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, timeout time.Dura // if no error happens, and dealine has not been reached, then continue consuming txs continue case errors.Is(err, circuitcapacitychecker.ErrBlockRowConsumptionOverflow): + log.Info("Circuit capacity limit reached for a single tx", "tx", lastTxn.Hash().String(), "isL1Message", lastTxn.IsL1MessageTx(), "block", p.header.Number, "error", err) if candidate.Txs.Len() == 1 { // It is the first tx that causes row consumption overflow if lastTxn.IsL1MessageTx() { p.skippedL1Txs = append(p.skippedL1Txs, &types.SkippedTransaction{ @@ -525,6 +527,14 @@ func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, timeout time.Dura p.txpool.RemoveTx(lastTxn.Hash(), true) l2TxRowConsumptionOverflowCounter.Inc(1) } + } else if !lastTxn.IsL1MessageTx() { + // prioritize overflowing L2 message as the first txn next block + // no need to prioritize L1 messages, they are fetched in order + // and processed first in every block anyways + p.prioritizedTx = &prioritizedTransaction{ + blockNumber: p.header.Number.Uint64() + 1, + tx: lastTxn, + } } // the error here could be: diff --git a/miner/worker.go b/miner/worker.go index 1f3699098..3218a7460 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -159,6 +159,7 @@ func (miner *Miner) generateWork(genParams *generateParams) (*NewBlockResult, er if err != nil { return nil, err } + miner.prioritizedTx = pipeline.prioritizedTx return miner.handlePipelineResult(pipeline, result) @@ -176,12 +177,15 @@ func (miner *Miner) startPipeline( // Do not collect txns from txpool, if `simulate` is true if !genParams.simulate { - // Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees - pending = miner.txpool.PendingWithMax(tip, pipeline.header.BaseFee, miner.config.MaxAccountsNum) } + + if miner.prioritizedTx != nil && miner.txpool.Get(miner.prioritizedTx.tx.Hash()) == nil { // ignore the tx if it is removed from pool(maybe included by other miners) + miner.prioritizedTx = nil + } + // if no txs, return immediately without starting pipeline - if genParams.transactions.Len() == 0 && len(pending) == 0 { + if genParams.transactions.Len() == 0 && len(pending) == 0 && miner.prioritizedTx == nil { log.Info("no txs found, return immediately") return nil, nil } @@ -213,6 +217,17 @@ func (miner *Miner) startPipeline( } } + if !genParams.simulate && miner.prioritizedTx != nil && pipeline.header.Number.Uint64() >= miner.prioritizedTx.blockNumber { + tx := miner.prioritizedTx.tx + from, _ := types.Sender(signer, tx) // error already checked before + txList := map[common.Address]types.Transactions{from: []*types.Transaction{miner.prioritizedTx.tx}} + txs := types.NewTransactionsByPriceAndNonce(signer, txList, header.BaseFee) + miner.prioritizedTx = nil // clear prioritizedTx before commitTransactions + if result := pipeline.TryPushTxns(txs); result != nil { + return result, nil + } + } + // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending