diff --git a/indexer/abci.go b/indexer/abci.go index 1aa68eb..d24b9c0 100644 --- a/indexer/abci.go +++ b/indexer/abci.go @@ -210,5 +210,10 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque // return err // } + // execute pruning only if retain height is set + if e.retainHeight > 0 { + e.doPrune(ctx, uint64(blockHeight)) + } + return nil } diff --git a/indexer/indexer.go b/indexer/indexer.go index 0bc6d95..20d8d89 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "sync/atomic" "time" "github.com/jellydator/ttlcache/v3" @@ -14,6 +15,7 @@ import ( dbm "github.com/cosmos/cosmos-db" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" "github.com/ethereum/go-ethereum/common" @@ -58,7 +60,9 @@ type EVMIndexer interface { // EVMIndexerImpl implements EVMIndexer. type EVMIndexerImpl struct { - enabled bool + enabled bool + retainHeight uint64 + pruningRunning *atomic.Bool db dbm.DB logger log.Logger @@ -68,14 +72,18 @@ type EVMIndexerImpl struct { store *CacheStore evmKeeper *evmkeeper.Keeper - schema collections.Schema - TxMap collections.Map[[]byte, rpctypes.RPCTransaction] - TxReceiptMap collections.Map[[]byte, coretypes.Receipt] + schema collections.Schema + + // blocks BlockHeaderMap collections.Map[uint64, coretypes.Header] BlockAndIndexToTxHashMap collections.Map[collections.Pair[uint64, uint64], []byte] BlockHashToNumberMap collections.Map[[]byte, uint64] - TxHashToCosmosTxHash collections.Map[[]byte, []byte] - CosmosTxHashToTxHash collections.Map[[]byte, []byte] + + // txs + TxMap collections.Map[[]byte, rpctypes.RPCTransaction] + TxReceiptMap collections.Map[[]byte, coretypes.Receipt] + TxHashToCosmosTxHash collections.Map[[]byte, []byte] + CosmosTxHashToTxHash collections.Map[[]byte, []byte] blockChans []chan *coretypes.Header logsChans []chan []*coretypes.Log @@ -100,13 +108,20 @@ func NewEVMIndexer( store := NewCacheStore(dbadapter.Store{DB: db}, cfg.IndexerCacheSize) sb := collections.NewSchemaBuilderFromAccessor( func(ctx context.Context) corestoretypes.KVStore { + // if there is prune store in context, use it + if pruneStore := sdk.UnwrapSDKContext(ctx).Value(pruneStoreKey); pruneStore != nil { + return pruneStore.(corestoretypes.KVStore) + } + return store }, ) - logger.Info("EVM Indexer", "enable", !cfg.DisableIndexer) + logger.Info("EVM Indexer", "enable", !cfg.IndexerDisable) indexer := &EVMIndexerImpl{ - enabled: !cfg.DisableIndexer, + enabled: !cfg.IndexerDisable, + retainHeight: cfg.IndexerRetainHeight, + pruningRunning: &atomic.Bool{}, db: db, store: store, diff --git a/indexer/prune.go b/indexer/prune.go new file mode 100644 index 0000000..777cc17 --- /dev/null +++ b/indexer/prune.go @@ -0,0 +1,152 @@ +package indexer + +import ( + "context" + + "cosmossdk.io/collections" + storetypes "cosmossdk.io/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "golang.org/x/sync/errgroup" + + "github.com/ethereum/go-ethereum/common" + coretypes "github.com/ethereum/go-ethereum/core/types" +) + +const ( + pruneStoreKey = iota +) + +// doPrune triggers pruning in a goroutine. If pruning is already running, +// it does nothing. +func (e *EVMIndexerImpl) doPrune(ctx context.Context, height uint64) { + if running := e.pruningRunning.Swap(true); running { + return + } + + go func(ctx context.Context, height uint64) { + defer e.pruningRunning.Store(false) + if err := e.prune(ctx, height); err != nil { + e.logger.Error("failed to prune", "err", err) + } + + e.logger.Debug("prune finished", "height", height) + }(ctx, height) +} + +// prune removes old blocks and transactions from the indexer. +func (e *EVMIndexerImpl) prune(ctx context.Context, curHeight uint64) error { + // use branch context to perform batch operations + batchStore := e.store.store.CacheWrap() + ctx = sdk.UnwrapSDKContext(ctx).WithValue(pruneStoreKey, newCoreKVStore(interface{}(batchStore).(storetypes.KVStore))) + + minHeight := curHeight - e.retainHeight + if minHeight <= 0 || minHeight >= curHeight { + return nil + } + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return e.pruneBlocks(ctx, minHeight) + }) + g.Go(func() error { + return e.pruneTxs(ctx, minHeight) + }) + + if err := g.Wait(); err != nil { + return err + } + + // write batch + batchStore.Write() + + return nil +} + +// pruneBlocks removes old block headers from the indexer. +func (e *EVMIndexerImpl) pruneBlocks(ctx context.Context, minHeight uint64) error { + // record block hashes + var blockHashes []common.Hash + rn := new(collections.Range[uint64]).StartInclusive(1).EndInclusive(minHeight) + err := e.BlockHeaderMap.Walk(ctx, rn, func(key uint64, value coretypes.Header) (stop bool, err error) { + blockHashes = append(blockHashes, value.Hash()) + return false, nil + }) + if err != nil { + return err + } + + // clear block headers within range + err = e.BlockHeaderMap.Clear(ctx, rn) + if err != nil { + return err + } + + // clear block hash to number map + for _, blockHash := range blockHashes { + if err := e.BlockHashToNumberMap.Remove(ctx, blockHash.Bytes()); err != nil { + return err + } + } + + return nil +} + +// pruneTxs removes old transactions from the indexer. +func (e *EVMIndexerImpl) pruneTxs(ctx context.Context, minHeight uint64) error { + // record tx hashes + var txHashes []common.Hash + rnPair := collections.NewPrefixUntilPairRange[uint64, uint64](minHeight) + err := e.BlockAndIndexToTxHashMap.Walk(ctx, rnPair, func(key collections.Pair[uint64, uint64], txHashBz []byte) (bool, error) { + txHash := common.BytesToHash(txHashBz) + txHashes = append(txHashes, txHash) + return false, nil + }) + if err != nil { + return err + } + + // clear block txs within range + err = e.BlockAndIndexToTxHashMap.Clear(ctx, rnPair) + if err != nil { + return err + } + + // clear txs and receipts and cosmos tx hash mappings + for _, txHash := range txHashes { + if err := e.TxMap.Remove(ctx, txHash.Bytes()); err != nil { + return err + } + if err := e.TxReceiptMap.Remove(ctx, txHash.Bytes()); err != nil { + return err + } + if cosmosTxHash, err := e.TxHashToCosmosTxHash.Get(ctx, txHash.Bytes()); err == nil { + if err := e.TxHashToCosmosTxHash.Remove(ctx, txHash.Bytes()); err != nil { + return err + } + if err := e.CosmosTxHashToTxHash.Remove(ctx, cosmosTxHash); err != nil { + return err + } + } else if err != collections.ErrNotFound { + return err + } + } + + return nil +} + +//////////////////////// TESTING INTERFACE //////////////////////// + +// Set custom retain height +func (e *EVMIndexerImpl) SetRetainHeight(height uint64) { + e.retainHeight = height +} + +// Check if pruning is running +func (e *EVMIndexerImpl) IsPruningRunning() bool { + return e.pruningRunning.Load() +} + +// Clear cache for testing +func (e *EVMIndexerImpl) ClearCache() error { + return e.store.cache.Reset() +} diff --git a/indexer/prune_test.go b/indexer/prune_test.go new file mode 100644 index 0000000..cbee5da --- /dev/null +++ b/indexer/prune_test.go @@ -0,0 +1,109 @@ +package indexer_test + +import ( + "math/big" + "testing" + "time" + + "cosmossdk.io/collections" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + + evmindexer "github.com/initia-labs/minievm/indexer" + "github.com/initia-labs/minievm/tests" + evmtypes "github.com/initia-labs/minievm/x/evm/types" +) + +func Test_PruneIndexer(t *testing.T) { + app, addrs, privKeys := tests.CreateApp(t) + indexer := app.EVMIndexer().(*evmindexer.EVMIndexerImpl) + defer app.Close() + + // set retain height to 1, only last block is indexed + indexer.SetRetainHeight(1) + + tx, evmTxHash := tests.GenerateCreateERC20Tx(t, app, privKeys[0]) + _, finalizeRes := tests.ExecuteTxs(t, app, tx) + tests.CheckTxResult(t, finalizeRes.TxResults[0], true) + + events := finalizeRes.TxResults[0].Events + createEvent := events[len(events)-3] + require.Equal(t, evmtypes.EventTypeContractCreated, createEvent.GetType()) + + contractAddr, err := hexutil.Decode(createEvent.Attributes[0].Value) + require.NoError(t, err) + + // listen finalize block + ctx, err := app.CreateQueryContext(0, false) + require.NoError(t, err) + + // check the tx is indexed + evmTx, err := indexer.TxByHash(ctx, evmTxHash) + require.NoError(t, err) + require.NotNil(t, evmTx) + + // mint 1_000_000 tokens to the first address + tx, evmTxHash2 := tests.GenerateMintERC20Tx(t, app, privKeys[0], common.BytesToAddress(contractAddr), addrs[0], new(big.Int).SetUint64(1_000_000_000_000)) + finalizeReq, finalizeRes := tests.ExecuteTxs(t, app, tx) + tests.CheckTxResult(t, finalizeRes.TxResults[0], true) + + // wait for pruning + for { + if indexer.IsPruningRunning() { + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + + // clear cache + indexer.ClearCache() + + // listen finalize block + ctx, err = app.CreateQueryContext(0, false) + require.NoError(t, err) + + // check the block header is indexed + header, err := indexer.BlockHeaderByNumber(ctx, uint64(finalizeReq.Height)) + require.NoError(t, err) + require.NotNil(t, header) + require.Equal(t, finalizeReq.Height, header.Number.Int64()) + + // previous block should be pruned + header, err = indexer.BlockHeaderByNumber(ctx, uint64(finalizeReq.Height-1)) + require.ErrorIs(t, err, collections.ErrNotFound) + require.Nil(t, header) + + // check the tx is indexed + evmTx, err = indexer.TxByHash(ctx, evmTxHash2) + require.NoError(t, err) + require.NotNil(t, evmTx) + + // but the first tx should be pruned + evmTx, err = indexer.TxByHash(ctx, evmTxHash) + require.ErrorIs(t, err, collections.ErrNotFound) + require.Nil(t, evmTx) + + // check the receipt is indexed + receipt, err := indexer.TxReceiptByHash(ctx, evmTxHash2) + require.NoError(t, err) + require.NotNil(t, receipt) + + // check the receipt is pruned + _, err = indexer.TxReceiptByHash(ctx, evmTxHash) + require.ErrorIs(t, err, collections.ErrNotFound) + + // check cosmos tx hash is indexed + cosmosTxHash, err := indexer.CosmosTxHashByTxHash(ctx, evmTxHash2) + require.NoError(t, err) + require.NotNil(t, cosmosTxHash) + evmTxHash3, err := indexer.TxHashByCosmosTxHash(ctx, cosmosTxHash) + require.NoError(t, err) + require.Equal(t, evmTxHash2, evmTxHash3) + + // check cosmos tx hash is pruned + _, err = indexer.CosmosTxHashByTxHash(ctx, evmTxHash) + require.ErrorIs(t, err, collections.ErrNotFound) +} diff --git a/indexer/store.go b/indexer/store.go index 0e952f4..e7f08a4 100644 --- a/indexer/store.go +++ b/indexer/store.go @@ -4,6 +4,7 @@ import ( "context" corestoretypes "cosmossdk.io/core/store" + "cosmossdk.io/store" cachekv "cosmossdk.io/store/cachekv" storetypes "cosmossdk.io/store/types" @@ -119,3 +120,56 @@ func (c CacheStore) ReverseIterator(start, end []byte) (storetypes.Iterator, err func (c CacheStore) Write() { c.store.Write() } + +// coreKVStore is a wrapper of Core/Store kvstore interface +// Remove after https://github.com/cosmos/cosmos-sdk/issues/14714 is closed +type coreKVStore struct { + kvStore storetypes.KVStore +} + +// newCoreKVStore returns a wrapper of Core/Store kvstore interface +// Remove once store migrates to core/store kvstore interface +func newCoreKVStore(store storetypes.KVStore) corestoretypes.KVStore { + return coreKVStore{kvStore: store} +} + +// Get returns nil iff key doesn't exist. Errors on nil key. +func (store coreKVStore) Get(key []byte) ([]byte, error) { + return store.kvStore.Get(key), nil +} + +// Has checks if a key exists. Errors on nil key. +func (store coreKVStore) Has(key []byte) (bool, error) { + return store.kvStore.Has(key), nil +} + +// Set sets the key. Errors on nil key or value. +func (store coreKVStore) Set(key, value []byte) error { + store.kvStore.Set(key, value) + return nil +} + +// Delete deletes the key. Errors on nil key. +func (store coreKVStore) Delete(key []byte) error { + store.kvStore.Delete(key) + return nil +} + +// Iterator iterates over a domain of keys in ascending order. End is exclusive. +// Start must be less than end, or the Iterator is invalid. +// Iterator must be closed by caller. +// To iterate over entire domain, use store.Iterator(nil, nil) +// CONTRACT: No writes may happen within a domain while an iterator exists over it. +// Exceptionally allowed for cachekv.Store, safe to write in the modules. +func (store coreKVStore) Iterator(start, end []byte) (store.Iterator, error) { + return store.kvStore.Iterator(start, end), nil +} + +// ReverseIterator iterates over a domain of keys in descending order. End is exclusive. +// Start must be less than end, or the Iterator is invalid. +// Iterator must be closed by caller. +// CONTRACT: No writes may happen within a domain while an iterator exists over it. +// Exceptionally allowed for cachekv.Store, safe to write in the modules. +func (store coreKVStore) ReverseIterator(start, end []byte) (store.Iterator, error) { + return store.kvStore.ReverseIterator(start, end), nil +} diff --git a/x/evm/config/config.go b/x/evm/config/config.go index 1cde2a3..2ce6e8e 100644 --- a/x/evm/config/config.go +++ b/x/evm/config/config.go @@ -10,34 +10,41 @@ import ( const ( // DefaultContractSimulationGasLimit - default max simulation gas DefaultContractSimulationGasLimit = uint64(3_000_000) - // DefaultDisableIndexer is the default flag to disable indexer - DefaultDisableIndexer = false + // DefaultIndexerDisable is the default flag to disable indexer + DefaultIndexerDisable = false // DefaultIndexerCacheSize is the default maximum size (MiB) of the cache. DefaultIndexerCacheSize = 100 + // DefaultIndexerRetainHeight is the default height to retain indexer data. + DefaultIndexerRetainHeight = uint64(0) ) const ( flagContractSimulationGasLimit = "evm.contract-simulation-gas-limit" - flagDisableIndexer = "evm.disable-indexer" + flagIndexerDisable = "evm.indexer-disable" flagIndexerCacheSize = "evm.indexer-cache-size" + flagIndexerRetainHeight = "evm.indexer-retain-height" ) // EVMConfig is the extra config required for evm type EVMConfig struct { // ContractSimulationGasLimit is the maximum gas amount can be used in a tx simulation call. ContractSimulationGasLimit uint64 `mapstructure:"contract-simulation-gas-limit"` - // DisableIndexer is the flag to disable indexer - DisableIndexer bool `mapstructure:"disable-indexer"` + // IndexerDisable is the flag to disable indexer + IndexerDisable bool `mapstructure:"indexer-disable"` // IndexerCacheSize is the maximum size (MiB) of the cache. IndexerCacheSize int `mapstructure:"indexer-cache-size"` + // IndexerRetainHeight is the height to retain indexer data. + // If 0, it will retain all data. + IndexerRetainHeight uint64 `mapstructure:"indexer-retain-height"` } // DefaultEVMConfig returns the default settings for EVMConfig func DefaultEVMConfig() EVMConfig { return EVMConfig{ ContractSimulationGasLimit: DefaultContractSimulationGasLimit, - DisableIndexer: DefaultDisableIndexer, + IndexerDisable: DefaultIndexerDisable, IndexerCacheSize: DefaultIndexerCacheSize, + IndexerRetainHeight: DefaultIndexerRetainHeight, } } @@ -45,16 +52,18 @@ func DefaultEVMConfig() EVMConfig { func GetConfig(appOpts servertypes.AppOptions) EVMConfig { return EVMConfig{ ContractSimulationGasLimit: cast.ToUint64(appOpts.Get(flagContractSimulationGasLimit)), - DisableIndexer: cast.ToBool(appOpts.Get(flagDisableIndexer)), + IndexerDisable: cast.ToBool(appOpts.Get(flagIndexerDisable)), IndexerCacheSize: cast.ToInt(appOpts.Get(flagIndexerCacheSize)), + IndexerRetainHeight: cast.ToUint64(appOpts.Get(flagIndexerRetainHeight)), } } // AddConfigFlags implements servertypes.EVMConfigFlags interface. func AddConfigFlags(startCmd *cobra.Command) { startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Maximum simulation gas amount for evm contract execution") - startCmd.Flags().Bool(flagDisableIndexer, DefaultDisableIndexer, "Disable evm indexer") + startCmd.Flags().Bool(flagIndexerDisable, DefaultIndexerDisable, "Disable evm indexer") startCmd.Flags().Int(flagIndexerCacheSize, DefaultIndexerCacheSize, "Maximum size (MiB) of the indexer cache") + startCmd.Flags().Uint64(flagIndexerRetainHeight, DefaultIndexerRetainHeight, "Height to retain indexer data") } // DefaultConfigTemplate default config template for evm @@ -68,10 +77,14 @@ const DefaultConfigTemplate = ` # The maximum gas amount can be used in a tx simulation call. contract-simulation-gas-limit = "{{ .EVMConfig.ContractSimulationGasLimit }}" -# DisableIndexer is the flag to disable indexer. If true, evm jsonrpc queries will return +# IndexerDisable is the flag to disable indexer. If true, evm jsonrpc queries will return # empty results for block, tx, and receipt queries. -disable-indexer = {{ .EVMConfig.DisableIndexer }} +indexer-disable = {{ .EVMConfig.IndexerDisable }} # IndexerCacheSize is the maximum size (MiB) of the cache for evm indexer. indexer-cache-size = {{ .EVMConfig.IndexerCacheSize }} + +# IndexerRetainHeight is the height to retain indexer data. +# If 0, it will retain all data. +indexer-retain-height = {{ .EVMConfig.IndexerRetainHeight }} `