From 05b91476d15664774808455574933e8792cfd9c9 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Thu, 19 Dec 2024 10:14:23 +0000 Subject: [PATCH] optimize mutexes and increase batch size (#2215) * optimize mutexes and increase batch size * increase batch size * add rollup overhead to tx size. Added compression to batch limiter. Add missing error check * lint --- go/config/defaults/0-base-config.yaml | 4 +- go/enclave/components/batch_executor.go | 67 ++++++++++++---------- go/enclave/components/txpool.go | 74 ++++++++++++++++++++++--- go/enclave/enclave.go | 5 +- go/enclave/enclave_admin_service.go | 21 ++++--- go/enclave/limiters/batchlimiter.go | 23 +++++--- 6 files changed, 138 insertions(+), 56 deletions(-) diff --git a/go/config/defaults/0-base-config.yaml b/go/config/defaults/0-base-config.yaml index 5dad19fb8c..e8dd4cbb2f 100644 --- a/go/config/defaults/0-base-config.yaml +++ b/go/config/defaults/0-base-config.yaml @@ -8,11 +8,11 @@ network: batch: interval: 1s maxInterval: 1s # if this is greater than batch.interval then we make batches more slowly when there are no transactions - maxSize: 45000 # around 45kb - around 200 transactions / batch + maxSize: 125952 # (128-5)kb - the size of the rollup minus overhead rollup: interval: 5s maxInterval: 10m # rollups will be produced after this time even if the data blob is not full - maxSize: 131072 # 128kb + maxSize: 131072 # 128kb - the size of a blob gas: # todo: ask stefan about these fields baseFee: 1000000000 # using geth's initial base fee for EIP-1559 blocks. minGasPrice: 1000000000 # using geth's initial base fee for EIP-1559 blocks. diff --git a/go/enclave/components/batch_executor.go b/go/enclave/components/batch_executor.go index d21f30dcef..489dd5b82d 100644 --- a/go/enclave/components/batch_executor.go +++ b/go/enclave/components/batch_executor.go @@ -8,6 +8,8 @@ import ( "math/big" "sync" + "github.com/ten-protocol/go-ten/go/common/compression" + gethcore "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/vm" @@ -48,18 +50,19 @@ var ErrNoTransactionsToProcess = fmt.Errorf("no transactions to process") // batchExecutor - the component responsible for executing batches type batchExecutor struct { - storage storage.Storage - batchRegistry BatchRegistry - config enclaveconfig.EnclaveConfig - gethEncodingService gethencoding.EncodingService - crossChainProcessors *crosschain.Processors - genesis *genesis.Genesis - logger gethlog.Logger - gasOracle gas.Oracle - chainConfig *params.ChainConfig - systemContracts system.SystemContractCallbacks - entropyService *crypto.EvmEntropyService - mempool *TxPool + storage storage.Storage + batchRegistry BatchRegistry + config enclaveconfig.EnclaveConfig + gethEncodingService gethencoding.EncodingService + crossChainProcessors *crosschain.Processors + dataCompressionService compression.DataCompressionService + genesis *genesis.Genesis + logger gethlog.Logger + gasOracle gas.Oracle + chainConfig *params.ChainConfig + systemContracts system.SystemContractCallbacks + entropyService *crypto.EvmEntropyService + mempool *TxPool // stateDBMutex - used to protect calls to stateDB.Commit as it is not safe for async access. stateDBMutex sync.Mutex @@ -79,24 +82,26 @@ func NewBatchExecutor( systemContracts system.SystemContractCallbacks, entropyService *crypto.EvmEntropyService, mempool *TxPool, + dataCompressionService compression.DataCompressionService, logger gethlog.Logger, ) BatchExecutor { return &batchExecutor{ - storage: storage, - batchRegistry: batchRegistry, - config: config, - gethEncodingService: gethEncodingService, - crossChainProcessors: cc, - genesis: genesis, - chainConfig: chainConfig, - logger: logger, - gasOracle: gasOracle, - stateDBMutex: sync.Mutex{}, - batchGasLimit: config.GasBatchExecutionLimit, - systemContracts: systemContracts, - entropyService: entropyService, - mempool: mempool, - chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger), + storage: storage, + batchRegistry: batchRegistry, + config: config, + gethEncodingService: gethEncodingService, + crossChainProcessors: cc, + genesis: genesis, + chainConfig: chainConfig, + logger: logger, + gasOracle: gasOracle, + stateDBMutex: sync.Mutex{}, + batchGasLimit: config.GasBatchExecutionLimit, + systemContracts: systemContracts, + entropyService: entropyService, + mempool: mempool, + dataCompressionService: dataCompressionService, + chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger), } } @@ -286,7 +291,7 @@ func (executor *batchExecutor) execBatchTransactions(ec *BatchExecutionContext) } func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext) error { - sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize) + sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize, executor.dataCompressionService) pendingTransactions := executor.mempool.PendingTransactions() nrPending, nrQueued := executor.mempool.Stats() @@ -319,10 +324,12 @@ func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext // check the size limiter err := sizeLimiter.AcceptTransaction(tx) if err != nil { - executor.logger.Info("Unable to accept transaction", log.TxKey, tx.Hash(), log.ErrKey, err) if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space - break + executor.logger.Trace("Unable to accept transaction", log.TxKey, tx.Hash()) + mempoolTxs.Pop() + continue } + return fmt.Errorf("failed to apply the batch limiter. Cause: %w", err) } pTx, err := executor.toPricedTx(ec, tx) diff --git a/go/enclave/components/txpool.go b/go/enclave/components/txpool.go index 7aa81f6b68..214b5ff0aa 100644 --- a/go/enclave/components/txpool.go +++ b/go/enclave/components/txpool.go @@ -1,17 +1,17 @@ package components -// unsafe package imported in order to link to a private function in go-ethereum. -// This allows us to validate transactions against the tx pool rules. import ( "fmt" "math/big" + "reflect" "strings" "sync" "sync/atomic" "time" - _ "unsafe" + "unsafe" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/params" "github.com/ten-protocol/go-ten/go/common/log" gethcommon "github.com/ethereum/go-ethereum/common" @@ -24,12 +24,30 @@ import ( "github.com/ten-protocol/go-ten/go/common" ) +const ( + // txSlotSize is used to calculate how many data slots a single transaction + // takes up based on its size. The slots are used as DoS protection, ensuring + // that validating a new transaction remains a constant operation (in reality + // O(maxslots), where max slots are 4 currently). + txSlotSize = 32 * 1024 + + // we assume that at the limit, a single "uncompressable" tx is in a batch which gets rolled-up, and must fit in a 128kb blob + rollupOverhead = 5 * 1024 + + // txMaxSize is the maximum size a single transaction can have. This field has + // non-trivial consequences: larger transactions are significantly harder and + // more expensive to propagate; larger transactions also take more resources + // to validate whether they fit into the pool or not. + txMaxSize = 4*txSlotSize - rollupOverhead // 128KB - overhead +) + // this is how long the node waits to receive the second batch var startMempoolTimeout = 90 * time.Second // TxPool is an obscuro wrapper around geths transaction pool type TxPool struct { txPoolConfig legacypool.Config + chainconfig *params.ChainConfig legacyPool *legacypool.LegacyPool pool *gethtxpool.TxPool Chain *EthChainAdapter @@ -59,6 +77,7 @@ func NewTxPool(blockchain *EthChainAdapter, gasTip *big.Int, validateOnly bool, txp := &TxPool{ Chain: blockchain, + chainconfig: blockchain.Config(), txPoolConfig: txPoolConfig, legacyPool: legacyPool, gasTip: gasTip, @@ -195,6 +214,12 @@ func (t *TxPool) Close() error { // Add adds a new transactions to the pool func (t *TxPool) add(transaction *common.L2Tx) error { + // validate against the consensus rules + err := t.validateTxBasics(transaction, false) + if err != nil { + return err + } + var strErrors []string for _, err := range t.pool.Add([]*types.Transaction{transaction}, false, false) { if err != nil { @@ -208,16 +233,13 @@ func (t *TxPool) add(transaction *common.L2Tx) error { return nil } -//go:linkname validateTxBasics github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTxBasics -func validateTxBasics(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error - //go:linkname validateTx github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTx func validateTx(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error // Validate - run the underlying tx pool validation logic func (t *TxPool) validate(tx *common.L2Tx) error { // validate against the consensus rules - err := validateTxBasics(t.legacyPool, tx, false) + err := t.validateTxBasics(tx, false) if err != nil { return err } @@ -231,3 +253,41 @@ func (t *TxPool) validate(tx *common.L2Tx) error { func (t *TxPool) Stats() (int, int) { return t.legacyPool.Stats() } + +// validateTxBasics checks whether a transaction is valid according to the consensus +// rules, but does not check state-dependent validation such as sufficient balance. +// This check is meant as an early check which only needs to be performed once, +// and does not require the pool mutex to be held. +func (t *TxPool) validateTxBasics(tx *types.Transaction, local bool) error { + opts := &gethtxpool.ValidationOptions{ + Config: t.chainconfig, + Accept: 0 | + 1<