Skip to content

Commit

Permalink
add rollup overhead to tx size. Added compression to batch limiter. A…
Browse files Browse the repository at this point in the history
…dd missing error check
  • Loading branch information
tudor-malene committed Dec 19, 2024
1 parent 7f008fb commit a3cd9a0
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 46 deletions.
2 changes: 1 addition & 1 deletion go/config/defaults/0-base-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ network:
batch:
interval: 1s
maxInterval: 1s # if this is greater than batch.interval then we make batches more slowly when there are no transactions
maxSize: 131072 # 128kb - the size of the rollup
maxSize: 125952 # (128-5)kb - the size of the rollup minus overhead
rollup:
interval: 5s
maxInterval: 10m # rollups will be produced after this time even if the data blob is not full
Expand Down
67 changes: 37 additions & 30 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math/big"
"sync"

"github.com/ten-protocol/go-ten/go/common/compression"

gethcore "github.com/ethereum/go-ethereum/core"

"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -48,18 +50,19 @@ var ErrNoTransactionsToProcess = fmt.Errorf("no transactions to process")

// batchExecutor - the component responsible for executing batches
type batchExecutor struct {
storage storage.Storage
batchRegistry BatchRegistry
config enclaveconfig.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
genesis *genesis.Genesis
logger gethlog.Logger
gasOracle gas.Oracle
chainConfig *params.ChainConfig
systemContracts system.SystemContractCallbacks
entropyService *crypto.EvmEntropyService
mempool *TxPool
storage storage.Storage
batchRegistry BatchRegistry
config enclaveconfig.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
dataCompressionService compression.DataCompressionService
genesis *genesis.Genesis
logger gethlog.Logger
gasOracle gas.Oracle
chainConfig *params.ChainConfig
systemContracts system.SystemContractCallbacks
entropyService *crypto.EvmEntropyService
mempool *TxPool
// stateDBMutex - used to protect calls to stateDB.Commit as it is not safe for async access.
stateDBMutex sync.Mutex

Expand All @@ -79,24 +82,26 @@ func NewBatchExecutor(
systemContracts system.SystemContractCallbacks,
entropyService *crypto.EvmEntropyService,
mempool *TxPool,
dataCompressionService compression.DataCompressionService,
logger gethlog.Logger,
) BatchExecutor {
return &batchExecutor{
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
genesis: genesis,
chainConfig: chainConfig,
logger: logger,
gasOracle: gasOracle,
stateDBMutex: sync.Mutex{},
batchGasLimit: config.GasBatchExecutionLimit,
systemContracts: systemContracts,
entropyService: entropyService,
mempool: mempool,
chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger),
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
genesis: genesis,
chainConfig: chainConfig,
logger: logger,
gasOracle: gasOracle,
stateDBMutex: sync.Mutex{},
batchGasLimit: config.GasBatchExecutionLimit,
systemContracts: systemContracts,
entropyService: entropyService,
mempool: mempool,
dataCompressionService: dataCompressionService,
chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger),
}
}

Expand Down Expand Up @@ -286,7 +291,7 @@ func (executor *batchExecutor) execBatchTransactions(ec *BatchExecutionContext)
}

func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext) error {
sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize)
sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize, executor.dataCompressionService)
pendingTransactions := executor.mempool.PendingTransactions()

nrPending, nrQueued := executor.mempool.Stats()
Expand Down Expand Up @@ -319,10 +324,12 @@ func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext
// check the size limiter
err := sizeLimiter.AcceptTransaction(tx)
if err != nil {
executor.logger.Info("Unable to accept transaction", log.TxKey, tx.Hash(), log.ErrKey, err)
if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space
break
executor.logger.Trace("Unable to accept transaction", log.TxKey, tx.Hash())
mempoolTxs.Pop()
continue
}
return fmt.Errorf("failed to apply the batch limiter. Cause: %w", err)
}

pTx, err := executor.toPricedTx(ec, tx)
Expand Down
74 changes: 67 additions & 7 deletions go/enclave/components/txpool.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package components

// unsafe package imported in order to link to a private function in go-ethereum.
// This allows us to validate transactions against the tx pool rules.
import (
"fmt"
"math/big"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
_ "unsafe"
"unsafe"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/params"
"github.com/ten-protocol/go-ten/go/common/log"

gethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -24,12 +24,30 @@ import (
"github.com/ten-protocol/go-ten/go/common"
)

const (
// txSlotSize is used to calculate how many data slots a single transaction
// takes up based on its size. The slots are used as DoS protection, ensuring
// that validating a new transaction remains a constant operation (in reality
// O(maxslots), where max slots are 4 currently).
txSlotSize = 32 * 1024

// we assume that at the limit, a single "uncompressable" tx is in a batch which gets rolled-up, and must fit in a 128kb blob
rollupOverhead = 5 * 1024

// txMaxSize is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4*txSlotSize - rollupOverhead // 128KB - overhead
)

// this is how long the node waits to receive the second batch
var startMempoolTimeout = 90 * time.Second

// TxPool is an obscuro wrapper around geths transaction pool
type TxPool struct {
txPoolConfig legacypool.Config
chainconfig *params.ChainConfig
legacyPool *legacypool.LegacyPool
pool *gethtxpool.TxPool
Chain *EthChainAdapter
Expand Down Expand Up @@ -59,6 +77,7 @@ func NewTxPool(blockchain *EthChainAdapter, gasTip *big.Int, validateOnly bool,

txp := &TxPool{
Chain: blockchain,
chainconfig: blockchain.Config(),
txPoolConfig: txPoolConfig,
legacyPool: legacyPool,
gasTip: gasTip,
Expand Down Expand Up @@ -195,6 +214,12 @@ func (t *TxPool) Close() error {

// Add adds a new transactions to the pool
func (t *TxPool) add(transaction *common.L2Tx) error {
// validate against the consensus rules
err := t.validateTxBasics(transaction, false)
if err != nil {
return err
}

var strErrors []string
for _, err := range t.pool.Add([]*types.Transaction{transaction}, false, false) {
if err != nil {
Expand All @@ -208,16 +233,13 @@ func (t *TxPool) add(transaction *common.L2Tx) error {
return nil
}

//go:linkname validateTxBasics github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTxBasics
func validateTxBasics(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

//go:linkname validateTx github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTx
func validateTx(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

// Validate - run the underlying tx pool validation logic
func (t *TxPool) validate(tx *common.L2Tx) error {
// validate against the consensus rules
err := validateTxBasics(t.legacyPool, tx, false)
err := t.validateTxBasics(tx, false)
if err != nil {
return err
}
Expand All @@ -231,3 +253,41 @@ func (t *TxPool) validate(tx *common.L2Tx) error {
func (t *TxPool) Stats() (int, int) {
return t.legacyPool.Stats()
}

// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (t *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
opts := &gethtxpool.ValidationOptions{
Config: t.chainconfig,
Accept: 0 |
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType,
MaxSize: txMaxSize,
MinTip: t.gasTip,
}

// we need to access some private variables from the legacy pool to run validation with our own consensus options
v := reflect.ValueOf(t.legacyPool).Elem()

chField := v.FieldByName("currentHead")
chFieldPtr := unsafe.Pointer(chField.UnsafeAddr())
ch, ok := reflect.NewAt(chField.Type(), chFieldPtr).Elem().Interface().(atomic.Pointer[types.Header])

Check failure on line 277 in go/enclave/components/txpool.go

View workflow job for this annotation

GitHub Actions / lint

copylocks: assignment copies lock value to ch: (sync/atomic.Pointer[github.com/ethereum/go-ethereum/core/types.Header], bool) contains sync/atomic.Pointer[github.com/ethereum/go-ethereum/core/types.Header] contains sync/atomic.noCopy (govet)
if !ok {
t.logger.Crit("invalid mempool. should not happen")
}

sigField := v.FieldByName("signer")
sigFieldPtr := unsafe.Pointer(sigField.UnsafeAddr())
sig, ok1 := reflect.NewAt(sigField.Type(), sigFieldPtr).Elem().Interface().(types.Signer)
if !ok1 {
t.logger.Crit("invalid mempool. should not happen")
}

if err := gethtxpool.ValidateTransaction(tx, ch.Load(), sig, opts); err != nil {
return err
}
return nil
}
5 changes: 4 additions & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"math/big"

"github.com/ten-protocol/go-ten/go/common/compression"

"github.com/ten-protocol/go-ten/go/enclave/crypto"

enclaveconfig "github.com/ten-protocol/go-ten/go/enclave/config"
Expand Down Expand Up @@ -100,7 +102,8 @@ func NewEnclave(config *enclaveconfig.EnclaveConfig, genesis *genesis.Genesis, m

gasOracle := gas.NewGasOracle()
blockProcessor := components.NewBlockProcessor(storage, crossChainProcessors, gasOracle, logger)
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, logger)
dataCompressionService := compression.NewBrotliDataCompressionService()
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, dataCompressionService, logger)

// ensure cached chain state data is up-to-date using the persisted batch data
err = restoreStateDBCache(context.Background(), storage, batchRegistry, batchExecutor, genesis, logger)
Expand Down
23 changes: 16 additions & 7 deletions go/enclave/limiters/batchlimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ package limiters
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ten-protocol/go-ten/go/common/compression"
)

// BatchSizeLimiter - Acts as a limiter for batches based
// the data from the transaction that we have to publish to the l1.
// Acts as a calldata reservation system that accounts for both
// transactions and cross chain messages.
type batchSizeLimiter struct {
remainingSize uint64 // the available size in the limiter
compressionService compression.DataCompressionService
remainingSize uint64 // the available size in the limiter
}

// NewBatchSizeLimiter - Size is the total space available per batch for calldata in a rollup.
func NewBatchSizeLimiter(size uint64) BatchSizeLimiter {
func NewBatchSizeLimiter(size uint64, compressionService compression.DataCompressionService) BatchSizeLimiter {
return &batchSizeLimiter{
remainingSize: size,
compressionService: compressionService,
remainingSize: size,
}
}

// AcceptTransaction - transaction is rlp encoded as it normally would be when publishing a rollup and
// its size is deducted from the remaining limit.
func (l *batchSizeLimiter) AcceptTransaction(tx *types.Transaction) error {
rlpSize, err := getRlpSize(tx)
rlpSize, err := l.getCompressedSize(tx)
if err != nil {
return err
}
Expand All @@ -37,14 +40,20 @@ func (l *batchSizeLimiter) AcceptTransaction(tx *types.Transaction) error {
}

// todo (@stefan) figure out how to optimize the serialization out of the limiter
func getRlpSize(val interface{}) (int, error) {
// todo (@stefan) - this should have a coefficient for compression
func (l *batchSizeLimiter) getCompressedSize(val interface{}) (int, error) {
enc, err := rlp.EncodeToBytes(val)
if err != nil {
return 0, err
}

return len(enc), nil
// compress the transaction. This is useless for small transactions, but might be useful for larger transactions such as deploying contracts
// todo - keep a running compression of the current batch
compr, err := l.compressionService.CompressBatch(enc)
if err != nil {
return 0, err
}

return len(compr), nil
}

type unlimitedBatchSize struct{}
Expand Down

0 comments on commit a3cd9a0

Please sign in to comment.