From 7b33b8645919f5979ea252f2d82641e7e95937e2 Mon Sep 17 00:00:00 2001 From: otherview Date: Fri, 10 Nov 2023 16:04:24 +0000 Subject: [PATCH] Adding geths mempool --- go/common/gethencoding/geth_encoding.go | 13 +- go/enclave/enclave.go | 54 +++---- go/enclave/ethblockchain/eth_blockchain.go | 153 ++++++++++++++++++ go/enclave/ethblockchain/eth_chainparams.go | 36 +++++ go/enclave/mempool/README.md | 2 - go/enclave/mempool/interface.go | 20 --- go/enclave/mempool/manager.go | 118 -------------- go/enclave/mempool/nonce_tracker.go | 40 ----- go/enclave/nodetype/sequencer.go | 75 ++++++--- go/enclave/txpool/txpool.go | 66 ++++++++ integration/common/utils.go | 2 +- integration/obscuroscan/obscuroscan_test.go | 7 +- .../simulation/network/network_utils.go | 2 +- integration/simulation/simulation.go | 10 +- 14 files changed, 348 insertions(+), 250 deletions(-) create mode 100644 go/enclave/ethblockchain/eth_blockchain.go create mode 100644 go/enclave/ethblockchain/eth_chainparams.go delete mode 100644 go/enclave/mempool/README.md delete mode 100644 go/enclave/mempool/interface.go delete mode 100644 go/enclave/mempool/manager.go delete mode 100644 go/enclave/mempool/nonce_tracker.go create mode 100644 go/enclave/txpool/txpool.go diff --git a/go/common/gethencoding/geth_encoding.go b/go/common/gethencoding/geth_encoding.go index fe0d04204c..33c11779b5 100644 --- a/go/common/gethencoding/geth_encoding.go +++ b/go/common/gethencoding/geth_encoding.go @@ -7,7 +7,9 @@ import ( "strings" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/trie" "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/enclave/core" "github.com/obscuronet/go-obscuro/go/enclave/crypto" "github.com/ethereum/go-ethereum/common/hexutil" @@ -243,7 +245,7 @@ func CreateEthHeaderForBatch(h *common.BatchHeader, secret []byte) (*types.Heade Difficulty: big.NewInt(0), Number: h.Number, GasLimit: h.GasLimit, - GasUsed: 0, + GasUsed: h.GasUsed, BaseFee: big.NewInt(0).SetUint64(baseFee), Coinbase: h.Coinbase, Time: h.Time, @@ -252,6 +254,15 @@ func CreateEthHeaderForBatch(h *common.BatchHeader, secret []byte) (*types.Heade }, nil } +func CreateEthBlockFromBatch(b *core.Batch) (*types.Block, error) { + blockHeader, err := CreateEthHeaderForBatch(b.Header, nil) + if err != nil { + return nil, fmt.Errorf("unable to create eth block from batch - %w", err) + } + + return types.NewBlock(blockHeader, b.Transactions, nil, nil, trie.NewStackTrie(nil)), nil +} + // DecodeParamBytes decodes the parameters byte array into a slice of interfaces // Helps each calling method to manage the positional data func DecodeParamBytes(paramBytes []byte) ([]interface{}, error) { diff --git a/go/enclave/enclave.go b/go/enclave/enclave.go index 4ad8bd0d76..65ddd98f7d 100644 --- a/go/enclave/enclave.go +++ b/go/enclave/enclave.go @@ -11,9 +11,10 @@ import ( "time" "github.com/obscuronet/go-obscuro/go/common/measure" - + "github.com/obscuronet/go-obscuro/go/enclave/ethblockchain" "github.com/obscuronet/go-obscuro/go/enclave/gas" "github.com/obscuronet/go-obscuro/go/enclave/storage" + "github.com/obscuronet/go-obscuro/go/enclave/txpool" "github.com/obscuronet/go-obscuro/go/enclave/vkhandler" @@ -52,7 +53,6 @@ import ( "github.com/obscuronet/go-obscuro/go/enclave/debugger" "github.com/obscuronet/go-obscuro/go/enclave/events" - "github.com/obscuronet/go-obscuro/go/enclave/mempool" "github.com/obscuronet/go-obscuro/go/enclave/rpc" "github.com/obscuronet/go-obscuro/go/ethadapter/mgmtcontractlib" @@ -127,29 +127,9 @@ func NewEnclave( } } - zeroTimestamp := uint64(0) // Initialise the database - chainConfig := params.ChainConfig{ - ChainID: big.NewInt(config.ObscuroChainID), - HomesteadBlock: gethcommon.Big0, - DAOForkBlock: gethcommon.Big0, - EIP150Block: gethcommon.Big0, - EIP155Block: gethcommon.Big0, - EIP158Block: gethcommon.Big0, - ByzantiumBlock: gethcommon.Big0, - ConstantinopleBlock: gethcommon.Big0, - PetersburgBlock: gethcommon.Big0, - IstanbulBlock: gethcommon.Big0, - MuirGlacierBlock: gethcommon.Big0, - BerlinBlock: gethcommon.Big0, - LondonBlock: gethcommon.Big0, - - CancunTime: &zeroTimestamp, - ShanghaiTime: &zeroTimestamp, - PragueTime: &zeroTimestamp, - VerkleTime: &zeroTimestamp, - } - storage := storage.NewStorageFromConfig(config, &chainConfig, logger) + chainConfig := ethblockchain.ChainParams(big.NewInt(config.ObscuroChainID)) + storage := storage.NewStorageFromConfig(config, chainConfig, logger) // Initialise the Ethereum "Blockchain" structure that will allow us to validate incoming blocks // todo (#1056) - valid block @@ -200,25 +180,32 @@ func NewEnclave( dataEncryptionService := crypto.NewDataEncryptionService(logger) dataCompressionService := compression.NewBrotliDataCompressionService() - memp := mempool.New(config.ObscuroChainID, logger) - crossChainProcessors := crosschain.New(&config.MessageBusAddress, storage, big.NewInt(config.ObscuroChainID), logger) subscriptionManager := events.NewSubscriptionManager(&rpcEncryptionManager, storage, logger) gasOracle := gas.NewGasOracle() blockProcessor := components.NewBlockProcessor(storage, crossChainProcessors, gasOracle, logger) - batchExecutor := components.NewBatchExecutor(storage, crossChainProcessors, genesis, gasOracle, &chainConfig, logger) + batchExecutor := components.NewBatchExecutor(storage, crossChainProcessors, genesis, gasOracle, chainConfig, logger) sigVerifier, err := components.NewSignatureValidator(config.SequencerID, storage) registry := components.NewBatchRegistry(storage, logger) rProducer := components.NewRollupProducer(config.SequencerID, storage, registry, logger) if err != nil { logger.Crit("Could not initialise the signature validator", log.ErrKey, err) } - rollupCompression := components.NewRollupCompression(registry, batchExecutor, dataEncryptionService, dataCompressionService, storage, &chainConfig, logger) + rollupCompression := components.NewRollupCompression(registry, batchExecutor, dataEncryptionService, dataCompressionService, storage, chainConfig, logger) rConsumer := components.NewRollupConsumer(mgmtContractLib, registry, rollupCompression, storage, logger, sigVerifier) sharedSecretProcessor := components.NewSharedSecretProcessor(mgmtContractLib, attestationProvider, storage, logger) + blockchain := ethblockchain.NewEthBlockchain(big.NewInt(config.ObscuroChainID), registry, storage, logger) + if err != nil { + logger.Crit("unable to init the eth blockchain construct", log.ErrKey, err) + } + mempool, err := txpool.NewTxPool(blockchain) + if err != nil { + logger.Crit("unable to init eth tx pool", log.ErrKey, err) + } + var service nodetype.NodeType if config.NodeType == common.Sequencer { service = nodetype.NewSequencer( @@ -230,9 +217,9 @@ func NewEnclave( rollupCompression, logger, config.HostID, - &chainConfig, + chainConfig, enclaveKey, - memp, + mempool, storage, dataEncryptionService, dataCompressionService, @@ -243,14 +230,15 @@ func NewEnclave( BatchGasLimit: config.GasLimit, BaseFee: config.BaseFee, }, + blockchain, ) } else { - service = nodetype.NewValidator(blockProcessor, batchExecutor, registry, rConsumer, &chainConfig, config.SequencerID, storage, sigVerifier, logger) + service = nodetype.NewValidator(blockProcessor, batchExecutor, registry, rConsumer, chainConfig, config.SequencerID, storage, sigVerifier, logger) } chain := l2chain.NewChain( storage, - &chainConfig, + chainConfig, genesis, logger, registry, @@ -263,7 +251,7 @@ func NewEnclave( } // TODO ensure debug is allowed/disallowed - debug := debugger.New(chain, storage, &chainConfig) + debug := debugger.New(chain, storage, chainConfig) logger.Info("Enclave service created with following config", log.CfgKey, config.HostID) return &enclaveImpl{ diff --git a/go/enclave/ethblockchain/eth_blockchain.go b/go/enclave/ethblockchain/eth_blockchain.go new file mode 100644 index 0000000000..5a664d77ae --- /dev/null +++ b/go/enclave/ethblockchain/eth_blockchain.go @@ -0,0 +1,153 @@ +package ethblockchain + +import ( + "fmt" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/obscuronet/go-obscuro/go/common/log" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" + "github.com/obscuronet/go-obscuro/go/common/gethencoding" + "github.com/obscuronet/go-obscuro/go/enclave/components" + "github.com/obscuronet/go-obscuro/go/enclave/core" + "github.com/obscuronet/go-obscuro/go/enclave/storage" + + gethcore "github.com/ethereum/go-ethereum/core" + gethtypes "github.com/ethereum/go-ethereum/core/types" +) + +// EthBlockchain is an obscuro wrapper around the ethereum core.Blockchain object +type EthBlockchain struct { + newHeadChan chan gethcore.ChainHeadEvent + batchRegistry components.BatchRegistry + storage storage.Storage + chainID *big.Int + logger gethlog.Logger +} + +// NewEthBlockchain returns a new instance +func NewEthBlockchain(chainID *big.Int, batchRegistry components.BatchRegistry, storage storage.Storage, logger gethlog.Logger) *EthBlockchain { + return &EthBlockchain{ + newHeadChan: make(chan gethcore.ChainHeadEvent), + batchRegistry: batchRegistry, + storage: storage, + chainID: chainID, + logger: logger, + } +} + +// Config retrieves the chain's fork configuration. +func (e *EthBlockchain) Config() *params.ChainConfig { + return ChainParams(e.chainID) +} + +// CurrentBlock returns the current head of the chain. +func (e *EthBlockchain) CurrentBlock() *gethtypes.Header { + currentBatchSeqNo := e.batchRegistry.HeadBatchSeq() + if currentBatchSeqNo == nil { + return nil + } + currentBatch, err := e.storage.FetchBatchBySeqNo(currentBatchSeqNo.Uint64()) + if err != nil { + e.logger.Warn("unable to retrieve batch seq no: %d", "currentBatchSeqNo", currentBatchSeqNo, log.ErrKey, err) + return nil + } + batch, err := gethencoding.CreateEthHeaderForBatch(currentBatch.Header, secret(e.storage)) + if err != nil { + e.logger.Warn("unable to convert batch to eth header ", "currentBatchSeqNo", currentBatchSeqNo, log.ErrKey, err) + return nil + } + return batch +} + +func (e *EthBlockchain) SubscribeChainHeadEvent(ch chan<- gethcore.ChainHeadEvent) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + for { + select { + case head := <-e.newHeadChan: + select { + case ch <- head: + case <-quit: + return nil + } + case <-quit: + return nil + } + } + }) +} + +// GetBlock retrieves a specific block, used during pool resets. +func (e *EthBlockchain) GetBlock(_ common.Hash, number uint64) *gethtypes.Block { + nbatch, err := e.storage.FetchBatchByHeight(number) + if err != nil { + e.logger.Warn("unable to get batch by height", "number", number, log.ErrKey, err) + return nil + } + + nfromBatch, err := gethencoding.CreateEthBlockFromBatch(nbatch) + if err != nil { + e.logger.Error("unable to convert batch to eth block", log.ErrKey, err) + return nil + } + + return nfromBatch +} + +// StateAt returns a state database for a given root hash (generally the head). +func (e *EthBlockchain) StateAt(root common.Hash) (*state.StateDB, error) { + if root.Hex() == gethtypes.EmptyCodeHash.Hex() { + return nil, nil + } + + currentBatchSeqNo := e.batchRegistry.HeadBatchSeq() + if currentBatchSeqNo == nil { + return nil, fmt.Errorf("not ready yet") + } + currentBatch, err := e.storage.FetchBatchBySeqNo(currentBatchSeqNo.Uint64()) + if err != nil { + e.logger.Warn("unable to get batch by height", "currentBatchSeqNo", currentBatchSeqNo, log.ErrKey, err) + return nil, nil + } + + return e.storage.CreateStateDB(currentBatch.Hash()) +} + +func (e *EthBlockchain) IngestNewBlock(batch *core.Batch) error { + convertedBlock, err := gethencoding.CreateEthBlockFromBatch(batch) + if err != nil { + return err + } + + go func() { + e.newHeadChan <- gethcore.ChainHeadEvent{Block: convertedBlock} + }() + + return nil +} + +func NewLegacyPoolConfig() legacypool.Config { + return legacypool.Config{ + Locals: nil, + NoLocals: false, + Journal: "", + Rejournal: 0, + PriceLimit: 0, + PriceBump: 0, + AccountSlots: 100, + GlobalSlots: 10000000, + AccountQueue: 100, + GlobalQueue: 10000000, + Lifetime: 0, + } +} + +func secret(storage storage.Storage) []byte { + // todo (#1053) - handle secret not being found. + secret, _ := storage.FetchSecret() + return secret[:] +} diff --git a/go/enclave/ethblockchain/eth_chainparams.go b/go/enclave/ethblockchain/eth_chainparams.go new file mode 100644 index 0000000000..20c453977a --- /dev/null +++ b/go/enclave/ethblockchain/eth_chainparams.go @@ -0,0 +1,36 @@ +package ethblockchain + +import ( + "math/big" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/params" +) + +// ChainParams defines the forks of the EVM machine +// obscuro should typically be on the last fork version +func ChainParams(obscuroChainID *big.Int) *params.ChainConfig { + zeroTimestamp := uint64(0) + + // Initialise the database + return ¶ms.ChainConfig{ + ChainID: obscuroChainID, + HomesteadBlock: gethcommon.Big0, + DAOForkBlock: gethcommon.Big0, + EIP150Block: gethcommon.Big0, + EIP155Block: gethcommon.Big0, + EIP158Block: gethcommon.Big0, + ByzantiumBlock: gethcommon.Big0, + ConstantinopleBlock: gethcommon.Big0, + PetersburgBlock: gethcommon.Big0, + IstanbulBlock: gethcommon.Big0, + MuirGlacierBlock: gethcommon.Big0, + BerlinBlock: gethcommon.Big0, + LondonBlock: gethcommon.Big0, + + CancunTime: &zeroTimestamp, + ShanghaiTime: &zeroTimestamp, + PragueTime: &zeroTimestamp, + VerkleTime: &zeroTimestamp, + } +} diff --git a/go/enclave/mempool/README.md b/go/enclave/mempool/README.md deleted file mode 100644 index d3d2bad7e9..0000000000 --- a/go/enclave/mempool/README.md +++ /dev/null @@ -1,2 +0,0 @@ -This package implements a very primitve, in-memory mempool. - diff --git a/go/enclave/mempool/interface.go b/go/enclave/mempool/interface.go deleted file mode 100644 index c6858321b7..0000000000 --- a/go/enclave/mempool/interface.go +++ /dev/null @@ -1,20 +0,0 @@ -package mempool - -import ( - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" - "github.com/obscuronet/go-obscuro/go/common" - "github.com/obscuronet/go-obscuro/go/enclave/limiters" -) - -type Manager interface { - // FetchMempoolTxs returns all transactions in the mempool - FetchMempoolTxs() []*common.L2Tx - // AddMempoolTx adds a transaction to the mempool - AddMempoolTx(tx *common.L2Tx) error - // RemoveTxs removes transactions that are considered immune to re-orgs (i.e. over X batches deep). - RemoveTxs(transactions types.Transactions) error - - // CurrentTxs Returns the transactions that should be included in the current batch - CurrentTxs(stateDB *state.StateDB, limiter limiters.BatchSizeLimiter) ([]*common.L2Tx, error) -} diff --git a/go/enclave/mempool/manager.go b/go/enclave/mempool/manager.go deleted file mode 100644 index b70bffa32c..0000000000 --- a/go/enclave/mempool/manager.go +++ /dev/null @@ -1,118 +0,0 @@ -package mempool - -import ( - "errors" - "sort" - "sync" - - "github.com/obscuronet/go-obscuro/go/common/log" - - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" - - "github.com/obscuronet/go-obscuro/go/enclave/core" - "github.com/obscuronet/go-obscuro/go/enclave/limiters" - - gethlog "github.com/ethereum/go-ethereum/log" - "github.com/obscuronet/go-obscuro/go/common" -) - -// sortByNonce a very primitive way to implement mempool logic that -// adds transactions sorted by the nonce in the rollup -// which is what the EVM expects -type sortByNonce []*common.L2Tx - -func (c sortByNonce) Len() int { return len(c) } -func (c sortByNonce) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c sortByNonce) Less(i, j int) bool { return c[i].Nonce() < c[j].Nonce() } - -// todo - optimize this to use a different data structure that does not require a global lock. -type mempoolManager struct { - mpMutex sync.RWMutex // Controls access to `mempool` - obscuroChainID int64 - logger gethlog.Logger - mempool map[gethcommon.Hash]*common.L2Tx -} - -func New(chainID int64, logger gethlog.Logger) Manager { - return &mempoolManager{ - mempool: make(map[gethcommon.Hash]*common.L2Tx), - obscuroChainID: chainID, - mpMutex: sync.RWMutex{}, - logger: logger, - } -} - -func (db *mempoolManager) AddMempoolTx(tx *common.L2Tx) error { - // We do not care about the sender return value at this point, only that - // there is no error coming from validating the signature of said sender. - _, err := core.GetAuthenticatedSender(db.obscuroChainID, tx) - if err != nil { - return err - } - - db.mpMutex.Lock() - defer db.mpMutex.Unlock() - db.mempool[tx.Hash()] = tx - return nil -} - -func (db *mempoolManager) FetchMempoolTxs() []*common.L2Tx { - db.mpMutex.RLock() - defer db.mpMutex.RUnlock() - - mpCopy := make([]*common.L2Tx, len(db.mempool)) - i := 0 - for _, tx := range db.mempool { - mpCopy[i] = tx - i++ - } - return mpCopy -} - -func (db *mempoolManager) RemoveTxs(transactions types.Transactions) error { - db.mpMutex.Lock() - defer db.mpMutex.Unlock() - - for _, tx := range transactions { - delete(db.mempool, tx.Hash()) - } - - return nil -} - -// CurrentTxs - Calculate transactions to be included in the current batch -func (db *mempoolManager) CurrentTxs(stateDB *state.StateDB, limiter limiters.BatchSizeLimiter) ([]*common.L2Tx, error) { - txes := db.FetchMempoolTxs() - sort.Sort(sortByNonce(txes)) - - applicableTransactions := make(common.L2Transactions, 0) - nonceTracker := NewNonceTracker(stateDB) - - for _, tx := range txes { - sender, _ := core.GetAuthenticatedSender(db.obscuroChainID, tx) - if sender == nil { - continue - } - - if tx.Nonce() != nonceTracker.GetNonce(*sender) { - continue - } - - err := limiter.AcceptTransaction(tx) - if err != nil { - if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space - break - } - // Limiter encountered unexpected error - return nil, err - } - - applicableTransactions = append(applicableTransactions, tx) - nonceTracker.IncrementNonce(*sender) - db.logger.Debug("Including transaction in batch", log.TxKey, tx.Hash(), "nonce", tx.Nonce()) - } - - return applicableTransactions, nil -} diff --git a/go/enclave/mempool/nonce_tracker.go b/go/enclave/mempool/nonce_tracker.go deleted file mode 100644 index 999a43dd08..0000000000 --- a/go/enclave/mempool/nonce_tracker.go +++ /dev/null @@ -1,40 +0,0 @@ -package mempool - -import ( - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" -) - -// NonceTracker - a struct that helps us maintain the nonces for each account. -// If it gets asked for an account it does not know the nonce for, it will pull it -// from stateDB. Used when selecting transactions in order to ensure transactions get -// applied at correct nonces and correct order without any gaps. -type NonceTracker struct { - accountNonces map[gethcommon.Address]uint64 - stateDB *state.StateDB -} - -func NewNonceTracker(stateDB *state.StateDB) *NonceTracker { - return &NonceTracker{ - stateDB: stateDB, - accountNonces: make(map[gethcommon.Address]uint64), - } -} - -func (nt *NonceTracker) GetNonce(address gethcommon.Address) uint64 { - if nonce, ok := nt.accountNonces[address]; ok { - return nonce - } - - nonce := nt.nonceFromState(address) - nt.accountNonces[address] = nonce - return nonce -} - -func (nt *NonceTracker) nonceFromState(address gethcommon.Address) uint64 { - return nt.stateDB.GetNonce(address) -} - -func (nt *NonceTracker) IncrementNonce(address gethcommon.Address) { - nt.accountNonces[address]++ -} diff --git a/go/enclave/nodetype/sequencer.go b/go/enclave/nodetype/sequencer.go index 56f11c0963..10310bdb62 100644 --- a/go/enclave/nodetype/sequencer.go +++ b/go/enclave/nodetype/sequencer.go @@ -9,9 +9,10 @@ import ( "sort" "time" - "github.com/obscuronet/go-obscuro/go/common/measure" - "github.com/obscuronet/go-obscuro/go/common/errutil" + "github.com/obscuronet/go-obscuro/go/common/measure" + "github.com/obscuronet/go-obscuro/go/enclave/ethblockchain" + "github.com/obscuronet/go-obscuro/go/enclave/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/obscuronet/go-obscuro/go/enclave/storage" @@ -27,7 +28,6 @@ import ( "github.com/obscuronet/go-obscuro/go/enclave/core" "github.com/obscuronet/go-obscuro/go/enclave/crypto" "github.com/obscuronet/go-obscuro/go/enclave/limiters" - "github.com/obscuronet/go-obscuro/go/enclave/mempool" ) const RollupDelay = 2 // number of L1 blocks to exclude when creating a rollup. This will minimize compression reorg issues. @@ -53,11 +53,12 @@ type sequencer struct { hostID gethcommon.Address chainConfig *params.ChainConfig enclavePrivateKey *ecdsa.PrivateKey // this is a key known only to the current enclave, and the public key was shared with everyone during attestation - mempool mempool.Manager + mempool *txpool.TxPool storage storage.Storage dataEncryptionService crypto.DataEncryptionService dataCompressionService compression.DataCompressionService settings SequencerSettings + blockchain *ethblockchain.EthBlockchain } func NewSequencer( @@ -67,17 +68,16 @@ func NewSequencer( rollupProducer components.RollupProducer, rollupConsumer components.RollupConsumer, rollupCompression *components.RollupCompression, - logger gethlog.Logger, - hostID gethcommon.Address, chainConfig *params.ChainConfig, - enclavePrivateKey *ecdsa.PrivateKey, // this is a key known only to the current enclave, and the public key was shared with everyone during attestation - mempool mempool.Manager, + enclavePrivateKey *ecdsa.PrivateKey, + mempool *txpool.TxPool, storage storage.Storage, dataEncryptionService crypto.DataEncryptionService, dataCompressionService compression.DataCompressionService, settings SequencerSettings, + blockchain *ethblockchain.EthBlockchain, ) Sequencer { return &sequencer{ blockProcessor: blockProcessor, @@ -95,6 +95,7 @@ func NewSequencer( dataEncryptionService: dataEncryptionService, dataCompressionService: dataCompressionService, settings: settings, + blockchain: blockchain, } } @@ -134,10 +135,6 @@ func (s *sequencer) initGenesis(block *common.L1Block) error { return err } - if err = s.mempool.AddMempoolTx(msgBusTx); err != nil { - return fmt.Errorf("failed to queue message bus creation transaction to genesis. Cause: %w", err) - } - if err := s.signBatch(batch); err != nil { return fmt.Errorf("failed signing created batch. Cause: %w", err) } @@ -146,6 +143,24 @@ func (s *sequencer) initGenesis(block *common.L1Block) error { return fmt.Errorf("1. failed storing batch. Cause: %w", err) } + // this is the actual first block produced in chain + err = s.blockchain.IngestNewBlock(batch) + if err != nil { + return fmt.Errorf("unable to remove ingest new block into eth blockchain - %w", err) + } + + // the mempool can only be started after at least 1 block is in the blockchain object + err = s.mempool.Start() + if err != nil { + return err + } + + // make sure the mempool queuing system is initialized before adding the msg bus tx to it + time.Sleep(time.Second) + + if err = s.mempool.Add(msgBusTx); err != nil { + return fmt.Errorf("failed to queue message bus creation transaction to genesis - %s", err) + } return nil } @@ -168,16 +183,24 @@ func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block, skipBatchIfE return fmt.Errorf("attempted to create batch on top of batch=%s. With l1 head=%s", headBatch.Hash(), l1HeadBlock.Hash()) } - stateDB, err := s.storage.CreateStateDB(headBatch.Hash()) - if err != nil { - return fmt.Errorf("unable to create stateDB for selecting transactions. Batch: %s Cause: %w", headBatch.Hash(), err) - } - // todo (@stefan) - limit on receipts too limiter := limiters.NewBatchSizeLimiter(s.settings.MaxBatchSize) - transactions, err := s.mempool.CurrentTxs(stateDB, limiter) - if err != nil { - return err + pendingTransactions := s.mempool.PendingTransactions() // minor does not request tip enforcement + var transactions []*types.Transaction + for _, group := range pendingTransactions { + for _, lazyTx := range group { + if tx := lazyTx.Resolve(); tx != nil { + err = limiter.AcceptTransaction(tx.Tx) + if err != nil { + if errors.Is(err, limiters.ErrInsufficientSpace) { // Batch ran out of space + break + } + // Limiter encountered unexpected error + return fmt.Errorf("limiter encountered unexpected error - %w", err) + } + transactions = append(transactions, tx.Tx) + } + } } sequencerNo, err := s.storage.FetchCurrentSequencerNo() @@ -196,10 +219,6 @@ func (s *sequencer) createNewHeadBatch(l1HeadBlock *common.L1Block, skipBatchIfE return fmt.Errorf(" failed producing batch. Cause: %w", err) } - if err := s.mempool.RemoveTxs(transactions); err != nil { - return fmt.Errorf("could not remove transactions from mempool. Cause: %w", err) - } - return nil } @@ -233,6 +252,12 @@ func (s *sequencer) produceBatch(sequencerNo *big.Int, l1Hash common.L1BlockHash s.logger.Info("Produced new batch", log.BatchHashKey, cb.Batch.Hash(), "height", cb.Batch.Number(), "numTxs", len(cb.Batch.Transactions), log.BatchSeqNoKey, cb.Batch.SeqNo(), "parent", cb.Batch.Header.ParentHash) + // add the batch to the chain so it can remove pending transactions from the pool + err = s.blockchain.IngestNewBlock(cb.Batch) + if err != nil { + return nil, fmt.Errorf("unable to remove tx from mempool - %w", err) + } + return cb.Batch, nil } @@ -336,7 +361,7 @@ func (s *sequencer) duplicateBatches(l1Head *types.Block, nonCanonicalL1Path []c } func (s *sequencer) SubmitTransaction(transaction *common.L2Tx) error { - return s.mempool.AddMempoolTx(transaction) + return s.mempool.Add(transaction) } func (s *sequencer) OnL1Fork(fork *common.ChainFork) error { diff --git a/go/enclave/txpool/txpool.go b/go/enclave/txpool/txpool.go new file mode 100644 index 0000000000..b09f84f800 --- /dev/null +++ b/go/enclave/txpool/txpool.go @@ -0,0 +1,66 @@ +package txpool + +import ( + "fmt" + "math/big" + "strings" + + "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/obscuronet/go-obscuro/go/common" + "github.com/obscuronet/go-obscuro/go/enclave/ethblockchain" + + gethcommon "github.com/ethereum/go-ethereum/common" + gethtxpool "github.com/ethereum/go-ethereum/core/txpool" +) + +// TxPool is an obscuro wrapper around geths transaction pool +type TxPool struct { + txPoolConfig legacypool.Config + legacyPool *legacypool.LegacyPool + pool *gethtxpool.TxPool + blockchain *ethblockchain.EthBlockchain +} + +// NewTxPool returns a new instance of the tx pool +func NewTxPool(blockchain *ethblockchain.EthBlockchain) (*TxPool, error) { + txPoolConfig := ethblockchain.NewLegacyPoolConfig() + legacyPool := legacypool.New(txPoolConfig, blockchain) + + return &TxPool{ + blockchain: blockchain, + txPoolConfig: txPoolConfig, + legacyPool: legacyPool, + }, nil +} + +// Start starts the pool +// can only be started after t.blockchain has at least one block inside +func (t *TxPool) Start() error { + memp, err := gethtxpool.New(new(big.Int).SetUint64(0), t.blockchain, []gethtxpool.SubPool{t.legacyPool}) + if err != nil { + return fmt.Errorf("unable to init geth tx pool - %w", err) + } + + t.pool = memp + return nil +} + +// PendingTransactions returns all pending transactions grouped per address and ordered per nonce +func (t *TxPool) PendingTransactions() map[gethcommon.Address][]*gethtxpool.LazyTransaction { + return t.pool.Pending(false) +} + +// Add adds a new transactions to the pool +func (t *TxPool) Add(transaction *common.L2Tx) error { + var strErrors []string + for _, err := range t.pool.Add([]*gethtxpool.Transaction{{Tx: transaction}}, false, false) { + if err != nil { + strErrors = append(strErrors, err.Error()) + } + } + + if len(strErrors) > 0 { + return fmt.Errorf(strings.Join(strErrors, "; ")) + } + return nil +} diff --git a/integration/common/utils.go b/integration/common/utils.go index 5622104559..5bd2cc1a20 100644 --- a/integration/common/utils.go +++ b/integration/common/utils.go @@ -23,7 +23,7 @@ import ( "github.com/obscuronet/go-obscuro/go/rpc" ) -var _awaitReceiptPollingInterval = 100 * time.Millisecond +var _awaitReceiptPollingInterval = 200 * time.Millisecond func RndBtw(min uint64, max uint64) uint64 { if min >= max { diff --git a/integration/obscuroscan/obscuroscan_test.go b/integration/obscuroscan/obscuroscan_test.go index e89acab30a..f0a98e4d7d 100644 --- a/integration/obscuroscan/obscuroscan_test.go +++ b/integration/obscuroscan/obscuroscan_test.go @@ -238,13 +238,14 @@ func issueTransactions(t *testing.T, hostWSAddr string, issuerWallet wallet.Wall t.Errorf("not enough balance: has %s has %s obx", issuerWallet.Address().Hex(), balance.String()) } + nonce, err := authClient.NonceAt(ctx, nil) + assert.Nil(t, err) + issuerWallet.SetNonce(nonce) + var receipts []gethcommon.Hash for i := 0; i < numbTxs; i++ { toAddr := datagenerator.RandomAddress() - nonce, err := authClient.NonceAt(ctx, nil) - assert.Nil(t, err) - issuerWallet.SetNonce(nonce) estimatedTx := authClient.EstimateGasAndGasPrice(&types.LegacyTx{ Nonce: issuerWallet.GetNonceAndIncrement(), To: &toAddr, diff --git a/integration/simulation/network/network_utils.go b/integration/simulation/network/network_utils.go index b755e26d8c..0f0eb884fb 100644 --- a/integration/simulation/network/network_utils.go +++ b/integration/simulation/network/network_utils.go @@ -94,7 +94,7 @@ func createInMemObscuroNode( MaxBatchSize: 1024 * 25, MaxRollupSize: 1024 * 64, BaseFee: big.NewInt(1), // todo @siliev:: fix test transaction builders so this can be different - GasLimit: big.NewInt(1_000_000_000), + GasLimit: big.NewInt(1_000_000_000_000_000_000), } enclaveLogger := testlog.Logger().New(log.NodeIDKey, id, log.CmpKey, log.EnclaveCmp) diff --git a/integration/simulation/simulation.go b/integration/simulation/simulation.go index f043655a57..1c0f43b6da 100644 --- a/integration/simulation/simulation.go +++ b/integration/simulation/simulation.go @@ -140,15 +140,13 @@ func (s *Simulation) bridgeFundingToObscuro() { gethcommon.HexToAddress("0xDEe530E22045939e6f6a0A593F829e35A140D3F1"), } - ethClient := s.RPCHandles.RndEthClient() - - busCtr, err := MessageBus.NewMessageBus(destAddr, ethClient.EthClient()) + busCtr, err := MessageBus.NewMessageBus(destAddr, s.RPCHandles.RndEthClient().EthClient()) if err != nil { panic(err) } - for idx, wallet := range wallets { - opts, err := bind.NewKeyedTransactorWithChainID(wallet.PrivateKey(), wallet.ChainID()) + for idx, w := range wallets { + opts, err := bind.NewKeyedTransactorWithChainID(w.PrivateKey(), w.ChainID()) if err != nil { panic(err) } @@ -160,7 +158,7 @@ func (s *Simulation) bridgeFundingToObscuro() { } } - time.Sleep(3 * time.Second) + time.Sleep(15 * time.Second) // todo - fix the wait group, for whatever reason it does not find a receipt... /*wg := sync.WaitGroup{} for _, tx := range transactions {