Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize mutexes and increase batch size #2215

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/config/defaults/0-base-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ network:
batch:
interval: 1s
maxInterval: 1s # if this is greater than batch.interval then we make batches more slowly when there are no transactions
maxSize: 45000 # around 45kb - around 200 transactions / batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a reason we picked batch size to be lower but i cant recall. The only thing i recall as a hard limit was 64kb for accepting eth transactions in the geth code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually rollups had more metadata than batches so there was an overhead that needed to be free, if rollups match batch size we might have issues

maxSize: 125952 # (128-5)kb - the size of the rollup minus overhead
rollup:
interval: 5s
maxInterval: 10m # rollups will be produced after this time even if the data blob is not full
maxSize: 131072 # 128kb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do blobs have any overhead on top of rollup metadata? @badgersrus

maxSize: 131072 # 128kb - the size of a blob
gas: # todo: ask stefan about these fields
baseFee: 1000000000 # using geth's initial base fee for EIP-1559 blocks.
minGasPrice: 1000000000 # using geth's initial base fee for EIP-1559 blocks.
Expand Down
67 changes: 37 additions & 30 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math/big"
"sync"

"github.com/ten-protocol/go-ten/go/common/compression"

gethcore "github.com/ethereum/go-ethereum/core"

"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -48,18 +50,19 @@ var ErrNoTransactionsToProcess = fmt.Errorf("no transactions to process")

// batchExecutor - the component responsible for executing batches
type batchExecutor struct {
storage storage.Storage
batchRegistry BatchRegistry
config enclaveconfig.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
genesis *genesis.Genesis
logger gethlog.Logger
gasOracle gas.Oracle
chainConfig *params.ChainConfig
systemContracts system.SystemContractCallbacks
entropyService *crypto.EvmEntropyService
mempool *TxPool
storage storage.Storage
batchRegistry BatchRegistry
config enclaveconfig.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
dataCompressionService compression.DataCompressionService
genesis *genesis.Genesis
logger gethlog.Logger
gasOracle gas.Oracle
chainConfig *params.ChainConfig
systemContracts system.SystemContractCallbacks
entropyService *crypto.EvmEntropyService
mempool *TxPool
// stateDBMutex - used to protect calls to stateDB.Commit as it is not safe for async access.
stateDBMutex sync.Mutex

Expand All @@ -79,24 +82,26 @@ func NewBatchExecutor(
systemContracts system.SystemContractCallbacks,
entropyService *crypto.EvmEntropyService,
mempool *TxPool,
dataCompressionService compression.DataCompressionService,
logger gethlog.Logger,
) BatchExecutor {
return &batchExecutor{
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
genesis: genesis,
chainConfig: chainConfig,
logger: logger,
gasOracle: gasOracle,
stateDBMutex: sync.Mutex{},
batchGasLimit: config.GasBatchExecutionLimit,
systemContracts: systemContracts,
entropyService: entropyService,
mempool: mempool,
chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger),
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
genesis: genesis,
chainConfig: chainConfig,
logger: logger,
gasOracle: gasOracle,
stateDBMutex: sync.Mutex{},
batchGasLimit: config.GasBatchExecutionLimit,
systemContracts: systemContracts,
entropyService: entropyService,
mempool: mempool,
dataCompressionService: dataCompressionService,
chainContext: evm.NewTenChainContext(storage, gethEncodingService, config, logger),
}
}

Expand Down Expand Up @@ -286,7 +291,7 @@ func (executor *batchExecutor) execBatchTransactions(ec *BatchExecutionContext)
}

func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext) error {
sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize)
sizeLimiter := limiters.NewBatchSizeLimiter(executor.config.MaxBatchSize, executor.dataCompressionService)
pendingTransactions := executor.mempool.PendingTransactions()

nrPending, nrQueued := executor.mempool.Stats()
Expand Down Expand Up @@ -319,10 +324,12 @@ func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext
// check the size limiter
err := sizeLimiter.AcceptTransaction(tx)
if err != nil {
executor.logger.Info("Unable to accept transaction", log.TxKey, tx.Hash(), log.ErrKey, err)
if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space
break
executor.logger.Trace("Unable to accept transaction", log.TxKey, tx.Hash())
mempoolTxs.Pop()
continue
}
return fmt.Errorf("failed to apply the batch limiter. Cause: %w", err)
}

pTx, err := executor.toPricedTx(ec, tx)
Expand Down
74 changes: 67 additions & 7 deletions go/enclave/components/txpool.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package components

// unsafe package imported in order to link to a private function in go-ethereum.
// This allows us to validate transactions against the tx pool rules.
import (
"fmt"
"math/big"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
_ "unsafe"
"unsafe"

"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/params"
"github.com/ten-protocol/go-ten/go/common/log"

gethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -24,12 +24,30 @@
"github.com/ten-protocol/go-ten/go/common"
)

const (
// txSlotSize is used to calculate how many data slots a single transaction
// takes up based on its size. The slots are used as DoS protection, ensuring
// that validating a new transaction remains a constant operation (in reality
// O(maxslots), where max slots are 4 currently).
txSlotSize = 32 * 1024

// we assume that at the limit, a single "uncompressable" tx is in a batch which gets rolled-up, and must fit in a 128kb blob
rollupOverhead = 5 * 1024

// txMaxSize is the maximum size a single transaction can have. This field has
// non-trivial consequences: larger transactions are significantly harder and
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4*txSlotSize - rollupOverhead // 128KB - overhead
)

// this is how long the node waits to receive the second batch
var startMempoolTimeout = 90 * time.Second

// TxPool is an obscuro wrapper around geths transaction pool
type TxPool struct {
txPoolConfig legacypool.Config
chainconfig *params.ChainConfig
legacyPool *legacypool.LegacyPool
pool *gethtxpool.TxPool
Chain *EthChainAdapter
Expand Down Expand Up @@ -59,6 +77,7 @@

txp := &TxPool{
Chain: blockchain,
chainconfig: blockchain.Config(),
txPoolConfig: txPoolConfig,
legacyPool: legacyPool,
gasTip: gasTip,
Expand Down Expand Up @@ -195,6 +214,12 @@

// Add adds a new transactions to the pool
func (t *TxPool) add(transaction *common.L2Tx) error {
// validate against the consensus rules
err := t.validateTxBasics(transaction, false)
if err != nil {
return err
}

var strErrors []string
for _, err := range t.pool.Add([]*types.Transaction{transaction}, false, false) {
if err != nil {
Expand All @@ -208,16 +233,13 @@
return nil
}

//go:linkname validateTxBasics github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTxBasics
func validateTxBasics(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

//go:linkname validateTx github.com/ethereum/go-ethereum/core/txpool/legacypool.(*LegacyPool).validateTx
func validateTx(_ *legacypool.LegacyPool, _ *types.Transaction, _ bool) error

// Validate - run the underlying tx pool validation logic
func (t *TxPool) validate(tx *common.L2Tx) error {
// validate against the consensus rules
err := validateTxBasics(t.legacyPool, tx, false)
err := t.validateTxBasics(tx, false)
if err != nil {
return err
}
Expand All @@ -231,3 +253,41 @@
func (t *TxPool) Stats() (int, int) {
return t.legacyPool.Stats()
}

// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (t *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
opts := &gethtxpool.ValidationOptions{
Config: t.chainconfig,
Accept: 0 |
1<<types.LegacyTxType |
1<<types.AccessListTxType |
1<<types.DynamicFeeTxType,
MaxSize: txMaxSize,
MinTip: t.gasTip,
}

// we need to access some private variables from the legacy pool to run validation with our own consensus options
v := reflect.ValueOf(t.legacyPool).Elem()

chField := v.FieldByName("currentHead")
chFieldPtr := unsafe.Pointer(chField.UnsafeAddr())
ch, ok := reflect.NewAt(chField.Type(), chFieldPtr).Elem().Interface().(atomic.Pointer[types.Header])

Check failure on line 277 in go/enclave/components/txpool.go

View workflow job for this annotation

GitHub Actions / lint

copylocks: assignment copies lock value to ch: (sync/atomic.Pointer[github.com/ethereum/go-ethereum/core/types.Header], bool) contains sync/atomic.Pointer[github.com/ethereum/go-ethereum/core/types.Header] contains sync/atomic.noCopy (govet)
if !ok {
t.logger.Crit("invalid mempool. should not happen")
}

sigField := v.FieldByName("signer")
sigFieldPtr := unsafe.Pointer(sigField.UnsafeAddr())
sig, ok1 := reflect.NewAt(sigField.Type(), sigFieldPtr).Elem().Interface().(types.Signer)
if !ok1 {
t.logger.Crit("invalid mempool. should not happen")
}

if err := gethtxpool.ValidateTransaction(tx, ch.Load(), sig, opts); err != nil {
return err
}
return nil
}
5 changes: 4 additions & 1 deletion go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"math/big"

"github.com/ten-protocol/go-ten/go/common/compression"

"github.com/ten-protocol/go-ten/go/enclave/crypto"

enclaveconfig "github.com/ten-protocol/go-ten/go/enclave/config"
Expand Down Expand Up @@ -100,7 +102,8 @@ func NewEnclave(config *enclaveconfig.EnclaveConfig, genesis *genesis.Genesis, m

gasOracle := gas.NewGasOracle()
blockProcessor := components.NewBlockProcessor(storage, crossChainProcessors, gasOracle, logger)
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, logger)
dataCompressionService := compression.NewBrotliDataCompressionService()
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, dataCompressionService, logger)

// ensure cached chain state data is up-to-date using the persisted batch data
err = restoreStateDBCache(context.Background(), storage, batchRegistry, batchExecutor, genesis, logger)
Expand Down
21 changes: 12 additions & 9 deletions go/enclave/enclave_admin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (

type enclaveAdminService struct {
config *enclaveconfig.EnclaveConfig
mainMutex sync.Mutex // serialises all data ingestion or creation to avoid weird races
mainMutex sync.Mutex // locks the admin operations
dataInMutex sync.RWMutex // controls access to data ingestion
logger gethlog.Logger
l1BlockProcessor components.L1BlockProcessor
validatorService nodetype.Validator
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewEnclaveAdminAPI(config *enclaveconfig.EnclaveConfig, storage storage.Sto
eas := &enclaveAdminService{
config: config,
mainMutex: sync.Mutex{},
dataInMutex: sync.RWMutex{},
logger: logger,
l1BlockProcessor: blockProcessor,
service: validatorService,
Expand Down Expand Up @@ -176,8 +178,8 @@ func (e *enclaveAdminService) MakeActive() common.SystemError {

// SubmitL1Block is used to update the enclave with an additional L1 block.
func (e *enclaveAdminService) SubmitL1Block(ctx context.Context, blockHeader *types.Header, receipts []*common.TxAndReceiptAndBlobs) (*common.BlockSubmissionResponse, common.SystemError) {
e.mainMutex.Lock()
defer e.mainMutex.Unlock()
e.dataInMutex.Lock()
defer e.dataInMutex.Unlock()

e.logger.Info("SubmitL1Block", log.BlockHeightKey, blockHeader.Number, log.BlockHashKey, blockHeader.Hash())

Expand Down Expand Up @@ -237,8 +239,8 @@ func (e *enclaveAdminService) SubmitBatch(ctx context.Context, extBatch *common.
return err
}

e.mainMutex.Lock()
defer e.mainMutex.Unlock()
e.dataInMutex.Lock()
defer e.dataInMutex.Unlock()

// if the signature is valid, then store the batch together with the converted hash
err = e.storage.StoreBatch(ctx, batch, convertedHeader.Hash())
Expand All @@ -261,8 +263,8 @@ func (e *enclaveAdminService) CreateBatch(ctx context.Context, skipBatchIfEmpty

defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateBatch call ended")

e.mainMutex.Lock()
defer e.mainMutex.Unlock()
e.dataInMutex.RLock()
defer e.dataInMutex.RUnlock()

err := e.sequencer().CreateBatch(ctx, skipBatchIfEmpty)
if err != nil {
Expand All @@ -278,8 +280,9 @@ func (e *enclaveAdminService) CreateRollup(ctx context.Context, fromSeqNo uint64
}
defer core.LogMethodDuration(e.logger, measure.NewStopwatch(), "CreateRollup call ended")

e.mainMutex.Lock()
defer e.mainMutex.Unlock()
// allow the simultaneous production of rollups and batches
e.dataInMutex.RLock()
defer e.dataInMutex.RUnlock()

if e.registry.HeadBatchSeq() == nil {
return nil, responses.ToInternalError(fmt.Errorf("not initialised yet"))
Expand Down
23 changes: 16 additions & 7 deletions go/enclave/limiters/batchlimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ package limiters
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ten-protocol/go-ten/go/common/compression"
)

// BatchSizeLimiter - Acts as a limiter for batches based
// the data from the transaction that we have to publish to the l1.
// Acts as a calldata reservation system that accounts for both
// transactions and cross chain messages.
type batchSizeLimiter struct {
remainingSize uint64 // the available size in the limiter
compressionService compression.DataCompressionService
remainingSize uint64 // the available size in the limiter
}

// NewBatchSizeLimiter - Size is the total space available per batch for calldata in a rollup.
func NewBatchSizeLimiter(size uint64) BatchSizeLimiter {
func NewBatchSizeLimiter(size uint64, compressionService compression.DataCompressionService) BatchSizeLimiter {
return &batchSizeLimiter{
remainingSize: size,
compressionService: compressionService,
remainingSize: size,
}
}

// AcceptTransaction - transaction is rlp encoded as it normally would be when publishing a rollup and
// its size is deducted from the remaining limit.
func (l *batchSizeLimiter) AcceptTransaction(tx *types.Transaction) error {
rlpSize, err := getRlpSize(tx)
rlpSize, err := l.getCompressedSize(tx)
if err != nil {
return err
}
Expand All @@ -37,14 +40,20 @@ func (l *batchSizeLimiter) AcceptTransaction(tx *types.Transaction) error {
}

// todo (@stefan) figure out how to optimize the serialization out of the limiter
func getRlpSize(val interface{}) (int, error) {
// todo (@stefan) - this should have a coefficient for compression
func (l *batchSizeLimiter) getCompressedSize(val interface{}) (int, error) {
enc, err := rlp.EncodeToBytes(val)
if err != nil {
return 0, err
}

return len(enc), nil
// compress the transaction. This is useless for small transactions, but might be useful for larger transactions such as deploying contracts
// todo - keep a running compression of the current batch
compr, err := l.compressionService.CompressBatch(enc)
if err != nil {
return 0, err
}

return len(compr), nil
}

type unlimitedBatchSize struct{}
Expand Down
Loading