Skip to content

Commit

Permalink
refactor and fix statedb (#2357)
Browse files Browse the repository at this point in the history
* refactor and fix statedb

* fix receipt batch hash
  • Loading branch information
tudor-malene authored Mar 3, 2025
1 parent 028ebcc commit 670ab22
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 97 deletions.
74 changes: 36 additions & 38 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"math/big"
"sync"

"github.com/ten-protocol/go-ten/go/common/gethutil"

"github.com/ten-protocol/go-ten/go/common/compression"

gethcore "github.com/ethereum/go-ethereum/core"
Expand Down Expand Up @@ -374,8 +372,6 @@ func (executor *batchExecutor) execMempoolTransactions(ec *BatchExecutionContext
}
}

ec.stateDB.Finalise(true)

ec.Transactions = results.BatchTransactions()
ec.batchTxResults = results

Expand All @@ -396,7 +392,6 @@ func (executor *batchExecutor) executeExistingBatch(ec *BatchExecutionContext) e
return fmt.Errorf("could not process transactions. Cause: %w", err)
}
ec.batchTxResults = txResults
ec.stateDB.Finalise(true)
return nil
}

Expand Down Expand Up @@ -521,47 +516,61 @@ func (executor *batchExecutor) execOnBlockEndTx(ec *BatchExecutionContext) error
}

func (executor *batchExecutor) execResult(ec *BatchExecutionContext) (*ComputedBatch, error) {
executor.stateDBMutex.Lock()
defer executor.stateDBMutex.Unlock()

batch, allResults, err := executor.createBatch(ec)
if err != nil {
return nil, fmt.Errorf("failed creating batch. Cause: %w", err)
}

commitFunc := func(deleteEmptyObjects bool) (gethcommon.Hash, error) {
executor.stateDBMutex.Lock()
defer executor.stateDBMutex.Unlock()
h, err := ec.stateDB.Commit(batch.Number().Uint64(), deleteEmptyObjects)
rootHash, err := ec.stateDB.Commit(batch.Number().Uint64(), true)
if err != nil {
return nil, fmt.Errorf("commit failure for batch %d. Cause: %w", ec.currentBatch.SeqNo(), err)
}

trieDB := executor.storage.TrieDB()
err = trieDB.Commit(rootHash, false)
if err != nil {
executor.logger.Error("Failed to commit trieDB", "error", err)
return nil, fmt.Errorf("failed to commit trieDB. Cause: %w", err)
}
batch.Header.Root = rootHash

// When system contract deployment genesis batch is committed, initialize executor's addresses for the hooks.
// Further restarts will call into Load() which will take the receipts for batch number 2 (which should never be deleted)
// and reinitialize them.
if ec.currentBatch.Header.SequencerOrderNo.Uint64() == common.L2SysContractGenesisSeqNo {
if len(ec.genesisSysCtrResult) == 0 {
return nil, fmt.Errorf("failed to instantiate system contracts: expected receipt for system deployer transaction, but no receipts found in batch")
}

err := executor.systemContracts.Initialize(batch, *ec.genesisSysCtrResult.Receipts()[0], executor.crossChainProcessors.Local)
if err != nil {
return gethutil.EmptyHash, fmt.Errorf("commit failure for batch %d. Cause: %w", ec.currentBatch.SeqNo(), err)
return nil, fmt.Errorf("failed to initialize system contracts: %w", err)
}
trieDB := executor.storage.TrieDB()
err = trieDB.Commit(h, false)

// When system contract deployment genesis batch is committed, initialize executor's addresses for the hooks.
// Further restarts will call into Load() which will take the receipts for batch number 2 (which should never be deleted)
// and reinitialize them.
if err == nil && ec.currentBatch.Header.SequencerOrderNo.Uint64() == common.L2SysContractGenesisSeqNo {
if len(ec.genesisSysCtrResult) == 0 {
return h, fmt.Errorf("failed to instantiate system contracts: expected receipt for system deployer transaction, but no receipts found in batch")
}
}

batch.ResetHash()

return h, executor.systemContracts.Initialize(batch, *ec.genesisSysCtrResult.Receipts()[0], executor.crossChainProcessors.Local)
// the logs and receipts produced by the EVM have the wrong hash which must be adjusted
for _, receipt := range allResults.Receipts() {
receipt.BlockHash = batch.Hash()
for _, l := range receipt.Logs {
l.BlockHash = batch.Hash()
}
return h, err
}

return &ComputedBatch{
Batch: batch,
TxExecResults: allResults,
Commit: commitFunc,
}, nil
}

func (executor *batchExecutor) createBatch(ec *BatchExecutionContext) (*core.Batch, core.TxExecResults, error) {
// we need to copy the batch to reset the internal hash cache
batch := *ec.currentBatch
batch.Header.Root = ec.stateDB.IntermediateRoot(false)
batch.Transactions = ec.batchTxResults.BatchTransactions()
batch.ResetHash()

txReceipts := ec.batchTxResults.Receipts()
if err := executor.populateOutboundCrossChainData(ec.ctx, &batch, ec.l1block, txReceipts); err != nil {
Expand All @@ -571,24 +580,17 @@ func (executor *batchExecutor) createBatch(ec *BatchExecutionContext) (*core.Bat
allResults := append(append(append(append(ec.batchTxResults, ec.xChainResults...), ec.callbackTxResults...), ec.blockEndResult...), ec.genesisSysCtrResult...)
receipts := allResults.Receipts()
if len(receipts) == 0 {
batch.Header.ReceiptHash = types.EmptyRootHash
batch.Header.ReceiptHash = types.EmptyReceiptsHash
} else {
batch.Header.ReceiptHash = types.DeriveSha(receipts, trie.NewStackTrie(nil))
}

if len(batch.Transactions) == 0 {
batch.Header.TxHash = types.EmptyRootHash
batch.Header.TxHash = types.EmptyTxsHash
} else {
batch.Header.TxHash = types.DeriveSha(types.Transactions(batch.Transactions), trie.NewStackTrie(nil))
}

// the logs and receipts produced by the EVM have the wrong hash which must be adjusted
for _, receipt := range receipts {
receipt.BlockHash = batch.Hash()
for _, l := range receipt.Logs {
l.BlockHash = batch.Hash()
}
}
return &batch, allResults, nil
}

Expand Down Expand Up @@ -621,10 +623,6 @@ func (executor *batchExecutor) ExecuteBatch(ctx context.Context, batch *core.Bat
return nil, fmt.Errorf("batch is in invalid state. Incoming hash: %s Computed hash: %s", batch.Hash(), cb.Batch.Hash())
}

if _, err := cb.Commit(true); err != nil {
return nil, fmt.Errorf("cannot commit stateDB for incoming valid batch %s. Cause: %w", batch.Hash(), err)
}

return cb.TxExecResults, nil
}

Expand Down
1 change: 0 additions & 1 deletion go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ type BatchExecutionContext struct {
type ComputedBatch struct {
Batch *core.Batch
TxExecResults []*core.TxExecResult
Commit func(bool) (gethcommon.Hash, error)
}

type BatchExecutor interface {
Expand Down
4 changes: 0 additions & 4 deletions go/enclave/components/rollup_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,6 @@ func (rc *RollupCompression) executeAndSaveIncompleteBatches(ctx context.Context
rc.logger.Crit("Rollup decompression failure. The check hashes don't match")
}*/

if _, err := computedBatch.Commit(true); err != nil {
return fmt.Errorf("cannot commit stateDB for incoming valid batch seq=%d. Cause: %w", incompleteBatch.seqNo, err)
}

convertedHeader, err := rc.gethEncodingService.CreateEthHeaderForBatch(ctx, computedBatch.Batch.Header)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func NewEnclave(config *enclaveconfig.EnclaveConfig, genesis *genesis.Genesis, m
dataCompressionService := compression.NewBrotliDataCompressionService(int64(config.DecompressionLimit))
batchExecutor := components.NewBatchExecutor(storage, batchRegistry, evmFacade, config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, scb, evmEntropyService, mempool, dataCompressionService, logger)

// ensure cached chain state data is up-to-date using the persisted batch data
err = restoreStateDBCache(context.Background(), storage, batchRegistry, batchExecutor, genesis, logger)
// ensure EVM state data is up-to-date using the persisted batch data
err = syncExecutedBatchesWithEVMStateDB(context.Background(), storage, batchRegistry, logger)
if err != nil {
logger.Crit("failed to resync L2 chain state DB after restart", log.ErrKey, err)
}
Expand Down
5 changes: 2 additions & 3 deletions go/enclave/evm/evm_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func (exec *evmExecutor) execute(tx *common.L2PricedTransaction, from gethcommon
func (exec *evmExecutor) ExecuteCall(ctx context.Context, msg *gethcore.Message, s *state.StateDB, header *common.BatchHeader) (*gethcore.ExecutionResult, error, common.SystemError) {
defer core.LogMethodDuration(exec.logger, measure.NewStopwatch(), "evm_facade.go:Call()")

snapshot := s.Snapshot()
defer s.RevertToSnapshot(snapshot) // Always revert after simulation

vmCfg := vm.Config{
NoBaseFee: true,
}
Expand All @@ -149,6 +146,8 @@ func (exec *evmExecutor) ExecuteCall(ctx context.Context, msg *gethcore.Message,
gp.SetGas(exec.gasEstimationCap)

cleanState := createCleanState(s, msg, ethHeader, exec.cc)
snapshot := cleanState.Snapshot()
defer cleanState.RevertToSnapshot(snapshot) // Always revert after simulation

blockContext := gethcore.NewEVMBlockContext(ethHeader, exec.chain, nil)
// sets TxKey.origin
Expand Down
4 changes: 0 additions & 4 deletions go/enclave/nodetype/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,6 @@ func (s *sequencer) produceBatch(
return nil, fmt.Errorf("failed computing batch. Cause: %w", err)
}

if _, err := cb.Commit(true); err != nil {
return nil, fmt.Errorf("failed committing batch state. Cause: %w", err)
}

if err := s.signBatch(cb.Batch); err != nil {
return nil, fmt.Errorf("failed signing created batch. Cause: %w", err)
}
Expand Down
53 changes: 13 additions & 40 deletions go/enclave/restore_statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import (
"github.com/ten-protocol/go-ten/go/common/errutil"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/enclave/components"
"github.com/ten-protocol/go-ten/go/enclave/core"
"github.com/ten-protocol/go-ten/go/enclave/genesis"
"github.com/ten-protocol/go-ten/go/enclave/storage"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"
)

// this function looks at the batch chain and makes sure the resulting stateDB snapshots are available, replaying them if needed
// (if there had been a clean shutdown and all stateDB data was persisted this should do nothing)
func restoreStateDBCache(ctx context.Context, storage storage.Storage, registry components.BatchRegistry, producer components.BatchExecutor, gen *genesis.Genesis, logger gethlog.Logger) error {
func syncExecutedBatchesWithEVMStateDB(ctx context.Context, storage storage.Storage, registry components.BatchRegistry, logger gethlog.Logger) error {
if registry.HeadBatchSeq() == nil {
// not initialised yet
return nil
Expand All @@ -34,7 +32,7 @@ func restoreStateDBCache(ctx context.Context, storage storage.Storage, registry
}
if !stateDBAvailableForBatch(ctx, registry, batch.Hash()) {
logger.Info("state not available for latest batch after restart - rebuilding stateDB cache from batches")
err = replayBatchesToValidState(ctx, storage, registry, producer, gen, logger)
err = markUnexecutedBatches(ctx, storage, registry, logger)
if err != nil {
return fmt.Errorf("unable to replay batches to restore valid state - %w", err)
}
Expand All @@ -51,53 +49,28 @@ func stateDBAvailableForBatch(ctx context.Context, registry components.BatchRegi
return err == nil
}

// replayBatchesToValidState is used to repopulate the stateDB cache with data from persisted batches. Two step process:
// 1. step backwards from head batch until we find a batch that is already in stateDB cache, builds list of batches to replay
// 2. iterate that list of batches from the earliest, process the transactions to calculate and cache the stateDB
// todo (#1416) - get unit test coverage around this (and L2 Chain code more widely, see ticket #1416 )
func replayBatchesToValidState(ctx context.Context, storage storage.Storage, registry components.BatchRegistry, batchExecutor components.BatchExecutor, gen *genesis.Genesis, logger gethlog.Logger) error {
// this slice will be a stack of batches to replay as we walk backwards in search of latest valid state
// todo - consider capping the size of this batch list using FIFO to avoid memory issues, and then repeating as necessary
var batchesToReplay []*core.Batch
// `batchToReplayFrom` variable will eventually be the latest batch for which we are able to produce a StateDB
// markUnexecutedBatches marks the batches for which the statedb is missing as un-executed
func markUnexecutedBatches(ctx context.Context, storage storage.Storage, registry components.BatchRegistry, logger gethlog.Logger) error {
// `currentBatch` variable will eventually be the latest batch for which we are able to produce a StateDB
// - we will then set that as the head of the L2 so that this node can rebuild its missing state
batchToReplayFrom, err := storage.FetchBatchBySeqNo(ctx, registry.HeadBatchSeq().Uint64())
currentBatch, err := storage.FetchBatchBySeqNo(ctx, registry.HeadBatchSeq().Uint64())
if err != nil {
return fmt.Errorf("no head batch found in DB but expected to replay batches - %w", err)
}
// loop backwards building a slice of all batches that don't have cached stateDB data available
for !stateDBAvailableForBatch(ctx, registry, batchToReplayFrom.Hash()) {
batchesToReplay = append(batchesToReplay, batchToReplayFrom)
if batchToReplayFrom.NumberU64() == 0 {
for !stateDBAvailableForBatch(ctx, registry, currentBatch.Hash()) {
err = storage.MarkBatchAsUnexecuted(ctx, currentBatch.SeqNo())
if err != nil {
return fmt.Errorf("unable to mark batch as unexecuted - %w", err)
}
if currentBatch.NumberU64() == common.L2GenesisHeight {
// no more parents to check, replaying from genesis
break
}
batchToReplayFrom, err = storage.FetchBatch(ctx, batchToReplayFrom.Header.ParentHash)
currentBatch, err = storage.FetchBatch(ctx, currentBatch.Header.ParentHash)
if err != nil {
return fmt.Errorf("unable to fetch previous batch while rolling back to stable state - %w", err)
}
}
logger.Info("replaying batch data into stateDB cache", "fromBatch", batchesToReplay[len(batchesToReplay)-1].NumberU64(),
"toBatch", batchesToReplay[0].NumberU64())
// loop through the slice of batches without stateDB data to cache the state (loop in reverse because slice is newest to oldest)
for i := len(batchesToReplay) - 1; i >= 0; i-- {
batch := batchesToReplay[i]

// if genesis batch then create the genesis state before continuing on with remaining batches
if batch.NumberU64() == 0 {
err := gen.CommitGenesisState(storage)
if err != nil {
return err
}
continue
}

// calculate the stateDB after this batch and store it in the cache
_, err := batchExecutor.ExecuteBatch(ctx, batch)
if err != nil {
return err
}
}

return nil
}
5 changes: 1 addition & 4 deletions go/enclave/rpc/TenStorageRead.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ten-protocol/go-ten/go/common/gethencoding"
"github.com/ten-protocol/go-ten/go/common/log"
"github.com/ten-protocol/go-ten/go/common/syserr"
Expand Down Expand Up @@ -62,9 +61,7 @@ func TenStorageReadValidate(reqParams []any, builder *CallBuilder[storageReadWit
}

func TenStorageReadExecute(builder *CallBuilder[storageReadWithBlock, string], rpc *EncryptionManager) error {
var err error
var stateDb *state.StateDB
stateDb, err = rpc.registry.GetBatchState(builder.ctx, *builder.Param.block)
stateDb, err := rpc.registry.GetBatchState(builder.ctx, *builder.Param.block)
if err != nil {
builder.Err = fmt.Errorf("unable to read block number - %w", err)
return nil
Expand Down
5 changes: 5 additions & 0 deletions go/enclave/storage/enclavedb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ func BatchWasExecuted(ctx context.Context, db *sql.DB, hash common.L2BatchHash)
return result, nil
}

func MarkBatchAsUnexecuted(ctx context.Context, dbTx *sql.Tx, seqNo *big.Int) error {
_, err := dbTx.ExecContext(ctx, "update batch set is_executed=false where sequence=?", seqNo.Uint64())
return err
}

func GetTransactionsPerAddress(ctx context.Context, db *sql.DB, address *gethcommon.Address, pagination *common.QueryPagination) ([]*core.InternalReceipt, error) {
return loadReceiptList(ctx, db, address, " ", []any{}, " ORDER BY b.sequence DESC LIMIT ? OFFSET ?", []any{pagination.Size, pagination.Offset})
}
Expand Down
1 change: 1 addition & 0 deletions go/enclave/storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type BatchResolver interface {

// BatchWasExecuted - return true if the batch was executed
BatchWasExecuted(ctx context.Context, hash common.L2BatchHash) (bool, error)
MarkBatchAsUnexecuted(ctx context.Context, seqNo *big.Int) error

// StoreBatch stores an un-executed batch.
StoreBatch(ctx context.Context, batch *core.Batch, convertedHash gethcommon.Hash) error
Expand Down
17 changes: 16 additions & 1 deletion go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var defaultCacheConfig = &gethcore.CacheConfig{
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
StateScheme: rawdb.HashScheme, // todo - change to rawdb.PathScheme for next release
}

var trieDBConfig = &triedb.Config{
Expand Down Expand Up @@ -902,6 +902,21 @@ func (s *storageImpl) BatchWasExecuted(ctx context.Context, hash common.L2BatchH
return enclavedb.BatchWasExecuted(ctx, s.db.GetSQLDB(), hash)
}

func (s *storageImpl) MarkBatchAsUnexecuted(ctx context.Context, seqNo *big.Int) error {
defer s.logDuration("MarkBatchAsUnexecuted", measure.NewStopwatch())
dbTx, err := s.db.NewDBTransaction(ctx)
if err != nil {
return fmt.Errorf("could not create DB transaction - %w", err)
}
defer dbTx.Rollback()

err = enclavedb.MarkBatchAsUnexecuted(ctx, dbTx, seqNo)
if err != nil {
return fmt.Errorf("could not mark batch as unexecuted. Cause: %w", err)
}
return dbTx.Commit()
}

func (s *storageImpl) GetTransactionsPerAddress(ctx context.Context, requester *gethcommon.Address, pagination *common.QueryPagination) ([]*core.InternalReceipt, error) {
defer s.logDuration("GetTransactionsPerAddress", measure.NewStopwatch())
return enclavedb.GetTransactionsPerAddress(ctx, s.db.GetSQLDB(), requester, pagination)
Expand Down

0 comments on commit 670ab22

Please sign in to comment.