Skip to content

Commit

Permalink
async filter system
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 committed Oct 24, 2024
1 parent 875632b commit 45f8079
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 116 deletions.
22 changes: 4 additions & 18 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}

blockHash := blockHeader.Hash()
blockLogs := make([][]*coretypes.Log, 0, len(ethTxs))
for txIndex, ethTx := range ethTxs {
txHash := ethTx.Hash()
receipt := receipts[txIndex]
Expand Down Expand Up @@ -177,18 +178,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
log.TxIndex = uint(txIndex)
}

// emit logs event
for _, logsChan := range e.logsChans {
logsChan <- receipt.Logs
}
}
}

// emit empty logs event to confirm all logs are emitted and consumed, so the logs are
// available for querying.
if len(e.logsChans) > 0 {
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
blockLogs = append(blockLogs, receipt.Logs)
}
}

Expand All @@ -202,12 +192,8 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
return err
}

// emit new block events
if len(e.blockChans) > 0 {
for _, blockChan := range e.blockChans {
blockChan <- &blockHeader
}
}
// emit block event in a goroutine
go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs})

// TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker.
// - https://github.com/cosmos/cosmos-sdk/issues/22246
Expand Down
24 changes: 24 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,27 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.
e.pendingChans = append(e.pendingChans, pendingChan)
return blockChan, logsChan, pendingChan
}

// blockEvents is a struct to emit block events.
type blockEvents struct {
header *coretypes.Header
logs [][]*coretypes.Log
}

// blockEventsEmitter emits block events to subscribers.
func (e *EVMIndexerImpl) blockEventsEmitter(blockEvents *blockEvents) {
if blockEvents == nil {
return
}
for _, logs := range blockEvents.logs {
for _, logsChan := range e.logsChans {
logsChan <- logs
}
}
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
}
for _, blockChan := range e.blockChans {
blockChan <- blockEvents.header
}
}
10 changes: 0 additions & 10 deletions jsonrpc/backend/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,3 @@ func (b *JSONRPCBackend) GetLogsByHeight(height uint64) ([]*coretypes.Log, error

return blockLogs, nil
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCFilterCap() int32 {
return b.cfg.FilterCap
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCBlockRangeCap() int32 {
return b.cfg.BlockRangeCap
}
11 changes: 10 additions & 1 deletion jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (b *JSONRPCBackend) getQueryCtx() (context.Context, error) {
}

func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context, error) {
// check whether the given height is bigger than the latest block height
num, err := b.BlockNumber()
if err != nil {
return nil, err
}
if height >= uint64(num) {
height = 0
}

return b.app.CreateQueryContext(int64(height), false)
}

Expand Down Expand Up @@ -171,7 +180,7 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa
}

var err error
queryCtx, err = b.app.CreateQueryContext(blockNumber.Int64(), false)
queryCtx, err = b.getQueryCtxWithHeight(uint64(blockNumber.Int64()))
if err != nil {
return nil, err
}
Expand Down
19 changes: 0 additions & 19 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ const (
flagJSONRPCEnableUnsafeCORS = "json-rpc.enable-unsafe-cors"
flagJSONRPCAPIs = "json-rpc.apis"
flagJSONRPCLogsCap = "json-rpc.logs-cap"
flagJSONRPCFilterCap = "json-rpc.filter-cap"
flagJSONRPCBlockRangeCap = "json-rpc.block-range-cap"
flagJSONRPCHTTPTimeout = "json-rpc.http-timeout"
flagJSONRPCHTTPIdleTimeout = "json-rpc.http-idle-timeout"
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
Expand All @@ -86,10 +84,6 @@ type JSONRPCConfig struct {
EnableUnsafeCORS bool `mapstructure:"enable-unsafe-cors"`
// API defines a list of JSON-RPC namespaces that should be enabled
APIs []string `mapstructure:"apis"`
// FilterCap is the global cap for total number of filters that can be created.
FilterCap int32 `mapstructure:"filter-cap"`
// BlockRangeCap defines the max block range allowed for `eth_getLogs` query.
BlockRangeCap int32 `mapstructure:"block-range-cap"`
// HTTPTimeout is the read/write timeout of http json-rpc server.
HTTPTimeout time.Duration `mapstructure:"http-timeout"`
// HTTPIdleTimeout is the idle timeout of http json-rpc server.
Expand Down Expand Up @@ -120,9 +114,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig {

APIs: DefaultAPIs,

FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,

HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
Expand All @@ -145,8 +136,6 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().String(flagJSONRPCAddressWS, DefaultAddressWS, "Address to listen on for the WebSocket server")
startCmd.Flags().Bool(flagJSONRPCEnableUnsafeCORS, DefaultEnableUnsafeCORS, "Enable unsafe CORS")
startCmd.Flags().StringSlice(flagJSONRPCAPIs, DefaultAPIs, "List of JSON-RPC namespaces that should be enabled")
startCmd.Flags().Int32(flagJSONRPCFilterCap, DefaultFilterCap, "Sets the global cap for total number of filters that can be created")
startCmd.Flags().Int32(flagJSONRPCBlockRangeCap, DefaultBlockRangeCap, "Max block range allowed for 'eth_getLogs' query")
startCmd.Flags().Duration(flagJSONRPCHTTPTimeout, DefaultHTTPTimeout, "Read/write timeout of http json-rpc server")
startCmd.Flags().Duration(flagJSONRPCHTTPIdleTimeout, DefaultHTTPIdleTimeout, "Idle timeout of http json-rpc server")
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
Expand All @@ -166,8 +155,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
AddressWS: cast.ToString(appOpts.Get(flagJSONRPCAddressWS)),
EnableUnsafeCORS: cast.ToBool(appOpts.Get(flagJSONRPCEnableUnsafeCORS)),
APIs: strings.Split(cast.ToString(appOpts.Get(flagJSONRPCAPIs)), ","),
FilterCap: cast.ToInt32(appOpts.Get(flagJSONRPCFilterCap)),
BlockRangeCap: cast.ToInt32(appOpts.Get(flagJSONRPCBlockRangeCap)),
HTTPTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPTimeout)),
HTTPIdleTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPIdleTimeout)),
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
Expand Down Expand Up @@ -206,12 +193,6 @@ enable-unsafe-cors = {{ .JSONRPCConfig.EnableUnsafeCORS }}
# Example: "eth,txpool,personal,net,debug,web3"
apis = "{{range $index, $elmt := .JSONRPCConfig.APIs}}{{if $index}},{{$elmt}}{{else}}{{$elmt}}{{end}}{{end}}"
# FilterCap is the global cap for total number of filters that can be created.
filter-cap = {{ .JSONRPCConfig.FilterCap }}
# BlockRangeCap defines the max block range allowed for 'eth_getLogs' query.
block-range-cap = {{ .JSONRPCConfig.BlockRangeCap }}
# HTTPTimeout is the read/write timeout of http json-rpc server.
http-timeout = "{{ .JSONRPCConfig.HTTPTimeout }}"
Expand Down
Loading

0 comments on commit 45f8079

Please sign in to comment.