Skip to content

Commit

Permalink
StateDB cleanup (#1890)
Browse files Browse the repository at this point in the history
* better cache for the statedb

* fix merge

* fix

* remove cache

* address pr comment
  • Loading branch information
tudor-malene authored Apr 30, 2024
1 parent 40ae16f commit ce8d274
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 47 deletions.
7 changes: 5 additions & 2 deletions go/enclave/components/batch_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var ErrNoTransactionsToProcess = fmt.Errorf("no transactions to process")
// batchExecutor - the component responsible for executing batches
type batchExecutor struct {
storage storage.Storage
batchRegistry BatchRegistry
config config.EnclaveConfig
gethEncodingService gethencoding.EncodingService
crossChainProcessors *crosschain.Processors
Expand All @@ -57,6 +58,7 @@ type batchExecutor struct {

func NewBatchExecutor(
storage storage.Storage,
batchRegistry BatchRegistry,
config config.EnclaveConfig,
gethEncodingService gethencoding.EncodingService,
cc *crosschain.Processors,
Expand All @@ -68,6 +70,7 @@ func NewBatchExecutor(
) BatchExecutor {
return &batchExecutor{
storage: storage,
batchRegistry: batchRegistry,
config: config,
gethEncodingService: gethEncodingService,
crossChainProcessors: cc,
Expand Down Expand Up @@ -163,7 +166,7 @@ func (executor *batchExecutor) ComputeBatch(ctx context.Context, context *BatchE
// Create a new batch based on the fromBlock of inclusion of the previous, including all new transactions
batch := core.DeterministicEmptyBatch(parent.Header, block, context.AtTime, context.SequencerNo, context.BaseFee, context.Creator)

stateDB, err := executor.storage.CreateStateDB(ctx, batch.Header.ParentHash)
stateDB, err := executor.batchRegistry.GetBatchState(ctx, &batch.Header.ParentHash)
if err != nil {
return nil, fmt.Errorf("could not create stateDB. Cause: %w", err)
}
Expand Down Expand Up @@ -444,7 +447,7 @@ func (executor *batchExecutor) processTransactions(
} else {
// Exclude all errors
excludedTransactions = append(excludedTransactions, tx.Tx)
executor.logger.Info("Excluding transaction from batch", log.TxKey, tx.Tx.Hash(), log.BatchHashKey, batch.Hash(), "cause", result)
executor.logger.Debug("Excluding transaction from batch", log.TxKey, tx.Tx.Hash(), log.BatchHashKey, batch.Hash(), "cause", result)
}
}
sort.Sort(sortByTxIndex(txReceipts))
Expand Down
18 changes: 16 additions & 2 deletions go/enclave/components/batch_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/ten-protocol/go-ten/go/common"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ten-protocol/go-ten/go/enclave/storage"

Expand Down Expand Up @@ -46,6 +48,7 @@ func NewBatchRegistry(storage storage.Storage, logger gethlog.Logger) BatchRegis
} else {
headBatchSeq = headBatch.SeqNo()
}

return &batchRegistry{
storage: storage,
headBatchSeq: headBatchSeq,
Expand Down Expand Up @@ -154,15 +157,26 @@ func (br *batchRegistry) BatchesAfter(ctx context.Context, batchSeqNo uint64, up
return resultBatches, resultBlocks, nil
}

func (br *batchRegistry) GetBatchState(ctx context.Context, hash *common.L2BatchHash) (*state.StateDB, error) {
batch, err := br.storage.FetchBatch(ctx, *hash)
if err != nil {
return nil, err
}
return getBatchState(ctx, br.storage, batch)
}

func (br *batchRegistry) GetBatchStateAtHeight(ctx context.Context, blockNumber *gethrpc.BlockNumber) (*state.StateDB, error) {
// We retrieve the batch of interest.
batch, err := br.GetBatchAtHeight(ctx, *blockNumber)
if err != nil {
return nil, err
}

// We get that of the chain at that height
blockchainState, err := br.storage.CreateStateDB(ctx, batch.Hash())
return getBatchState(ctx, br.storage, batch)
}

func getBatchState(ctx context.Context, storage storage.Storage, batch *core.Batch) (*state.StateDB, error) {
blockchainState, err := storage.CreateStateDB(ctx, batch.Hash())
if err != nil {
return nil, fmt.Errorf("could not create stateDB. Cause: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions go/enclave/components/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ type BatchRegistry interface {
// BatchesAfter - Given a hash, will return batches following it until the head batch and the l1 blocks referenced by those batches
BatchesAfter(ctx context.Context, batchSeqNo uint64, upToL1Height uint64, rollupLimiter limiters.RollupLimiter) ([]*core.Batch, []*types.Block, error)

// GetBatchStateAtHeight - creates a stateDB that represents the state committed when
// the batch with height matching the blockNumber was created and stored.
// GetBatchStateAtHeight - creates a stateDB for the block number
GetBatchStateAtHeight(ctx context.Context, blockNumber *gethrpc.BlockNumber) (*state.StateDB, error)

// GetBatchState - creates a stateDB for the block hash
GetBatchState(ctx context.Context, hash *common.L2BatchHash) (*state.StateDB, error)

// GetBatchAtHeight - same as `GetBatchStateAtHeight`, but instead returns the full batch
// rather than its stateDB only.
GetBatchAtHeight(ctx context.Context, height gethrpc.BlockNumber) (*core.Batch, error)
Expand Down
16 changes: 8 additions & 8 deletions go/enclave/enclave.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func NewEnclave(

gasOracle := gas.NewGasOracle()
blockProcessor := components.NewBlockProcessor(storage, crossChainProcessors, gasOracle, logger)
batchExecutor := components.NewBatchExecutor(storage, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, config.GasBatchExecutionLimit, logger)
sigVerifier, err := components.NewSignatureValidator(storage)
registry := components.NewBatchRegistry(storage, logger)
batchExecutor := components.NewBatchExecutor(storage, registry, *config, gethEncodingService, crossChainProcessors, genesis, gasOracle, chainConfig, config.GasBatchExecutionLimit, logger)
sigVerifier, err := components.NewSignatureValidator(storage)
rProducer := components.NewRollupProducer(enclaveKey.EnclaveID(), storage, registry, logger)
if err != nil {
logger.Crit("Could not initialise the signature validator", log.ErrKey, err)
Expand Down Expand Up @@ -226,7 +226,7 @@ func NewEnclave(
config.GasLocalExecutionCapFlag,
)
rpcEncryptionManager := rpc.NewEncryptionManager(ecies.ImportECDSA(obscuroKey), storage, registry, crossChainProcessors, service, config, gasOracle, storage, blockProcessor, chain, logger)
subscriptionManager := events.NewSubscriptionManager(storage, config.ObscuroChainID, logger)
subscriptionManager := events.NewSubscriptionManager(storage, registry, config.ObscuroChainID, logger)

// ensure cached chain state data is up-to-date using the persisted batch data
err = restoreStateDBCache(context.Background(), storage, registry, batchExecutor, genesis, logger)
Expand Down Expand Up @@ -671,7 +671,7 @@ func (e *enclaveImpl) GetCode(ctx context.Context, address gethcommon.Address, b
return nil, responses.ToInternalError(fmt.Errorf("requested GetCode with the enclave stopping"))
}

stateDB, err := e.storage.CreateStateDB(ctx, *batchHash)
stateDB, err := e.registry.GetBatchState(ctx, batchHash)
if err != nil {
return nil, responses.ToInternalError(fmt.Errorf("could not create stateDB. Cause: %w", err))
}
Expand Down Expand Up @@ -904,7 +904,7 @@ func restoreStateDBCache(ctx context.Context, storage storage.Storage, registry
}
return fmt.Errorf("unexpected error fetching head batch to resync- %w", err)
}
if !stateDBAvailableForBatch(ctx, storage, batch.Hash()) {
if !stateDBAvailableForBatch(ctx, registry, batch.Hash()) {
logger.Info("state not available for latest batch after restart - rebuilding stateDB cache from batches")
err = replayBatchesToValidState(ctx, storage, registry, producer, gen, logger)
if err != nil {
Expand All @@ -918,8 +918,8 @@ func restoreStateDBCache(ctx context.Context, storage storage.Storage, registry
// batch in the chain and is used to query state at a certain height.
//
// This method checks if the stateDB data is available for a given batch hash (so it can be restored if not)
func stateDBAvailableForBatch(ctx context.Context, storage storage.Storage, hash common.L2BatchHash) bool {
_, err := storage.CreateStateDB(ctx, hash)
func stateDBAvailableForBatch(ctx context.Context, registry components.BatchRegistry, hash common.L2BatchHash) bool {
_, err := registry.GetBatchState(ctx, &hash)
return err == nil
}

Expand All @@ -938,7 +938,7 @@ func replayBatchesToValidState(ctx context.Context, storage storage.Storage, reg
return fmt.Errorf("no head batch found in DB but expected to replay batches - %w", err)
}
// loop backwards building a slice of all batches that don't have cached stateDB data available
for !stateDBAvailableForBatch(ctx, storage, batchToReplayFrom.Hash()) {
for !stateDBAvailableForBatch(ctx, registry, batchToReplayFrom.Hash()) {
batchesToReplay = append(batchesToReplay, batchToReplayFrom)
if batchToReplayFrom.NumberU64() == 0 {
// no more parents to check, replaying from genesis
Expand Down
19 changes: 12 additions & 7 deletions go/enclave/events/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"sync"

"github.com/ten-protocol/go-ten/go/enclave/components"

"github.com/ten-protocol/go-ten/go/enclave/vkhandler"
gethrpc "github.com/ten-protocol/go-ten/lib/gethfork/rpc"

Expand Down Expand Up @@ -36,7 +38,8 @@ type logSubscription struct {
// SubscriptionManager manages the creation/deletion of subscriptions, and the filtering and encryption of logs for
// active subscriptions.
type SubscriptionManager struct {
storage storage.Storage
storage storage.Storage
registry components.BatchRegistry

subscriptions map[gethrpc.ID]*logSubscription
chainID int64
Expand All @@ -45,9 +48,10 @@ type SubscriptionManager struct {
logger gethlog.Logger
}

func NewSubscriptionManager(storage storage.Storage, chainID int64, logger gethlog.Logger) *SubscriptionManager {
func NewSubscriptionManager(storage storage.Storage, registry components.BatchRegistry, chainID int64, logger gethlog.Logger) *SubscriptionManager {
return &SubscriptionManager{
storage: storage,
storage: storage,
registry: registry,

subscriptions: map[gethrpc.ID]*logSubscription{},
chainID: chainID,
Expand Down Expand Up @@ -89,9 +93,9 @@ func (s *SubscriptionManager) RemoveSubscription(id gethrpc.ID) {
}

// FilterLogsForReceipt removes the logs that the sender of a transaction is not allowed to view
func FilterLogsForReceipt(ctx context.Context, receipt *types.Receipt, account *gethcommon.Address, storage storage.Storage) ([]*types.Log, error) {
filteredLogs := []*types.Log{}
stateDB, err := storage.CreateStateDB(ctx, receipt.BlockHash)
func FilterLogsForReceipt(ctx context.Context, receipt *types.Receipt, account *gethcommon.Address, registry components.BatchRegistry) ([]*types.Log, error) {
var filteredLogs []*types.Log
stateDB, err := registry.GetBatchState(ctx, &receipt.BlockHash)
if err != nil {
return nil, fmt.Errorf("could not create state DB to filter logs. Cause: %w", err)
}
Expand Down Expand Up @@ -130,7 +134,8 @@ func (s *SubscriptionManager) GetSubscribedLogsForBatch(ctx context.Context, bat
}

// the stateDb is needed to extract the user addresses from the topics
stateDB, err := s.storage.CreateStateDB(ctx, batch.Hash())
h := batch.Hash()
stateDB, err := s.registry.GetBatchState(ctx, &h)
if err != nil {
return nil, fmt.Errorf("could not create state DB to filter logs. Cause: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion go/enclave/rpc/GetTransactionCount.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func GetTransactionCountExecute(builder *CallBuilder[uint64, string], rpc *Encry
if err == nil {
// todo - we should return an error when head state is not available, but for current test situations with race
// conditions we allow it to return zero while head state is uninitialized
s, err := rpc.storage.CreateStateDB(builder.ctx, l2Head.Hash())
h := l2Head.Hash()
s, err := rpc.registry.GetBatchState(builder.ctx, &h)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/enclave/rpc/GetTransactionReceipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func GetTransactionReceiptExecute(builder *CallBuilder[gethcommon.Hash, map[stri
}

// We filter out irrelevant logs.
txReceipt.Logs, err = events.FilterLogsForReceipt(builder.ctx, txReceipt, &txSigner, rpc.storage)
txReceipt.Logs, err = events.FilterLogsForReceipt(builder.ctx, txReceipt, &txSigner, rpc.registry)
if err != nil {
rpc.logger.Error("error filter logs ", log.TxKey, txHash, log.ErrKey, err)
// this is a system error
Expand Down
56 changes: 32 additions & 24 deletions go/enclave/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/triedb/hashdb"

"github.com/ethereum/go-ethereum/triedb"

"github.com/dgraph-io/ristretto"
Expand All @@ -24,11 +27,9 @@ import (
"github.com/ethereum/go-ethereum/rlp"

gethcore "github.com/ethereum/go-ethereum/core"
gethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ten-protocol/go-ten/go/common/syserr"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
gethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ten-protocol/go-ten/go/common"
"github.com/ten-protocol/go-ten/go/common/log"
Expand Down Expand Up @@ -65,7 +66,7 @@ type storageImpl struct {

cachedSharedSecret *crypto.SharedEnclaveSecret

stateDB state.Database
stateCache state.Database
chainConfig *params.ChainConfig
logger gethlog.Logger
}
Expand All @@ -78,21 +79,28 @@ func NewStorageFromConfig(config *config.EnclaveConfig, chainConfig *params.Chai
return NewStorage(backingDB, chainConfig, logger)
}

var defaultCacheConfig = &gethcore.CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256,
SnapshotWait: true,
StateScheme: rawdb.HashScheme,
}

var trieDBConfig = &triedb.Config{
Preimages: defaultCacheConfig.Preimages,
IsVerkle: false,
HashDB: &hashdb.Config{
CleanCacheSize: defaultCacheConfig.TrieCleanLimit * 1024 * 1024,
},
}

func NewStorage(backingDB enclavedb.EnclaveDB, chainConfig *params.ChainConfig, logger gethlog.Logger) Storage {
// these are the twice the default configs from geth
// todo - consider tweaking these independently on the validator and on the sequencer
// the validator probably need higher values on this cache?
cacheConfig := &gethcore.CacheConfig{
TrieCleanLimit: 256 * 2,
TrieDirtyLimit: 256 * 2,
TrieTimeLimit: 5 * time.Minute,
SnapshotLimit: 256 * 2,
SnapshotWait: true,
}

stateDB := state.NewDatabaseWithConfig(backingDB, &triedb.Config{
Preimages: cacheConfig.Preimages,
})
// Open trie database with provided config
triedb := triedb.NewDatabase(backingDB, trieDBConfig)

stateDB := state.NewDatabaseWithNodeDB(backingDB, triedb)

// todo (tudor) figure out the config
ristrettoCache, err := ristretto.NewCache(&ristretto.Config{
Expand All @@ -106,7 +114,7 @@ func NewStorage(backingDB enclavedb.EnclaveDB, chainConfig *params.ChainConfig,
ristrettoStore := ristretto_store.NewRistretto(ristrettoCache)
return &storageImpl{
db: backingDB,
stateDB: stateDB,
stateCache: stateDB,
chainConfig: chainConfig,
blockCache: cache.New[*types.Block](ristrettoStore),
batchCacheBySeqNo: cache.New[*core.Batch](ristrettoStore),
Expand All @@ -117,11 +125,11 @@ func NewStorage(backingDB enclavedb.EnclaveDB, chainConfig *params.ChainConfig,
}

func (s *storageImpl) TrieDB() *triedb.Database {
return s.stateDB.TrieDB()
return s.stateCache.TrieDB()
}

func (s *storageImpl) StateDB() state.Database {
return s.stateDB
return s.stateCache
}

func (s *storageImpl) Close() error {
Expand Down Expand Up @@ -325,16 +333,16 @@ func (s *storageImpl) CreateStateDB(ctx context.Context, batchHash common.L2Batc
return nil, err
}

statedb, err := state.New(batch.Header.Root, s.stateDB, nil)
statedb, err := state.New(batch.Header.Root, s.stateCache, nil)
if err != nil {
return nil, syserr.NewInternalError(fmt.Errorf("could not create state DB for %s. Cause: %w", batch.Header.Root, err))
return nil, fmt.Errorf("could not create state DB for %s. Cause: %w", batch.Header.Root, err)
}
return statedb, nil
}

func (s *storageImpl) EmptyStateDB() (*state.StateDB, error) {
defer s.logDuration("EmptyStateDB", measure.NewStopwatch())
statedb, err := state.New(types.EmptyRootHash, s.stateDB, nil)
statedb, err := state.New(types.EmptyRootHash, s.stateCache, nil)
if err != nil {
return nil, fmt.Errorf("could not create state DB. Cause: %w", err)
}
Expand Down

0 comments on commit ce8d274

Please sign in to comment.