Skip to content

Commit

Permalink
fix: async filter & subscription system (#82)
Browse files Browse the repository at this point in the history
* async filter system

* add missing mutex

* remove duplicated types

* add filter timeout config

* use default timeout when it is unset

* use function
  • Loading branch information
beer-1 authored Oct 24, 2024
1 parent 875632b commit 63688ba
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 127 deletions.
29 changes: 12 additions & 17 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package indexer
import (
"context"
"math/big"
"time"

abci "github.com/cometbft/cometbft/abci/types"
comettypes "github.com/cometbft/cometbft/types"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}

blockHash := blockHeader.Hash()
blockLogs := make([][]*coretypes.Log, 0, len(ethTxs))
for txIndex, ethTx := range ethTxs {
txHash := ethTx.Hash()
receipt := receipts[txIndex]
Expand Down Expand Up @@ -177,18 +179,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
log.TxIndex = uint(txIndex)
}

// emit logs event
for _, logsChan := range e.logsChans {
logsChan <- receipt.Logs
}
}
}

// emit empty logs event to confirm all logs are emitted and consumed, so the logs are
// available for querying.
if len(e.logsChans) > 0 {
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
blockLogs = append(blockLogs, receipt.Logs)
}
}

Expand All @@ -202,12 +193,16 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
return err
}

// emit new block events
if len(e.blockChans) > 0 {
for _, blockChan := range e.blockChans {
blockChan <- &blockHeader
// emit block event in a goroutine
done := make(chan struct{})
go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs}, done)
go func() {
select {
case <-done:
case <-time.After(5 * time.Second):
e.logger.Error("block event emitter timed out")
}
}
}()

// TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker.
// - https://github.com/cosmos/cosmos-sdk/issues/22246
Expand Down
26 changes: 26 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,29 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.
e.pendingChans = append(e.pendingChans, pendingChan)
return blockChan, logsChan, pendingChan
}

// blockEvents is a struct to emit block events.
type blockEvents struct {
header *coretypes.Header
logs [][]*coretypes.Log
}

// blockEventsEmitter emits block events to subscribers.
func (e *EVMIndexerImpl) blockEventsEmitter(blockEvents *blockEvents, done chan struct{}) {
if blockEvents == nil {
return
}
for _, logs := range blockEvents.logs {
for _, logsChan := range e.logsChans {
logsChan <- logs
}
}
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
}
for _, blockChan := range e.blockChans {
blockChan <- blockEvents.header
}

close(done)
}
5 changes: 5 additions & 0 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backend
import (
"context"
"sync"
"time"

lrucache "github.com/hashicorp/golang-lru/v2"

Expand Down Expand Up @@ -103,3 +104,7 @@ func (b *JSONRPCBackend) releaseAccMut(senderHex string) {
b.mut.Unlock()
// critical section end
}

func (b *JSONRPCBackend) FilterTimeout() time.Duration {
return b.cfg.FilterTimeout
}
10 changes: 0 additions & 10 deletions jsonrpc/backend/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,3 @@ func (b *JSONRPCBackend) GetLogsByHeight(height uint64) ([]*coretypes.Log, error

return blockLogs, nil
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCFilterCap() int32 {
return b.cfg.FilterCap
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCBlockRangeCap() int32 {
return b.cfg.BlockRangeCap
}
11 changes: 10 additions & 1 deletion jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (b *JSONRPCBackend) getQueryCtx() (context.Context, error) {
}

func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context, error) {
// check whether the given height is bigger than the latest block height
num, err := b.BlockNumber()
if err != nil {
return nil, err
}
if height >= uint64(num) {
height = 0
}

return b.app.CreateQueryContext(int64(height), false)
}

Expand Down Expand Up @@ -171,7 +180,7 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa
}

var err error
queryCtx, err = b.app.CreateQueryContext(blockNumber.Int64(), false)
queryCtx, err = b.getQueryCtxWithHeight(uint64(blockNumber.Int64()))
if err != nil {
return nil, err
}
Expand Down
31 changes: 12 additions & 19 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
DefaultFeeHistoryMaxHeaders = 1024
// DefaultFeeHistoryMaxBlocks is the default maximum number of blocks, which can be used to lookup the fee history.
DefaultFeeHistoryMaxBlocks = 1024
// DefaultFilterTimeout is the default filter timeout, how long filters stay active.
DefaultFilterTimeout = 5 * time.Minute
)

var (
Expand All @@ -59,8 +61,6 @@ const (
flagJSONRPCEnableUnsafeCORS = "json-rpc.enable-unsafe-cors"
flagJSONRPCAPIs = "json-rpc.apis"
flagJSONRPCLogsCap = "json-rpc.logs-cap"
flagJSONRPCFilterCap = "json-rpc.filter-cap"
flagJSONRPCBlockRangeCap = "json-rpc.block-range-cap"
flagJSONRPCHTTPTimeout = "json-rpc.http-timeout"
flagJSONRPCHTTPIdleTimeout = "json-rpc.http-idle-timeout"
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
Expand All @@ -70,6 +70,7 @@ const (
flagJSONRPCQueuedTransactionTTL = "json-rpc.queued-transaction-ttl"
flagJSONRPCFeeHistoryMaxHeaders = "json-rpc.fee-history-max-headers"
flagJSONRPCFeeHistoryMaxBlocks = "json-rpc.fee-history-max-blocks"
flagJSONRPCFilterTimeout = "json-rpc.filter-timeout"
)

// JSONRPCConfig defines configuration for the EVM RPC server.
Expand All @@ -86,10 +87,6 @@ type JSONRPCConfig struct {
EnableUnsafeCORS bool `mapstructure:"enable-unsafe-cors"`
// API defines a list of JSON-RPC namespaces that should be enabled
APIs []string `mapstructure:"apis"`
// FilterCap is the global cap for total number of filters that can be created.
FilterCap int32 `mapstructure:"filter-cap"`
// BlockRangeCap defines the max block range allowed for `eth_getLogs` query.
BlockRangeCap int32 `mapstructure:"block-range-cap"`
// HTTPTimeout is the read/write timeout of http json-rpc server.
HTTPTimeout time.Duration `mapstructure:"http-timeout"`
// HTTPIdleTimeout is the idle timeout of http json-rpc server.
Expand All @@ -107,6 +104,8 @@ type JSONRPCConfig struct {
FeeHistoryMaxHeaders int `mapstructure:"fee-history-max-headers"`
// FeeHistoryMaxBlocks is the maximum number of blocks, which can be used to lookup the fee history.
FeeHistoryMaxBlocks int `mapstructure:"fee-history-max-blocks"`
// FilterTimeout is a duration how long filters stay active (default: 5min)
FilterTimeout time.Duration `mapstructure:"filter-timeout"`
}

// DefaultJSONRPCConfig returns a default configuration for the EVM RPC server.
Expand All @@ -120,9 +119,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig {

APIs: DefaultAPIs,

FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,

HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
Expand All @@ -134,6 +130,8 @@ func DefaultJSONRPCConfig() JSONRPCConfig {

FeeHistoryMaxHeaders: DefaultFeeHistoryMaxHeaders,
FeeHistoryMaxBlocks: DefaultFeeHistoryMaxBlocks,

FilterTimeout: DefaultFilterTimeout,
}
}

Expand All @@ -145,8 +143,6 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().String(flagJSONRPCAddressWS, DefaultAddressWS, "Address to listen on for the WebSocket server")
startCmd.Flags().Bool(flagJSONRPCEnableUnsafeCORS, DefaultEnableUnsafeCORS, "Enable unsafe CORS")
startCmd.Flags().StringSlice(flagJSONRPCAPIs, DefaultAPIs, "List of JSON-RPC namespaces that should be enabled")
startCmd.Flags().Int32(flagJSONRPCFilterCap, DefaultFilterCap, "Sets the global cap for total number of filters that can be created")
startCmd.Flags().Int32(flagJSONRPCBlockRangeCap, DefaultBlockRangeCap, "Max block range allowed for 'eth_getLogs' query")
startCmd.Flags().Duration(flagJSONRPCHTTPTimeout, DefaultHTTPTimeout, "Read/write timeout of http json-rpc server")
startCmd.Flags().Duration(flagJSONRPCHTTPIdleTimeout, DefaultHTTPIdleTimeout, "Idle timeout of http json-rpc server")
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
Expand All @@ -155,6 +151,7 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum number of queued transactions that can be in the transaction pool")
startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxHeaders, DefaultFeeHistoryMaxHeaders, "Maximum number of headers used to lookup the fee history")
startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxBlocks, DefaultFeeHistoryMaxBlocks, "Maximum number of blocks used to lookup the fee history")
startCmd.Flags().Duration(flagJSONRPCFilterTimeout, DefaultFilterTimeout, "Duration how long filters stay active")
}

// GetConfig load config values from the app options
Expand All @@ -166,8 +163,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
AddressWS: cast.ToString(appOpts.Get(flagJSONRPCAddressWS)),
EnableUnsafeCORS: cast.ToBool(appOpts.Get(flagJSONRPCEnableUnsafeCORS)),
APIs: strings.Split(cast.ToString(appOpts.Get(flagJSONRPCAPIs)), ","),
FilterCap: cast.ToInt32(appOpts.Get(flagJSONRPCFilterCap)),
BlockRangeCap: cast.ToInt32(appOpts.Get(flagJSONRPCBlockRangeCap)),
HTTPTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPTimeout)),
HTTPIdleTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPIdleTimeout)),
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
Expand All @@ -176,6 +171,7 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)),
FeeHistoryMaxHeaders: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxHeaders)),
FeeHistoryMaxBlocks: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxBlocks)),
FilterTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCFilterTimeout)),
}
}

Expand Down Expand Up @@ -206,12 +202,6 @@ enable-unsafe-cors = {{ .JSONRPCConfig.EnableUnsafeCORS }}
# Example: "eth,txpool,personal,net,debug,web3"
apis = "{{range $index, $elmt := .JSONRPCConfig.APIs}}{{if $index}},{{$elmt}}{{else}}{{$elmt}}{{end}}{{end}}"
# FilterCap is the global cap for total number of filters that can be created.
filter-cap = {{ .JSONRPCConfig.FilterCap }}
# BlockRangeCap defines the max block range allowed for 'eth_getLogs' query.
block-range-cap = {{ .JSONRPCConfig.BlockRangeCap }}
# HTTPTimeout is the read/write timeout of http json-rpc server.
http-timeout = "{{ .JSONRPCConfig.HTTPTimeout }}"
Expand All @@ -237,4 +227,7 @@ fee-history-max-headers = {{ .JSONRPCConfig.FeeHistoryMaxHeaders }}
# FeeHistoryMaxBlocks is the maximum number of blocks, which can be used to lookup the fee history.
fee-history-max-blocks = {{ .JSONRPCConfig.FeeHistoryMaxBlocks }}
# FilterTimeout is a duration how long filters stay active (default: 5min)
filter-timeout = "{{ .JSONRPCConfig.FilterTimeout }}"
`
Loading

0 comments on commit 63688ba

Please sign in to comment.