Skip to content

Commit

Permalink
ensure all logs events are emitted before emitting block events (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 authored Oct 22, 2024
1 parent f5f60c3 commit 1839cea
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 30 deletions.
10 changes: 9 additions & 1 deletion indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
return err
}

if len(e.logsChans) > 0 {
if len(e.logsChans) > 0 && len(receipt.Logs) > 0 {
for idx, log := range receipt.Logs {
// fill in missing fields before emitting
log.Index = uint(idx)
Expand All @@ -184,6 +184,14 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}
}

// emit empty logs event to confirm all logs are emitted and consumed, so the logs are
// available for querying.
if len(e.logsChans) > 0 {
for _, logsChan := range e.logsChans {
logsChan <- []*coretypes.Log{}
}
}

// index block header
if err := e.BlockHeaderMap.Set(sdkCtx, uint64(blockHeight), blockHeader); err != nil {
e.logger.Error("failed to marshal blockHeader", "err", err)
Expand Down
47 changes: 26 additions & 21 deletions jsonrpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type FilterAPI struct {

logger log.Logger

filtersMu sync.Mutex
filtersMut sync.Mutex

filters map[rpc.ID]*filter
subscriptions map[rpc.ID]*subscription

Expand Down Expand Up @@ -84,21 +85,21 @@ func (api *FilterAPI) clearUnusedFilters() {

for {
time.Sleep(timeout)
api.filtersMu.Lock()
api.filtersMut.Lock()
for id, f := range api.filters {
if time.Since(f.lastUsed) > 5*time.Minute {
delete(api.filters, id)
}
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()
}
}

func (api *FilterAPI) subscribeEvents() {
for {
select {
case block := <-api.blockChan:
api.filtersMu.Lock()
api.filtersMut.Lock()
for _, f := range api.filters {
if f.ty == ethfilters.BlocksSubscription {
f.hashes = append(f.hashes, block.Hash())
Expand All @@ -109,9 +110,13 @@ func (api *FilterAPI) subscribeEvents() {
s.headerChan <- block
}
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()
case logs := <-api.logsChan:
api.filtersMu.Lock()
if len(logs) == 0 {
continue
}

api.filtersMut.Lock()
for _, f := range api.filters {
if f.ty == ethfilters.LogsSubscription {
logs = filterLogs(logs, f.crit.FromBlock, f.crit.ToBlock, f.crit.Addresses, f.crit.Topics)
Expand All @@ -125,9 +130,9 @@ func (api *FilterAPI) subscribeEvents() {
s.logsChan <- logs
}
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()
case tx := <-api.pendingChan:
api.filtersMu.Lock()
api.filtersMut.Lock()
for _, f := range api.filters {
if f.ty == ethfilters.PendingTransactionsSubscription {
if f.fullTx {
Expand All @@ -146,7 +151,7 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()
}
}
}
Expand All @@ -162,14 +167,14 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) (rpc.ID, error)
}

id := rpc.NewID()
api.filtersMu.Lock()
api.filtersMut.Lock()
api.filters[id] = &filter{
ty: ethfilters.PendingTransactionsSubscription,
fullTx: fullTx != nil && *fullTx,
txs: make([]*rpctypes.RPCTransaction, 0),
hashes: make([]common.Hash, 0),
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()

return id, nil
}
Expand All @@ -182,12 +187,12 @@ func (api *FilterAPI) NewBlockFilter() (rpc.ID, error) {
}

id := rpc.NewID()
api.filtersMu.Lock()
api.filtersMut.Lock()
api.filters[id] = &filter{
ty: ethfilters.BlocksSubscription,
hashes: make([]common.Hash, 0),
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()

return id, nil
}
Expand Down Expand Up @@ -228,13 +233,13 @@ func (api *FilterAPI) NewFilter(crit ethfilters.FilterCriteria) (rpc.ID, error)
}

id := rpc.NewID()
api.filtersMu.Lock()
api.filtersMut.Lock()
api.filters[id] = &filter{
ty: ethfilters.LogsSubscription,
crit: crit, lastUsed: time.Now(),
logs: make([]*coretypes.Log, 0),
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()

return id, nil
}
Expand Down Expand Up @@ -275,19 +280,19 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri

// UninstallFilter removes the filter with the given filter id.
func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMu.Lock()
api.filtersMut.Lock()
_, found := api.filters[id]
delete(api.filters, id)
api.filtersMu.Unlock()
api.filtersMut.Unlock()
return found
}

// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretypes.Log, error) {
api.filtersMu.Lock()
api.filtersMut.Lock()
f, found := api.filters[id]
api.filtersMu.Lock()
api.filtersMut.Lock()

if !found || f.ty != ethfilters.LogsSubscription {
return nil, errFilterNotFound
Expand Down Expand Up @@ -326,8 +331,8 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretype
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
api.filtersMut.Lock()
defer api.filtersMut.Unlock()

f, ok := api.filters[id]
if !ok {
Expand Down
16 changes: 8 additions & 8 deletions jsonrpc/namespaces/eth/filters/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
)

id := rpc.NewID()
api.filtersMu.Lock()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
ty: ethfilters.BlocksSubscription,
headerChan: headerChan,
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()

go func() {
defer api.clearSubscription(id)
Expand Down Expand Up @@ -93,14 +93,14 @@ func (api *FilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria)
}

id := rpc.NewID()
api.filtersMu.Lock()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
ty: ethfilters.LogsSubscription,
crit: crit,

logsChan: logsChan,
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()

go func() {
defer api.clearSubscription(id)
Expand Down Expand Up @@ -136,14 +136,14 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
)

id := rpc.NewID()
api.filtersMu.Lock()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
ty: ethfilters.PendingTransactionsSubscription,
fullTx: fullTx != nil && *fullTx,
txChan: txChan,
hashChan: hashChan,
}
api.filtersMu.Unlock()
api.filtersMut.Unlock()

go func() {
defer api.clearSubscription(id)
Expand All @@ -164,7 +164,7 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
}

func (api *FilterAPI) clearSubscription(id rpc.ID) {
api.filtersMu.Lock()
api.filtersMut.Lock()
delete(api.subscriptions, id)
api.filtersMu.Unlock()
api.filtersMut.Unlock()
}

0 comments on commit 1839cea

Please sign in to comment.