Skip to content

Commit

Permalink
fix: adopt install uninstall chan to resolve deadlock on subscription (
Browse files Browse the repository at this point in the history
…#81)

* adopt install uninstall chan to resolve deadlock on subscription

* fix comments

* fix to use types function to convert denom<>erc20

* fix condition
  • Loading branch information
beer-1 authored Oct 24, 2024
1 parent c76e767 commit 875632b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 26 deletions.
2 changes: 1 addition & 1 deletion jsonrpc/backend/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (b *JSONRPCBackend) Call(args rpctypes.TransactionArgs, blockNrOrHash *rpc.
}

// if blockNrOrHash is nil, use the latest block
if blockNrOrHash != nil {
if blockNrOrHash == nil {
latest := rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber)
blockNrOrHash = &latest
}
Expand Down
33 changes: 26 additions & 7 deletions jsonrpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type FilterAPI struct {
filters map[rpc.ID]*filter
subscriptions map[rpc.ID]*subscription

// Channels for subscription managements
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification

// channels for block and log events
blockChan chan *coretypes.Header
logsChan chan []*coretypes.Log
Expand All @@ -67,14 +71,17 @@ func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger l

logger: logger,

install: make(chan *subscription),
uninstall: make(chan *subscription),

filters: make(map[rpc.ID]*filter),
subscriptions: make(map[rpc.ID]*subscription),
}

go api.clearUnusedFilters()

api.blockChan, api.logsChan, api.pendingChan = app.EVMIndexer().Subscribe()
go api.subscribeEvents()
go api.eventLoop()

return api
}
Expand All @@ -95,7 +102,7 @@ func (api *FilterAPI) clearUnusedFilters() {
}
}

func (api *FilterAPI) subscribeEvents() {
func (api *FilterAPI) eventLoop() {
for {
select {
case block := <-api.blockChan:
Expand All @@ -105,12 +112,13 @@ func (api *FilterAPI) subscribeEvents() {
f.hashes = append(f.hashes, block.Hash())
}
}
api.filtersMut.Unlock()

for _, s := range api.subscriptions {
if s.ty == ethfilters.BlocksSubscription {
s.headerChan <- block
}
}
api.filtersMut.Unlock()
case logs := <-api.logsChan:
if len(logs) == 0 {
continue
Expand All @@ -125,6 +133,8 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()

for _, s := range api.subscriptions {
if s.ty == ethfilters.LogsSubscription {
logs := filterLogs(logs, s.crit.FromBlock, s.crit.ToBlock, s.crit.Addresses, s.crit.Topics)
Expand All @@ -133,7 +143,6 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()
case tx := <-api.pendingChan:
api.filtersMut.Lock()
for _, f := range api.filters {
Expand All @@ -145,6 +154,8 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()

for _, s := range api.subscriptions {
if s.ty == ethfilters.PendingTransactionsSubscription {
if s.fullTx {
Expand All @@ -154,7 +165,13 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()
// subscription managements
case s := <-api.install:
api.subscriptions[s.id] = s
close(s.installed)
case s := <-api.uninstall:
delete(api.subscriptions, s.id)
close(s.err)
}
}
}
Expand Down Expand Up @@ -285,7 +302,9 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri
func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMut.Lock()
_, found := api.filters[id]
delete(api.filters, id)
if found {
delete(api.filters, id)
}
api.filtersMut.Unlock()
return found
}
Expand All @@ -295,7 +314,7 @@ func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretypes.Log, error) {
api.filtersMut.Lock()
f, found := api.filters[id]
api.filtersMut.Lock()
api.filtersMut.Unlock()

if !found || f.ty != ethfilters.LogsSubscription {
return nil, errFilterNotFound
Expand Down
70 changes: 54 additions & 16 deletions jsonrpc/namespaces/eth/filters/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filters

import (
"context"
"sync"

"github.com/ethereum/go-ethereum/common"
coretypes "github.com/ethereum/go-ethereum/core/types"
Expand All @@ -12,6 +13,7 @@ import (
)

type subscription struct {
id rpc.ID
ty ethfilters.Type
crit ethfilters.FilterCriteria
fullTx bool
Expand All @@ -20,6 +22,11 @@ type subscription struct {
logsChan chan []*coretypes.Log
txChan chan *rpctypes.RPCTransaction
hashChan chan common.Hash

// Channels to signal the subscription is installed or uninstalled
installed chan struct{} // closed when the subscription is installed
err chan error // closed when the subscription is uninstalled
unsubOnce sync.Once
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
Expand All @@ -35,15 +42,19 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
)

id := rpc.NewID()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
s := &subscription{
id: id,
ty: ethfilters.BlocksSubscription,
headerChan: headerChan,

installed: make(chan struct{}),
err: make(chan error),
}
api.filtersMut.Unlock()
api.install <- s
<-s.installed

go func() {
defer api.clearSubscription(id)
defer api.clearSubscription(s)

for {
select {
Expand Down Expand Up @@ -93,17 +104,21 @@ func (api *FilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria)
}

id := rpc.NewID()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
s := &subscription{
id: id,
ty: ethfilters.LogsSubscription,
crit: crit,

logsChan: logsChan,

installed: make(chan struct{}),
err: make(chan error),
}
api.filtersMut.Unlock()
api.install <- s
<-s.installed

go func() {
defer api.clearSubscription(id)
defer api.clearSubscription(s)
for {
select {
case logs := <-logsChan:
Expand Down Expand Up @@ -136,17 +151,21 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
)

id := rpc.NewID()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
s := &subscription{
id: id,
ty: ethfilters.PendingTransactionsSubscription,
fullTx: fullTx != nil && *fullTx,
txChan: txChan,
hashChan: hashChan,

installed: make(chan struct{}),
err: make(chan error),
}
api.filtersMut.Unlock()
api.install <- s
<-s.installed

go func() {
defer api.clearSubscription(id)
defer api.clearSubscription(s)

for {
select {
Expand All @@ -163,8 +182,27 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
return rpcSub, nil
}

func (api *FilterAPI) clearSubscription(id rpc.ID) {
api.filtersMut.Lock()
delete(api.subscriptions, id)
api.filtersMut.Unlock()
func (api *FilterAPI) clearSubscription(s *subscription) {
s.unsubOnce.Do(func() {
uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
// the eventLoop broadcast method to deadlock when writing to the
// filter event channel while the subscription loop is waiting for
// this method to return (and thus not reading these events).
select {
case api.uninstall <- s:
break uninstallLoop
case <-s.logsChan:
case <-s.txChan:
case <-s.hashChan:
case <-s.headerChan:
}
}

// wait for filter to be uninstalled in work loop before returning
// this ensures that the manager won't use the event channel which
// will probably be closed by the client asap after this method returns.
<-s.err
})
}
4 changes: 2 additions & 2 deletions x/evm/precompiles/cosmos/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (e CosmosPrecompile) ExtendedRun(caller vm.ContractRef, input []byte, suppl
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}

denom, err := e.edk.GetDenomByContractAddr(ctx, toDenomArguments.ERC20Address)
denom, err := types.ContractAddrToDenom(ctx, e.edk, toDenomArguments.ERC20Address)
if err != nil {
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}
Expand All @@ -315,7 +315,7 @@ func (e CosmosPrecompile) ExtendedRun(caller vm.ContractRef, input []byte, suppl
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}

contractAddr, err := e.edk.GetContractAddrByDenom(ctx, toERC20Arguments.Denom)
contractAddr, err := types.DenomToContractAddr(ctx, e.edk, toERC20Arguments.Denom)
if err != nil {
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}
Expand Down

0 comments on commit 875632b

Please sign in to comment.