diff --git a/go.mod b/go.mod index 6aff32e3..4c42139b 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( cosmossdk.io/x/feegrant v0.1.0 cosmossdk.io/x/tx v0.13.3 cosmossdk.io/x/upgrade v0.1.3 + github.com/allegro/bigcache/v3 v3.1.0 github.com/cometbft/cometbft v0.38.10 github.com/cosmos/cosmos-db v1.0.2 github.com/cosmos/cosmos-proto v1.0.0-beta.5 @@ -72,7 +73,6 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect - github.com/allegro/bigcache/v3 v3.1.0 // indirect github.com/avast/retry-go/v4 v4.5.1 // indirect github.com/aws/aws-sdk-go v1.44.312 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/indexer/abci.go b/indexer/abci.go index 07948502..36ed4a6c 100644 --- a/indexer/abci.go +++ b/indexer/abci.go @@ -29,11 +29,8 @@ func (e *EVMIndexerImpl) ListenCommit(ctx context.Context, res abci.ResponseComm func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { sdkCtx := sdk.UnwrapSDKContext(ctx) - batch := e.db.NewBatch() - defer batch.Close() - // compute base fee from the opChild gas prices - baseFee, err := e.baseFee(ctx) + baseFee, err := e.baseFee(sdkCtx) if err != nil { e.logger.Error("failed to get base fee", "err", err) return err @@ -67,11 +64,11 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque // index tx hash cosmosTxHash := comettypes.Tx(txBytes).Hash() - if err := e.TxHashToCosmosTxHash.Set(ctx, ethTx.Hash().Bytes(), cosmosTxHash); err != nil { + if err := e.TxHashToCosmosTxHash.Set(sdkCtx, ethTx.Hash().Bytes(), cosmosTxHash); err != nil { e.logger.Error("failed to store tx hash to cosmos tx hash", "err", err) return err } - if err := e.CosmosTxHashToTxHash.Set(ctx, cosmosTxHash, ethTx.Hash().Bytes()); err != nil { + if err := e.CosmosTxHashToTxHash.Set(sdkCtx, cosmosTxHash, ethTx.Hash().Bytes()); err != nil { e.logger.Error("failed to store cosmos tx hash to tx hash", "err", err) return err } @@ -147,17 +144,17 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque // store tx rpcTx := rpctypes.NewRPCTransaction(ethTx, blockHash, uint64(blockHeight), uint64(receipt.TransactionIndex), chainId) - if err := e.TxMap.Set(ctx, txHash.Bytes(), *rpcTx); err != nil { + if err := e.TxMap.Set(sdkCtx, txHash.Bytes(), *rpcTx); err != nil { e.logger.Error("failed to store rpcTx", "err", err) return err } - if err := e.TxReceiptMap.Set(ctx, txHash.Bytes(), *receipt); err != nil { + if err := e.TxReceiptMap.Set(sdkCtx, txHash.Bytes(), *receipt); err != nil { e.logger.Error("failed to store tx receipt", "err", err) return err } // store index - if err := e.BlockAndIndexToTxHashMap.Set(ctx, collections.Join(uint64(blockHeight), uint64(receipt.TransactionIndex)), txHash.Bytes()); err != nil { + if err := e.BlockAndIndexToTxHashMap.Set(sdkCtx, collections.Join(uint64(blockHeight), uint64(receipt.TransactionIndex)), txHash.Bytes()); err != nil { e.logger.Error("failed to store blockAndIndexToTxHash", "err", err) return err } @@ -181,11 +178,11 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque } // index block header - if err := e.BlockHeaderMap.Set(ctx, uint64(blockHeight), blockHeader); err != nil { + if err := e.BlockHeaderMap.Set(sdkCtx, uint64(blockHeight), blockHeader); err != nil { e.logger.Error("failed to marshal blockHeader", "err", err) return err } - if err := e.BlockHashToNumberMap.Set(ctx, blockHash.Bytes(), uint64(blockHeight)); err != nil { + if err := e.BlockHashToNumberMap.Set(sdkCtx, blockHash.Bytes(), uint64(blockHeight)); err != nil { e.logger.Error("failed to store blockHashToNumber", "err", err) return err } diff --git a/indexer/indexer.go b/indexer/indexer.go index 7761bea1..bdcca382 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -17,7 +17,7 @@ import ( coretypes "github.com/ethereum/go-ethereum/core/types" opchildkeeper "github.com/initia-labs/OPinit/x/opchild/keeper" - "github.com/initia-labs/kvindexer/store" + rpctypes "github.com/initia-labs/minievm/jsonrpc/types" evmkeeper "github.com/initia-labs/minievm/x/evm/keeper" ) @@ -55,7 +55,7 @@ type EVMIndexerImpl struct { txConfig client.TxConfig appCodec codec.Codec - store *store.CacheStore + store *CacheStore evmKeeper *evmkeeper.Keeper opChildKeeper *opchildkeeper.Keeper @@ -82,7 +82,7 @@ func NewEVMIndexer( opChildKeeper *opchildkeeper.Keeper, ) (EVMIndexer, error) { // TODO make cache size configurable - store := store.NewCacheStore(dbadapter.Store{DB: db}, 100) + store := NewCacheStore(dbadapter.Store{DB: db}, 100) sb := collections.NewSchemaBuilderFromAccessor( func(ctx context.Context) corestoretypes.KVStore { return store diff --git a/indexer/store.go b/indexer/store.go new file mode 100644 index 00000000..8b5747d3 --- /dev/null +++ b/indexer/store.go @@ -0,0 +1,123 @@ +package indexer + +import ( + "context" + + corestoretypes "cosmossdk.io/core/store" + "cosmossdk.io/errors" + cachekv "cosmossdk.io/store/cachekv" + storetypes "cosmossdk.io/store/types" + bigcache "github.com/allegro/bigcache/v3" +) + +var _ corestoretypes.KVStore = (*CacheStore)(nil) + +type CacheStore struct { + store storetypes.CacheKVStore + cache *bigcache.BigCache +} + +func NewCacheStore(store storetypes.KVStore, capacity int) *CacheStore { + // default with no eviction and custom hard max cache capacity + cacheCfg := bigcache.DefaultConfig(0) + cacheCfg.Verbose = false + cacheCfg.HardMaxCacheSize = capacity + + cache, err := bigcache.New(context.Background(), cacheCfg) + if err != nil { + panic(err) + } + + return &CacheStore{ + store: cachekv.NewStore(store), + cache: cache, + } +} + +// Get returns nil iff key doesn't exist. Errors on nil key. +func (c CacheStore) Get(key []byte) ([]byte, error) { + storetypes.AssertValidKey(key) + + if value, err := c.cache.Get(string(key)); err == nil { + return value, nil + } + + // get from store and write to cache + value := c.store.Get(key) + if value == nil { + return nil, nil + } + + // ignore cache error + _ = c.cache.Set(string(key), value) + + return value, nil +} + +// Has checks if a key exists. Errors on nil key. +func (c CacheStore) Has(key []byte) (bool, error) { + _, err := c.cache.Get(string(key)) + if err == nil { + return true, nil + } + + value := c.store.Get(key) + if value == nil { + return false, nil + } + + // ignore cache error + _ = c.cache.Set(string(key), value) + + return true, nil +} + +// Set sets the key. Errors on nil key or value. +func (c CacheStore) Set(key, value []byte) error { + storetypes.AssertValidKey(key) + storetypes.AssertValidValue(value) + + err := c.cache.Set(string(key), value) + if err != nil { + return errors.Wrap(err, "failed to set cache") + } + c.store.Set(key, value) + + return nil +} + +// Delete deletes the key. Errors on nil key. +func (c CacheStore) Delete(key []byte) error { + storetypes.AssertValidKey(key) + + err := c.cache.Delete(string(key)) + if err != nil && errors.IsOf(err, bigcache.ErrEntryNotFound) { + return errors.Wrap(err, "failed to delete cache") + } + c.store.Delete(key) + + return nil +} + +// Iterator iterates over a domain of keys in ascending order. End is exclusive. +// Start must be less than end, or the Iterator is invalid. +// Iterator must be closed by caller. +// To iterate over entire domain, use store.Iterator(nil, nil) +// CONTRACT: No writes may happen within a domain while an iterator exists over it. +// Exceptionally allowed for cachekv.Store, safe to write in the modules. +func (c CacheStore) Iterator(start, end []byte) (storetypes.Iterator, error) { + return c.store.Iterator(start, end), nil +} + +// ReverseIterator iterates over a domain of keys in descending order. End is exclusive. +// Start must be less than end, or the Iterator is invalid. +// Iterator must be closed by caller. +// CONTRACT: No writes may happen within a domain while an iterator exists over it. +// Exceptionally allowed for cachekv.Store, safe to write in the modules. +func (c CacheStore) ReverseIterator(start, end []byte) (storetypes.Iterator, error) { + return c.store.ReverseIterator(start, end), nil +} + +func (c CacheStore) Write() { + c.store.Write() +} diff --git a/indexer/store_test.go b/indexer/store_test.go new file mode 100644 index 00000000..8b823c4d --- /dev/null +++ b/indexer/store_test.go @@ -0,0 +1,50 @@ +package indexer + +import ( + "testing" + + "cosmossdk.io/store/dbadapter" + dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/require" +) + +func Test_StoreIO(t *testing.T) { + db := dbm.NewMemDB() + store := NewCacheStore(dbadapter.Store{DB: db}, 100) + + key := []byte("key") + value := []byte("value") + + // case 1. key not exists + ok, err := store.Has(key) + require.NoError(t, err) + require.False(t, ok) + + bz, err := store.Get(key) + require.NoError(t, err) + require.Nil(t, bz) + + // case 2. set key + err = store.Set(key, value) + require.NoError(t, err) + + ok, err = store.Has(key) + require.NoError(t, err) + require.True(t, ok) + + bz, err = store.Get(key) + require.NoError(t, err) + require.Equal(t, value, bz) + + // case 3. delete key + err = store.Delete(key) + require.NoError(t, err) + + ok, err = store.Has(key) + require.NoError(t, err) + require.False(t, ok) + + bz, err = store.Get(key) + require.NoError(t, err) + require.Nil(t, bz) +} diff --git a/indexer/utils.go b/indexer/utils.go index 33d642f0..be7c3715 100644 --- a/indexer/utils.go +++ b/indexer/utils.go @@ -80,14 +80,11 @@ func CollJsonVal[T any]() collcodec.ValueCodec[T] { type collJsonVal[T any] struct{} func (c collJsonVal[T]) Encode(value T) ([]byte, error) { - return json.Marshal(value) + return c.EncodeJSON(value) } func (c collJsonVal[T]) Decode(b []byte) (T, error) { - var value T - - err := json.Unmarshal(b, &value) - return value, err + return c.DecodeJSON(b) } func (c collJsonVal[T]) EncodeJSON(value T) ([]byte, error) { diff --git a/indexer/utils_test.go b/indexer/utils_test.go index b96e9a82..4fd80c94 100644 --- a/indexer/utils_test.go +++ b/indexer/utils_test.go @@ -35,3 +35,25 @@ func Test_UnpackData(t *testing.T) { require.NoError(t, err) require.Equal(t, resp, respOut) } + +func Test_collJsonVal(t *testing.T) { + type s1 struct { + A string `json:"a"` + B uint64 `json:"b"` + } + + codec := collJsonVal[s1]{} + bz, err := codec.Encode(s1{ + A: "a", + B: 1, + }) + require.NoError(t, err) + require.Equal(t, `{"a":"a","b":1}`, string(bz)) + + out, err := codec.Decode(bz) + require.NoError(t, err) + require.Equal(t, s1{ + A: "a", + B: 1, + }, out) +}