Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: async filter & subscription system #82

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package indexer
import (
"context"
"math/big"
"time"

abci "github.com/cometbft/cometbft/abci/types"
comettypes "github.com/cometbft/cometbft/types"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}

blockHash := blockHeader.Hash()
blockLogs := make([][]*coretypes.Log, 0, len(ethTxs))
for txIndex, ethTx := range ethTxs {
txHash := ethTx.Hash()
receipt := receipts[txIndex]
Expand Down Expand Up @@ -177,18 +179,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
log.TxIndex = uint(txIndex)
}

// emit logs event
for _, logsChan := range e.logsChans {
logsChan <- receipt.Logs
}
}
}

// emit empty logs event to confirm all logs are emitted and consumed, so the logs are
// available for querying.
if len(e.logsChans) > 0 {
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
blockLogs = append(blockLogs, receipt.Logs)
}
}

Expand All @@ -202,12 +193,16 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
return err
}

// emit new block events
if len(e.blockChans) > 0 {
for _, blockChan := range e.blockChans {
blockChan <- &blockHeader
// emit block event in a goroutine
done := make(chan struct{})
go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs}, done)
go func() {
select {
case <-done:
case <-time.After(5 * time.Second):
e.logger.Error("block event emitter timed out")
}
}
}()

// TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker.
// - https://github.com/cosmos/cosmos-sdk/issues/22246
Expand Down
26 changes: 26 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,29 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.
e.pendingChans = append(e.pendingChans, pendingChan)
return blockChan, logsChan, pendingChan
}

// blockEvents is a struct to emit block events.
type blockEvents struct {
header *coretypes.Header
logs [][]*coretypes.Log
}

// blockEventsEmitter emits block events to subscribers.
func (e *EVMIndexerImpl) blockEventsEmitter(blockEvents *blockEvents, done chan struct{}) {
if blockEvents == nil {
return
}
for _, logs := range blockEvents.logs {
for _, logsChan := range e.logsChans {
logsChan <- logs
}
}
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
}
for _, blockChan := range e.blockChans {
blockChan <- blockEvents.header
}

close(done)
}
5 changes: 5 additions & 0 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"context"
"sync"
"time"

lrucache "github.com/hashicorp/golang-lru/v2"

Expand Down Expand Up @@ -103,3 +104,7 @@ func (b *JSONRPCBackend) releaseAccMut(senderHex string) {
b.mut.Unlock()
// critical section end
}

func (b *JSONRPCBackend) FilterTimeout() time.Duration {
return b.cfg.FilterTimeout
}
10 changes: 0 additions & 10 deletions jsonrpc/backend/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,3 @@ func (b *JSONRPCBackend) GetLogsByHeight(height uint64) ([]*coretypes.Log, error

return blockLogs, nil
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCFilterCap() int32 {
return b.cfg.FilterCap
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCBlockRangeCap() int32 {
return b.cfg.BlockRangeCap
}
11 changes: 10 additions & 1 deletion jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (b *JSONRPCBackend) getQueryCtx() (context.Context, error) {
}

func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context, error) {
// check whether the given height is bigger than the latest block height
num, err := b.BlockNumber()
if err != nil {
return nil, err
}
if height >= uint64(num) {
height = 0
}

return b.app.CreateQueryContext(int64(height), false)
}

Expand Down Expand Up @@ -171,7 +180,7 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa
}

var err error
queryCtx, err = b.app.CreateQueryContext(blockNumber.Int64(), false)
queryCtx, err = b.getQueryCtxWithHeight(uint64(blockNumber.Int64()))
if err != nil {
return nil, err
}
Expand Down
31 changes: 12 additions & 19 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
DefaultFeeHistoryMaxHeaders = 1024
// DefaultFeeHistoryMaxBlocks is the default maximum number of blocks, which can be used to lookup the fee history.
DefaultFeeHistoryMaxBlocks = 1024
// DefaultFilterTimeout is the default filter timeout, how long filters stay active.
DefaultFilterTimeout = 5 * time.Minute
)

var (
Expand All @@ -59,8 +61,6 @@ const (
flagJSONRPCEnableUnsafeCORS = "json-rpc.enable-unsafe-cors"
flagJSONRPCAPIs = "json-rpc.apis"
flagJSONRPCLogsCap = "json-rpc.logs-cap"
flagJSONRPCFilterCap = "json-rpc.filter-cap"
flagJSONRPCBlockRangeCap = "json-rpc.block-range-cap"
flagJSONRPCHTTPTimeout = "json-rpc.http-timeout"
flagJSONRPCHTTPIdleTimeout = "json-rpc.http-idle-timeout"
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
Expand All @@ -70,6 +70,7 @@ const (
flagJSONRPCQueuedTransactionTTL = "json-rpc.queued-transaction-ttl"
flagJSONRPCFeeHistoryMaxHeaders = "json-rpc.fee-history-max-headers"
flagJSONRPCFeeHistoryMaxBlocks = "json-rpc.fee-history-max-blocks"
flagJSONRPCFilterTimeout = "json-rpc.filter-timeout"
)

// JSONRPCConfig defines configuration for the EVM RPC server.
Expand All @@ -86,10 +87,6 @@ type JSONRPCConfig struct {
EnableUnsafeCORS bool `mapstructure:"enable-unsafe-cors"`
// API defines a list of JSON-RPC namespaces that should be enabled
APIs []string `mapstructure:"apis"`
// FilterCap is the global cap for total number of filters that can be created.
FilterCap int32 `mapstructure:"filter-cap"`
// BlockRangeCap defines the max block range allowed for `eth_getLogs` query.
BlockRangeCap int32 `mapstructure:"block-range-cap"`
// HTTPTimeout is the read/write timeout of http json-rpc server.
HTTPTimeout time.Duration `mapstructure:"http-timeout"`
// HTTPIdleTimeout is the idle timeout of http json-rpc server.
Expand All @@ -107,6 +104,8 @@ type JSONRPCConfig struct {
FeeHistoryMaxHeaders int `mapstructure:"fee-history-max-headers"`
// FeeHistoryMaxBlocks is the maximum number of blocks, which can be used to lookup the fee history.
FeeHistoryMaxBlocks int `mapstructure:"fee-history-max-blocks"`
// FilterTimeout is a duration how long filters stay active (default: 5min)
FilterTimeout time.Duration `mapstructure:"filter-timeout"`
}

// DefaultJSONRPCConfig returns a default configuration for the EVM RPC server.
Expand All @@ -120,9 +119,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig {

APIs: DefaultAPIs,

FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,

HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
Expand All @@ -134,6 +130,8 @@ func DefaultJSONRPCConfig() JSONRPCConfig {

FeeHistoryMaxHeaders: DefaultFeeHistoryMaxHeaders,
FeeHistoryMaxBlocks: DefaultFeeHistoryMaxBlocks,

FilterTimeout: DefaultFilterTimeout,
}
}

Expand All @@ -145,8 +143,6 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().String(flagJSONRPCAddressWS, DefaultAddressWS, "Address to listen on for the WebSocket server")
startCmd.Flags().Bool(flagJSONRPCEnableUnsafeCORS, DefaultEnableUnsafeCORS, "Enable unsafe CORS")
startCmd.Flags().StringSlice(flagJSONRPCAPIs, DefaultAPIs, "List of JSON-RPC namespaces that should be enabled")
startCmd.Flags().Int32(flagJSONRPCFilterCap, DefaultFilterCap, "Sets the global cap for total number of filters that can be created")
startCmd.Flags().Int32(flagJSONRPCBlockRangeCap, DefaultBlockRangeCap, "Max block range allowed for 'eth_getLogs' query")
startCmd.Flags().Duration(flagJSONRPCHTTPTimeout, DefaultHTTPTimeout, "Read/write timeout of http json-rpc server")
startCmd.Flags().Duration(flagJSONRPCHTTPIdleTimeout, DefaultHTTPIdleTimeout, "Idle timeout of http json-rpc server")
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
Expand All @@ -155,6 +151,7 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum number of queued transactions that can be in the transaction pool")
startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxHeaders, DefaultFeeHistoryMaxHeaders, "Maximum number of headers used to lookup the fee history")
startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxBlocks, DefaultFeeHistoryMaxBlocks, "Maximum number of blocks used to lookup the fee history")
startCmd.Flags().Duration(flagJSONRPCFilterTimeout, DefaultFilterTimeout, "Duration how long filters stay active")
}

// GetConfig load config values from the app options
Expand All @@ -166,8 +163,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
AddressWS: cast.ToString(appOpts.Get(flagJSONRPCAddressWS)),
EnableUnsafeCORS: cast.ToBool(appOpts.Get(flagJSONRPCEnableUnsafeCORS)),
APIs: strings.Split(cast.ToString(appOpts.Get(flagJSONRPCAPIs)), ","),
FilterCap: cast.ToInt32(appOpts.Get(flagJSONRPCFilterCap)),
BlockRangeCap: cast.ToInt32(appOpts.Get(flagJSONRPCBlockRangeCap)),
HTTPTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPTimeout)),
HTTPIdleTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPIdleTimeout)),
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
Expand All @@ -176,6 +171,7 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)),
FeeHistoryMaxHeaders: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxHeaders)),
FeeHistoryMaxBlocks: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxBlocks)),
FilterTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCFilterTimeout)),
}
}

Expand Down Expand Up @@ -206,12 +202,6 @@ enable-unsafe-cors = {{ .JSONRPCConfig.EnableUnsafeCORS }}
# Example: "eth,txpool,personal,net,debug,web3"
apis = "{{range $index, $elmt := .JSONRPCConfig.APIs}}{{if $index}},{{$elmt}}{{else}}{{$elmt}}{{end}}{{end}}"

# FilterCap is the global cap for total number of filters that can be created.
filter-cap = {{ .JSONRPCConfig.FilterCap }}

# BlockRangeCap defines the max block range allowed for 'eth_getLogs' query.
block-range-cap = {{ .JSONRPCConfig.BlockRangeCap }}

# HTTPTimeout is the read/write timeout of http json-rpc server.
http-timeout = "{{ .JSONRPCConfig.HTTPTimeout }}"

Expand All @@ -237,4 +227,7 @@ fee-history-max-headers = {{ .JSONRPCConfig.FeeHistoryMaxHeaders }}

# FeeHistoryMaxBlocks is the maximum number of blocks, which can be used to lookup the fee history.
fee-history-max-blocks = {{ .JSONRPCConfig.FeeHistoryMaxBlocks }}

# FilterTimeout is a duration how long filters stay active (default: 5min)
filter-timeout = "{{ .JSONRPCConfig.FilterTimeout }}"
`
Loading
Loading