diff --git a/.changeset/quick-fishes-heal.md b/.changeset/quick-fishes-heal.md new file mode 100644 index 00000000000..966e74c843a --- /dev/null +++ b/.changeset/quick-fishes-heal.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- +#changed +Added prefix `RPCClient returned error ({RPC_NAME})` to RPC errors to simplify filtering of RPC related issues. diff --git a/common/client/models.go b/common/client/models.go index 66f1e9cf88b..fd0c3915940 100644 --- a/common/client/models.go +++ b/common/client/models.go @@ -28,6 +28,35 @@ var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, Exc // sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown} +func (c SendTxReturnCode) String() string { + switch c { + case Successful: + return "Successful" + case Fatal: + return "Fatal" + case Retryable: + return "Retryable" + case Underpriced: + return "Underpriced" + case Unknown: + return "Unknown" + case Unsupported: + return "Unsupported" + case TransactionAlreadyKnown: + return "TransactionAlreadyKnown" + case InsufficientFunds: + return "InsufficientFunds" + case ExceedsMaxFee: + return "ExceedsMaxFee" + case FeeOutOfValidRange: + return "FeeOutOfValidRange" + case OutOfCounters: + return "OutOfCounters" + default: + return fmt.Sprintf("SendTxReturnCode(%d)", c) + } +} + type NodeTier int const ( diff --git a/common/client/models_test.go b/common/client/models_test.go new file mode 100644 index 00000000000..2d5dc31b373 --- /dev/null +++ b/common/client/models_test.go @@ -0,0 +1,16 @@ +package client + +import ( + "strings" + "testing" +) + +func TestSendTxReturnCode_String(t *testing.T) { + // ensure all the SendTxReturnCodes have proper name + for c := 1; c < int(sendTxReturnCodeLen); c++ { + strC := SendTxReturnCode(c).String() + if strings.Contains(strC, "SendTxReturnCode(") { + t.Errorf("Expected %s to have a proper string representation", strC) + } + } +} diff --git a/common/client/multi_node.go b/common/client/multi_node.go index cc8daed599c..fa413df91aa 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -561,6 +561,13 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return n.RPC().PendingSequenceAt(ctx, addr) } +type sendTxErrors map[SendTxReturnCode][]error + +// String - returns string representation of the errors map. Required by logger to properly represent the value +func (errs sendTxErrors) String() string { + return fmt.Sprint(map[SendTxReturnCode][]error(errs)) +} + func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) SendEmptyTransaction( ctx context.Context, newTxAttempt func(seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt any, err error), @@ -602,7 +609,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP ctx, cancel := c.chStop.Ctx(ctx) defer cancel() requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum)) - errorsByCode := map[SendTxReturnCode][]error{} + errorsByCode := sendTxErrors{} var softTimeoutChan <-chan time.Time var resultsCount int loop: @@ -639,7 +646,7 @@ loop: func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { defer c.wg.Done() - resultsByCode := map[SendTxReturnCode][]error{} + resultsByCode := sendTxErrors{} // txResults eventually will be closed for txResult := range txResults { resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) @@ -653,7 +660,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } } -func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) { +func aggregateTxResults(resultsByCode sendTxErrors) (txResult error, err error) { severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) if hasSuccess { diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 9c09bd57d70..9f6904fcaf2 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -796,13 +796,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name string ExpectedTxResult string ExpectedCriticalErr string - ResultsByCode map[SendTxReturnCode][]error + ResultsByCode sendTxErrors }{ { Name: "Returns success and logs critical error on success and Fatal", ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: map[SendTxReturnCode][]error{ + ResultsByCode: sendTxErrors{ Successful: {errors.New("success")}, Fatal: {errors.New("fatal")}, }, @@ -811,7 +811,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got success and severe error", - ResultsByCode: map[SendTxReturnCode][]error{ + ResultsByCode: sendTxErrors{ TransactionAlreadyKnown: {errors.New("tx_already_known")}, Unsupported: {errors.New("unsupported")}, }, @@ -820,7 +820,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Prefers sever error to temporary", ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", - ResultsByCode: map[SendTxReturnCode][]error{ + ResultsByCode: sendTxErrors{ Retryable: {errors.New("retryable")}, Underpriced: {errors.New("underpriced")}, }, @@ -829,7 +829,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Returns temporary error", ExpectedTxResult: "retryable", ExpectedCriticalErr: "", - ResultsByCode: map[SendTxReturnCode][]error{ + ResultsByCode: sendTxErrors{ Retryable: {errors.New("retryable")}, }, }, @@ -837,7 +837,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Insufficient funds is treated as error", ExpectedTxResult: "", ExpectedCriticalErr: "", - ResultsByCode: map[SendTxReturnCode][]error{ + ResultsByCode: sendTxErrors{ Successful: {nil}, InsufficientFunds: {errors.New("insufficientFunds")}, }, @@ -846,13 +846,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { Name: "Logs critical error on empty ResultsByCode", ExpectedTxResult: "expected at least one response on SendTransaction", ExpectedCriticalErr: "expected at least one response on SendTransaction", - ResultsByCode: map[SendTxReturnCode][]error{}, + ResultsByCode: sendTxErrors{}, }, { Name: "Zk out of counter error", ExpectedTxResult: "not enough keccak counters to continue the execution", ExpectedCriticalErr: "", - ResultsByCode: map[SendTxReturnCode][]error{ + ResultsByCode: sendTxErrors{ OutOfCounters: {errors.New("not enough keccak counters to continue the execution")}, }, }, @@ -870,6 +870,9 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { assert.EqualError(t, txResult, testCase.ExpectedTxResult) } + logger.Sugared(logger.Test(t)).Info("Map: " + fmt.Sprint(testCase.ResultsByCode)) + logger.Sugared(logger.Test(t)).Criticalw("observed invariant violation on SendTransaction", "resultsByCode", testCase.ResultsByCode, "err", err) + if testCase.ExpectedCriticalErr == "" { assert.NoError(t, err) } else { @@ -884,5 +887,4 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { delete(codesToCover, codeToIgnore) } assert.Empty(t, codesToCover, "all of the SendTxReturnCode must be covered by this test") - } diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 255b038037a..548acf3206c 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -130,7 +130,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") if err != nil { promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted()) + return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) } r.ws.rpc = wsrpc @@ -159,7 +159,7 @@ func (r *rpcClient) DialHTTP() error { httprpc, err := rpc.DialHTTP(r.http.uri.String()) if err != nil { promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return pkgerrors.Wrapf(err, "error while dialing HTTP: %v", r.http.uri.Redacted()) + return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing HTTP: %v", r.http.uri.Redacted())) } r.http.rpc = httprpc @@ -295,10 +295,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() { // CallContext implementation func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With( "method", method, @@ -307,6 +304,7 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method lggr.Debug("RPC call: evmclient.Client#CallContext") start := time.Now() + var err error if http != nil { err = r.wrapHTTP(http.rpc.CallContext(ctx, result, method, args...)) } else { @@ -320,15 +318,13 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method } func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("nBatchElems", len(b), "batchElems", b) lggr.Trace("RPC call: evmclient.Client#BatchCallContext") start := time.Now() + var err error if http != nil { err = r.wrapHTTP(http.rpc.BatchCallContext(ctx, b)) } else { @@ -342,24 +338,23 @@ func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) err } func (r *rpcClient) Subscribe(ctx context.Context, channel chan<- *evmtypes.Head, args ...interface{}) (commontypes.Subscription, error) { - ctx, cancel, ws, _, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, _ := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("args", args) lggr.Debug("RPC call: evmclient.Client#EthSubscribe") start := time.Now() + var sub commontypes.Subscription sub, err := ws.rpc.EthSubscribe(ctx, channel, args...) if err == nil { + sub = newSubscriptionErrorWrapper(sub, r.rpcClientErrorPrefix()) r.registerSub(sub) } duration := time.Since(start) r.logResult(lggr, err, duration, r.getRPCDomain(), "EthSubscribe") - return sub, err + return sub, r.wrapWS(err) } // GethClient wrappers @@ -370,17 +365,14 @@ func (r *rpcClient) TransactionReceipt(ctx context.Context, txHash common.Hash) return nil, err } if receipt == nil { - err = ethereum.NotFound + err = r.wrapRPCClientError(ethereum.NotFound) return } return } func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -403,10 +395,7 @@ func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Ha return } func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -430,10 +419,7 @@ func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) ( } func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header *types.Header, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("number", number) @@ -454,10 +440,7 @@ func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header } func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header *types.Header, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -494,7 +477,7 @@ func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evm return nil, err } if head == nil { - err = ethereum.NotFound + err = r.wrapRPCClientError(ethereum.NotFound) return } head.EVMChainID = ubig.New(r.chainID) @@ -507,7 +490,7 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev return nil, err } if head == nil { - err = ethereum.NotFound + err = r.wrapRPCClientError(ethereum.NotFound) return } head.EVMChainID = ubig.New(r.chainID) @@ -515,10 +498,7 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev } func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (block *types.Block, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -541,10 +521,7 @@ func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (bloc } func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (block *types.Block, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("number", number) @@ -567,15 +544,13 @@ func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo } func (r *rpcClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("tx", tx) lggr.Debug("RPC call: evmclient.Client#SendTransaction") start := time.Now() + var err error if http != nil { err = r.wrapHTTP(http.geth.SendTransaction(ctx, tx)) } else { @@ -607,10 +582,7 @@ func (r *rpcClient) SendEmptyTransaction( // PendingSequenceAt returns one higher than the highest nonce from both mempool and mined transactions func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Address) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return 0, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("account", account) @@ -639,10 +611,7 @@ func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Addres // mined nonce at the given block number, but it actually returns the total // transaction count which is the highest mined nonce + 1 func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return 0, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -668,10 +637,7 @@ func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, bloc } func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("account", account) @@ -694,10 +660,7 @@ func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) ( } func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) (code []byte, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -720,10 +683,7 @@ func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNum } func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return 0, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() call := c.(ethereum.CallMsg) lggr := r.newRqLggr().With("call", call) @@ -747,10 +707,7 @@ func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, } func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr() @@ -773,10 +730,7 @@ func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err er } func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumber *big.Int) (val []byte, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("callMsg", msg, "blockNumber", blockNumber) message := msg.(ethereum.CallMsg) @@ -804,10 +758,7 @@ func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumb } func (r *rpcClient) PendingCallContract(ctx context.Context, msg interface{}) (val []byte, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("callMsg", msg) message := msg.(ethereum.CallMsg) @@ -841,10 +792,7 @@ func (r *rpcClient) LatestBlockHeight(ctx context.Context) (*big.Int, error) { } func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return 0, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr() @@ -867,10 +815,7 @@ func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) } func (r *rpcClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (balance *big.Int, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("account", account.Hex(), "blockNumber", blockNumber) @@ -907,7 +852,7 @@ func (r *rpcClient) TokenBalance(ctx context.Context, address common.Address, co return numLinkBigInt, err } if _, ok := numLinkBigInt.SetString(result, 0); !ok { - return nil, fmt.Errorf("failed to parse int: %s", result) + return nil, r.wrapRPCClientError(fmt.Errorf("failed to parse int: %s", result)) } return numLinkBigInt, nil } @@ -926,10 +871,7 @@ func (r *rpcClient) FilterEvents(ctx context.Context, q ethereum.FilterQuery) ([ } func (r *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []types.Log, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("q", q) @@ -957,10 +899,7 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro } func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (sub ethereum.Subscription, err error) { - ctx, cancel, ws, _, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, _ := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr().With("q", q) @@ -968,6 +907,7 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu start := time.Now() sub, err = ws.geth.SubscribeFilterLogs(ctx, q, ch) if err == nil { + sub = newSubscriptionErrorWrapper(sub, r.rpcClientErrorPrefix()) r.registerSub(sub) } err = r.wrapWS(err) @@ -979,10 +919,7 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu } func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return nil, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr() @@ -1007,7 +944,7 @@ func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err // Returns the ChainID according to the geth client. This is useful for functions like verify() // the common node. func (r *rpcClient) ChainID(ctx context.Context) (chainID *big.Int, err error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() @@ -1026,6 +963,15 @@ func (r *rpcClient) newRqLggr() logger.SugaredLogger { return r.rpcLog.With("requestID", uuid.New()) } +func (r *rpcClient) wrapRPCClientError(err error) error { + // simple add msg to the error without adding new stack trace + return pkgerrors.WithMessage(err, r.rpcClientErrorPrefix()) +} + +func (r *rpcClient) rpcClientErrorPrefix() string { + return fmt.Sprintf("RPCClient returned error (%s)", r.name) +} + func wrapCallError(err error, tp string) error { if err == nil { return nil @@ -1038,11 +984,12 @@ func wrapCallError(err error, tp string) error { func (r *rpcClient) wrapWS(err error) error { err = wrapCallError(err, fmt.Sprintf("%s websocket (%s)", r.tier.String(), r.ws.uri.Redacted())) - return err + return r.wrapRPCClientError(err) } func (r *rpcClient) wrapHTTP(err error) error { err = wrapCallError(err, fmt.Sprintf("%s http (%s)", r.tier.String(), r.http.uri.Redacted())) + err = r.wrapRPCClientError(err) if err != nil { r.rpcLog.Debugw("Call failed", "err", err) } else { @@ -1052,7 +999,7 @@ func (r *rpcClient) wrapHTTP(err error) error { } // makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx -func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient, err error) { +func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { // Need to wrap in mutex because state transition can cancel and replace the // context r.stateMu.RLock() @@ -1072,16 +1019,14 @@ func (r *rpcClient) makeQueryCtx(ctx context.Context) (context.Context, context. } func (r *rpcClient) IsSyncing(ctx context.Context) (bool, error) { - ctx, cancel, ws, http, err := r.makeLiveQueryCtxAndSafeGetClients(ctx) - if err != nil { - return false, err - } + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) defer cancel() lggr := r.newRqLggr() lggr.Debug("RPC call: evmclient.Client#SyncProgress") var syncProgress *ethereum.SyncProgress start := time.Now() + var err error if http != nil { syncProgress, err = http.geth.SyncProgress(ctx) err = r.wrapHTTP(err) diff --git a/core/chains/evm/client/sub_error_wrapper.go b/core/chains/evm/client/sub_error_wrapper.go new file mode 100644 index 00000000000..689991ce70f --- /dev/null +++ b/core/chains/evm/client/sub_error_wrapper.go @@ -0,0 +1,77 @@ +package client + +import ( + "fmt" + + commontypes "github.com/smartcontractkit/chainlink/v2/common/types" +) + +// subErrorWrapper - adds specified prefix to a subscription error +type subErrorWrapper struct { + sub commontypes.Subscription + errorPrefix string + + done chan struct{} + unSub chan struct{} + errorCh chan error +} + +func newSubscriptionErrorWrapper(sub commontypes.Subscription, errorPrefix string) *subErrorWrapper { + s := &subErrorWrapper{ + sub: sub, + errorPrefix: errorPrefix, + done: make(chan struct{}), + unSub: make(chan struct{}), + errorCh: make(chan error), + } + + go func() { + for { + select { + // sub.Err channel is closed by sub.Unsubscribe + case err, ok := <-sub.Err(): + if !ok { + // might only happen if someone terminated wrapped subscription + // in any case - do our best to release resources + // we can't call Unsubscribe on root sub as this might cause panic + close(s.errorCh) + close(s.done) + return + } + + select { + case s.errorCh <- fmt.Errorf("%s: %w", s.errorPrefix, err): + case <-s.unSub: + s.close() + return + } + case <-s.unSub: + s.close() + return + } + } + }() + + return s +} + +func (s *subErrorWrapper) close() { + s.sub.Unsubscribe() + close(s.errorCh) + close(s.done) +} + +func (s *subErrorWrapper) Unsubscribe() { + select { + // already unsubscribed + case <-s.done: + // signal unsubscribe + case s.unSub <- struct{}{}: + // wait for unsubscribe to complete + <-s.done + } +} + +func (s *subErrorWrapper) Err() <-chan error { + return s.errorCh +} diff --git a/core/chains/evm/client/sub_error_wrapper_test.go b/core/chains/evm/client/sub_error_wrapper_test.go new file mode 100644 index 00000000000..457d392a50e --- /dev/null +++ b/core/chains/evm/client/sub_error_wrapper_test.go @@ -0,0 +1,75 @@ +package client + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" +) + +func TestSubscriptionErrorWrapper(t *testing.T) { + t.Parallel() + t.Run("Unsubscribe wrapper releases resources", func(t *testing.T) { + t.Parallel() + + mockedSub := NewMockSubscription() + const prefix = "RPC returned error" + wrapper := newSubscriptionErrorWrapper(mockedSub, prefix) + wrapper.Unsubscribe() + + // mock's resources were relased + assert.True(t, mockedSub.unsubscribed) + _, ok := <-mockedSub.Err() + assert.False(t, ok) + // wrapper's channels are closed + _, ok = <-wrapper.Err() + assert.False(t, ok) + // subsequence unsubscribe does not causes panic + wrapper.Unsubscribe() + }) + t.Run("Unsubscribe interrupts error delivery", func(t *testing.T) { + t.Parallel() + sub := NewMockSubscription() + const prefix = "RPC returned error" + wrapper := newSubscriptionErrorWrapper(sub, prefix) + sub.Errors <- fmt.Errorf("error") + + wrapper.Unsubscribe() + _, ok := <-wrapper.Err() + assert.False(t, ok) + }) + t.Run("Successfully wraps error", func(t *testing.T) { + t.Parallel() + sub := NewMockSubscription() + const prefix = "RPC returned error" + wrapper := newSubscriptionErrorWrapper(sub, prefix) + sub.Errors <- fmt.Errorf("root error") + + err, ok := <-wrapper.Err() + assert.True(t, ok) + assert.Equal(t, "RPC returned error: root error", err.Error()) + + wrapper.Unsubscribe() + _, ok = <-wrapper.Err() + assert.False(t, ok) + }) + t.Run("Unsubscribe on root does not cause panic", func(t *testing.T) { + t.Parallel() + mockedSub := NewMockSubscription() + wrapper := newSubscriptionErrorWrapper(mockedSub, "") + + mockedSub.Unsubscribe() + // mock's resources were released + assert.True(t, mockedSub.unsubscribed) + _, ok := <-mockedSub.Err() + assert.False(t, ok) + // wrapper's channels are eventually closed + tests.AssertEventually(t, func() bool { + _, ok = <-wrapper.Err() + return !ok + }) + + }) +} diff --git a/core/chains/evm/gas/block_history_estimator.go b/core/chains/evm/gas/block_history_estimator.go index 8b8c626f725..0ae067e45bf 100644 --- a/core/chains/evm/gas/block_history_estimator.go +++ b/core/chains/evm/gas/block_history_estimator.go @@ -721,7 +721,7 @@ func (b *BlockHistoryEstimator) batchFetch(ctx context.Context, reqs []rpc.Batch err := b.ethClient.BatchCallContext(ctx, reqs[i:j]) if pkgerrors.Is(err, context.DeadlineExceeded) { // We ran out of time, return what we have - b.logger.Warnf("Batch fetching timed out; loaded %d/%d results", i, len(reqs)) + b.logger.Warnf("Batch fetching timed out; loaded %d/%d results: %v", i, len(reqs), err) for k := i; k < len(reqs); k++ { if k < j { reqs[k].Error = pkgerrors.Wrap(err, "request failed")