Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txnprovider: introduce txnprovider concept into block production #12831

Merged
merged 15 commits into from
Nov 22, 2024
Merged
4 changes: 2 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
miningSync := stagedsync.New(
cfg.Sync,
stagedsync.MiningStages(ctx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, dirs.Tmp, blockReader),
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, dirs.Tmp, blockReader),
stagedsync.StageBorHeimdallCfg(db, snapDb, miner, *chainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, nil, nil, recents, signatures, false, unwindTypes),
stagedsync.StageExecuteBlocksCfg(
db,
Expand All @@ -1538,7 +1538,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig,
nil,
),
stagedsync.StageSendersCfg(db, sentryControlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, sentryControlServer.Hd),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil, blockReader),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, blockReader),
stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, miningCancel, blockReader, builder.NewLatestBlockBuiltStore()),
),
stagedsync.MiningUnwindOrder,
Expand Down
4 changes: 2 additions & 2 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
miner.MiningConfig.ExtraData = nextBlock.Extra()
miningStages.MockExecFunc(stages.MiningCreateBlock, func(badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, txc wrap.TxContainer, logger log.Logger) error {
err = stagedsync.SpawnMiningCreateBlockStage(s, txc,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, dirs.Tmp, br),
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, dirs.Tmp, br),
quit, logger)
if err != nil {
return err
Expand All @@ -318,7 +318,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
miner.MiningBlock.Header.GasLimit = nextBlock.GasLimit()
miner.MiningBlock.Header.Difficulty = nextBlock.Difficulty()
miner.MiningBlock.Header.Nonce = nextBlock.Nonce()
miner.MiningBlock.PreparedTxs = types.NewTransactionsFixedOrder(nextBlock.Transactions())
miner.MiningBlock.PreparedTxns = nextBlock.Transactions()
//debugprint.Headers(miningWorld.Block.Header, nextBlock.Header())
return err
})
Expand Down
3 changes: 2 additions & 1 deletion cmd/txpool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
if err != nil {
return err
}
defer txPoolDB.Close()
fetch.ConnectCore()
fetch.ConnectSentries()

Expand All @@ -202,7 +203,7 @@ func doTxpool(ctx context.Context, logger log.Logger) error {
}

notifyMiner := func() {}
txpool.MainLoop(ctx, txPoolDB, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner)
txpool.MainLoop(ctx, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner)

grpcServer.GracefulStop()
return nil
Expand Down
52 changes: 0 additions & 52 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,58 +355,6 @@ func (s TxByNonce) Len() int { return len(s) }
func (s TxByNonce) Less(i, j int) bool { return s[i].GetNonce() < s[j].GetNonce() }
func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

type TransactionsStream interface {
Empty() bool
Peek() Transaction
Shift()
Pop()
}

// TransactionsFixedOrder represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsFixedOrder struct {
Transactions
}

// NewTransactionsFixedOrder creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsFixedOrder(txs Transactions) *TransactionsFixedOrder {
return &TransactionsFixedOrder{txs}
}

func (t *TransactionsFixedOrder) Empty() bool {
if t == nil {
return true
}
return len(t.Transactions) == 0
}

// Peek returns the next transaction by price.
func (t *TransactionsFixedOrder) Peek() Transaction {
if len(t.Transactions) == 0 {
return nil
}
return t.Transactions[0]
}

// Shift replaces the current best head with the next one from the same account.
func (t *TransactionsFixedOrder) Shift() {
t.Transactions[0] = nil // avoid memory leak
t.Transactions = t.Transactions[1:]
}

// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *TransactionsFixedOrder) Pop() {
t.Transactions[0] = nil // avoid memory leak
t.Transactions = t.Transactions[1:]
}

// Message is a fully derived transaction and implements core.Message
type Message struct {
to *libcommon.Address
Expand Down
15 changes: 9 additions & 6 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import (
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
stages2 "github.com/erigontech/erigon/turbo/stages"
"github.com/erigontech/erigon/turbo/stages/headerdownload"
"github.com/erigontech/erigon/txnprovider"
"github.com/erigontech/erigon/txnprovider/txpool"
"github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg"
"github.com/erigontech/erigon/txnprovider/txpool/txpoolutil"
Expand Down Expand Up @@ -191,6 +192,7 @@ type Ethereum struct {
waitForStageLoopStop chan struct{}
waitForMiningStop chan struct{}

txnProvider txnprovider.TxnProvider
txPoolDB kv.RwDB
txPool *txpool.TxPool
newTxs chan txpool.Announcements
Expand Down Expand Up @@ -660,6 +662,8 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
if err != nil {
return nil, err
}

backend.txnProvider = txnprovider.NewOrderedTxnPoolProvider(backend.txPool)
}

backend.notifyMiningAboutNewTxs = make(chan struct{}, 1)
Expand All @@ -684,7 +688,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
mining := stagedsync.New(
config.Sync,
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPoolDB, nil, tmpdir, backend.blockReader),
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, nil, tmpdir, backend.blockReader),
stagedsync.StageBorHeimdallCfg(
backend.chainDB,
snapDb,
Expand Down Expand Up @@ -718,7 +722,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
stages2.SilkwormForExecutionStage(backend.silkworm, config),
),
stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd),
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txnProvider, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore),
), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder,
logger, stages.ModeBlockProduction)
Expand All @@ -735,7 +739,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
proposingSync := stagedsync.New(
config.Sync,
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPoolDB, param, tmpdir, backend.blockReader),
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, param, tmpdir, backend.blockReader),
stagedsync.StageBorHeimdallCfg(
backend.chainDB,
snapDb,
Expand Down Expand Up @@ -769,7 +773,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
stages2.SilkwormForExecutionStage(backend.silkworm, config),
),
stagedsync.StageSendersCfg(backend.chainDB, chainConfig, config.Sync, false, dirs.Tmp, config.Prune, blockReader, backend.sentriesClient.Hd),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool, backend.txPoolDB, blockReader),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txnProvider, blockReader),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit, backend.blockReader, latestBlockBuiltStore)), stagedsync.MiningUnwindOrder, stagedsync.MiningPruneOrder, logger, stages.ModeBlockProduction)
// We start the mining step
if err := stages2.MiningStep(ctx, backend.chainDB, proposingSync, tmpdir, logger); err != nil {
Expand Down Expand Up @@ -830,8 +834,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
if casted, ok := backend.txPoolGrpcServer.(*txpool.GrpcServer); ok {
newTxsBroadcaster = casted.NewSlotsStreams
}
go txpool.MainLoop(backend.sentryCtx,
backend.txPoolDB, backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster,
go txpool.MainLoop(backend.sentryCtx, backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster,
func() {
select {
case backend.notifyMiningAboutNewTxs <- struct{}{}:
Expand Down
20 changes: 12 additions & 8 deletions eth/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ type MiningBlock struct {
ParentHeaderTime uint64
Header *types.Header
Uncles []*types.Header
Txs types.Transactions
Txns types.Transactions
Receipts types.Receipts
Withdrawals []*types.Withdrawal
PreparedTxs types.TransactionsStream
PreparedTxns types.Transactions
Requests types.FlatRequests
}

Expand All @@ -76,33 +76,37 @@ type MiningCreateBlockCfg struct {
miner MiningState
chainConfig chain.Config
engine consensus.Engine
txPoolDB kv.RoDB
tmpdir string
blockBuilderParameters *core.BlockBuilderParameters
blockReader services.FullBlockReader
}

func StageMiningCreateBlockCfg(db kv.RwDB, miner MiningState, chainConfig chain.Config, engine consensus.Engine, txPoolDB kv.RoDB, blockBuilderParameters *core.BlockBuilderParameters, tmpdir string, blockReader services.FullBlockReader) MiningCreateBlockCfg {
func StageMiningCreateBlockCfg(
db kv.RwDB,
miner MiningState,
chainConfig chain.Config,
engine consensus.Engine,
blockBuilderParameters *core.BlockBuilderParameters,
tmpdir string,
blockReader services.FullBlockReader,
) MiningCreateBlockCfg {
return MiningCreateBlockCfg{
db: db,
miner: miner,
chainConfig: chainConfig,
engine: engine,
txPoolDB: txPoolDB,
tmpdir: tmpdir,
blockBuilderParameters: blockBuilderParameters,
blockReader: blockReader,
}
}

var maxTransactions uint16 = 1000

// SpawnMiningCreateBlockStage
// TODO:
// - resubmitAdjustCh - variable is not implemented
func SpawnMiningCreateBlockStage(s *StageState, txc wrap.TxContainer, cfg MiningCreateBlockCfg, quit <-chan struct{}, logger log.Logger) (err error) {
current := cfg.miner.MiningBlock
txPoolLocals := []libcommon.Address{} //txPoolV2 has no concept of local addresses (yet?)
var txPoolLocals []libcommon.Address //txPoolV2 has no concept of local addresses (yet?)
coinbase := cfg.miner.MiningConfig.Etherbase

const (
Expand Down
Loading
Loading