Skip to content

Commit

Permalink
implement evm indexer pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 committed Dec 4, 2024
1 parent eb18a0c commit 365c859
Show file tree
Hide file tree
Showing 6 changed files with 366 additions and 18 deletions.
5 changes: 5 additions & 0 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,10 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
// return err
// }

// execute pruning only if retain height is set
if e.retainHeight > 0 {
e.doPrune(ctx, uint64(blockHeight))
}

return nil
}
31 changes: 23 additions & 8 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"sync/atomic"
"time"

"github.com/jellydator/ttlcache/v3"
Expand All @@ -14,6 +15,7 @@ import (
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -58,7 +60,9 @@ type EVMIndexer interface {

// EVMIndexerImpl implements EVMIndexer.
type EVMIndexerImpl struct {
enabled bool
enabled bool
retainHeight uint64
pruningRunning *atomic.Bool

db dbm.DB
logger log.Logger
Expand All @@ -68,14 +72,18 @@ type EVMIndexerImpl struct {
store *CacheStore
evmKeeper *evmkeeper.Keeper

schema collections.Schema
TxMap collections.Map[[]byte, rpctypes.RPCTransaction]
TxReceiptMap collections.Map[[]byte, coretypes.Receipt]
schema collections.Schema

// blocks
BlockHeaderMap collections.Map[uint64, coretypes.Header]
BlockAndIndexToTxHashMap collections.Map[collections.Pair[uint64, uint64], []byte]
BlockHashToNumberMap collections.Map[[]byte, uint64]
TxHashToCosmosTxHash collections.Map[[]byte, []byte]
CosmosTxHashToTxHash collections.Map[[]byte, []byte]

// txs
TxMap collections.Map[[]byte, rpctypes.RPCTransaction]
TxReceiptMap collections.Map[[]byte, coretypes.Receipt]
TxHashToCosmosTxHash collections.Map[[]byte, []byte]
CosmosTxHashToTxHash collections.Map[[]byte, []byte]

blockChans []chan *coretypes.Header
logsChans []chan []*coretypes.Log
Expand All @@ -100,13 +108,20 @@ func NewEVMIndexer(
store := NewCacheStore(dbadapter.Store{DB: db}, cfg.IndexerCacheSize)
sb := collections.NewSchemaBuilderFromAccessor(
func(ctx context.Context) corestoretypes.KVStore {
// if there is prune store in context, use it
if pruneStore := sdk.UnwrapSDKContext(ctx).Value(pruneStoreKey); pruneStore != nil {
return pruneStore.(corestoretypes.KVStore)
}

return store
},
)

logger.Info("EVM Indexer", "enable", !cfg.DisableIndexer)
logger.Info("EVM Indexer", "enable", !cfg.IndexerDisable)
indexer := &EVMIndexerImpl{
enabled: !cfg.DisableIndexer,
enabled: !cfg.IndexerDisable,
retainHeight: cfg.IndexerRetainHeight,
pruningRunning: &atomic.Bool{},

db: db,
store: store,
Expand Down
152 changes: 152 additions & 0 deletions indexer/prune.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package indexer

import (
"context"

"cosmossdk.io/collections"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/common"
coretypes "github.com/ethereum/go-ethereum/core/types"
)

const (
pruneStoreKey = iota
)

// doPrune triggers pruning in a goroutine. If pruning is already running,
// it does nothing.
func (e *EVMIndexerImpl) doPrune(ctx context.Context, height uint64) {
if running := e.pruningRunning.Swap(true); running {
return
}

go func(ctx context.Context, height uint64) {
defer e.pruningRunning.Store(false)
if err := e.prune(ctx, height); err != nil {
e.logger.Error("failed to prune", "err", err)
}

e.logger.Debug("prune finished", "height", height)
}(ctx, height)
}

// prune removes old blocks and transactions from the indexer.
func (e *EVMIndexerImpl) prune(ctx context.Context, curHeight uint64) error {
// use branch context to perform batch operations
batchStore := e.store.store.CacheWrap()
ctx = sdk.UnwrapSDKContext(ctx).WithValue(pruneStoreKey, newCoreKVStore(interface{}(batchStore).(storetypes.KVStore)))

minHeight := curHeight - e.retainHeight
if minHeight <= 0 || minHeight >= curHeight {
return nil
}

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return e.pruneBlocks(ctx, minHeight)
})
g.Go(func() error {
return e.pruneTxs(ctx, minHeight)
})

if err := g.Wait(); err != nil {
return err
}

// write batch
batchStore.Write()

return nil
}

// pruneBlocks removes old block headers from the indexer.
func (e *EVMIndexerImpl) pruneBlocks(ctx context.Context, minHeight uint64) error {
// record block hashes
var blockHashes []common.Hash
rn := new(collections.Range[uint64]).StartInclusive(1).EndInclusive(minHeight)
err := e.BlockHeaderMap.Walk(ctx, rn, func(key uint64, value coretypes.Header) (stop bool, err error) {
blockHashes = append(blockHashes, value.Hash())
return false, nil
})
if err != nil {
return err
}

// clear block headers within range
err = e.BlockHeaderMap.Clear(ctx, rn)
if err != nil {
return err
}

// clear block hash to number map
for _, blockHash := range blockHashes {
if err := e.BlockHashToNumberMap.Remove(ctx, blockHash.Bytes()); err != nil {
return err
}
}

return nil
}

// pruneTxs removes old transactions from the indexer.
func (e *EVMIndexerImpl) pruneTxs(ctx context.Context, minHeight uint64) error {
// record tx hashes
var txHashes []common.Hash
rnPair := collections.NewPrefixUntilPairRange[uint64, uint64](minHeight)
err := e.BlockAndIndexToTxHashMap.Walk(ctx, rnPair, func(key collections.Pair[uint64, uint64], txHashBz []byte) (bool, error) {
txHash := common.BytesToHash(txHashBz)
txHashes = append(txHashes, txHash)
return false, nil
})
if err != nil {
return err
}

// clear block txs within range
err = e.BlockAndIndexToTxHashMap.Clear(ctx, rnPair)
if err != nil {
return err
}

// clear txs and receipts and cosmos tx hash mappings
for _, txHash := range txHashes {
if err := e.TxMap.Remove(ctx, txHash.Bytes()); err != nil {
return err
}
if err := e.TxReceiptMap.Remove(ctx, txHash.Bytes()); err != nil {
return err
}
if cosmosTxHash, err := e.TxHashToCosmosTxHash.Get(ctx, txHash.Bytes()); err == nil {
if err := e.TxHashToCosmosTxHash.Remove(ctx, txHash.Bytes()); err != nil {
return err
}
if err := e.CosmosTxHashToTxHash.Remove(ctx, cosmosTxHash); err != nil {
return err
}
} else if err != collections.ErrNotFound {
return err
}
}

return nil
}

//////////////////////// TESTING INTERFACE ////////////////////////

// Set custom retain height
func (e *EVMIndexerImpl) SetRetainHeight(height uint64) {
e.retainHeight = height
}

// Check if pruning is running
func (e *EVMIndexerImpl) IsPruningRunning() bool {
return e.pruningRunning.Load()
}

// Clear cache for testing
func (e *EVMIndexerImpl) ClearCache() error {
return e.store.cache.Reset()
}
109 changes: 109 additions & 0 deletions indexer/prune_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package indexer_test

import (
"math/big"
"testing"
"time"

"cosmossdk.io/collections"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"

evmindexer "github.com/initia-labs/minievm/indexer"
"github.com/initia-labs/minievm/tests"
evmtypes "github.com/initia-labs/minievm/x/evm/types"
)

func Test_PruneIndexer(t *testing.T) {
app, addrs, privKeys := tests.CreateApp(t)
indexer := app.EVMIndexer().(*evmindexer.EVMIndexerImpl)
defer app.Close()

// set retain height to 1, only last block is indexed
indexer.SetRetainHeight(1)

tx, evmTxHash := tests.GenerateCreateERC20Tx(t, app, privKeys[0])
_, finalizeRes := tests.ExecuteTxs(t, app, tx)
tests.CheckTxResult(t, finalizeRes.TxResults[0], true)

events := finalizeRes.TxResults[0].Events
createEvent := events[len(events)-3]
require.Equal(t, evmtypes.EventTypeContractCreated, createEvent.GetType())

contractAddr, err := hexutil.Decode(createEvent.Attributes[0].Value)
require.NoError(t, err)

// listen finalize block
ctx, err := app.CreateQueryContext(0, false)
require.NoError(t, err)

// check the tx is indexed
evmTx, err := indexer.TxByHash(ctx, evmTxHash)
require.NoError(t, err)
require.NotNil(t, evmTx)

// mint 1_000_000 tokens to the first address
tx, evmTxHash2 := tests.GenerateMintERC20Tx(t, app, privKeys[0], common.BytesToAddress(contractAddr), addrs[0], new(big.Int).SetUint64(1_000_000_000_000))
finalizeReq, finalizeRes := tests.ExecuteTxs(t, app, tx)
tests.CheckTxResult(t, finalizeRes.TxResults[0], true)

// wait for pruning
for {
if indexer.IsPruningRunning() {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}

// clear cache
indexer.ClearCache()

// listen finalize block
ctx, err = app.CreateQueryContext(0, false)
require.NoError(t, err)

// check the block header is indexed
header, err := indexer.BlockHeaderByNumber(ctx, uint64(finalizeReq.Height))
require.NoError(t, err)
require.NotNil(t, header)
require.Equal(t, finalizeReq.Height, header.Number.Int64())

// previous block should be pruned
header, err = indexer.BlockHeaderByNumber(ctx, uint64(finalizeReq.Height-1))
require.ErrorIs(t, err, collections.ErrNotFound)
require.Nil(t, header)

// check the tx is indexed
evmTx, err = indexer.TxByHash(ctx, evmTxHash2)
require.NoError(t, err)
require.NotNil(t, evmTx)

// but the first tx should be pruned
evmTx, err = indexer.TxByHash(ctx, evmTxHash)
require.ErrorIs(t, err, collections.ErrNotFound)
require.Nil(t, evmTx)

// check the receipt is indexed
receipt, err := indexer.TxReceiptByHash(ctx, evmTxHash2)
require.NoError(t, err)
require.NotNil(t, receipt)

// check the receipt is pruned
_, err = indexer.TxReceiptByHash(ctx, evmTxHash)
require.ErrorIs(t, err, collections.ErrNotFound)

// check cosmos tx hash is indexed
cosmosTxHash, err := indexer.CosmosTxHashByTxHash(ctx, evmTxHash2)
require.NoError(t, err)
require.NotNil(t, cosmosTxHash)
evmTxHash3, err := indexer.TxHashByCosmosTxHash(ctx, cosmosTxHash)
require.NoError(t, err)
require.Equal(t, evmTxHash2, evmTxHash3)

// check cosmos tx hash is pruned
_, err = indexer.CosmosTxHashByTxHash(ctx, evmTxHash)
require.ErrorIs(t, err, collections.ErrNotFound)
}
Loading

0 comments on commit 365c859

Please sign in to comment.