Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: async filter & subscription system #82

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}

blockHash := blockHeader.Hash()
blockLogs := make([][]*coretypes.Log, 0, len(ethTxs))
for txIndex, ethTx := range ethTxs {
txHash := ethTx.Hash()
receipt := receipts[txIndex]
Expand Down Expand Up @@ -177,18 +178,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
log.TxIndex = uint(txIndex)
}

// emit logs event
for _, logsChan := range e.logsChans {
logsChan <- receipt.Logs
}
}
}

// emit empty logs event to confirm all logs are emitted and consumed, so the logs are
// available for querying.
if len(e.logsChans) > 0 {
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
blockLogs = append(blockLogs, receipt.Logs)
}
}

Expand All @@ -202,12 +192,8 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
return err
}

// emit new block events
if len(e.blockChans) > 0 {
for _, blockChan := range e.blockChans {
blockChan <- &blockHeader
}
}
// emit block event in a goroutine
go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs})

// TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker.
// - https://github.com/cosmos/cosmos-sdk/issues/22246
Expand Down
24 changes: 24 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,27 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.
e.pendingChans = append(e.pendingChans, pendingChan)
return blockChan, logsChan, pendingChan
}

// blockEvents is a struct to emit block events.
type blockEvents struct {
header *coretypes.Header
logs [][]*coretypes.Log
}

// blockEventsEmitter emits block events to subscribers.
func (e *EVMIndexerImpl) blockEventsEmitter(blockEvents *blockEvents) {
if blockEvents == nil {
return
}
for _, logs := range blockEvents.logs {
for _, logsChan := range e.logsChans {
logsChan <- logs
}
}
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
}
for _, blockChan := range e.blockChans {
blockChan <- blockEvents.header
}
}
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 0 additions & 10 deletions jsonrpc/backend/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,3 @@ func (b *JSONRPCBackend) GetLogsByHeight(height uint64) ([]*coretypes.Log, error

return blockLogs, nil
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCFilterCap() int32 {
return b.cfg.FilterCap
}

// RPCFilterCap is the limit for total number of filters that can be created
func (b *JSONRPCBackend) RPCBlockRangeCap() int32 {
return b.cfg.BlockRangeCap
}
11 changes: 10 additions & 1 deletion jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ func (b *JSONRPCBackend) getQueryCtx() (context.Context, error) {
}

func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context, error) {
// check whether the given height is bigger than the latest block height
num, err := b.BlockNumber()
if err != nil {
return nil, err
}
if height >= uint64(num) {
height = 0
}

return b.app.CreateQueryContext(int64(height), false)
}

Expand Down Expand Up @@ -171,7 +180,7 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa
}

var err error
queryCtx, err = b.app.CreateQueryContext(blockNumber.Int64(), false)
queryCtx, err = b.getQueryCtxWithHeight(uint64(blockNumber.Int64()))
if err != nil {
return nil, err
}
Expand Down
19 changes: 0 additions & 19 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ const (
flagJSONRPCEnableUnsafeCORS = "json-rpc.enable-unsafe-cors"
flagJSONRPCAPIs = "json-rpc.apis"
flagJSONRPCLogsCap = "json-rpc.logs-cap"
flagJSONRPCFilterCap = "json-rpc.filter-cap"
flagJSONRPCBlockRangeCap = "json-rpc.block-range-cap"
flagJSONRPCHTTPTimeout = "json-rpc.http-timeout"
flagJSONRPCHTTPIdleTimeout = "json-rpc.http-idle-timeout"
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
Expand All @@ -86,10 +84,6 @@ type JSONRPCConfig struct {
EnableUnsafeCORS bool `mapstructure:"enable-unsafe-cors"`
// API defines a list of JSON-RPC namespaces that should be enabled
APIs []string `mapstructure:"apis"`
// FilterCap is the global cap for total number of filters that can be created.
FilterCap int32 `mapstructure:"filter-cap"`
// BlockRangeCap defines the max block range allowed for `eth_getLogs` query.
BlockRangeCap int32 `mapstructure:"block-range-cap"`
// HTTPTimeout is the read/write timeout of http json-rpc server.
HTTPTimeout time.Duration `mapstructure:"http-timeout"`
// HTTPIdleTimeout is the idle timeout of http json-rpc server.
Expand Down Expand Up @@ -120,9 +114,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig {

APIs: DefaultAPIs,

FilterCap: DefaultFilterCap,
BlockRangeCap: DefaultBlockRangeCap,

HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
Expand All @@ -145,8 +136,6 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().String(flagJSONRPCAddressWS, DefaultAddressWS, "Address to listen on for the WebSocket server")
startCmd.Flags().Bool(flagJSONRPCEnableUnsafeCORS, DefaultEnableUnsafeCORS, "Enable unsafe CORS")
startCmd.Flags().StringSlice(flagJSONRPCAPIs, DefaultAPIs, "List of JSON-RPC namespaces that should be enabled")
startCmd.Flags().Int32(flagJSONRPCFilterCap, DefaultFilterCap, "Sets the global cap for total number of filters that can be created")
startCmd.Flags().Int32(flagJSONRPCBlockRangeCap, DefaultBlockRangeCap, "Max block range allowed for 'eth_getLogs' query")
startCmd.Flags().Duration(flagJSONRPCHTTPTimeout, DefaultHTTPTimeout, "Read/write timeout of http json-rpc server")
startCmd.Flags().Duration(flagJSONRPCHTTPIdleTimeout, DefaultHTTPIdleTimeout, "Idle timeout of http json-rpc server")
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
Expand All @@ -166,8 +155,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
AddressWS: cast.ToString(appOpts.Get(flagJSONRPCAddressWS)),
EnableUnsafeCORS: cast.ToBool(appOpts.Get(flagJSONRPCEnableUnsafeCORS)),
APIs: strings.Split(cast.ToString(appOpts.Get(flagJSONRPCAPIs)), ","),
FilterCap: cast.ToInt32(appOpts.Get(flagJSONRPCFilterCap)),
BlockRangeCap: cast.ToInt32(appOpts.Get(flagJSONRPCBlockRangeCap)),
HTTPTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPTimeout)),
HTTPIdleTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPIdleTimeout)),
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
Expand Down Expand Up @@ -206,12 +193,6 @@ enable-unsafe-cors = {{ .JSONRPCConfig.EnableUnsafeCORS }}
# Example: "eth,txpool,personal,net,debug,web3"
apis = "{{range $index, $elmt := .JSONRPCConfig.APIs}}{{if $index}},{{$elmt}}{{else}}{{$elmt}}{{end}}{{end}}"

# FilterCap is the global cap for total number of filters that can be created.
filter-cap = {{ .JSONRPCConfig.FilterCap }}

# BlockRangeCap defines the max block range allowed for 'eth_getLogs' query.
block-range-cap = {{ .JSONRPCConfig.BlockRangeCap }}

# HTTPTimeout is the read/write timeout of http json-rpc server.
http-timeout = "{{ .JSONRPCConfig.HTTPTimeout }}"

Expand Down
Loading
Loading