diff --git a/indexer/indexer.go b/indexer/indexer.go index bdcca38..b7a15b5 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -19,6 +19,7 @@ import ( opchildkeeper "github.com/initia-labs/OPinit/x/opchild/keeper" rpctypes "github.com/initia-labs/minievm/jsonrpc/types" + evmconfig "github.com/initia-labs/minievm/x/evm/config" evmkeeper "github.com/initia-labs/minievm/x/evm/keeper" ) @@ -81,8 +82,12 @@ func NewEVMIndexer( evmKeeper *evmkeeper.Keeper, opChildKeeper *opchildkeeper.Keeper, ) (EVMIndexer, error) { - // TODO make cache size configurable - store := NewCacheStore(dbadapter.Store{DB: db}, 100) + cfg := evmKeeper.Config() + if cfg.IndexerCacheSize == 0 { + cfg.IndexerCacheSize = evmconfig.DefaultIndexerCacheSize + } + + store := NewCacheStore(dbadapter.Store{DB: db}, cfg.IndexerCacheSize) sb := collections.NewSchemaBuilderFromAccessor( func(ctx context.Context) corestoretypes.KVStore { return store diff --git a/indexer/store.go b/indexer/store.go index 8b5747d..dee7636 100644 --- a/indexer/store.go +++ b/indexer/store.go @@ -7,6 +7,7 @@ import ( "cosmossdk.io/errors" cachekv "cosmossdk.io/store/cachekv" storetypes "cosmossdk.io/store/types" + bigcache "github.com/allegro/bigcache/v3" ) diff --git a/jsonrpc/backend/backend.go b/jsonrpc/backend/backend.go index 1c87b14..5bafcf8 100644 --- a/jsonrpc/backend/backend.go +++ b/jsonrpc/backend/backend.go @@ -2,6 +2,9 @@ package backend import ( "context" + "sync" + + bigcache "github.com/allegro/bigcache/v3" "cosmossdk.io/log" "github.com/cosmos/cosmos-sdk/client" @@ -15,11 +18,14 @@ type JSONRPCBackend struct { app *app.MinitiaApp logger log.Logger + queuedTxs *bigcache.BigCache + sendTxMut sync.Mutex + + ctx context.Context svrCtx *server.Context clientCtx client.Context - cfg config.JSONRPCConfig - ctx context.Context + cfg config.JSONRPCConfig } // NewJSONRPCBackend creates a new JSONRPCBackend instance @@ -29,9 +35,32 @@ func NewJSONRPCBackend( svrCtx *server.Context, clientCtx client.Context, cfg config.JSONRPCConfig, -) *JSONRPCBackend { +) (*JSONRPCBackend, error) { + + if cfg.QueuedTransactionCap == 0 { + cfg.QueuedTransactionCap = config.DefaultQueuedTransactionCap + } + if cfg.QueuedTransactionTTL == 0 { + cfg.QueuedTransactionTTL = config.DefaultQueuedTransactionTTL + } + + cacheConfig := bigcache.DefaultConfig(cfg.QueuedTransactionTTL) + cacheConfig.HardMaxCacheSize = cfg.QueuedTransactionCap + ctx := context.Background() - return &JSONRPCBackend{ - app, logger, svrCtx, clientCtx, cfg, ctx, + + queuedTxs, err := bigcache.New(ctx, cacheConfig) + if err != nil { + return nil, err } + + return &JSONRPCBackend{ + app: app, + logger: logger, + queuedTxs: queuedTxs, + ctx: ctx, + svrCtx: svrCtx, + clientCtx: clientCtx, + cfg: cfg, + }, nil } diff --git a/jsonrpc/backend/errors.go b/jsonrpc/backend/errors.go index 346f7cf..4295efc 100644 --- a/jsonrpc/backend/errors.go +++ b/jsonrpc/backend/errors.go @@ -20,3 +20,20 @@ func (e *TxIndexingError) ErrorCode() int { // ErrorData returns the hex encoded revert reason. func (e *TxIndexingError) ErrorData() interface{} { return "transaction indexing is in progress" } + +type InternalError struct { + msg string +} + +func NewInternalError(msg string) *InternalError { + return &InternalError{msg: msg} +} + +func (e *InternalError) Error() string { + return "internal jsonrpc error: " + e.msg +} + +func (e *InternalError) ErrorCode() int { + // Internal JSON-RPC error + return -32603 +} diff --git a/jsonrpc/backend/tx.go b/jsonrpc/backend/tx.go index b0516d8..531fa9f 100644 --- a/jsonrpc/backend/tx.go +++ b/jsonrpc/backend/tx.go @@ -3,6 +3,7 @@ package backend import ( "context" "errors" + "fmt" "cosmossdk.io/collections" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -35,6 +36,9 @@ func (b *JSONRPCBackend) SendRawTransaction(input hexutil.Bytes) (common.Hash, e } func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error { + b.sendTxMut.Lock() + defer b.sendTxMut.Unlock() + queryCtx, err := b.getQueryCtx() if err != nil { return err @@ -45,23 +49,61 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error { return err } - if authTx, ok := cosmosTx.(authsigning.Tx); ok { - if sigs, err := authTx.GetSignaturesV2(); err == nil && len(sigs) > 0 { - b.logger.Debug("eth_sendTx", "sequence", sigs[0].Sequence) - } - } - txBytes, err := b.app.TxEncode(cosmosTx) if err != nil { return err } - res, err := b.clientCtx.BroadcastTxSync(txBytes) - if err != nil { - return err - } - if res.Code != 0 { - return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog) + authTx, ok := cosmosTx.(authsigning.Tx) + if !ok { + return NewInternalError("failed to convert cosmosTx to authsigning.Tx") + } + + sigs, err := authTx.GetSignaturesV2() + if err != nil || len(sigs) != 1 { + b.logger.Error("failed to get signatures from authsigning.Tx", "err", err) + return NewInternalError("failed to get signatures from authsigning.Tx") + } + + sig := sigs[0] + txSeq := sig.Sequence + accSeq := uint64(0) + sender := sdk.AccAddress(sig.PubKey.Address().Bytes()) + + checkCtx := b.app.GetContextForCheckTx(nil) + if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil { + accSeq = acc.GetSequence() + } + + b.logger.Debug("enqueue tx", "sender", sender, "txSeq", txSeq, "accSeq", accSeq) + cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), txSeq) + if err := b.queuedTxs.Set(cacheKey, txBytes); err != nil { + b.logger.Error("failed to enqueue tx", "key", cacheKey, "err", err) + return NewInternalError("failed to enqueue tx") + } + + // check if there are queued txs which can be sent + for { + cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), accSeq) + if txBytes, err := b.queuedTxs.Get(cacheKey); err == nil { + if err := b.queuedTxs.Delete(cacheKey); err != nil { + b.logger.Error("failed to delete queued tx", "key", cacheKey, "err", err) + return NewInternalError("failed to delete queued tx") + } + + b.logger.Debug("broadcast queued tx", "sender", sender, "txSeq", accSeq) + res, err := b.clientCtx.BroadcastTxSync(txBytes) + if err != nil { + return err + } + if res.Code != 0 { + return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog) + } + } else { + break + } + + accSeq++ } return nil diff --git a/jsonrpc/config/config.go b/jsonrpc/config/config.go index 6882e4b..bb382d6 100644 --- a/jsonrpc/config/config.go +++ b/jsonrpc/config/config.go @@ -38,6 +38,11 @@ const ( DefaultBatchRequestLimit = 1000 // DefaultBatchResponseMaxSize is the default maximum number of bytes returned from a batched call DefaultBatchResponseMaxSize = 25 * 1000 * 1000 + // DefaultQueuedTransactionCap is the default maximum size (MiB) of queued transactions that can be in the transaction pool. + DefaultQueuedTransactionCap = 100 + // DefaultQueuedTransactionTTL is the default maximum time a queued transaction can + // remain in the transaction pool before being evicted. + DefaultQueuedTransactionTTL = 1 * time.Minute ) var ( @@ -60,6 +65,8 @@ const ( flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections" flagJSONRPCBatchRequestLimit = "json-rpc.batch-request-limit" flagJSONRPCBatchResponseMaxSize = "json-rpc.batch-response-max-size" + flagJSONRPCQueuedTransactionCap = "json-rpc.queued-transaction-cap" + flagJSONRPCQueuedTransactionTTL = "json-rpc.queued-transaction-ttl" ) // JSONRPCConfig defines configuration for the EVM RPC server. @@ -91,24 +98,36 @@ type JSONRPCConfig struct { BatchRequestLimit int `mapstructure:"batch-request-limit"` // Maximum number of bytes returned from a batched call BatchResponseMaxSize int `mapstructure:"batch-response-max-size"` + // QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool. + QueuedTransactionCap int `mapstructure:"queued-transaction-cap"` + // QueuedTransactionTTL is the maximum time a queued transaction can + // remain in the transaction pool before being evicted. + QueuedTransactionTTL time.Duration `mapstructure:"queued-transaction-ttl"` } // DefaultJSONRPCConfig returns a default configuration for the EVM RPC server. func DefaultJSONRPCConfig() JSONRPCConfig { return JSONRPCConfig{ - Enable: DefaultEnable, - Address: DefaultAddress, - EnableWS: DefaultEnable, - AddressWS: DefaultAddressWS, - EnableUnsafeCORS: DefaultEnableUnsafeCORS, - APIs: DefaultAPIs, - FilterCap: DefaultFilterCap, - BlockRangeCap: DefaultBlockRangeCap, - HTTPTimeout: DefaultHTTPTimeout, - HTTPIdleTimeout: DefaultHTTPIdleTimeout, - MaxOpenConnections: DefaultMaxOpenConnections, + Enable: DefaultEnable, + Address: DefaultAddress, + EnableWS: DefaultEnable, + AddressWS: DefaultAddressWS, + EnableUnsafeCORS: DefaultEnableUnsafeCORS, + + APIs: DefaultAPIs, + + FilterCap: DefaultFilterCap, + BlockRangeCap: DefaultBlockRangeCap, + + HTTPTimeout: DefaultHTTPTimeout, + HTTPIdleTimeout: DefaultHTTPIdleTimeout, + MaxOpenConnections: DefaultMaxOpenConnections, + BatchRequestLimit: DefaultBatchRequestLimit, BatchResponseMaxSize: DefaultBatchResponseMaxSize, + + QueuedTransactionCap: DefaultQueuedTransactionCap, + QueuedTransactionTTL: DefaultQueuedTransactionTTL, } } @@ -127,6 +146,8 @@ func AddConfigFlags(startCmd *cobra.Command) { startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener") startCmd.Flags().Int(flagJSONRPCBatchRequestLimit, DefaultBatchRequestLimit, "Maximum number of requests in a batch") startCmd.Flags().Int(flagJSONRPCBatchResponseMaxSize, DefaultBatchResponseMaxSize, "Maximum number of bytes returned from a batched call") + startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum size (MiB) of queued transactions that can be in the transaction pool") + startCmd.Flags().Duration(flagJSONRPCQueuedTransactionTTL, DefaultQueuedTransactionTTL, "Maximum time a queued transaction can remain in the transaction pool before being evicted") } // GetConfig load config values from the app options @@ -145,6 +166,8 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig { MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)), BatchRequestLimit: cast.ToInt(appOpts.Get(flagJSONRPCBatchRequestLimit)), BatchResponseMaxSize: cast.ToInt(appOpts.Get(flagJSONRPCBatchResponseMaxSize)), + QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)), + QueuedTransactionTTL: cast.ToDuration(appOpts.Get(flagJSONRPCQueuedTransactionTTL)), } } @@ -196,4 +219,11 @@ batch-request-limit = {{ .JSONRPCConfig.BatchRequestLimit }} # Maximum number of bytes returned from a batched call batch-response-max-size = {{ .JSONRPCConfig.BatchResponseMaxSize }} + +# QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool. +queued-transaction-cap = {{ .JSONRPCConfig.QueuedTransactionCap }} + +# QueuedTransactionTTL is the maximum time a queued transaction can +# remain in the transaction pool before being evicted. +queued-transaction-ttl = "{{ .JSONRPCConfig.QueuedTransactionTTL }}" ` diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index aa92854..88a7e9b 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -63,7 +63,12 @@ func StartJSONRPC( rpcServer := rpc.NewServer() rpcServer.SetBatchLimits(jsonRPCConfig.BatchRequestLimit, jsonRPCConfig.BatchResponseMaxSize) - bkd := backend.NewJSONRPCBackend(app, logger, svrCtx, clientCtx, jsonRPCConfig) + + bkd, err := backend.NewJSONRPCBackend(app, logger, svrCtx, clientCtx, jsonRPCConfig) + if err != nil { + return err + } + apis := []rpc.API{ { Namespace: EthNamespace, diff --git a/x/evm/config/config.go b/x/evm/config/config.go index ddf6edd..72d477d 100644 --- a/x/evm/config/config.go +++ b/x/evm/config/config.go @@ -7,22 +7,31 @@ import ( servertypes "github.com/cosmos/cosmos-sdk/server/types" ) -// DefaultContractSimulationGasLimit - default max simulation gas -const DefaultContractSimulationGasLimit = uint64(3_000_000) +const ( + // DefaultContractSimulationGasLimit - default max simulation gas + DefaultContractSimulationGasLimit = uint64(3_000_000) + // DefaultIndexerCacheSize is the default maximum size (MiB) of the cache. + DefaultIndexerCacheSize = 100 +) const ( flagContractSimulationGasLimit = "evm.contract-simulation-gas-limit" + flagIndexerCacheSize = "evm.indexer-cache-size" ) // EVMConfig is the extra config required for evm type EVMConfig struct { + // ContractSimulationGasLimit is the maximum gas amount can be used in a tx simulation call. ContractSimulationGasLimit uint64 `mapstructure:"contract-simulation-gas-limit"` + // IndexerCacheSize is the maximum size (MiB) of the cache. + IndexerCacheSize int `mapstructure:"indexer-cache-size"` } // DefaultEVMConfig returns the default settings for EVMConfig func DefaultEVMConfig() EVMConfig { return EVMConfig{ ContractSimulationGasLimit: DefaultContractSimulationGasLimit, + IndexerCacheSize: DefaultIndexerCacheSize, } } @@ -30,12 +39,14 @@ func DefaultEVMConfig() EVMConfig { func GetConfig(appOpts servertypes.AppOptions) EVMConfig { return EVMConfig{ ContractSimulationGasLimit: cast.ToUint64(appOpts.Get(flagContractSimulationGasLimit)), + IndexerCacheSize: cast.ToInt(appOpts.Get(flagIndexerCacheSize)), } } // AddConfigFlags implements servertypes.EVMConfigFlags interface. func AddConfigFlags(startCmd *cobra.Command) { - startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Set the max simulation gas for evm contract execution") + startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Maximum simulation gas amount for evm contract execution") + startCmd.Flags().Int(flagIndexerCacheSize, DefaultIndexerCacheSize, "Maximum size (MiB) of the indexer cache") } // DefaultConfigTemplate default config template for evm @@ -48,4 +59,7 @@ const DefaultConfigTemplate = ` # The maximum gas amount can be used in a tx simulation call. contract-simulation-gas-limit = "{{ .EVMConfig.ContractSimulationGasLimit }}" + +# IndexerCacheSize is the maximum size (MiB) of the cache for evm indexer. +indexer-cache-size = {{ .EVMConfig.IndexerCacheSize }} ` diff --git a/x/evm/keeper/keeper.go b/x/evm/keeper/keeper.go index 6d1117a..f5a85f9 100644 --- a/x/evm/keeper/keeper.go +++ b/x/evm/keeper/keeper.go @@ -148,6 +148,11 @@ func (k *Keeper) Logger(ctx context.Context) log.Logger { return sdkCtx.Logger().With("module", "x/"+types.ModuleName) } +// Config returns the x/evm configuration. +func (k Keeper) Config() evmconfig.EVMConfig { + return k.config +} + // ERC20Keeper returns the ERC20Keeper func (k Keeper) ERC20Keeper() types.IERC20Keeper { return k.erc20Keeper