Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tx queue for unordered txs requests #48

Merged
merged 10 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
opchildkeeper "github.com/initia-labs/OPinit/x/opchild/keeper"

rpctypes "github.com/initia-labs/minievm/jsonrpc/types"
evmconfig "github.com/initia-labs/minievm/x/evm/config"
evmkeeper "github.com/initia-labs/minievm/x/evm/keeper"
)

Expand Down Expand Up @@ -81,8 +82,12 @@
evmKeeper *evmkeeper.Keeper,
opChildKeeper *opchildkeeper.Keeper,
) (EVMIndexer, error) {
// TODO make cache size configurable
store := NewCacheStore(dbadapter.Store{DB: db}, 100)
cfg := evmKeeper.Config()
if cfg.IndexerCacheSize == 0 {
cfg.IndexerCacheSize = evmconfig.DefaultIndexerCacheSize
}

Check warning on line 88 in indexer/indexer.go

View check run for this annotation

Codecov / codecov/patch

indexer/indexer.go#L85-L88

Added lines #L85 - L88 were not covered by tests

store := NewCacheStore(dbadapter.Store{DB: db}, cfg.IndexerCacheSize)

Check warning on line 90 in indexer/indexer.go

View check run for this annotation

Codecov / codecov/patch

indexer/indexer.go#L90

Added line #L90 was not covered by tests
sb := collections.NewSchemaBuilderFromAccessor(
func(ctx context.Context) corestoretypes.KVStore {
return store
Expand Down
1 change: 1 addition & 0 deletions indexer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/errors"
cachekv "cosmossdk.io/store/cachekv"
storetypes "cosmossdk.io/store/types"

bigcache "github.com/allegro/bigcache/v3"
)

Expand Down
39 changes: 34 additions & 5 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import (
"context"
"sync"

bigcache "github.com/allegro/bigcache/v3"

"cosmossdk.io/log"
"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -15,11 +18,14 @@
app *app.MinitiaApp
logger log.Logger

queuedTxs *bigcache.BigCache
sendTxMut sync.Mutex

ctx context.Context
svrCtx *server.Context
clientCtx client.Context
cfg config.JSONRPCConfig

ctx context.Context
cfg config.JSONRPCConfig
}

// NewJSONRPCBackend creates a new JSONRPCBackend instance
Expand All @@ -29,9 +35,32 @@
svrCtx *server.Context,
clientCtx client.Context,
cfg config.JSONRPCConfig,
) *JSONRPCBackend {
) (*JSONRPCBackend, error) {

if cfg.QueuedTransactionCap == 0 {
cfg.QueuedTransactionCap = config.DefaultQueuedTransactionCap
}
if cfg.QueuedTransactionTTL == 0 {
cfg.QueuedTransactionTTL = config.DefaultQueuedTransactionTTL
}

Check warning on line 45 in jsonrpc/backend/backend.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/backend.go#L38-L45

Added lines #L38 - L45 were not covered by tests

cacheConfig := bigcache.DefaultConfig(cfg.QueuedTransactionTTL)
cacheConfig.HardMaxCacheSize = cfg.QueuedTransactionCap

Check warning on line 49 in jsonrpc/backend/backend.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/backend.go#L47-L49

Added lines #L47 - L49 were not covered by tests
ctx := context.Background()
return &JSONRPCBackend{
app, logger, svrCtx, clientCtx, cfg, ctx,

queuedTxs, err := bigcache.New(ctx, cacheConfig)
if err != nil {
return nil, err

Check warning on line 54 in jsonrpc/backend/backend.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/backend.go#L51-L54

Added lines #L51 - L54 were not covered by tests
}

return &JSONRPCBackend{
app: app,
logger: logger,
queuedTxs: queuedTxs,
ctx: ctx,
svrCtx: svrCtx,
clientCtx: clientCtx,
cfg: cfg,
}, nil

Check warning on line 65 in jsonrpc/backend/backend.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/backend.go#L57-L65

Added lines #L57 - L65 were not covered by tests
}
17 changes: 17 additions & 0 deletions jsonrpc/backend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,20 @@

// ErrorData returns the hex encoded revert reason.
func (e *TxIndexingError) ErrorData() interface{} { return "transaction indexing is in progress" }

type InternalError struct {
msg string
}

func NewInternalError(msg string) *InternalError {
return &InternalError{msg: msg}

Check warning on line 29 in jsonrpc/backend/errors.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/errors.go#L28-L29

Added lines #L28 - L29 were not covered by tests
}

func (e *InternalError) Error() string {
return "internal jsonrpc error: " + e.msg

Check warning on line 33 in jsonrpc/backend/errors.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/errors.go#L32-L33

Added lines #L32 - L33 were not covered by tests
}

func (e *InternalError) ErrorCode() int {
// Internal JSON-RPC error
return -32603
}
66 changes: 54 additions & 12 deletions jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"errors"
"fmt"

"cosmossdk.io/collections"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -35,6 +36,9 @@
}

func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
b.sendTxMut.Lock()
defer b.sendTxMut.Unlock()

queryCtx, err := b.getQueryCtx()
if err != nil {
return err
Expand All @@ -45,23 +49,61 @@
return err
}

if authTx, ok := cosmosTx.(authsigning.Tx); ok {
if sigs, err := authTx.GetSignaturesV2(); err == nil && len(sigs) > 0 {
b.logger.Debug("eth_sendTx", "sequence", sigs[0].Sequence)
}
}

txBytes, err := b.app.TxEncode(cosmosTx)
if err != nil {
return err
}

res, err := b.clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return err
}
if res.Code != 0 {
return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog)
authTx, ok := cosmosTx.(authsigning.Tx)
if !ok {
return NewInternalError("failed to convert cosmosTx to authsigning.Tx")
}

Check warning on line 60 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L57-L60

Added lines #L57 - L60 were not covered by tests

sigs, err := authTx.GetSignaturesV2()
if err != nil || len(sigs) != 1 {
b.logger.Error("failed to get signatures from authsigning.Tx", "err", err)
return NewInternalError("failed to get signatures from authsigning.Tx")
}

Check warning on line 66 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L62-L66

Added lines #L62 - L66 were not covered by tests

sig := sigs[0]
txSeq := sig.Sequence
accSeq := uint64(0)
sender := sdk.AccAddress(sig.PubKey.Address().Bytes())

checkCtx := b.app.GetContextForCheckTx(nil)
if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil {
accSeq = acc.GetSequence()
}

Check warning on line 76 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L68-L76

Added lines #L68 - L76 were not covered by tests

b.logger.Debug("enqueue tx", "sender", sender, "txSeq", txSeq, "accSeq", accSeq)
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), txSeq)
if err := b.queuedTxs.Set(cacheKey, txBytes); err != nil {
b.logger.Error("failed to enqueue tx", "key", cacheKey, "err", err)
return NewInternalError("failed to enqueue tx")
}

Check warning on line 83 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L78-L83

Added lines #L78 - L83 were not covered by tests

// check if there are queued txs which can be sent
for {
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), accSeq)
if txBytes, err := b.queuedTxs.Get(cacheKey); err == nil {
if err := b.queuedTxs.Delete(cacheKey); err != nil {
b.logger.Error("failed to delete queued tx", "key", cacheKey, "err", err)
return NewInternalError("failed to delete queued tx")
}

Check warning on line 92 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L86-L92

Added lines #L86 - L92 were not covered by tests

b.logger.Debug("broadcast queued tx", "sender", sender, "txSeq", accSeq)
res, err := b.clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return err
}
if res.Code != 0 {
return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog)
}
} else {
break

Check warning on line 103 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L94-L103

Added lines #L94 - L103 were not covered by tests
}

accSeq++

Check warning on line 106 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L106

Added line #L106 was not covered by tests
}

return nil
Expand Down
52 changes: 41 additions & 11 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
DefaultBatchRequestLimit = 1000
// DefaultBatchResponseMaxSize is the default maximum number of bytes returned from a batched call
DefaultBatchResponseMaxSize = 25 * 1000 * 1000
// DefaultQueuedTransactionCap is the default maximum size (MiB) of queued transactions that can be in the transaction pool.
DefaultQueuedTransactionCap = 100
// DefaultQueuedTransactionTTL is the default maximum time a queued transaction can
// remain in the transaction pool before being evicted.
DefaultQueuedTransactionTTL = 1 * time.Minute
)

var (
Expand All @@ -60,6 +65,8 @@
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
flagJSONRPCBatchRequestLimit = "json-rpc.batch-request-limit"
flagJSONRPCBatchResponseMaxSize = "json-rpc.batch-response-max-size"
flagJSONRPCQueuedTransactionCap = "json-rpc.queued-transaction-cap"
flagJSONRPCQueuedTransactionTTL = "json-rpc.queued-transaction-ttl"
)

// JSONRPCConfig defines configuration for the EVM RPC server.
Expand Down Expand Up @@ -91,24 +98,36 @@
BatchRequestLimit int `mapstructure:"batch-request-limit"`
// Maximum number of bytes returned from a batched call
BatchResponseMaxSize int `mapstructure:"batch-response-max-size"`
// QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool.
QueuedTransactionCap int `mapstructure:"queued-transaction-cap"`
// QueuedTransactionTTL is the maximum time a queued transaction can
// remain in the transaction pool before being evicted.
QueuedTransactionTTL time.Duration `mapstructure:"queued-transaction-ttl"`
}

// DefaultJSONRPCConfig returns a default configuration for the EVM RPC server.
func DefaultJSONRPCConfig() JSONRPCConfig {
return JSONRPCConfig{
Enable: DefaultEnable,
Address: DefaultAddress,
EnableWS: DefaultEnable,
AddressWS: DefaultAddressWS,
EnableUnsafeCORS: DefaultEnableUnsafeCORS,
APIs: DefaultAPIs,
FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,
HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
Enable: DefaultEnable,
Address: DefaultAddress,
EnableWS: DefaultEnable,
AddressWS: DefaultAddressWS,
EnableUnsafeCORS: DefaultEnableUnsafeCORS,

APIs: DefaultAPIs,

FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,

HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,

Check warning on line 125 in jsonrpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/config/config.go#L111-L125

Added lines #L111 - L125 were not covered by tests
BatchRequestLimit: DefaultBatchRequestLimit,
BatchResponseMaxSize: DefaultBatchResponseMaxSize,

QueuedTransactionCap: DefaultQueuedTransactionCap,
QueuedTransactionTTL: DefaultQueuedTransactionTTL,

Check warning on line 130 in jsonrpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/config/config.go#L128-L130

Added lines #L128 - L130 were not covered by tests
}
}

Expand All @@ -127,6 +146,8 @@
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
startCmd.Flags().Int(flagJSONRPCBatchRequestLimit, DefaultBatchRequestLimit, "Maximum number of requests in a batch")
startCmd.Flags().Int(flagJSONRPCBatchResponseMaxSize, DefaultBatchResponseMaxSize, "Maximum number of bytes returned from a batched call")
startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum size (MiB) of queued transactions that can be in the transaction pool")
startCmd.Flags().Duration(flagJSONRPCQueuedTransactionTTL, DefaultQueuedTransactionTTL, "Maximum time a queued transaction can remain in the transaction pool before being evicted")

Check warning on line 150 in jsonrpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/config/config.go#L149-L150

Added lines #L149 - L150 were not covered by tests
}

// GetConfig load config values from the app options
Expand All @@ -145,6 +166,8 @@
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
BatchRequestLimit: cast.ToInt(appOpts.Get(flagJSONRPCBatchRequestLimit)),
BatchResponseMaxSize: cast.ToInt(appOpts.Get(flagJSONRPCBatchResponseMaxSize)),
QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)),
QueuedTransactionTTL: cast.ToDuration(appOpts.Get(flagJSONRPCQueuedTransactionTTL)),

Check warning on line 170 in jsonrpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/config/config.go#L169-L170

Added lines #L169 - L170 were not covered by tests
}
}

Expand Down Expand Up @@ -196,4 +219,11 @@

# Maximum number of bytes returned from a batched call
batch-response-max-size = {{ .JSONRPCConfig.BatchResponseMaxSize }}

# QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool.
queued-transaction-cap = {{ .JSONRPCConfig.QueuedTransactionCap }}

# QueuedTransactionTTL is the maximum time a queued transaction can
# remain in the transaction pool before being evicted.
queued-transaction-ttl = "{{ .JSONRPCConfig.QueuedTransactionTTL }}"
`
7 changes: 6 additions & 1 deletion jsonrpc/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@

rpcServer := rpc.NewServer()
rpcServer.SetBatchLimits(jsonRPCConfig.BatchRequestLimit, jsonRPCConfig.BatchResponseMaxSize)
bkd := backend.NewJSONRPCBackend(app, logger, svrCtx, clientCtx, jsonRPCConfig)

bkd, err := backend.NewJSONRPCBackend(app, logger, svrCtx, clientCtx, jsonRPCConfig)
if err != nil {
return err
}

Check warning on line 70 in jsonrpc/jsonrpc.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/jsonrpc.go#L66-L70

Added lines #L66 - L70 were not covered by tests

apis := []rpc.API{
{
Namespace: EthNamespace,
Expand Down
20 changes: 17 additions & 3 deletions x/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,46 @@
servertypes "github.com/cosmos/cosmos-sdk/server/types"
)

// DefaultContractSimulationGasLimit - default max simulation gas
const DefaultContractSimulationGasLimit = uint64(3_000_000)
const (
// DefaultContractSimulationGasLimit - default max simulation gas
DefaultContractSimulationGasLimit = uint64(3_000_000)
// DefaultIndexerCacheSize is the default maximum size (MiB) of the cache.
DefaultIndexerCacheSize = 100
)

const (
flagContractSimulationGasLimit = "evm.contract-simulation-gas-limit"
flagIndexerCacheSize = "evm.indexer-cache-size"
)

// EVMConfig is the extra config required for evm
type EVMConfig struct {
// ContractSimulationGasLimit is the maximum gas amount can be used in a tx simulation call.
ContractSimulationGasLimit uint64 `mapstructure:"contract-simulation-gas-limit"`
// IndexerCacheSize is the maximum size (MiB) of the cache.
IndexerCacheSize int `mapstructure:"indexer-cache-size"`
}

// DefaultEVMConfig returns the default settings for EVMConfig
func DefaultEVMConfig() EVMConfig {
return EVMConfig{
ContractSimulationGasLimit: DefaultContractSimulationGasLimit,
IndexerCacheSize: DefaultIndexerCacheSize,
}
}

// GetConfig load config values from the app options
func GetConfig(appOpts servertypes.AppOptions) EVMConfig {
return EVMConfig{
ContractSimulationGasLimit: cast.ToUint64(appOpts.Get(flagContractSimulationGasLimit)),
IndexerCacheSize: cast.ToInt(appOpts.Get(flagIndexerCacheSize)),

Check warning on line 42 in x/evm/config/config.go

View check run for this annotation

Codecov / codecov/patch

x/evm/config/config.go#L42

Added line #L42 was not covered by tests
}
}

// AddConfigFlags implements servertypes.EVMConfigFlags interface.
func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Set the max simulation gas for evm contract execution")
startCmd.Flags().Uint64(flagContractSimulationGasLimit, DefaultContractSimulationGasLimit, "Maximum simulation gas amount for evm contract execution")
startCmd.Flags().Int(flagIndexerCacheSize, DefaultIndexerCacheSize, "Maximum size (MiB) of the indexer cache")

Check warning on line 49 in x/evm/config/config.go

View check run for this annotation

Codecov / codecov/patch

x/evm/config/config.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

// DefaultConfigTemplate default config template for evm
Expand All @@ -48,4 +59,7 @@

# The maximum gas amount can be used in a tx simulation call.
contract-simulation-gas-limit = "{{ .EVMConfig.ContractSimulationGasLimit }}"

# IndexerCacheSize is the maximum size (MiB) of the cache for evm indexer.
indexer-cache-size = {{ .EVMConfig.IndexerCacheSize }}
`
5 changes: 5 additions & 0 deletions x/evm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@
return sdkCtx.Logger().With("module", "x/"+types.ModuleName)
}

// Config returns the x/evm configuration.
func (k Keeper) Config() evmconfig.EVMConfig {
return k.config

Check warning on line 153 in x/evm/keeper/keeper.go

View check run for this annotation

Codecov / codecov/patch

x/evm/keeper/keeper.go#L152-L153

Added lines #L152 - L153 were not covered by tests
}

// ERC20Keeper returns the ERC20Keeper
func (k Keeper) ERC20Keeper() types.IERC20Keeper {
return k.erc20Keeper
Expand Down
Loading