From 63688ba81912672530144b77334818659c75cfcd Mon Sep 17 00:00:00 2001 From: beer-1 <147697694+beer-1@users.noreply.github.com> Date: Thu, 24 Oct 2024 18:35:40 +0900 Subject: [PATCH] fix: async filter & subscription system (#82) * async filter system * add missing mutex * remove duplicated types * add filter timeout config * use default timeout when it is unset * use function --- indexer/abci.go | 29 ++- indexer/indexer.go | 26 +++ jsonrpc/backend/backend.go | 5 + jsonrpc/backend/filters.go | 10 - jsonrpc/backend/tx.go | 11 +- jsonrpc/config/config.go | 31 ++- jsonrpc/namespaces/eth/filters/api.go | 202 ++++++++++++------ jsonrpc/namespaces/eth/filters/filter.go | 5 +- .../namespaces/eth/filters/subscriptions.go | 47 ++-- 9 files changed, 239 insertions(+), 127 deletions(-) diff --git a/indexer/abci.go b/indexer/abci.go index 8d82735..c8b6ef0 100644 --- a/indexer/abci.go +++ b/indexer/abci.go @@ -3,6 +3,7 @@ package indexer import ( "context" "math/big" + "time" abci "github.com/cometbft/cometbft/abci/types" comettypes "github.com/cometbft/cometbft/types" @@ -146,6 +147,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque } blockHash := blockHeader.Hash() + blockLogs := make([][]*coretypes.Log, 0, len(ethTxs)) for txIndex, ethTx := range ethTxs { txHash := ethTx.Hash() receipt := receipts[txIndex] @@ -177,18 +179,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque log.TxIndex = uint(txIndex) } - // emit logs event - for _, logsChan := range e.logsChans { - logsChan <- receipt.Logs - } - } - } - - // emit empty logs event to confirm all logs are emitted and consumed, so the logs are - // available for querying. - if len(e.logsChans) > 0 { - for _, logsChan := range e.logsChans { - logsChan <- []*coretypes.Log{} + blockLogs = append(blockLogs, receipt.Logs) } } @@ -202,12 +193,16 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque return err } - // emit new block events - if len(e.blockChans) > 0 { - for _, blockChan := range e.blockChans { - blockChan <- &blockHeader + // emit block event in a goroutine + done := make(chan struct{}) + go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs}, done) + go func() { + select { + case <-done: + case <-time.After(5 * time.Second): + e.logger.Error("block event emitter timed out") } - } + }() // TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker. // - https://github.com/cosmos/cosmos-sdk/issues/22246 diff --git a/indexer/indexer.go b/indexer/indexer.go index e11dee9..9813f0f 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -138,3 +138,29 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes. e.pendingChans = append(e.pendingChans, pendingChan) return blockChan, logsChan, pendingChan } + +// blockEvents is a struct to emit block events. +type blockEvents struct { + header *coretypes.Header + logs [][]*coretypes.Log +} + +// blockEventsEmitter emits block events to subscribers. +func (e *EVMIndexerImpl) blockEventsEmitter(blockEvents *blockEvents, done chan struct{}) { + if blockEvents == nil { + return + } + for _, logs := range blockEvents.logs { + for _, logsChan := range e.logsChans { + logsChan <- logs + } + } + for _, logsChan := range e.logsChans { + logsChan <- []*coretypes.Log{} + } + for _, blockChan := range e.blockChans { + blockChan <- blockEvents.header + } + + close(done) +} diff --git a/jsonrpc/backend/backend.go b/jsonrpc/backend/backend.go index 26572e6..c5933dd 100644 --- a/jsonrpc/backend/backend.go +++ b/jsonrpc/backend/backend.go @@ -3,6 +3,7 @@ package backend import ( "context" "sync" + "time" lrucache "github.com/hashicorp/golang-lru/v2" @@ -103,3 +104,7 @@ func (b *JSONRPCBackend) releaseAccMut(senderHex string) { b.mut.Unlock() // critical section end } + +func (b *JSONRPCBackend) FilterTimeout() time.Duration { + return b.cfg.FilterTimeout +} diff --git a/jsonrpc/backend/filters.go b/jsonrpc/backend/filters.go index 3fb38ac..98e2d53 100644 --- a/jsonrpc/backend/filters.go +++ b/jsonrpc/backend/filters.go @@ -47,13 +47,3 @@ func (b *JSONRPCBackend) GetLogsByHeight(height uint64) ([]*coretypes.Log, error return blockLogs, nil } - -// RPCFilterCap is the limit for total number of filters that can be created -func (b *JSONRPCBackend) RPCFilterCap() int32 { - return b.cfg.FilterCap -} - -// RPCFilterCap is the limit for total number of filters that can be created -func (b *JSONRPCBackend) RPCBlockRangeCap() int32 { - return b.cfg.BlockRangeCap -} diff --git a/jsonrpc/backend/tx.go b/jsonrpc/backend/tx.go index 890d71c..e7d4bcc 100644 --- a/jsonrpc/backend/tx.go +++ b/jsonrpc/backend/tx.go @@ -116,6 +116,15 @@ func (b *JSONRPCBackend) getQueryCtx() (context.Context, error) { } func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context, error) { + // check whether the given height is bigger than the latest block height + num, err := b.BlockNumber() + if err != nil { + return nil, err + } + if height >= uint64(num) { + height = 0 + } + return b.app.CreateQueryContext(int64(height), false) } @@ -171,7 +180,7 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa } var err error - queryCtx, err = b.app.CreateQueryContext(blockNumber.Int64(), false) + queryCtx, err = b.getQueryCtxWithHeight(uint64(blockNumber.Int64())) if err != nil { return nil, err } diff --git a/jsonrpc/config/config.go b/jsonrpc/config/config.go index 10354d2..d363439 100644 --- a/jsonrpc/config/config.go +++ b/jsonrpc/config/config.go @@ -44,6 +44,8 @@ const ( DefaultFeeHistoryMaxHeaders = 1024 // DefaultFeeHistoryMaxBlocks is the default maximum number of blocks, which can be used to lookup the fee history. DefaultFeeHistoryMaxBlocks = 1024 + // DefaultFilterTimeout is the default filter timeout, how long filters stay active. + DefaultFilterTimeout = 5 * time.Minute ) var ( @@ -59,8 +61,6 @@ const ( flagJSONRPCEnableUnsafeCORS = "json-rpc.enable-unsafe-cors" flagJSONRPCAPIs = "json-rpc.apis" flagJSONRPCLogsCap = "json-rpc.logs-cap" - flagJSONRPCFilterCap = "json-rpc.filter-cap" - flagJSONRPCBlockRangeCap = "json-rpc.block-range-cap" flagJSONRPCHTTPTimeout = "json-rpc.http-timeout" flagJSONRPCHTTPIdleTimeout = "json-rpc.http-idle-timeout" flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections" @@ -70,6 +70,7 @@ const ( flagJSONRPCQueuedTransactionTTL = "json-rpc.queued-transaction-ttl" flagJSONRPCFeeHistoryMaxHeaders = "json-rpc.fee-history-max-headers" flagJSONRPCFeeHistoryMaxBlocks = "json-rpc.fee-history-max-blocks" + flagJSONRPCFilterTimeout = "json-rpc.filter-timeout" ) // JSONRPCConfig defines configuration for the EVM RPC server. @@ -86,10 +87,6 @@ type JSONRPCConfig struct { EnableUnsafeCORS bool `mapstructure:"enable-unsafe-cors"` // API defines a list of JSON-RPC namespaces that should be enabled APIs []string `mapstructure:"apis"` - // FilterCap is the global cap for total number of filters that can be created. - FilterCap int32 `mapstructure:"filter-cap"` - // BlockRangeCap defines the max block range allowed for `eth_getLogs` query. - BlockRangeCap int32 `mapstructure:"block-range-cap"` // HTTPTimeout is the read/write timeout of http json-rpc server. HTTPTimeout time.Duration `mapstructure:"http-timeout"` // HTTPIdleTimeout is the idle timeout of http json-rpc server. @@ -107,6 +104,8 @@ type JSONRPCConfig struct { FeeHistoryMaxHeaders int `mapstructure:"fee-history-max-headers"` // FeeHistoryMaxBlocks is the maximum number of blocks, which can be used to lookup the fee history. FeeHistoryMaxBlocks int `mapstructure:"fee-history-max-blocks"` + // FilterTimeout is a duration how long filters stay active (default: 5min) + FilterTimeout time.Duration `mapstructure:"filter-timeout"` } // DefaultJSONRPCConfig returns a default configuration for the EVM RPC server. @@ -120,9 +119,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig { APIs: DefaultAPIs, - FilterCap: DefaultFilterCap, - BlockRangeCap: DefaultBlockRangeCap, - HTTPTimeout: DefaultHTTPTimeout, HTTPIdleTimeout: DefaultHTTPIdleTimeout, MaxOpenConnections: DefaultMaxOpenConnections, @@ -134,6 +130,8 @@ func DefaultJSONRPCConfig() JSONRPCConfig { FeeHistoryMaxHeaders: DefaultFeeHistoryMaxHeaders, FeeHistoryMaxBlocks: DefaultFeeHistoryMaxBlocks, + + FilterTimeout: DefaultFilterTimeout, } } @@ -145,8 +143,6 @@ func AddConfigFlags(startCmd *cobra.Command) { startCmd.Flags().String(flagJSONRPCAddressWS, DefaultAddressWS, "Address to listen on for the WebSocket server") startCmd.Flags().Bool(flagJSONRPCEnableUnsafeCORS, DefaultEnableUnsafeCORS, "Enable unsafe CORS") startCmd.Flags().StringSlice(flagJSONRPCAPIs, DefaultAPIs, "List of JSON-RPC namespaces that should be enabled") - startCmd.Flags().Int32(flagJSONRPCFilterCap, DefaultFilterCap, "Sets the global cap for total number of filters that can be created") - startCmd.Flags().Int32(flagJSONRPCBlockRangeCap, DefaultBlockRangeCap, "Max block range allowed for 'eth_getLogs' query") startCmd.Flags().Duration(flagJSONRPCHTTPTimeout, DefaultHTTPTimeout, "Read/write timeout of http json-rpc server") startCmd.Flags().Duration(flagJSONRPCHTTPIdleTimeout, DefaultHTTPIdleTimeout, "Idle timeout of http json-rpc server") startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener") @@ -155,6 +151,7 @@ func AddConfigFlags(startCmd *cobra.Command) { startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum number of queued transactions that can be in the transaction pool") startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxHeaders, DefaultFeeHistoryMaxHeaders, "Maximum number of headers used to lookup the fee history") startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxBlocks, DefaultFeeHistoryMaxBlocks, "Maximum number of blocks used to lookup the fee history") + startCmd.Flags().Duration(flagJSONRPCFilterTimeout, DefaultFilterTimeout, "Duration how long filters stay active") } // GetConfig load config values from the app options @@ -166,8 +163,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig { AddressWS: cast.ToString(appOpts.Get(flagJSONRPCAddressWS)), EnableUnsafeCORS: cast.ToBool(appOpts.Get(flagJSONRPCEnableUnsafeCORS)), APIs: strings.Split(cast.ToString(appOpts.Get(flagJSONRPCAPIs)), ","), - FilterCap: cast.ToInt32(appOpts.Get(flagJSONRPCFilterCap)), - BlockRangeCap: cast.ToInt32(appOpts.Get(flagJSONRPCBlockRangeCap)), HTTPTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPTimeout)), HTTPIdleTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPIdleTimeout)), MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)), @@ -176,6 +171,7 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig { QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)), FeeHistoryMaxHeaders: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxHeaders)), FeeHistoryMaxBlocks: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxBlocks)), + FilterTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCFilterTimeout)), } } @@ -206,12 +202,6 @@ enable-unsafe-cors = {{ .JSONRPCConfig.EnableUnsafeCORS }} # Example: "eth,txpool,personal,net,debug,web3" apis = "{{range $index, $elmt := .JSONRPCConfig.APIs}}{{if $index}},{{$elmt}}{{else}}{{$elmt}}{{end}}{{end}}" -# FilterCap is the global cap for total number of filters that can be created. -filter-cap = {{ .JSONRPCConfig.FilterCap }} - -# BlockRangeCap defines the max block range allowed for 'eth_getLogs' query. -block-range-cap = {{ .JSONRPCConfig.BlockRangeCap }} - # HTTPTimeout is the read/write timeout of http json-rpc server. http-timeout = "{{ .JSONRPCConfig.HTTPTimeout }}" @@ -237,4 +227,7 @@ fee-history-max-headers = {{ .JSONRPCConfig.FeeHistoryMaxHeaders }} # FeeHistoryMaxBlocks is the maximum number of blocks, which can be used to lookup the fee history. fee-history-max-blocks = {{ .JSONRPCConfig.FeeHistoryMaxBlocks }} + +# FilterTimeout is a duration how long filters stay active (default: 5min) +filter-timeout = "{{ .JSONRPCConfig.FilterTimeout }}" ` diff --git a/jsonrpc/namespaces/eth/filters/api.go b/jsonrpc/namespaces/eth/filters/api.go index 3434e3f..092c837 100644 --- a/jsonrpc/namespaces/eth/filters/api.go +++ b/jsonrpc/namespaces/eth/filters/api.go @@ -22,22 +22,22 @@ var ( errFilterNotFound = errors.New("filter not found") errInvalidBlockRange = errors.New("invalid block range params") errExceedMaxTopics = errors.New("exceed max topics") - errExceedFilterCap = errors.New("exceed filter cap") ) // The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0 const maxTopics = 4 type filter struct { - ty ethfilters.Type hashes []common.Hash - fullTx bool txs []*rpctypes.RPCTransaction crit ethfilters.FilterCriteria logs []*coretypes.Log // lastUsed is the time the filter was last used lastUsed time.Time + + // subscription lifecycle + s *subscription } // FilterAPI is the eth_ filter namespace API @@ -88,17 +88,33 @@ func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger l // clearUnusedFilters removes filters that have not been used for 5 minutes func (api *FilterAPI) clearUnusedFilters() { - const timeout = 5 * time.Minute + timeout := api.backend.FilterTimeout() + if timeout == 0 { + timeout = 5 * time.Minute + } + ticker := time.NewTicker(timeout) + defer ticker.Stop() + + var toUninstall []*subscription for { - time.Sleep(timeout) + <-ticker.C api.filtersMut.Lock() for id, f := range api.filters { - if time.Since(f.lastUsed) > 5*time.Minute { + if time.Since(f.lastUsed) > timeout { + toUninstall = append(toUninstall, f.s) delete(api.filters, id) } } api.filtersMut.Unlock() + + // Unsubscribes are processed outside the lock to avoid the following scenario: + // event loop attempts broadcasting events to still active filters while + // Unsubscribe is waiting for it to process the uninstall request. + for _, s := range toUninstall { + api.uninstallSubscription(s) + } + toUninstall = nil } } @@ -106,14 +122,6 @@ func (api *FilterAPI) eventLoop() { for { select { case block := <-api.blockChan: - api.filtersMut.Lock() - for _, f := range api.filters { - if f.ty == ethfilters.BlocksSubscription { - f.hashes = append(f.hashes, block.Hash()) - } - } - api.filtersMut.Unlock() - for _, s := range api.subscriptions { if s.ty == ethfilters.BlocksSubscription { s.headerChan <- block @@ -124,38 +132,13 @@ func (api *FilterAPI) eventLoop() { continue } - api.filtersMut.Lock() - for _, f := range api.filters { - if f.ty == ethfilters.LogsSubscription { - logs := filterLogs(logs, f.crit.FromBlock, f.crit.ToBlock, f.crit.Addresses, f.crit.Topics) - if len(logs) > 0 { - f.logs = append(f.logs, logs...) - } - } - } - api.filtersMut.Unlock() - for _, s := range api.subscriptions { if s.ty == ethfilters.LogsSubscription { - logs := filterLogs(logs, s.crit.FromBlock, s.crit.ToBlock, s.crit.Addresses, s.crit.Topics) - if len(logs) > 0 { - s.logsChan <- logs - } + // logs will be filtered in the subscription in the goroutine + s.logsChan <- logs } } case tx := <-api.pendingChan: - api.filtersMut.Lock() - for _, f := range api.filters { - if f.ty == ethfilters.PendingTransactionsSubscription { - if f.fullTx { - f.txs = append(f.txs, tx) - } else { - f.hashes = append(f.hashes, tx.Hash) - } - } - } - api.filtersMut.Unlock() - for _, s := range api.subscriptions { if s.ty == ethfilters.PendingTransactionsSubscription { if s.fullTx { @@ -182,38 +165,102 @@ func (api *FilterAPI) eventLoop() { // It is part of the filter package because this filter can be used through the // `eth_getFilterChanges` polling method that is also used for log filters. func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error) { - if len(api.filters) >= int(api.backend.RPCFilterCap()) { - return "", errExceedFilterCap - } + var ( + txChan = make(chan *rpctypes.RPCTransaction) + hashChan = make(chan common.Hash) + ) id := rpc.NewID() - api.filtersMut.Lock() - api.filters[id] = &filter{ + s := &subscription{ + id: id, ty: ethfilters.PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, + + // for listening + txChan: txChan, + hashChan: hashChan, + + // for lifecycle + installed: make(chan struct{}), + err: make(chan error), + } + api.installSubscription(s) + + api.filtersMut.Lock() + api.filters[id] = &filter{ txs: make([]*rpctypes.RPCTransaction, 0), hashes: make([]common.Hash, 0), + s: s, } api.filtersMut.Unlock() + go func() { + for { + select { + case rpcTx := <-txChan: + api.filtersMut.Lock() + if f, found := api.filters[id]; found { + f.txs = append(f.txs, rpcTx) + } + api.filtersMut.Unlock() + case hash := <-hashChan: + api.filtersMut.Lock() + if f, found := api.filters[id]; found { + f.hashes = append(f.hashes, hash) + } + api.filtersMut.Unlock() + case <-s.err: // subsciprtion is uninstalled + return + } + } + }() + return id, nil } // NewBlockFilter creates a filter that fetches blocks that are imported into the chain. // It is part of the filter package since polling goes with eth_getFilterChanges. func (api *FilterAPI) NewBlockFilter() (rpc.ID, error) { - if len(api.filters) >= int(api.backend.RPCFilterCap()) { - return "", errExceedFilterCap - } + var ( + headerChan = make(chan *coretypes.Header) + ) id := rpc.NewID() + s := &subscription{ + id: id, + ty: ethfilters.BlocksSubscription, + + // for listening + headerChan: headerChan, + + // for lifecycle + installed: make(chan struct{}), + err: make(chan error), + } + api.installSubscription(s) + api.filtersMut.Lock() api.filters[id] = &filter{ - ty: ethfilters.BlocksSubscription, hashes: make([]common.Hash, 0), + s: s, } api.filtersMut.Unlock() + go func() { + for { + select { + case header := <-headerChan: + api.filtersMut.Lock() + if f, found := api.filters[id]; found { + f.hashes = append(f.hashes, header.Hash()) + } + api.filtersMut.Unlock() + case <-s.err: // subsciprtion is uninstalled + return + } + } + }() + return id, nil } @@ -229,9 +276,6 @@ func (api *FilterAPI) NewFilter(crit ethfilters.FilterCriteria) (rpc.ID, error) if len(crit.Topics) > maxTopics { return "", errExceedMaxTopics } - if len(api.filters) >= int(api.backend.RPCFilterCap()) { - return "", errExceedFilterCap - } var from, to rpc.BlockNumber if crit.FromBlock == nil { @@ -252,15 +296,49 @@ func (api *FilterAPI) NewFilter(crit ethfilters.FilterCriteria) (rpc.ID, error) return "", errInvalidBlockRange } + var ( + logsChan = make(chan []*coretypes.Log) + ) + id := rpc.NewID() + s := &subscription{ + id: id, + ty: ethfilters.LogsSubscription, + crit: crit, + + // for listening + logsChan: logsChan, + + // for lifecycle + installed: make(chan struct{}), + err: make(chan error), + } + api.installSubscription(s) + api.filtersMut.Lock() api.filters[id] = &filter{ - ty: ethfilters.LogsSubscription, crit: crit, lastUsed: time.Now(), logs: make([]*coretypes.Log, 0), + s: s, } api.filtersMut.Unlock() + go func() { + for { + select { + case logs := <-logsChan: + logs = filterLogs(logs, s.crit.FromBlock, s.crit.ToBlock, s.crit.Addresses, s.crit.Topics) + api.filtersMut.Lock() + if f, found := api.filters[id]; found { + f.logs = append(f.logs, logs...) + } + api.filtersMut.Unlock() + case <-s.err: // subsciprtion is uninstalled + return + } + } + }() + return id, nil } @@ -291,7 +369,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri } // Run the filter and return all the logs - logs, err := filter.Logs(ctx, int64(api.backend.RPCBlockRangeCap())) + logs, err := filter.Logs(ctx) if err != nil { return nil, err } @@ -301,11 +379,15 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri // UninstallFilter removes the filter with the given filter id. func (api *FilterAPI) UninstallFilter(id rpc.ID) bool { api.filtersMut.Lock() - _, found := api.filters[id] + f, found := api.filters[id] if found { delete(api.filters, id) } api.filtersMut.Unlock() + if found { + api.uninstallSubscription(f.s) + } + return found } @@ -316,7 +398,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretype f, found := api.filters[id] api.filtersMut.Unlock() - if !found || f.ty != ethfilters.LogsSubscription { + if !found || f.s.ty != ethfilters.LogsSubscription { return nil, errFilterNotFound } @@ -339,7 +421,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretype } // Run the filter and return all the logs - logs, err := bloomFilter.Logs(ctx, int64(api.backend.RPCBlockRangeCap())) + logs, err := bloomFilter.Logs(ctx) if err != nil { return nil, err } @@ -363,7 +445,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.lastUsed = time.Now() - switch f.ty { + switch f.s.ty { case ethfilters.BlocksSubscription: hashes := f.hashes f.hashes = nil @@ -375,7 +457,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { return returnLogs(logs), nil case ethfilters.PendingTransactionsSubscription: - if f.fullTx { + if f.s.fullTx { txs := f.txs f.txs = nil diff --git a/jsonrpc/namespaces/eth/filters/filter.go b/jsonrpc/namespaces/eth/filters/filter.go index 9879ab3..0db7e6c 100644 --- a/jsonrpc/namespaces/eth/filters/filter.go +++ b/jsonrpc/namespaces/eth/filters/filter.go @@ -70,7 +70,7 @@ func newFilter( // Logs searches the blockchain for matching log entries, returning all from the // first block that contains matches, updating the start of the filter accordingly. -func (f *Filter) Logs(ctx context.Context, blockLimit int64) ([]*coretypes.Log, error) { +func (f *Filter) Logs(ctx context.Context) ([]*coretypes.Log, error) { var err error // If we're doing singleton block filtering, execute and return @@ -111,9 +111,6 @@ func (f *Filter) Logs(ctx context.Context, blockLimit int64) ([]*coretypes.Log, if f.end < f.begin { return nil, fmt.Errorf("invalid range [%d, %d]", f.begin, f.end) } - if f.end-f.begin > blockLimit { - return nil, fmt.Errorf("maximum [begin, end] blocks distance: %d", blockLimit) - } // check bounds if f.begin > head { diff --git a/jsonrpc/namespaces/eth/filters/subscriptions.go b/jsonrpc/namespaces/eth/filters/subscriptions.go index cee7927..39ff28c 100644 --- a/jsonrpc/namespaces/eth/filters/subscriptions.go +++ b/jsonrpc/namespaces/eth/filters/subscriptions.go @@ -18,12 +18,13 @@ type subscription struct { crit ethfilters.FilterCriteria fullTx bool + // for listening headerChan chan *coretypes.Header logsChan chan []*coretypes.Log txChan chan *rpctypes.RPCTransaction hashChan chan common.Hash - // Channels to signal the subscription is installed or uninstalled + // for lifecycle installed chan struct{} // closed when the subscription is installed err chan error // closed when the subscription is uninstalled unsubOnce sync.Once @@ -43,18 +44,21 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { id := rpc.NewID() s := &subscription{ - id: id, - ty: ethfilters.BlocksSubscription, + id: id, + ty: ethfilters.BlocksSubscription, + + // for listening headerChan: headerChan, + // for lifecycle installed: make(chan struct{}), err: make(chan error), } - api.install <- s - <-s.installed + + api.installSubscription(s) go func() { - defer api.clearSubscription(s) + defer api.uninstallSubscription(s) for { select { @@ -109,19 +113,22 @@ func (api *FilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria) ty: ethfilters.LogsSubscription, crit: crit, + // for listening logsChan: logsChan, + // for lifecycle installed: make(chan struct{}), err: make(chan error), } - api.install <- s - <-s.installed + + api.installSubscription(s) go func() { - defer api.clearSubscription(s) + defer api.uninstallSubscription(s) for { select { case logs := <-logsChan: + logs = filterLogs(logs, s.crit.FromBlock, s.crit.ToBlock, s.crit.Addresses, s.crit.Topics) for _, log := range logs { log := log _ = notifier.Notify(rpcSub.ID, &log) @@ -152,20 +159,23 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) id := rpc.NewID() s := &subscription{ - id: id, - ty: ethfilters.PendingTransactionsSubscription, - fullTx: fullTx != nil && *fullTx, + id: id, + ty: ethfilters.PendingTransactionsSubscription, + fullTx: fullTx != nil && *fullTx, + + // for listening txChan: txChan, hashChan: hashChan, + // for lifecycle installed: make(chan struct{}), err: make(chan error), } - api.install <- s - <-s.installed + + api.installSubscription(s) go func() { - defer api.clearSubscription(s) + defer api.uninstallSubscription(s) for { select { @@ -182,7 +192,12 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) return rpcSub, nil } -func (api *FilterAPI) clearSubscription(s *subscription) { +func (api *FilterAPI) installSubscription(s *subscription) { + api.install <- s + <-s.installed +} + +func (api *FilterAPI) uninstallSubscription(s *subscription) { s.unsubOnce.Do(func() { uninstallLoop: for {