Skip to content

Commit

Permalink
use mutex and pass logs in bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 committed Jul 12, 2024
1 parent 64d8e45 commit 91a7c4d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 236 deletions.
8 changes: 4 additions & 4 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}

// emit log events
if e.logChan != nil {
if e.logsChan != nil {
for idx, log := range receipt.Logs {
// fill in missing fields before emitting
log.Index = uint(idx)
log.BlockHash = blockHash
log.BlockNumber = uint64(blockHeight)
log.TxHash = txHash
log.TxIndex = uint(txIndex)

// emit log event
e.logChan <- log
}

// emit logs event
e.logsChan <- receipt.Logs
}
}

Expand Down
12 changes: 6 additions & 6 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type EVMIndexer interface {
BlockHeaderByNumber(ctx context.Context, number uint64) (*coretypes.Header, error)

// event subscription
Subscribe() (chan *coretypes.Header, chan *coretypes.Log, chan *rpctypes.RPCTransaction)
Subscribe() (chan *coretypes.Header, chan []*coretypes.Log, chan *rpctypes.RPCTransaction)
MempoolWrapper(mempool mempool.Mempool) mempool.Mempool
}

Expand All @@ -61,7 +61,7 @@ type EVMIndexerImpl struct {
BlockHashToNumberMap collections.Map[[]byte, uint64]

blockChan chan *coretypes.Header
logChan chan *coretypes.Log
logsChan chan []*coretypes.Log
pendingChan chan *rpctypes.RPCTransaction
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func NewEVMIndexer(
BlockHashToNumberMap: collections.NewMap(sb, prefixBlockHashToNumber, "block_hash_to_number", collections.BytesKey, collections.Uint64Value),

blockChan: nil,
logChan: nil,
logsChan: nil,
pendingChan: nil,
}

Expand All @@ -109,9 +109,9 @@ func NewEVMIndexer(
}

// Subscribe returns channels to receive blocks and logs.
func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan *coretypes.Log, chan *rpctypes.RPCTransaction) {
func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.Log, chan *rpctypes.RPCTransaction) {
e.blockChan = make(chan *coretypes.Header)
e.logChan = make(chan *coretypes.Log)
e.logsChan = make(chan []*coretypes.Log)
e.pendingChan = make(chan *rpctypes.RPCTransaction)
return e.blockChan, e.logChan, e.pendingChan
return e.blockChan, e.logsChan, e.pendingChan
}
12 changes: 0 additions & 12 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
DefaultLogsCap = 100
// DefaultBlockRangeCap is the default max block range allowed for `eth_getLogs` query.
DefaultBlockRangeCap = 100
// DefaultMetricsAddress defines the default EVM Metrics server address to bind to.
DefaultMetricsAddress = "127.0.0.1:6065"
// DefaultAddress defines the default HTTP server to listen on.
DefaultAddress = "127.0.0.1:8545"
// DefaultFilterCap
Expand All @@ -49,7 +47,6 @@ const (
flagJSONRPCHTTPTimeout = "json-rpc.http-timeout"
flagJSONRPCHTTPIdleTimeout = "json-rpc.http-idle-timeout"
flagJSONRPCMaxOpenConnections = "json-rpc.max-open-connections"
flagJSONRPCMetricsAddress = "json-rpc.metrics-address"
)

// JSONRPCConfig defines configuration for the EVM RPC server.
Expand All @@ -73,8 +70,6 @@ type JSONRPCConfig struct {
// MaxOpenConnections sets the maximum number of simultaneous connections
// for the server listener.
MaxOpenConnections int `mapstructure:"max-open-connections"`
// MetricsAddress defines the metrics server to listen on
MetricsAddress string `mapstructure:"metrics-address"`
}

// DefaultJSONRPCConfig returns a default configuration for the EVM RPC server.
Expand All @@ -89,7 +84,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig {
HTTPTimeout: DefaultHTTPTimeout,
HTTPIdleTimeout: DefaultHTTPIdleTimeout,
MaxOpenConnections: DefaultMaxOpenConnections,
MetricsAddress: DefaultMetricsAddress,
}
}

Expand All @@ -104,7 +98,6 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Duration(flagJSONRPCHTTPTimeout, DefaultHTTPTimeout, "Read/write timeout of http json-rpc server")
startCmd.Flags().Duration(flagJSONRPCHTTPIdleTimeout, DefaultHTTPIdleTimeout, "Idle timeout of http json-rpc server")
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
startCmd.Flags().String(flagJSONRPCMetricsAddress, DefaultMetricsAddress, "Address to listen on for the EVM Metrics server")
}

// GetConfig load config values from the app options
Expand All @@ -119,7 +112,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
HTTPTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPTimeout)),
HTTPIdleTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCHTTPIdleTimeout)),
MaxOpenConnections: cast.ToInt(appOpts.Get(flagJSONRPCMaxOpenConnections)),
MetricsAddress: cast.ToString(appOpts.Get(flagJSONRPCMetricsAddress)),
}
}

Expand Down Expand Up @@ -159,8 +151,4 @@ http-idle-timeout = "{{ .JSONRPCConfig.HTTPIdleTimeout }}"
# MaxOpenConnections sets the maximum number of simultaneous connections
# for the server listener.
max-open-connections = {{ .JSONRPCConfig.MaxOpenConnections }}
# MetricsAddress defines the EVM Metrics server address to bind to. Pass --metrics in CLI to enable
# Prometheus metrics path: /debug/metrics/prometheus
metrics-address = "{{ .JSONRPCConfig.MetricsAddress }}"
`
125 changes: 69 additions & 56 deletions jsonrpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ type FilterAPI struct {

logger log.Logger

filters sync.Map
filtersMu sync.Mutex
filters map[rpc.ID]*filter

// channels for block and log events
blockChan chan *coretypes.Header
logChan chan *coretypes.Log
logsChan chan []*coretypes.Log
pendingChan chan *rpctypes.RPCTransaction
}

Expand All @@ -63,12 +64,12 @@ func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger l

logger: logger,

filters: sync.Map{},
filters: make(map[rpc.ID]*filter),
}

go api.clearUnusedFilters()

api.blockChan, api.logChan, api.pendingChan = app.EVMIndexer().Subscribe()
api.blockChan, api.logsChan, api.pendingChan = app.EVMIndexer().Subscribe()
go api.subscribeEvents()

return api
Expand All @@ -80,47 +81,50 @@ func (api *FilterAPI) clearUnusedFilters() {

for {
time.Sleep(timeout)
api.filters.Range(func(key, value interface{}) bool {
f := value.(*filter)
api.filtersMu.Lock()
for id, f := range api.filters {
if time.Since(f.lastUsed) > 5*time.Minute {
api.filters.Delete(key)
delete(api.filters, id)
}
return true
})
}
api.filtersMu.Unlock()
}
}

func (api *FilterAPI) subscribeEvents() {
for {
select {
case block := <-api.blockChan:
api.filters.Range(func(key, value interface{}) bool {
f := value.(*filter)
api.filtersMu.Lock()
for _, f := range api.filters {
if f.ty == ethfilters.BlocksSubscription {
f.hashes = append(f.hashes, block.Hash())
}
return true
})
case log := <-api.logChan:
api.filters.Range(func(key, value interface{}) bool {
f := value.(*filter)
}
api.filtersMu.Unlock()
case logs := <-api.logsChan:
api.filtersMu.Lock()
for _, f := range api.filters {
if f.ty == ethfilters.LogsSubscription {
f.logs = append(f.logs, log)
logs = filterLogs(logs, f.crit.FromBlock, f.crit.ToBlock, f.crit.Addresses, f.crit.Topics)
if len(logs) > 0 {
f.logs = append(f.logs, logs...)
}
}
return true
})
}
api.filtersMu.Unlock()
case tx := <-api.pendingChan:
api.filters.Range(func(key, value interface{}) bool {
f := value.(*filter)
api.filtersMu.Lock()
for _, f := range api.filters {
if f.ty == ethfilters.PendingTransactionsSubscription {
if f.fullTx {
f.txs = append(f.txs, tx)
} else {
f.hashes = append(f.hashes, tx.Hash)
}
}
return true
})
}
api.filtersMu.Unlock()
}
}
}
Expand All @@ -132,12 +136,14 @@ func (api *FilterAPI) subscribeEvents() {
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
id := rpc.NewID()
api.filters.Store(id, &filter{
api.filtersMu.Lock()
api.filters[id] = &filter{
ty: ethfilters.PendingTransactionsSubscription,
fullTx: fullTx != nil && *fullTx,
txs: make([]*rpctypes.RPCTransaction, 0),
hashes: make([]common.Hash, 0),
})
}
api.filtersMu.Unlock()

return id
}
Expand All @@ -146,10 +152,12 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
// It is part of the filter package since polling goes with eth_getFilterChanges.
func (api *FilterAPI) NewBlockFilter() rpc.ID {
id := rpc.NewID()
api.filters.Store(id, &filter{
api.filtersMu.Lock()
api.filters[id] = &filter{
ty: ethfilters.BlocksSubscription,
hashes: make([]common.Hash, 0),
})
}
api.filtersMu.Unlock()

return id
}
Expand Down Expand Up @@ -187,11 +195,13 @@ func (api *FilterAPI) NewFilter(crit ethfilters.FilterCriteria) (rpc.ID, error)
}

id := rpc.NewID()
api.filters.Store(id, &filter{
api.filtersMu.Lock()
api.filters[id] = &filter{
ty: ethfilters.LogsSubscription,
crit: crit, lastUsed: time.Now(),
logs: make([]*coretypes.Log, 0),
})
}
api.filtersMu.Unlock()

return id, nil
}
Expand Down Expand Up @@ -233,39 +243,40 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri

// UninstallFilter removes the filter with the given filter id.
func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
_, found := api.filters.LoadAndDelete(id)
api.filtersMu.Lock()
_, found := api.filters[id]
delete(api.filters, id)
api.filtersMu.Unlock()
return found
}

// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretypes.Log, error) {
f, ok := api.filters.Load(id)
if !ok {
return nil, errFilterNotFound
}
api.filtersMu.Lock()
f, found := api.filters[id]
api.filtersMu.Lock()

filter := f.(*filter)
if filter.ty != ethfilters.LogsSubscription {
if !found || f.ty != ethfilters.LogsSubscription {
return nil, errFilterNotFound
}

var bloomFilter *Filter
if filter.crit.BlockHash != nil {
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
bloomFilter = newBlockFilter(api.logger, api.backend, *filter.crit.BlockHash, filter.crit.Addresses, filter.crit.Topics)
bloomFilter = newBlockFilter(api.logger, api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
if filter.crit.FromBlock != nil {
begin = filter.crit.FromBlock.Int64()
if f.crit.FromBlock != nil {
begin = f.crit.FromBlock.Int64()
}
end := rpc.LatestBlockNumber.Int64()
if filter.crit.ToBlock != nil {
end = filter.crit.ToBlock.Int64()
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
bloomFilter = newRangeFilter(api.logger, api.backend, begin, end, filter.crit.Addresses, filter.crit.Topics)
bloomFilter = newRangeFilter(api.logger, api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
}

// Run the filter and return all the logs
Expand All @@ -283,35 +294,37 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretype
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
f, ok := api.filters.Load(id)
api.filtersMu.Lock()
defer api.filtersMu.Unlock()

f, ok := api.filters[id]
if !ok {
return []interface{}{}, errFilterNotFound
}

filter := f.(*filter)
filter.lastUsed = time.Now()
f.lastUsed = time.Now()

switch filter.ty {
switch f.ty {
case ethfilters.BlocksSubscription:
hashes := filter.hashes
filter.hashes = nil
hashes := f.hashes
f.hashes = nil

return returnHashes(hashes), nil
case ethfilters.LogsSubscription:
logs := filter.logs
filter.logs = nil
logs := f.logs
f.logs = nil

return returnLogs(logs), nil
case ethfilters.PendingTransactionsSubscription:
if filter.fullTx {
txs := filter.txs
filter.txs = nil
if f.fullTx {
txs := f.txs
f.txs = nil

return txs, nil
}

hashes := filter.hashes
filter.hashes = nil
hashes := f.hashes
f.hashes = nil

return returnHashes(hashes), nil
}
Expand Down
Loading

0 comments on commit 91a7c4d

Please sign in to comment.