Skip to content

Commit

Permalink
optimize mutexes and increase batch size (#2215)
Browse files Browse the repository at this point in the history
* optimize mutexes and increase batch size

* increase batch size

* add rollup overhead to tx size. Added compression to batch limiter. Add missing error check

* lint
  • Loading branch information
tudor-malene authored Dec 19, 2024
1 parent 0867530 commit 05b9147
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 56 deletions.
4 changes: 2 additions & 2 deletions go/config/defaults/0-base-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ network:
batch:
interval: 1s
maxInterval: 1s # if this is greater than batch.interval then we make batches more slowly when there are no transactions
maxSize: 45000 # around 45kb - around 200 transactions / batch
maxSize: 125952 # (128-5)kb - the size of the rollup minus overhead
rollup:
interval: 5s
maxInterval: 10m # rollups will be produced after this time even if the data blob is not full
maxSize: 131072 # 128kb
maxSize: 131072 # 128kb - the size of a blob
gas: # todo: ask stefan about these fields
baseFee: 1000000000 # using geth's initial base fee for EIP-1559 blocks.
minGasPrice: 1000000000 # using geth's initial base fee for EIP-1559 blocks.
Expand Down
67 changes: 37 additions & 30 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math/big"
"sync"

"github.com/ten-protocol/go-ten/go/common/compression"

gethcore "github.com/ethereum/go-ethereum/core"

"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -48,18 +50,19 @@ var ErrNoTransactionsToProcess = fmt.Errorf("no transactions to process")

// batchExecutor - the component responsible for executing batches
type batchExecutor struct {
storage storage.Storage
batchRegistry BatchRegistry
config enclaveconfig.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
genesis *genesis.Genesis
logger gethlog.Logger
gasOracle gas.Oracle
chainConfig *params.ChainConfig
systemContracts system.SystemContractCallbacks
entropyService *crypto.EvmEntropyService
mempool *TxPool
storage storage.Storage
batchRegistry BatchRegistry
config enclaveconfig.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
dataCompressionService compression.DataCompressionService
genesis *genesis.Genesis
logger gethlog.Logger
gasOracle gas.Oracle
chainConfig *params.ChainConfig
systemContracts system.SystemContractCallbacks
entropyService *crypto.EvmEntropyService
mempool *TxPool
// stateDBMutex - used to protect calls to stateDB.Commit as it is not safe for async access.
stateDBMutex sync.Mutex

Expand All @@ -79,24 +82,26 @@ func NewBatchExecutor(
systemContracts system.SystemContractCallbacks,
entropyService *crypto.EvmEntropyService,
mempool *TxPool,
dataCompressionService compression.DataCompressionService,
logger gethlog.Logger,
) BatchExecutor {
return &batchExecutor{
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
genesis: genesis,
chainConfig: chainConfig,
logger: logger,
gasOracle: gasOracle,
stateDBMutex: sync.Mutex{},
batchGasLimit: config.GasBatchExecutionLimit,
systemContracts: systemContracts,
entropyService: entropyService,
mempool: mempool,
chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger),
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
genesis: genesis,
chainConfig: chainConfig,
logger: logger,
gasOracle: gasOracle,
stateDBMutex: sync.Mutex{},
batchGasLimit: config.GasBatchExecutionLimit,
systemContracts: systemContracts,
entropyService: entropyService,
mempool: mempool,
dataCompressionService: dataCompressionService,
chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger),
}
}

Expand Down Expand Up @@ -286,7 +291,7 @@ func (executor *batchExecutor) execBatchTransactions(ec *BatchExecutionContext)
}

func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext) error {
sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize)
sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize, executor.dataCompressionService)
pendingTransactions := executor.mempool.PendingTransactions()

nrPending, nrQueued := executor.mempool.Stats()
Expand Down Expand Up @@ -319,10 +324,12 @@ func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext
// check the size limiter
err := sizeLimiter.AcceptTransaction(tx)
if err != nil {
executor.logger.Info("Unable to accept transaction", log.TxKey, tx.Hash(), log.ErrKey, err)
if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space
break
executor.logger.Trace("Unable to accept transaction", log.TxKey, tx.Hash())
mempoolTxs.Pop()
continue
}
return fmt.Errorf("failed to apply the batch limiter. Cause: %w", err)
}

pTx, err := executor.toPricedTx(ec, tx)
Expand Down
74 changes: 67 additions & 7 deletions go/enclave/components/txpool.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package components

// unsafe package imported in order to link to a private function in go-ethereum.
// This allows us to validate transactions against the tx pool rules.
import (
"fmt"
"math/big"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
_ "unsafe"
"unsafe"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/params"
"github.com/ten-protocol/go-ten/go/common/log"

gethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -24,12 +24,30 @@ import (
"github.com/ten-protocol/go-ten/go/common"
)

const (
// txSlotSize is used to calculate how many data slots a single transaction
// takes up based on its size. The slots are used as DoS protection, ensuring
// that validating a new transaction remains a constant operation (in reality
// O(maxslots), where max slots are 4 currently).
txSlotSize = 32 * 1024

// we assume that at the limit, a single "uncompressable" tx is in a batch which gets rolled-up, and must fit in a 128kb blob
rollupOverhead = 5 * 1024

// txMaxSize is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4*txSlotSize - rollupOverhead // 128KB - overhead
)

// this is how long the node waits to receive the second batch
var startMempoolTimeout = 90 * time.Second

// TxPool is an obscuro wrapper around geths transaction pool
type TxPool struct {
txPoolConfig legacypool.Config
chainconfig *params.ChainConfig
legacyPool *legacypool.LegacyPool
pool *gethtxpool.TxPool
Chain *EthChainAdapter
Expand Down Expand Up @@ -59,6 +77,7 @@ func NewTxPool(blockchain *EthChainAdapter, gasTip *big.Int, validateOnly bool,

txp := &TxPool{
Chain: blockchain,
chainconfig: blockchain.Config(),
txPoolConfig: txPoolConfig,
legacyPool: legacyPool,
gasTip: gasTip,
Expand Down Expand Up @@ -195,6 +214,12 @@ func (t *TxPool) Close() error {

// Add adds a new transactions to the pool
func (t *TxPool) add(transaction *common.L2Tx) error {
// validate against the consensus rules
err := t.validateTxBasics(transaction, false)
if err != nil {
return err
}

var strErrors []string
for _, err := range t.pool.Add([]*types.Transaction{transaction}, false, false) {
if err != nil {
Expand All @@ -208,16 +233,13 @@ func (t *TxPool) add(transaction *common.L2Tx) error {
return nil
}

//go:linkname validateTxBasics github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTxBasics
func validateTxBasics(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

//go:linkname validateTx github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTx
func validateTx(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

// Validate - run the underlying tx pool validation logic
func (t *TxPool) validate(tx *common.L2Tx) error {
// validate against the consensus rules
err := validateTxBasics(t.legacyPool, tx, false)
err := t.validateTxBasics(tx, false)
if err != nil {
return err
}
Expand All @@ -231,3 +253,41 @@ func (t *TxPool) validate(tx *common.L2Tx) error {
func (t *TxPool) Stats() (int, int) {
return t.legacyPool.Stats()
}

// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (t *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
opts := &gethtxpool.ValidationOptions{
Config: t.chainconfig,
Accept: 0 |
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType,
MaxSize: txMaxSize,
MinTip: t.gasTip,
}

// we need to access some private variables from the legacy pool to run validation with our own consensus options
v := reflect.ValueOf(t.legacyPool).Elem()

chField := v.FieldByName("currentHead")
chFieldPtr := unsafe.Pointer(chField.UnsafeAddr())
ch, ok := reflect.NewAt(chField.Type(), chFieldPtr).Elem().Interface().(atomic.Pointer[types.Header]) //nolint:govet
if !ok {
t.logger.Crit("invalid mempool. should not happen")
}

sigField := v.FieldByName("signer")
sigFieldPtr := unsafe.Pointer(sigField.UnsafeAddr())
sig, ok1 := reflect.NewAt(sigField.Type(), sigFieldPtr).Elem().Interface().(types.Signer)
if !ok1 {
t.logger.Crit("invalid mempool. should not happen")
}

if err := gethtxpool.ValidateTransaction(tx, ch.Load(), sig, opts); err != nil {
return err
}
return nil
}
5 changes: 4 additions & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"math/big"

"github.com/ten-protocol/go-ten/go/common/compression"

"github.com/ten-protocol/go-ten/go/enclave/crypto"

enclaveconfig "github.com/ten-protocol/go-ten/go/enclave/config"
Expand Down Expand Up @@ -100,7 +102,8 @@ func NewEnclave(config *enclaveconfig.EnclaveConfig, genesis *genesis.Genesis, m

gasOracle := gas.NewGasOracle()
blockProcessor := components.NewBlockProcessor(storage, crossChainProcessors, gasOracle, logger)
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, logger)
dataCompressionService := compression.NewBrotliDataCompressionService()
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, dataCompressionService, logger)

// ensure cached chain state data is up-to-date using the persisted batch data
err = restoreStateDBCache(context.Background(), storage, batchRegistry, batchExecutor, genesis, logger)
Expand Down
21 changes: 12 additions & 9 deletions go/enclave/enclave_admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (

type enclaveAdminService struct {
config *enclaveconfig.EnclaveConfig
mainMutex sync.Mutex // serialises all data ingestion or creation to avoid weird races
mainMutex sync.Mutex // locks the admin operations
dataInMutex sync.RWMutex // controls access to data ingestion
logger gethlog.Logger
l1BlockProcessor components.L1BlockProcessor
validatorService nodetype.Validator
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewEnclaveAdminAPI(config *enclaveconfig.EnclaveConfig, storage storage.Sto
eas := &enclaveAdminService{
config: config,
mainMutex: sync.Mutex{},
dataInMutex: sync.RWMutex{},
logger: logger,
l1BlockProcessor: blockProcessor,
service: validatorService,
Expand Down Expand Up @@ -176,8 +178,8 @@ func (e *enclaveAdminService) MakeActive() common.SystemError {

// SubmitL1Block is used to update the enclave with an additional L1 block.
func (e *enclaveAdminService) SubmitL1Block(ctx context.Context, blockHeader *types.Header, receipts []*common.TxAndReceiptAndBlobs) (*common.BlockSubmissionResponse, common.SystemError) {
e.mainMutex.Lock()
defer e.mainMutex.Unlock()
e.dataInMutex.Lock()
defer e.dataInMutex.Unlock()

e.logger.Info("SubmitL1Block", log.BlockHeightKey, blockHeader.Number, log.BlockHashKey, blockHeader.Hash())

Expand Down Expand Up @@ -237,8 +239,8 @@ func (e *enclaveAdminService) SubmitBatch(ctx context.Context, extBatch *common.
return err
}

e.mainMutex.Lock()
defer e.mainMutex.Unlock()
e.dataInMutex.Lock()
defer e.dataInMutex.Unlock()

// if the signature is valid, then store the batch together with the converted hash
err = e.storage.StoreBatch(ctx, batch, convertedHeader.Hash())
Expand All @@ -261,8 +263,8 @@ func (e *enclaveAdminService) CreateBatch(ctx context.Context, skipBatchIfEmpty

defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateBatch call ended")

e.mainMutex.Lock()
defer e.mainMutex.Unlock()
e.dataInMutex.RLock()
defer e.dataInMutex.RUnlock()

err := e.sequencer().CreateBatch(ctx, skipBatchIfEmpty)
if err != nil {
Expand All @@ -278,8 +280,9 @@ func (e *enclaveAdminService) CreateRollup(ctx context.Context, fromSeqNo uint64
}
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateRollup call ended")

e.mainMutex.Lock()
defer e.mainMutex.Unlock()
// allow the simultaneous production of rollups and batches
e.dataInMutex.RLock()
defer e.dataInMutex.RUnlock()

if e.registry.HeadBatchSeq() == nil {
return nil, responses.ToInternalError(fmt.Errorf("not initialised yet"))
Expand Down
23 changes: 16 additions & 7 deletions go/enclave/limiters/batchlimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ package limiters
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ten-protocol/go-ten/go/common/compression"
)

// BatchSizeLimiter - Acts as a limiter for batches based
// the data from the transaction that we have to publish to the l1.
// Acts as a calldata reservation system that accounts for both
// transactions and cross chain messages.
type batchSizeLimiter struct {
remainingSize uint64 // the available size in the limiter
compressionService compression.DataCompressionService
remainingSize uint64 // the available size in the limiter
}

// NewBatchSizeLimiter - Size is the total space available per batch for calldata in a rollup.
func NewBatchSizeLimiter(size uint64) BatchSizeLimiter {
func NewBatchSizeLimiter(size uint64, compressionService compression.DataCompressionService) BatchSizeLimiter {
return &batchSizeLimiter{
remainingSize: size,
compressionService: compressionService,
remainingSize: size,
}
}

// AcceptTransaction - transaction is rlp encoded as it normally would be when publishing a rollup and
// its size is deducted from the remaining limit.
func (l *batchSizeLimiter) AcceptTransaction(tx *types.Transaction) error {
rlpSize, err := getRlpSize(tx)
rlpSize, err := l.getCompressedSize(tx)
if err != nil {
return err
}
Expand All @@ -37,14 +40,20 @@ func (l *batchSizeLimiter) AcceptTransaction(tx *types.Transaction) error {
}

// todo (@stefan) figure out how to optimize the serialization out of the limiter
func getRlpSize(val interface{}) (int, error) {
// todo (@stefan) - this should have a coefficient for compression
func (l *batchSizeLimiter) getCompressedSize(val interface{}) (int, error) {
enc, err := rlp.EncodeToBytes(val)
if err != nil {
return 0, err
}

return len(enc), nil
// compress the transaction. This is useless for small transactions, but might be useful for larger transactions such as deploying contracts
// todo - keep a running compression of the current batch
compr, err := l.compressionService.CompressBatch(enc)
if err != nil {
return 0, err
}

return len(compr), nil
}

type unlimitedBatchSize struct{}
Expand Down

0 comments on commit 05b9147

Please sign in to comment.