Skip to content

Commit

Permalink
Batch execution refactor (#2186)
Browse files Browse the repository at this point in the history
* preview of the batch execution refactor

* fixes

* fix

* fix

* fix merge

* fixes

* fix

* fix
  • Loading branch information
tudor-malene authored Dec 6, 2024
1 parent 2a4562e commit 682ee36
Show file tree
Hide file tree
Showing 19 changed files with 395 additions and 351 deletions.
8 changes: 4 additions & 4 deletions go/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ func (txs L2PricedTransactions) ToTransactions() types.Transactions {
}

const (
L2GenesisHeight = uint64(0)
L1GenesisHeight = uint64(0)
L2GenesisSeqNo = uint64(1)
// HeightCommittedBlocks is the number of blocks deep a transaction must be to be considered safe from reorganisations.
HeightCommittedBlocks = 15

L2GenesisHeight = uint64(0)
L2GenesisSeqNo = uint64(1)
L2SysContractGenesisSeqNo = uint64(2)
)

var GethGenesisParentHash = common.Hash{}
Expand Down
550 changes: 276 additions & 274 deletions go/enclave/components/batch_executor.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ type BatchExecutionContext struct {
ChainConfig *params.ChainConfig
SequencerNo *big.Int
BaseFee *big.Int

// these properties are calculated during execution
ctx context.Context
l1block *types.Header
parentL1Block *types.Header
parentBatch *common.BatchHeader

xChainMsgs common.CrossChainMessages
xChainValueMsgs common.ValueTransferEvents

currentBatch *core.Batch
stateDB *state.StateDB
beforeProcessingSnap int

genesisSysCtrResult core.TxExecResults

xChainResults core.TxExecResults
batchTxResults core.TxExecResults
callbackTxResults core.TxExecResults
blockEndResult core.TxExecResults
}

// ComputedBatch - a structure representing the result of a batch
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (rc *RollupCompression) executeAndSaveIncompleteBatches(ctx context.Context
if err != nil {
return err
}
err = rc.storage.StoreExecutedBatch(ctx, genBatch.Header, nil)
err = rc.storage.StoreExecutedBatch(ctx, genBatch, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -518,7 +518,7 @@ func (rc *RollupCompression) executeAndSaveIncompleteBatches(ctx context.Context
if err != nil {
return err
}
err = rc.storage.StoreExecutedBatch(ctx, computedBatch.Batch.Header, computedBatch.TxExecResults)
err = rc.storage.StoreExecutedBatch(ctx, computedBatch.Batch, computedBatch.TxExecResults)
if err != nil {
return err
}
Expand Down
20 changes: 19 additions & 1 deletion go/enclave/core/event_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (txResults *TxExecResults) MarkSynthetic(isSynthetic bool) {
}
}

func (txResults *TxExecResults) GetSynthetic() *TxExecResults {
func (txResults *TxExecResults) SyntheticTransactions() *TxExecResults {
syntheticTxs := make(TxExecResults, 0)
for _, txResult := range *txResults {
if txResult.TxWithSender.IsSynthetic {
Expand All @@ -159,6 +159,16 @@ func (txResults *TxExecResults) GetSynthetic() *TxExecResults {
return &syntheticTxs
}

func (txResults *TxExecResults) BatchTransactions() []*common.L2Tx {
txs := make([]*common.L2Tx, 0)
for _, txResult := range *txResults {
if !txResult.TxWithSender.IsSynthetic {
txs = append(txs, txResult.TxWithSender.Tx)
}
}
return txs
}

func (txResults *TxExecResults) GetReal() *TxExecResults {
realTxs := make(TxExecResults, 0)
for _, txResult := range *txResults {
Expand All @@ -176,3 +186,11 @@ func (txResults *TxExecResults) ToTransactionsWithSenders() TransactionsWithSend
}
return transactionsWithSenders
}

func (txResults *TxExecResults) Receipts() types.Receipts {
receipts := make(types.Receipts, len(*txResults))
for i, txResult := range *txResults {
receipts[i] = txResult.Receipt
}
return receipts
}
6 changes: 3 additions & 3 deletions go/enclave/crosschain/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type Manager interface {

ExtractOutboundTransfers(ctx context.Context, receipts common.L2Receipts) (common.ValueTransferEvents, error)

CreateSyntheticTransactions(ctx context.Context, messages common.CrossChainMessages, transfers common.ValueTransferEvents, rollupState *state.StateDB) common.L2Transactions
CreateSyntheticTransactions(ctx context.Context, messages common.CrossChainMessages, transfers common.ValueTransferEvents, stateDB *state.StateDB) common.L2Transactions

ExecuteValueTransfers(ctx context.Context, transfers common.ValueTransferEvents, rollupState *state.StateDB)
ExecuteValueTransfers(ctx context.Context, transfers common.ValueTransferEvents, stateDB *state.StateDB)

RetrieveInboundMessages(ctx context.Context, fromBlock *types.Header, toBlock *types.Header, rollupState *state.StateDB) (common.CrossChainMessages, common.ValueTransferEvents)
RetrieveInboundMessages(ctx context.Context, fromBlock *types.Header, toBlock *types.Header) (common.CrossChainMessages, common.ValueTransferEvents)

system.SystemContractsInitializable
}
2 changes: 1 addition & 1 deletion go/enclave/crosschain/message_bus_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *MessageBusManager) ExtractOutboundTransfers(_ context.Context, receipts
// todo (@stefan) - fix ordering of messages, currently it is irrelevant.
// todo (@stefan) - do not extract messages below their consistency level. Irrelevant security wise.
// todo (@stefan) - surface errors
func (m *MessageBusManager) RetrieveInboundMessages(ctx context.Context, fromBlock *types.Header, toBlock *types.Header, rollupState *state.StateDB) (common.CrossChainMessages, common.ValueTransferEvents) {
func (m *MessageBusManager) RetrieveInboundMessages(ctx context.Context, fromBlock *types.Header, toBlock *types.Header) (common.CrossChainMessages, common.ValueTransferEvents) {
messages := make(common.CrossChainMessages, 0)
transfers := make(common.ValueTransferEvents, 0)

Expand Down
4 changes: 2 additions & 2 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *sequencer) createGenesisBatch(ctx context.Context, block *types.Header)
// produce batch #2 which has the message bus and any other system contracts
_, err = s.produceBatch(
ctx,
big.NewInt(0).Add(batch.Header.SequencerOrderNo, big.NewInt(1)),
big.NewInt(0).SetUint64(common.L2SysContractGenesisSeqNo),
block.Hash(),
batch.Hash(),
common.L2Transactions{},
Expand Down Expand Up @@ -304,7 +304,7 @@ func (s *sequencer) StoreExecutedBatch(ctx context.Context, batch *core.Batch, t
return fmt.Errorf("failed to store batch. Cause: %w", err)
}

if err := s.storage.StoreExecutedBatch(ctx, batch.Header, txResults); err != nil {
if err := s.storage.StoreExecutedBatch(ctx, batch, txResults); err != nil {
return fmt.Errorf("failed to store batch. Cause: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions go/enclave/nodetype/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (val *validator) ExecuteStoredBatches(ctx context.Context) error {
if err != nil {
return fmt.Errorf("could not execute batch %s. Cause: %w", batchHeader.Hash(), err)
}
err = val.storage.StoreExecutedBatch(ctx, batchHeader, txResults)
err = val.storage.StoreExecutedBatch(ctx, batch, txResults)
if err != nil {
return fmt.Errorf("could not store executed batch %s. Cause: %w", batchHeader.Hash(), err)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (val *validator) handleGenesis(ctx context.Context, batch *common.BatchHead
return fmt.Errorf("received invalid genesis batch")
}

err = val.storage.StoreExecutedBatch(ctx, genBatch.Header, nil)
err = val.storage.StoreExecutedBatch(ctx, genBatch, nil)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions go/enclave/storage/enclavedb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func ExistsBatchAtHeight(ctx context.Context, dbTx *sql.Tx, height *big.Int) (bo
}

// WriteTransactions - persists the batch and the transactions
func WriteTransactions(ctx context.Context, dbtx *sql.Tx, transactions []*core.TxWithSender, height uint64, isSynthetic bool, senders []uint64, toContracts []*uint64) error {
func WriteTransactions(ctx context.Context, dbtx *sql.Tx, transactions []*core.TxWithSender, height uint64, isSynthetic bool, senderIds []uint64, toContractIds []*uint64, fromIdx int) error {
// creates a batch insert statement for all entries
if len(transactions) > 0 {
insert := "insert into tx (hash, content, to_address, type, sender_address, idx, batch_height, is_synthetic) values " + repeat("(?,?,?,?,?,?,?,?)", ",", len(transactions))
Expand All @@ -79,10 +79,10 @@ func WriteTransactions(ctx context.Context, dbtx *sql.Tx, transactions []*core.T

args = append(args, transaction.Tx.Hash()) // tx_hash
args = append(args, txBytes) // content
args = append(args, toContracts[i]) // To
args = append(args, toContractIds[i]) // To
args = append(args, transaction.Tx.Type()) // Type
args = append(args, senders[i]) // sender_address
args = append(args, i) // idx
args = append(args, senderIds[i]) // sender_address
args = append(args, fromIdx+i) // idx
args = append(args, height) // the batch height which contained it
args = append(args, isSynthetic) // is_synthetic if the transaction is a synthetic (internally derived transaction)
}
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/storage/enclavedb/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func WriteEventType(ctx context.Context, dbTX *sql.Tx, et *EventType) (uint64, e
}

func ReadEventType(ctx context.Context, dbTX *sql.Tx, contract *Contract, eventSignature gethcommon.Hash) (*EventType, error) {
var et EventType = EventType{Contract: contract}
et := EventType{Contract: contract}
err := dbTX.QueryRowContext(ctx,
"select id, event_sig, auto_visibility, auto_public, config_public, topic1_can_view, topic2_can_view, topic3_can_view, sender_can_view from event_type where contract=? and event_sig=?",
contract.Id, eventSignature.Bytes(),
Expand Down
6 changes: 3 additions & 3 deletions go/enclave/storage/events_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newEventsStorage(cachingService *CacheService, db enclavedb.EnclaveDB, logg

func (es *eventsStorage) storeReceiptAndEventLogs(ctx context.Context, dbTX *sql.Tx, batch *common.BatchHeader, txExecResult *core.TxExecResult) error {
txId, senderId, err := enclavedb.ReadTransactionIdAndSender(ctx, dbTX, txExecResult.Receipt.TxHash)
if err != nil && !errors.Is(err, errutil.ErrNotFound) {
if err != nil {
return fmt.Errorf("could not get transaction id. Cause: %w", err)
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func (es *eventsStorage) storeReceipt(ctx context.Context, dbTX *sql.Tx, batch *
return execTxId, nil
}

func (es *eventsStorage) storeEventLog(ctx context.Context, dbTX *sql.Tx, execTxId uint64, l *types.Log) error {
func (es *eventsStorage) storeEventLog(ctx context.Context, dbTX *sql.Tx, receiptId uint64, l *types.Log) error {
eventSig := l.Topics[0]

contract, err := es.readContract(ctx, dbTX, l.Address)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (es *eventsStorage) storeEventLog(ctx context.Context, dbTX *sql.Tx, execTx
if len(data) == 0 {
data = nil
}
err = enclavedb.WriteEventLog(ctx, dbTX, eventType.Id, topicIds, data, l.Index, execTxId)
err = enclavedb.WriteEventLog(ctx, dbTX, eventType.Id, topicIds, data, l.Index, receiptId)
if err != nil {
return fmt.Errorf("could not write event log. Cause: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/storage/init/edgelessdb/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ create table if not exists tendb.receipt
cumulative_gas_used BIGINT not null,
effective_gas_price BIGINT,
created_contract_address binary(20),
tx int,
tx int NOT NULL,
batch int NOT NULL,
INDEX (batch),
INDEX (tx, batch),
Expand Down
3 changes: 1 addition & 2 deletions go/enclave/storage/init/sqlite/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ create table if not exists receipt
cumulative_gas_used int not null,
effective_gas_price int,
created_contract_address binary(20),
-- commenting out the fk until synthetic transactions are also stored
tx INTEGER,
tx INTEGER NOT NULL REFERENCES tx,
batch INTEGER NOT NULL REFERENCES batch
);
create index IDX_EX_TX_BATCH on receipt (batch);
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type BatchResolver interface {
// StoreBatch stores an un-executed batch.
StoreBatch(ctx context.Context, batch *core.Batch, convertedHash gethcommon.Hash) error
// StoreExecutedBatch - store the batch after it was executed
StoreExecutedBatch(ctx context.Context, batch *common.BatchHeader, results core.TxExecResults) error
StoreExecutedBatch(ctx context.Context, batch *core.Batch, results core.TxExecResults) error

// StoreRollup
StoreRollup(ctx context.Context, rollup *common.ExtRollup, header *common.CalldataRollupHeader) error
Expand Down
19 changes: 10 additions & 9 deletions go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,12 +589,12 @@ func (s *storageImpl) StoreBatch(ctx context.Context, batch *core.Batch, convert
transactionsWithSenders[i] = &core.TxWithSender{Tx: tx, Sender: &sender}
}

senders, toContracts, err := s.handleTxSendersAndReceivers(ctx, transactionsWithSenders, dbTx)
senderIds, toContractIds, err := s.handleTxSendersAndReceivers(ctx, transactionsWithSenders, dbTx)
if err != nil {
return err
}

if err := enclavedb.WriteTransactions(ctx, dbTx, transactionsWithSenders, batch.Header.Number.Uint64(), false, senders, toContracts); err != nil {
if err := enclavedb.WriteTransactions(ctx, dbTx, transactionsWithSenders, batch.Header.Number.Uint64(), false, senderIds, toContractIds, 0); err != nil {
return fmt.Errorf("could not write transactions. Cause: %w", err)
}
}
Expand Down Expand Up @@ -633,7 +633,7 @@ func (s *storageImpl) handleTxSendersAndReceivers(ctx context.Context, transacti
return senders, toContracts, nil
}

func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *common.BatchHeader, results core.TxExecResults) error {
func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *core.Batch, results core.TxExecResults) error {
defer s.logDuration("StoreExecutedBatch", measure.NewStopwatch())
executed, err := enclavedb.BatchWasExecuted(ctx, s.db.GetSQLDB(), batch.Hash())
if err != nil {
Expand All @@ -644,31 +644,32 @@ func (s *storageImpl) StoreExecutedBatch(ctx context.Context, batch *common.Batc
return nil
}

s.logger.Trace("storing executed batch", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.SequencerOrderNo, "receipts", len(results))
s.logger.Trace("storing executed batch", log.BatchHashKey, batch.Hash(), log.BatchSeqNoKey, batch.Header.SequencerOrderNo, "receipts", len(results))

dbTx, err := s.db.NewDBTransaction(ctx)
if err != nil {
return fmt.Errorf("could not create DB transaction - %w", err)
}
defer dbTx.Rollback()

if err := enclavedb.MarkBatchExecuted(ctx, dbTx, batch.SequencerOrderNo); err != nil {
if err := enclavedb.MarkBatchExecuted(ctx, dbTx, batch.Header.SequencerOrderNo); err != nil {
return fmt.Errorf("could not set the executed flag. Cause: %w", err)
}

transactionsWithSenders := results.GetSynthetic().ToTransactionsWithSenders()
// store the synthetic transactions
transactionsWithSenders := results.SyntheticTransactions().ToTransactionsWithSenders()

senders, toContracts, err := s.handleTxSendersAndReceivers(ctx, transactionsWithSenders, dbTx)
if err != nil {
return fmt.Errorf("could not handle synthetic txs senders and receivers. Cause: %w", err)
}

if err := enclavedb.WriteTransactions(ctx, dbTx, transactionsWithSenders, batch.Number.Uint64(), true, senders, toContracts); err != nil {
if err := enclavedb.WriteTransactions(ctx, dbTx, transactionsWithSenders, batch.Header.Number.Uint64(), true, senders, toContracts, len(batch.Transactions)); err != nil {
return fmt.Errorf("could not write synthetic txs. Cause: %w", err)
}

for _, txExecResult := range results {
err = s.eventsStorage.storeReceiptAndEventLogs(ctx, dbTx, batch, txExecResult)
err = s.eventsStorage.storeReceiptAndEventLogs(ctx, dbTx, batch.Header, txExecResult)
if err != nil {
return fmt.Errorf("could not store receipt. Cause: %w", err)
}
Expand Down Expand Up @@ -802,7 +803,7 @@ func (s *storageImpl) FilterLogs(
return nil, err
}
// the database returns an unsorted list of event logs.
// we have to perform the sorting programatically
// we have to perform the sorting programmatically
sort.Slice(logs, func(i, j int) bool {
if logs[i].BlockNumber == logs[j].BlockNumber {
return logs[i].Index < logs[j].Index
Expand Down
Loading

0 comments on commit 682ee36

Please sign in to comment.