From 875632b4f8b9d7549de31cbe813e3cfcd54364ba Mon Sep 17 00:00:00 2001 From: beer-1 <147697694+beer-1@users.noreply.github.com> Date: Thu, 24 Oct 2024 13:53:13 +0900 Subject: [PATCH] fix: adopt install uninstall chan to resolve deadlock on subscription (#81) * adopt install uninstall chan to resolve deadlock on subscription * fix comments * fix to use types function to convert denom<>erc20 * fix condition --- jsonrpc/backend/eth.go | 2 +- jsonrpc/namespaces/eth/filters/api.go | 33 +++++++-- .../namespaces/eth/filters/subscriptions.go | 70 ++++++++++++++----- x/evm/precompiles/cosmos/contract.go | 4 +- 4 files changed, 83 insertions(+), 26 deletions(-) diff --git a/jsonrpc/backend/eth.go b/jsonrpc/backend/eth.go index d702873..ac76657 100644 --- a/jsonrpc/backend/eth.go +++ b/jsonrpc/backend/eth.go @@ -47,7 +47,7 @@ func (b *JSONRPCBackend) Call(args rpctypes.TransactionArgs, blockNrOrHash *rpc. } // if blockNrOrHash is nil, use the latest block - if blockNrOrHash != nil { + if blockNrOrHash == nil { latest := rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber) blockNrOrHash = &latest } diff --git a/jsonrpc/namespaces/eth/filters/api.go b/jsonrpc/namespaces/eth/filters/api.go index c3de65e..3434e3f 100644 --- a/jsonrpc/namespaces/eth/filters/api.go +++ b/jsonrpc/namespaces/eth/filters/api.go @@ -52,6 +52,10 @@ type FilterAPI struct { filters map[rpc.ID]*filter subscriptions map[rpc.ID]*subscription + // Channels for subscription managements + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + // channels for block and log events blockChan chan *coretypes.Header logsChan chan []*coretypes.Log @@ -67,6 +71,9 @@ func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger l logger: logger, + install: make(chan *subscription), + uninstall: make(chan *subscription), + filters: make(map[rpc.ID]*filter), subscriptions: make(map[rpc.ID]*subscription), } @@ -74,7 +81,7 @@ func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger l go api.clearUnusedFilters() api.blockChan, api.logsChan, api.pendingChan = app.EVMIndexer().Subscribe() - go api.subscribeEvents() + go api.eventLoop() return api } @@ -95,7 +102,7 @@ func (api *FilterAPI) clearUnusedFilters() { } } -func (api *FilterAPI) subscribeEvents() { +func (api *FilterAPI) eventLoop() { for { select { case block := <-api.blockChan: @@ -105,12 +112,13 @@ func (api *FilterAPI) subscribeEvents() { f.hashes = append(f.hashes, block.Hash()) } } + api.filtersMut.Unlock() + for _, s := range api.subscriptions { if s.ty == ethfilters.BlocksSubscription { s.headerChan <- block } } - api.filtersMut.Unlock() case logs := <-api.logsChan: if len(logs) == 0 { continue @@ -125,6 +133,8 @@ func (api *FilterAPI) subscribeEvents() { } } } + api.filtersMut.Unlock() + for _, s := range api.subscriptions { if s.ty == ethfilters.LogsSubscription { logs := filterLogs(logs, s.crit.FromBlock, s.crit.ToBlock, s.crit.Addresses, s.crit.Topics) @@ -133,7 +143,6 @@ func (api *FilterAPI) subscribeEvents() { } } } - api.filtersMut.Unlock() case tx := <-api.pendingChan: api.filtersMut.Lock() for _, f := range api.filters { @@ -145,6 +154,8 @@ func (api *FilterAPI) subscribeEvents() { } } } + api.filtersMut.Unlock() + for _, s := range api.subscriptions { if s.ty == ethfilters.PendingTransactionsSubscription { if s.fullTx { @@ -154,7 +165,13 @@ func (api *FilterAPI) subscribeEvents() { } } } - api.filtersMut.Unlock() + // subscription managements + case s := <-api.install: + api.subscriptions[s.id] = s + close(s.installed) + case s := <-api.uninstall: + delete(api.subscriptions, s.id) + close(s.err) } } } @@ -285,7 +302,9 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri func (api *FilterAPI) UninstallFilter(id rpc.ID) bool { api.filtersMut.Lock() _, found := api.filters[id] - delete(api.filters, id) + if found { + delete(api.filters, id) + } api.filtersMut.Unlock() return found } @@ -295,7 +314,7 @@ func (api *FilterAPI) UninstallFilter(id rpc.ID) bool { func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretypes.Log, error) { api.filtersMut.Lock() f, found := api.filters[id] - api.filtersMut.Lock() + api.filtersMut.Unlock() if !found || f.ty != ethfilters.LogsSubscription { return nil, errFilterNotFound diff --git a/jsonrpc/namespaces/eth/filters/subscriptions.go b/jsonrpc/namespaces/eth/filters/subscriptions.go index 2364a89..cee7927 100644 --- a/jsonrpc/namespaces/eth/filters/subscriptions.go +++ b/jsonrpc/namespaces/eth/filters/subscriptions.go @@ -2,6 +2,7 @@ package filters import ( "context" + "sync" "github.com/ethereum/go-ethereum/common" coretypes "github.com/ethereum/go-ethereum/core/types" @@ -12,6 +13,7 @@ import ( ) type subscription struct { + id rpc.ID ty ethfilters.Type crit ethfilters.FilterCriteria fullTx bool @@ -20,6 +22,11 @@ type subscription struct { logsChan chan []*coretypes.Log txChan chan *rpctypes.RPCTransaction hashChan chan common.Hash + + // Channels to signal the subscription is installed or uninstalled + installed chan struct{} // closed when the subscription is installed + err chan error // closed when the subscription is uninstalled + unsubOnce sync.Once } // NewHeads send a notification each time a new (header) block is appended to the chain. @@ -35,15 +42,19 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { ) id := rpc.NewID() - api.filtersMut.Lock() - api.subscriptions[id] = &subscription{ + s := &subscription{ + id: id, ty: ethfilters.BlocksSubscription, headerChan: headerChan, + + installed: make(chan struct{}), + err: make(chan error), } - api.filtersMut.Unlock() + api.install <- s + <-s.installed go func() { - defer api.clearSubscription(id) + defer api.clearSubscription(s) for { select { @@ -93,17 +104,21 @@ func (api *FilterAPI) Logs(ctx context.Context, crit ethfilters.FilterCriteria) } id := rpc.NewID() - api.filtersMut.Lock() - api.subscriptions[id] = &subscription{ + s := &subscription{ + id: id, ty: ethfilters.LogsSubscription, crit: crit, logsChan: logsChan, + + installed: make(chan struct{}), + err: make(chan error), } - api.filtersMut.Unlock() + api.install <- s + <-s.installed go func() { - defer api.clearSubscription(id) + defer api.clearSubscription(s) for { select { case logs := <-logsChan: @@ -136,17 +151,21 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) ) id := rpc.NewID() - api.filtersMut.Lock() - api.subscriptions[id] = &subscription{ + s := &subscription{ + id: id, ty: ethfilters.PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, txChan: txChan, hashChan: hashChan, + + installed: make(chan struct{}), + err: make(chan error), } - api.filtersMut.Unlock() + api.install <- s + <-s.installed go func() { - defer api.clearSubscription(id) + defer api.clearSubscription(s) for { select { @@ -163,8 +182,27 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) return rpcSub, nil } -func (api *FilterAPI) clearSubscription(id rpc.ID) { - api.filtersMut.Lock() - delete(api.subscriptions, id) - api.filtersMut.Unlock() +func (api *FilterAPI) clearSubscription(s *subscription) { + s.unsubOnce.Do(func() { + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case api.uninstall <- s: + break uninstallLoop + case <-s.logsChan: + case <-s.txChan: + case <-s.hashChan: + case <-s.headerChan: + } + } + + // wait for filter to be uninstalled in work loop before returning + // this ensures that the manager won't use the event channel which + // will probably be closed by the client asap after this method returns. + <-s.err + }) } diff --git a/x/evm/precompiles/cosmos/contract.go b/x/evm/precompiles/cosmos/contract.go index 0eae4bd..1e6a1d7 100644 --- a/x/evm/precompiles/cosmos/contract.go +++ b/x/evm/precompiles/cosmos/contract.go @@ -297,7 +297,7 @@ func (e CosmosPrecompile) ExtendedRun(caller vm.ContractRef, input []byte, suppl return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error()) } - denom, err := e.edk.GetDenomByContractAddr(ctx, toDenomArguments.ERC20Address) + denom, err := types.ContractAddrToDenom(ctx, e.edk, toDenomArguments.ERC20Address) if err != nil { return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error()) } @@ -315,7 +315,7 @@ func (e CosmosPrecompile) ExtendedRun(caller vm.ContractRef, input []byte, suppl return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error()) } - contractAddr, err := e.edk.GetContractAddrByDenom(ctx, toERC20Arguments.Denom) + contractAddr, err := types.DenomToContractAddr(ctx, e.edk, toERC20Arguments.Denom) if err != nil { return nil, ctx.GasMeter().GasConsumedToLimit(), types.ErrPrecompileFailed.Wrap(err.Error()) }