Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: adopt install uninstall chan to resolve deadlock on subscription #81

Merged
merged 4 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jsonrpc/backend/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (b *JSONRPCBackend) Call(args rpctypes.TransactionArgs, blockNrOrHash *rpc.
}

// if blockNrOrHash is nil, use the latest block
if blockNrOrHash != nil {
if blockNrOrHash == nil {
latest := rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber)
blockNrOrHash = &latest
}
Expand Down
33 changes: 26 additions & 7 deletions jsonrpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type FilterAPI struct {
filters map[rpc.ID]*filter
subscriptions map[rpc.ID]*subscription

// Channels for subscription managements
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification

// channels for block and log events
blockChan chan *coretypes.Header
logsChan chan []*coretypes.Log
Expand All @@ -67,14 +71,17 @@ func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger l

logger: logger,

install: make(chan *subscription),
uninstall: make(chan *subscription),

filters: make(map[rpc.ID]*filter),
subscriptions: make(map[rpc.ID]*subscription),
}

go api.clearUnusedFilters()

api.blockChan, api.logsChan, api.pendingChan = app.EVMIndexer().Subscribe()
go api.subscribeEvents()
go api.eventLoop()

return api
}
Expand All @@ -95,7 +102,7 @@ func (api *FilterAPI) clearUnusedFilters() {
}
}

func (api *FilterAPI) subscribeEvents() {
func (api *FilterAPI) eventLoop() {
for {
select {
case block := <-api.blockChan:
Expand All @@ -105,12 +112,13 @@ func (api *FilterAPI) subscribeEvents() {
f.hashes = append(f.hashes, block.Hash())
}
}
api.filtersMut.Unlock()

for _, s := range api.subscriptions {
if s.ty == ethfilters.BlocksSubscription {
s.headerChan <- block
}
}
api.filtersMut.Unlock()
case logs := <-api.logsChan:
if len(logs) == 0 {
continue
Expand All @@ -125,6 +133,8 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()

for _, s := range api.subscriptions {
if s.ty == ethfilters.LogsSubscription {
logs := filterLogs(logs, s.crit.FromBlock, s.crit.ToBlock, s.crit.Addresses, s.crit.Topics)
Expand All @@ -133,7 +143,6 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()
case tx := <-api.pendingChan:
api.filtersMut.Lock()
for _, f := range api.filters {
Expand All @@ -145,6 +154,8 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()

for _, s := range api.subscriptions {
if s.ty == ethfilters.PendingTransactionsSubscription {
if s.fullTx {
Expand All @@ -154,7 +165,13 @@ func (api *FilterAPI) subscribeEvents() {
}
}
}
api.filtersMut.Unlock()
// subscription managements
case s := <-api.install:
api.subscriptions[s.id] = s
close(s.installed)
case s := <-api.uninstall:
delete(api.subscriptions, s.id)
close(s.err)
}
}
}
Expand Down Expand Up @@ -285,7 +302,9 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri
func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMut.Lock()
_, found := api.filters[id]
delete(api.filters, id)
if found {
delete(api.filters, id)
}
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
api.filtersMut.Unlock()
return found
}
Expand All @@ -295,7 +314,7 @@ func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretypes.Log, error) {
api.filtersMut.Lock()
f, found := api.filters[id]
api.filtersMut.Lock()
api.filtersMut.Unlock()

if !found || f.ty != ethfilters.LogsSubscription {
return nil, errFilterNotFound
Expand Down
70 changes: 54 additions & 16 deletions jsonrpc/namespaces/eth/filters/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filters

import (
"context"
"sync"

"github.com/ethereum/go-ethereum/common"
coretypes "github.com/ethereum/go-ethereum/core/types"
Expand All @@ -12,6 +13,7 @@ import (
)

type subscription struct {
id rpc.ID
ty ethfilters.Type
crit ethfilters.FilterCriteria
fullTx bool
Expand All @@ -20,6 +22,11 @@ type subscription struct {
logsChan chan []*coretypes.Log
txChan chan *rpctypes.RPCTransaction
hashChan chan common.Hash

// Channels to signal the subscription is installed or uninstalled
installed chan struct{} // closed when the subscription is installed
err chan error // closed when the subscription is uninstalled
unsubOnce sync.Once
}

// NewHeads send a notification each time a new (header) block is appended to the chain.
Expand All @@ -35,15 +42,19 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
)

id := rpc.NewID()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
s := &subscription{
id: id,
ty: ethfilters.BlocksSubscription,
headerChan: headerChan,

installed: make(chan struct{}),
err: make(chan error),
}
api.filtersMut.Unlock()
api.install <- s
<-s.installed

go func() {
defer api.clearSubscription(id)
defer api.clearSubscription(s)

for {
select {
Expand Down Expand Up @@ -93,17 +104,21 @@ func (api *FilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria)
}

id := rpc.NewID()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
s := &subscription{
id: id,
ty: ethfilters.LogsSubscription,
crit: crit,

logsChan: logsChan,

installed: make(chan struct{}),
err: make(chan error),
}
api.filtersMut.Unlock()
api.install <- s
<-s.installed

go func() {
defer api.clearSubscription(id)
defer api.clearSubscription(s)
for {
select {
case logs := <-logsChan:
Expand Down Expand Up @@ -136,17 +151,21 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
)

id := rpc.NewID()
api.filtersMut.Lock()
api.subscriptions[id] = &subscription{
s := &subscription{
id: id,
ty: ethfilters.PendingTransactionsSubscription,
fullTx: fullTx != nil && *fullTx,
txChan: txChan,
hashChan: hashChan,

installed: make(chan struct{}),
err: make(chan error),
}
api.filtersMut.Unlock()
api.install <- s
<-s.installed

go func() {
defer api.clearSubscription(id)
defer api.clearSubscription(s)

for {
select {
Expand All @@ -163,8 +182,27 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
return rpcSub, nil
}

func (api *FilterAPI) clearSubscription(id rpc.ID) {
api.filtersMut.Lock()
delete(api.subscriptions, id)
api.filtersMut.Unlock()
func (api *FilterAPI) clearSubscription(s *subscription) {
s.unsubOnce.Do(func() {
uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
// the eventLoop broadcast method to deadlock when writing to the
// filter event channel while the subscription loop is waiting for
// this method to return (and thus not reading these events).
select {
case api.uninstall <- s:
break uninstallLoop
case <-s.logsChan:
case <-s.txChan:
case <-s.hashChan:
case <-s.headerChan:
}
}

// wait for filter to be uninstalled in work loop before returning
// this ensures that the manager won't use the event channel which
// will probably be closed by the client asap after this method returns.
<-s.err
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
})
}
4 changes: 2 additions & 2 deletions x/evm/precompiles/cosmos/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (e CosmosPrecompile) ExtendedRun(caller vm.ContractRef, input []byte, suppl
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}

denom, err := e.edk.GetDenomByContractAddr(ctx, toDenomArguments.ERC20Address)
denom, err := types.ContractAddrToDenom(ctx, e.edk, toDenomArguments.ERC20Address)
if err != nil {
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}
Expand All @@ -315,7 +315,7 @@ func (e CosmosPrecompile) ExtendedRun(caller vm.ContractRef, input []byte, suppl
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}

contractAddr, err := e.edk.GetContractAddrByDenom(ctx, toERC20Arguments.Denom)
contractAddr, err := types.DenomToContractAddr(ctx, e.edk, toERC20Arguments.Denom)
if err != nil {
return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error())
}
Expand Down
Loading