Skip to content

Commit

Permalink
feat: tx queue for unordered txs requests (#48)
Browse files Browse the repository at this point in the history
* use local logger

* update bin files

* impl tx queue in jsonrpc backend

* hold lock for SendTx

* fix to broadcast queued txs properly

* print logs

* remove comment

* use colon
  • Loading branch information
beer-1 authored Aug 20, 2024
1 parent a4d23d2 commit efcca2e
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 34 deletions.
9 changes: 7 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
opchildkeeper "github.com/initia-labs/OPinit/x/opchild/keeper"

rpctypes "github.com/initia-labs/minievm/jsonrpc/types"
evmconfig "github.com/initia-labs/minievm/x/evm/config"
evmkeeper "github.com/initia-labs/minievm/x/evm/keeper"
)

Expand Down Expand Up @@ -81,8 +82,12 @@ func NewEVMIndexer(
evmKeeper *evmkeeper.Keeper,
opChildKeeper *opchildkeeper.Keeper,
) (EVMIndexer, error) {
// TODO make cache size configurable
store := NewCacheStore(dbadapter.Store{DB: db}, 100)
cfg := evmKeeper.Config()
if cfg.IndexerCacheSize == 0 {
cfg.IndexerCacheSize = evmconfig.DefaultIndexerCacheSize
}

store := NewCacheStore(dbadapter.Store{DB: db}, cfg.IndexerCacheSize)
sb := collections.NewSchemaBuilderFromAccessor(
func(ctx context.Context) corestoretypes.KVStore {
return store
Expand Down
1 change: 1 addition & 0 deletions indexer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/errors"
cachekv "cosmossdk.io/store/cachekv"
storetypes "cosmossdk.io/store/types"

bigcache "github.com/allegro/bigcache/v3"
)

Expand Down
39 changes: 34 additions & 5 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package backend

import (
"context"
"sync"

bigcache "github.com/allegro/bigcache/v3"

"cosmossdk.io/log"
"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -15,11 +18,14 @@ type JSONRPCBackend struct {
app *app.MinitiaApp
logger log.Logger

queuedTxs *bigcache.BigCache
sendTxMut sync.Mutex

ctx context.Context
svrCtx *server.Context
clientCtx client.Context
cfg config.JSONRPCConfig

ctx context.Context
cfg config.JSONRPCConfig
}

// NewJSONRPCBackend creates a new JSONRPCBackend instance
Expand All @@ -29,9 +35,32 @@ func NewJSONRPCBackend(
svrCtx *server.Context,
clientCtx client.Context,
cfg config.JSONRPCConfig,
) *JSONRPCBackend {
) (*JSONRPCBackend, error) {

if cfg.QueuedTransactionCap == 0 {
cfg.QueuedTransactionCap = config.DefaultQueuedTransactionCap
}
if cfg.QueuedTransactionTTL == 0 {
cfg.QueuedTransactionTTL = config.DefaultQueuedTransactionTTL
}

cacheConfig := bigcache.DefaultConfig(cfg.QueuedTransactionTTL)
cacheConfig.HardMaxCacheSize = cfg.QueuedTransactionCap

ctx := context.Background()
return &JSONRPCBackend{
app, logger, svrCtx, clientCtx, cfg, ctx,

queuedTxs, err := bigcache.New(ctx, cacheConfig)
if err != nil {
return nil, err
}

return &JSONRPCBackend{
app: app,
logger: logger,
queuedTxs: queuedTxs,
ctx: ctx,
svrCtx: svrCtx,
clientCtx: clientCtx,
cfg: cfg,
}, nil
}
17 changes: 17 additions & 0 deletions jsonrpc/backend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,20 @@ func (e *TxIndexingError) ErrorCode() int {

// ErrorData returns the hex encoded revert reason.
func (e *TxIndexingError) ErrorData() interface{} { return "transaction indexing is in progress" }

type InternalError struct {
msg string
}

func NewInternalError(msg string) *InternalError {
return &InternalError{msg: msg}
}

func (e *InternalError) Error() string {
return "internal jsonrpc error: " + e.msg
}

func (e *InternalError) ErrorCode() int {
// Internal JSON-RPC error
return -32603
}
66 changes: 54 additions & 12 deletions jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"context"
"errors"
"fmt"

"cosmossdk.io/collections"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -35,6 +36,9 @@ func (b *JSONRPCBackend) SendRawTransaction(input hexutil.Bytes) (common.Hash, e
}

func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
b.sendTxMut.Lock()
defer b.sendTxMut.Unlock()

queryCtx, err := b.getQueryCtx()
if err != nil {
return err
Expand All @@ -45,23 +49,61 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
return err
}

if authTx, ok := cosmosTx.(authsigning.Tx); ok {
if sigs, err := authTx.GetSignaturesV2(); err == nil && len(sigs) > 0 {
b.logger.Debug("eth_sendTx", "sequence", sigs[0].Sequence)
}
}

txBytes, err := b.app.TxEncode(cosmosTx)
if err != nil {
return err
}

res, err := b.clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return err
}
if res.Code != 0 {
return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog)
authTx, ok := cosmosTx.(authsigning.Tx)
if !ok {
return NewInternalError("failed to convert cosmosTx to authsigning.Tx")
}

sigs, err := authTx.GetSignaturesV2()
if err != nil || len(sigs) != 1 {
b.logger.Error("failed to get signatures from authsigning.Tx", "err", err)
return NewInternalError("failed to get signatures from authsigning.Tx")
}

sig := sigs[0]
txSeq := sig.Sequence
accSeq := uint64(0)
sender := sdk.AccAddress(sig.PubKey.Address().Bytes())

checkCtx := b.app.GetContextForCheckTx(nil)
if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil {
accSeq = acc.GetSequence()
}

b.logger.Debug("enqueue tx", "sender", sender, "txSeq", txSeq, "accSeq", accSeq)
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), txSeq)
if err := b.queuedTxs.Set(cacheKey, txBytes); err != nil {
b.logger.Error("failed to enqueue tx", "key", cacheKey, "err", err)
return NewInternalError("failed to enqueue tx")
}

// check if there are queued txs which can be sent
for {
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), accSeq)
if txBytes, err := b.queuedTxs.Get(cacheKey); err == nil {
if err := b.queuedTxs.Delete(cacheKey); err != nil {
b.logger.Error("failed to delete queued tx", "key", cacheKey, "err", err)
return NewInternalError("failed to delete queued tx")
}

b.logger.Debug("broadcast queued tx", "sender", sender, "txSeq", accSeq)
res, err := b.clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return err
}
if res.Code != 0 {
return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog)
}
} else {
break
}

accSeq++
}

return nil
Expand Down
52 changes: 41 additions & 11 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const (
DefaultBatchRequestLimit = 1000
// DefaultBatchResponseMaxSize is the default maximum number of bytes returned from a batched call
DefaultBatchResponseMaxSize = 25 * 1000 * 1000
// DefaultQueuedTransactionCap is the default maximum size (MiB) of queued transactions that can be in the transaction pool.
DefaultQueuedTransactionCap = 100
// DefaultQueuedTransactionTTL is the default maximum time a queued transaction can
// remain in the transaction pool before being evicted.
DefaultQueuedTransactionTTL = 1 * time.Minute
)

var (
Expand All @@ -60,6 +65,8 @@ const (
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
flagJSONRPCBatchRequestLimit = "json-rpc.batch-request-limit"
flagJSONRPCBatchResponseMaxSize = "json-rpc.batch-response-max-size"
flagJSONRPCQueuedTransactionCap = "json-rpc.queued-transaction-cap"
flagJSONRPCQueuedTransactionTTL = "json-rpc.queued-transaction-ttl"
)

// JSONRPCConfig defines configuration for the EVM RPC server.
Expand Down Expand Up @@ -91,24 +98,36 @@ type JSONRPCConfig struct {
BatchRequestLimit int `mapstructure:"batch-request-limit"`
// Maximum number of bytes returned from a batched call
BatchResponseMaxSize int `mapstructure:"batch-response-max-size"`
// QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool.
QueuedTransactionCap int `mapstructure:"queued-transaction-cap"`
// QueuedTransactionTTL is the maximum time a queued transaction can
// remain in the transaction pool before being evicted.
QueuedTransactionTTL time.Duration `mapstructure:"queued-transaction-ttl"`
}

// DefaultJSONRPCConfig returns a default configuration for the EVM RPC server.
func DefaultJSONRPCConfig() JSONRPCConfig {
return JSONRPCConfig{
Enable: DefaultEnable,
Address: DefaultAddress,
EnableWS: DefaultEnable,
AddressWS: DefaultAddressWS,
EnableUnsafeCORS: DefaultEnableUnsafeCORS,
APIs: DefaultAPIs,
FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,
HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
Enable: DefaultEnable,
Address: DefaultAddress,
EnableWS: DefaultEnable,
AddressWS: DefaultAddressWS,
EnableUnsafeCORS: DefaultEnableUnsafeCORS,

APIs: DefaultAPIs,

FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,

HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,

BatchRequestLimit: DefaultBatchRequestLimit,
BatchResponseMaxSize: DefaultBatchResponseMaxSize,

QueuedTransactionCap: DefaultQueuedTransactionCap,
QueuedTransactionTTL: DefaultQueuedTransactionTTL,
}
}

Expand All @@ -127,6 +146,8 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
startCmd.Flags().Int(flagJSONRPCBatchRequestLimit, DefaultBatchRequestLimit, "Maximum number of requests in a batch")
startCmd.Flags().Int(flagJSONRPCBatchResponseMaxSize, DefaultBatchResponseMaxSize, "Maximum number of bytes returned from a batched call")
startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum size (MiB) of queued transactions that can be in the transaction pool")
startCmd.Flags().Duration(flagJSONRPCQueuedTransactionTTL, DefaultQueuedTransactionTTL, "Maximum time a queued transaction can remain in the transaction pool before being evicted")
}

// GetConfig load config values from the app options
Expand All @@ -145,6 +166,8 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
BatchRequestLimit: cast.ToInt(appOpts.Get(flagJSONRPCBatchRequestLimit)),
BatchResponseMaxSize: cast.ToInt(appOpts.Get(flagJSONRPCBatchResponseMaxSize)),
QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)),
QueuedTransactionTTL: cast.ToDuration(appOpts.Get(flagJSONRPCQueuedTransactionTTL)),
}
}

Expand Down Expand Up @@ -196,4 +219,11 @@ batch-request-limit = {{ .JSONRPCConfig.BatchRequestLimit }}
# Maximum number of bytes returned from a batched call
batch-response-max-size = {{ .JSONRPCConfig.BatchResponseMaxSize }}
# QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool.
queued-transaction-cap = {{ .JSONRPCConfig.QueuedTransactionCap }}
# QueuedTransactionTTL is the maximum time a queued transaction can
# remain in the transaction pool before being evicted.
queued-transaction-ttl = "{{ .JSONRPCConfig.QueuedTransactionTTL }}"
`
7 changes: 6 additions & 1 deletion jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ func StartJSONRPC(

rpcServer := rpc.NewServer()
rpcServer.SetBatchLimits(jsonRPCConfig.BatchRequestLimit, jsonRPCConfig.BatchResponseMaxSize)
bkd := backend.NewJSONRPCBackend(app, logger, svrCtx, clientCtx, jsonRPCConfig)

bkd, err := backend.NewJSONRPCBackend(app, logger, svrCtx, clientCtx, jsonRPCConfig)
if err != nil {
return err
}

apis := []rpc.API{
{
Namespace: EthNamespace,
Expand Down
20 changes: 17 additions & 3 deletions x/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,46 @@ import (
servertypes "github.com/cosmos/cosmos-sdk/server/types"
)

// DefaultContractSimulationGasLimit - default max simulation gas
const DefaultContractSimulationGasLimit = uint64(3_000_000)
const (
// DefaultContractSimulationGasLimit - default max simulation gas
DefaultContractSimulationGasLimit = uint64(3_000_000)
// DefaultIndexerCacheSize is the default maximum size (MiB) of the cache.
DefaultIndexerCacheSize = 100
)

const (
flagContractSimulationGasLimit = "evm.contract-simulation-gas-limit"
flagIndexerCacheSize = "evm.indexer-cache-size"
)

// EVMConfig is the extra config required for evm
type EVMConfig struct {
// ContractSimulationGasLimit is the maximum gas amount can be used in a tx simulation call.
ContractSimulationGasLimit uint64 `mapstructure:"contract-simulation-gas-limit"`
// IndexerCacheSize is the maximum size (MiB) of the cache.
IndexerCacheSize int `mapstructure:"indexer-cache-size"`
}

// DefaultEVMConfig returns the default settings for EVMConfig
func DefaultEVMConfig() EVMConfig {
return EVMConfig{
ContractSimulationGasLimit: DefaultContractSimulationGasLimit,
IndexerCacheSize: DefaultIndexerCacheSize,
}
}

// GetConfig load config values from the app options
func GetConfig(appOpts servertypes.AppOptions) EVMConfig {
return EVMConfig{
ContractSimulationGasLimit: cast.ToUint64(appOpts.Get(flagContractSimulationGasLimit)),
IndexerCacheSize: cast.ToInt(appOpts.Get(flagIndexerCacheSize)),
}
}

// AddConfigFlags implements servertypes.EVMConfigFlags interface.
func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Set the max simulation gas for evm contract execution")
startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Maximum simulation gas amount for evm contract execution")
startCmd.Flags().Int(flagIndexerCacheSize, DefaultIndexerCacheSize, "Maximum size (MiB) of the indexer cache")
}

// DefaultConfigTemplate default config template for evm
Expand All @@ -48,4 +59,7 @@ const DefaultConfigTemplate = `
# The maximum gas amount can be used in a tx simulation call.
contract-simulation-gas-limit = "{{ .EVMConfig.ContractSimulationGasLimit }}"
# IndexerCacheSize is the maximum size (MiB) of the cache for evm indexer.
indexer-cache-size = {{ .EVMConfig.IndexerCacheSize }}
`
5 changes: 5 additions & 0 deletions x/evm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ func (k *Keeper) Logger(ctx context.Context) log.Logger {
return sdkCtx.Logger().With("module", "x/"+types.ModuleName)
}

// Config returns the x/evm configuration.
func (k Keeper) Config() evmconfig.EVMConfig {
return k.config
}

// ERC20Keeper returns the ERC20Keeper
func (k Keeper) ERC20Keeper() types.IERC20Keeper {
return k.erc20Keeper
Expand Down

0 comments on commit efcca2e

Please sign in to comment.