From 4653d0961528f2cf34fe090a4b40e1aeda837df2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedemann=20F=C3=BCrst?= <59653747+friedemannf@users.noreply.github.com> Date: Mon, 18 Nov 2024 22:02:10 +0100 Subject: [PATCH 1/2] Enable AutoPurge feature on all affected chains (#1524) ## Motivation Same as https://github.com/smartcontractkit/chainlink/pull/15157 ## Solution --- .../config/toml/defaults/Linea_Mainnet.toml | 5 ++++ .../config/toml/defaults/Linea_Sepolia.toml | 7 ++++- .../toml/defaults/Polygon_Zkevm_Cardona.toml | 4 +++ .../toml/defaults/Polygon_Zkevm_Mainnet.toml | 4 +++ .../config/toml/defaults/Scroll_Mainnet.toml | 4 +++ .../config/toml/defaults/Scroll_Sepolia.toml | 4 +++ .../config/toml/defaults/XLayer_Mainnet.toml | 4 +++ .../config/toml/defaults/XLayer_Sepolia.toml | 4 +++ .../config/toml/defaults/Zircuit_Mainnet.toml | 2 +- .../config/toml/defaults/Zircuit_Sepolia.toml | 2 +- docs/CONFIG.md | 26 +++++++++++++------ 11 files changed, 55 insertions(+), 11 deletions(-) diff --git a/core/chains/evm/config/toml/defaults/Linea_Mainnet.toml b/core/chains/evm/config/toml/defaults/Linea_Mainnet.toml index 97570a0d21..54fedca215 100644 --- a/core/chains/evm/config/toml/defaults/Linea_Mainnet.toml +++ b/core/chains/evm/config/toml/defaults/Linea_Mainnet.toml @@ -15,3 +15,8 @@ ResendAfterThreshold = '3m' # set greater than finality depth [HeadTracker] HistoryDepth = 350 + +[Transactions.AutoPurge] +Enabled = true +Threshold = 50 # 50 blocks at 3s block time ~2.5 minutes +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/Linea_Sepolia.toml b/core/chains/evm/config/toml/defaults/Linea_Sepolia.toml index af70f7ceba..4397b19c77 100644 --- a/core/chains/evm/config/toml/defaults/Linea_Sepolia.toml +++ b/core/chains/evm/config/toml/defaults/Linea_Sepolia.toml @@ -10,4 +10,9 @@ PriceMin = '1 wei' ResendAfterThreshold = '3m' [HeadTracker] -HistoryDepth = 1000 \ No newline at end of file +HistoryDepth = 1000 + +[Transactions.AutoPurge] +Enabled = true +Threshold = 50 # 50 blocks at 3s block time ~2.5 minutes +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Cardona.toml b/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Cardona.toml index 46ce80e29f..5e5bc57370 100644 --- a/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Cardona.toml +++ b/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Cardona.toml @@ -24,3 +24,7 @@ CacheTimeout = '4s' [HeadTracker] HistoryDepth = 2000 + +[Transactions.AutoPurge] +Enabled = true +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Mainnet.toml b/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Mainnet.toml index 7402d54835..d42ef9b057 100644 --- a/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Mainnet.toml +++ b/core/chains/evm/config/toml/defaults/Polygon_Zkevm_Mainnet.toml @@ -26,3 +26,7 @@ CacheTimeout = '4s' [HeadTracker] # Polygon suffers from a tremendous number of re-orgs, we need to set this to something very large to be conservative enough HistoryDepth = 2000 + +[Transactions.AutoPurge] +Enabled = true +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/Scroll_Mainnet.toml b/core/chains/evm/config/toml/defaults/Scroll_Mainnet.toml index 4a887b504d..b8e7bd09e8 100644 --- a/core/chains/evm/config/toml/defaults/Scroll_Mainnet.toml +++ b/core/chains/evm/config/toml/defaults/Scroll_Mainnet.toml @@ -20,3 +20,7 @@ HistoryDepth = 50 [OCR] ContractConfirmations = 1 + +[Transactions.AutoPurge] +Enabled = true +DetectionApiUrl = 'https://venus.scroll.io' diff --git a/core/chains/evm/config/toml/defaults/Scroll_Sepolia.toml b/core/chains/evm/config/toml/defaults/Scroll_Sepolia.toml index b2e1cfbd73..baee2080d9 100644 --- a/core/chains/evm/config/toml/defaults/Scroll_Sepolia.toml +++ b/core/chains/evm/config/toml/defaults/Scroll_Sepolia.toml @@ -20,3 +20,7 @@ HistoryDepth = 50 [OCR] ContractConfirmations = 1 + +[Transactions.AutoPurge] +Enabled = true +DetectionApiUrl = 'https://sepolia-venus.scroll.io' diff --git a/core/chains/evm/config/toml/defaults/XLayer_Mainnet.toml b/core/chains/evm/config/toml/defaults/XLayer_Mainnet.toml index 426c0204e9..28f7819276 100644 --- a/core/chains/evm/config/toml/defaults/XLayer_Mainnet.toml +++ b/core/chains/evm/config/toml/defaults/XLayer_Mainnet.toml @@ -23,3 +23,7 @@ BlockHistorySize = 12 [HeadTracker] HistoryDepth = 2000 + +[Transactions.AutoPurge] +Enabled = true +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/XLayer_Sepolia.toml b/core/chains/evm/config/toml/defaults/XLayer_Sepolia.toml index 62e2c1e8ad..2aa6e58469 100644 --- a/core/chains/evm/config/toml/defaults/XLayer_Sepolia.toml +++ b/core/chains/evm/config/toml/defaults/XLayer_Sepolia.toml @@ -23,3 +23,7 @@ BlockHistorySize = 12 [HeadTracker] HistoryDepth = 2000 + +[Transactions.AutoPurge] +Enabled = true +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/Zircuit_Mainnet.toml b/core/chains/evm/config/toml/defaults/Zircuit_Mainnet.toml index ec336d3efe..9ec1dd0d6a 100644 --- a/core/chains/evm/config/toml/defaults/Zircuit_Mainnet.toml +++ b/core/chains/evm/config/toml/defaults/Zircuit_Mainnet.toml @@ -33,4 +33,4 @@ GasLimit = 6500000 [Transactions.AutoPurge] Enabled = true Threshold = 90 # 90 blocks at 2s block time ~3 minutes -MinAttempts = 3 \ No newline at end of file +MinAttempts = 3 diff --git a/core/chains/evm/config/toml/defaults/Zircuit_Sepolia.toml b/core/chains/evm/config/toml/defaults/Zircuit_Sepolia.toml index d6934d533d..d439d98162 100644 --- a/core/chains/evm/config/toml/defaults/Zircuit_Sepolia.toml +++ b/core/chains/evm/config/toml/defaults/Zircuit_Sepolia.toml @@ -33,4 +33,4 @@ GasLimit = 6500000 [Transactions.AutoPurge] Enabled = true Threshold = 90 # 90 blocks at 2s block time ~3 minutes -MinAttempts = 3 \ No newline at end of file +MinAttempts = 3 diff --git a/docs/CONFIG.md b/docs/CONFIG.md index e700d94a14..0dca7763d4 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -3640,7 +3640,8 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '3m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +MinAttempts = 3 [BalanceMonitor] Enabled = true @@ -3743,7 +3744,8 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '3m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +MinAttempts = 3 [BalanceMonitor] Enabled = true @@ -5186,7 +5188,8 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '3m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +MinAttempts = 3 [BalanceMonitor] Enabled = true @@ -6216,7 +6219,8 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '3m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +MinAttempts = 3 [BalanceMonitor] Enabled = true @@ -8078,7 +8082,9 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '3m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +Threshold = 50 +MinAttempts = 3 [BalanceMonitor] Enabled = true @@ -8180,7 +8186,9 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '3m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +Threshold = 50 +MinAttempts = 3 [BalanceMonitor] Enabled = true @@ -9522,7 +9530,8 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '1m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +DetectionApiUrl = 'https://sepolia-venus.scroll.io' [BalanceMonitor] Enabled = true @@ -9625,7 +9634,8 @@ ReaperThreshold = '168h0m0s' ResendAfterThreshold = '1m0s' [Transactions.AutoPurge] -Enabled = false +Enabled = true +DetectionApiUrl = 'https://venus.scroll.io' [BalanceMonitor] Enabled = true From 228298cf134c2c6330a8e2eefdf3535b36d63881 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Mon, 18 Nov 2024 23:50:33 +0100 Subject: [PATCH 2/2] Cherry-pick optional ws url (#1535) Optional WS URL for RPCs to unblock new chain integration. Charry-picks from: https://github.com/smartcontractkit/chainlink/pull/14534 https://github.com/smartcontractkit/chainlink/pull/14364 https://github.com/smartcontractkit/chainlink/pull/14929 --------- Co-authored-by: Joe Huang Co-authored-by: Adam Hamrick Co-authored-by: Chunkai Yang --- .changeset/four-kangaroos-appear.md | 5 + .changeset/moody-rules-agree.md | 8 + .changeset/silly-lies-boil.md | 8 + .../workflows/ccip-offchain-upgrade-tests.yml | 7 +- common/client/node.go | 17 +- common/client/node_test.go | 4 +- core/chains/evm/client/config_builder.go | 18 +- core/chains/evm/client/config_builder_test.go | 4 +- core/chains/evm/client/evm_client.go | 11 +- core/chains/evm/client/helpers_test.go | 10 +- core/chains/evm/client/rpc_client.go | 121 +++++++++---- core/chains/evm/client/rpc_client_test.go | 165 ++++++++++++++++-- core/chains/evm/config/toml/config.go | 47 +++-- core/config/docs/chains-evm.toml | 2 +- core/services/chainlink/config_test.go | 21 +-- .../chainlink/testdata/config-invalid.toml | 19 ++ docs/CONFIG.md | 2 +- 17 files changed, 371 insertions(+), 98 deletions(-) create mode 100644 .changeset/four-kangaroos-appear.md create mode 100644 .changeset/moody-rules-agree.md create mode 100644 .changeset/silly-lies-boil.md diff --git a/.changeset/four-kangaroos-appear.md b/.changeset/four-kangaroos-appear.md new file mode 100644 index 0000000000..b8ef32ff69 --- /dev/null +++ b/.changeset/four-kangaroos-appear.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add config validation so it requires ws url when http polling disabled #bugfix diff --git a/.changeset/moody-rules-agree.md b/.changeset/moody-rules-agree.md new file mode 100644 index 0000000000..ef1f3bcaf6 --- /dev/null +++ b/.changeset/moody-rules-agree.md @@ -0,0 +1,8 @@ +--- +"chainlink": patch +--- + +- register polling subscription to avoid subscription leaking when rpc client gets closed. +- add a temporary special treatment for SubscribeNewHead before we replace it with SubscribeToHeads. Add a goroutine that forwards new head from poller to caller channel. +- fix a deadlock in poller, by using a new lock for subs slice in rpc client. +#bugfix diff --git a/.changeset/silly-lies-boil.md b/.changeset/silly-lies-boil.md new file mode 100644 index 0000000000..b2a5084a36 --- /dev/null +++ b/.changeset/silly-lies-boil.md @@ -0,0 +1,8 @@ +--- +"chainlink": minor +--- + +Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that: +* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error +* If WS URL was not provided LogBroadcaster should be disabled +#nops diff --git a/.github/workflows/ccip-offchain-upgrade-tests.yml b/.github/workflows/ccip-offchain-upgrade-tests.yml index 7536488eb9..78569fc370 100644 --- a/.github/workflows/ccip-offchain-upgrade-tests.yml +++ b/.github/workflows/ccip-offchain-upgrade-tests.yml @@ -113,6 +113,8 @@ jobs: if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch' needs: [ changes ] environment: integration + outputs: + tag: ${{ steps.build-test-image.outputs.test_image_tag }} permissions: id-token: write contents: read @@ -136,11 +138,14 @@ jobs: repository: smartcontractkit/ccip ref: ${{ github.event.pull_request.head.sha || github.sha }} - name: Build Test Image + id: build-test-image if: needs.changes.outputs.src == 'true' || github.event_name == 'workflow_dispatch' uses: smartcontractkit/.github/actions/ctf-build-test-image@a5e4f4c8fbb8e15ab2ad131552eca6ac83c4f4b3 # ctf-build-test-image@0.1.0 with: # we just want to build the load tests suites: ccip-tests/load ccip-tests/smoke + repository: chainlink-ccip-tests + tag: ${{ github.sha }} QA_AWS_ROLE_TO_ASSUME: ${{ secrets.QA_AWS_ROLE_TO_ASSUME }} QA_AWS_REGION: ${{ secrets.QA_AWS_REGION }} QA_AWS_ACCOUNT_NUMBER: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }} @@ -249,7 +254,7 @@ jobs: env: BASE64_CONFIG_OVERRIDE: ${{ steps.set_override_config.outputs.base_64_override }},${{ steps.setup_create_base64_config_ccip.outputs.base64_config }} TEST_BASE64_CONFIG_OVERRIDE: ${{ steps.set_override_config.outputs.base_64_override }},${{ steps.setup_create_base64_config_ccip.outputs.base64_config }} - ENV_JOB_IMAGE: ${{ env.ENV_JOB_IMAGE_BASE }}:${{ github.sha }} + ENV_JOB_IMAGE: ${{ env.ENV_JOB_IMAGE_BASE }}:${{ needs.build-test-image.outputs.tag }} TEST_SUITE: smoke TEST_ARGS: -test.timeout 30m TEST_LOG_LEVEL: info diff --git a/common/client/node.go b/common/client/node.go index 1f55e69cac..7885fe7676 100644 --- a/common/client/node.go +++ b/common/client/node.go @@ -91,14 +91,14 @@ type node[ services.StateMachine lfcLog logger.Logger name string - id int32 + id int chainID CHAIN_ID nodePoolCfg NodeConfig chainCfg ChainConfig order int32 chainFamily string - ws url.URL + ws *url.URL http *url.URL rpc RPC @@ -121,10 +121,10 @@ func NewNode[ nodeCfg NodeConfig, chainCfg ChainConfig, lggr logger.Logger, - wsuri url.URL, + wsuri *url.URL, httpuri *url.URL, name string, - id int32, + id int, chainID CHAIN_ID, nodeOrder int32, rpc RPC, @@ -136,8 +136,10 @@ func NewNode[ n.chainID = chainID n.nodePoolCfg = nodeCfg n.chainCfg = chainCfg - n.ws = wsuri n.order = nodeOrder + if wsuri != nil { + n.ws = wsuri + } if httpuri != nil { n.http = httpuri } @@ -157,7 +159,10 @@ func NewNode[ } func (n *node[CHAIN_ID, HEAD, RPC]) String() string { - s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String()) + s := fmt.Sprintf("(%s)%s", Primary.String(), n.name) + if n.ws != nil { + s = s + fmt.Sprintf(":%s", n.ws.String()) + } if n.http != nil { s = s + fmt.Sprintf(":%s", n.http.String()) } diff --git a/common/client/node_test.go b/common/client/node_test.go index 66bb50fc94..539964691c 100644 --- a/common/client/node_test.go +++ b/common/client/node_test.go @@ -67,10 +67,10 @@ type testNodeOpts struct { config testNodeConfig chainConfig clientMocks.ChainConfig lggr logger.Logger - wsuri url.URL + wsuri *url.URL httpuri *url.URL name string - id int32 + id int chainID types.ID nodeOrder int32 rpc *mockNodeClient[types.ID, Head] diff --git a/core/chains/evm/client/config_builder.go b/core/chains/evm/client/config_builder.go index 9a31f9e4b4..66bdfc2614 100644 --- a/core/chains/evm/client/config_builder.go +++ b/core/chains/evm/client/config_builder.go @@ -80,15 +80,21 @@ func NewClientConfigs( func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) { nodes := make([]*toml.Node, len(nodeCfgs)) for i, nodeCfg := range nodeCfgs { - if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil { - return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i) + var wsURL, httpURL *commonconfig.URL + // wsUrl requirement will be checked in EVMConfig validation + if nodeCfg.WSURL != nil { + wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL) } - wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL) - httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL) + + if nodeCfg.HTTPURL == nil { + return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i) + } + + httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL) node := &toml.Node{ Name: nodeCfg.Name, - WSURL: wsUrl, - HTTPURL: httpUrl, + WSURL: wsURL, + HTTPURL: httpURL, SendOnly: nodeCfg.SendOnly, Order: nodeCfg.Order, } diff --git a/core/chains/evm/client/config_builder_test.go b/core/chains/evm/client/config_builder_test.go index 0339131171..ac12a63fbe 100644 --- a/core/chains/evm/client/config_builder_test.go +++ b/core/chains/evm/client/config_builder_test.go @@ -90,7 +90,7 @@ func TestNodeConfigs(t *testing.T) { require.Len(t, tomlNodes, len(nodeConfigs)) }) - t.Run("parsing missing ws url fails", func(t *testing.T) { + t.Run("ws can be optional", func(t *testing.T) { nodeConfigs := []client.NodeConfig{ { Name: ptr("foo1"), @@ -98,7 +98,7 @@ func TestNodeConfigs(t *testing.T) { }, } _, err := client.ParseTestNodeConfigs(nodeConfigs) - require.Error(t, err) + require.Nil(t, err) }) t.Run("parsing missing http url fails", func(t *testing.T) { diff --git a/core/chains/evm/client/evm_client.go b/core/chains/evm/client/evm_client.go index c26362d635..c596bbc3a9 100644 --- a/core/chains/evm/client/evm_client.go +++ b/core/chains/evm/client/evm_client.go @@ -15,22 +15,25 @@ import ( ) func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client { - var empty url.URL var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient] var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient] largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType) for i, node := range nodes { + var ws *url.URL + if node.WSURL != nil { + ws = (*url.URL)(node.WSURL) + } if node.SendOnly != nil && *node.SendOnly { - rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, + rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType) sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID, rpc) sendonlys = append(sendonlys, sendonly) } else { - rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), + rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType) primaryNode := commonclient.NewNode(cfg, chainCfg, - lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order, + lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order, rpc, "EVM") primaries = append(primaries, primaryNode) } diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index 031f648157..1a6090e4a0 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -135,7 +135,7 @@ func NewChainClientWithTestNode( rpcUrl string, rpcHTTPURL *url.URL, sendonlyRPCURLs []url.URL, - id int32, + id int, chainID *big.Int, ) (Client, error) { parsed, err := url.ParseRequestURI(rpcUrl) @@ -148,10 +148,10 @@ func NewChainClientWithTestNode( } lggr := logger.Test(t) - rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient]( - nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") + nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n} var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient] @@ -160,7 +160,7 @@ func NewChainClientWithTestNode( return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String()) } var empty url.URL - rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") s := commonclient.NewSendOnlyNode[*big.Int, RPCClient]( lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc) sendonlys = append(sendonlys, s) @@ -206,7 +206,7 @@ func NewChainClientWithMockedRpc( parsed, _ := url.ParseRequestURI("ws://test") n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient]( - cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM") + cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM") primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n} clientErrors := NewTestClientErrors() c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 763348173a..f55c35980d 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -117,7 +117,7 @@ type rawclient struct { type rpcClient struct { rpcLog logger.SugaredLogger name string - id int32 + id int chainID *big.Int tier commonclient.NodeTier largePayloadRpcTimeout time.Duration @@ -126,10 +126,11 @@ type rpcClient struct { newHeadsPollInterval time.Duration chainType chaintype.ChainType - ws rawclient + ws *rawclient http *rawclient - stateMu sync.RWMutex // protects state* fields + stateMu sync.RWMutex // protects state* fields + subsSliceMu sync.RWMutex // protects subscription slice // Need to track subscriptions because closing the RPC does not (always?) // close the underlying subscription @@ -153,10 +154,10 @@ type rpcClient struct { // NewRPCCLient returns a new *rpcClient as commonclient.RPC func NewRPCClient( lggr logger.Logger, - wsuri url.URL, + wsuri *url.URL, httpuri *url.URL, name string, - id int32, + id int, chainID *big.Int, tier commonclient.NodeTier, finalizedBlockPollInterval time.Duration, @@ -174,9 +175,11 @@ func NewRPCClient( r.id = id r.chainID = chainID r.tier = tier - r.ws.uri = wsuri r.finalizedBlockPollInterval = finalizedBlockPollInterval r.newHeadsPollInterval = newHeadsPollInterval + if wsuri != nil { + r.ws = &rawclient{uri: *wsuri} + } if httpuri != nil { r.http = &rawclient{uri: *httpuri} } @@ -198,30 +201,33 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout) defer cancel() - promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() - lggr := r.rpcLog.With("wsuri", r.ws.uri.Redacted()) - if r.http != nil { - lggr = lggr.With("httpuri", r.http.uri.Redacted()) + if r.ws == nil && r.http == nil { + return errors.New("cannot dial rpc client when both ws and http info are missing") } - lggr.Debugw("RPC dial: evmclient.Client#dial") - wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") - if err != nil { - promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() - return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) - } + promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() + lggr := r.rpcLog + if r.ws != nil { + lggr = lggr.With("wsuri", r.ws.uri.Redacted()) + wsrpc, err := rpc.DialWebsocket(ctx, r.ws.uri.String(), "") + if err != nil { + promEVMPoolRPCNodeDialsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() + return r.wrapRPCClientError(pkgerrors.Wrapf(err, "error while dialing websocket: %v", r.ws.uri.Redacted())) + } - r.ws.rpc = wsrpc - r.ws.geth = ethclient.NewClient(wsrpc) + r.ws.rpc = wsrpc + r.ws.geth = ethclient.NewClient(wsrpc) + } if r.http != nil { + lggr = lggr.With("httpuri", r.http.uri.Redacted()) if err := r.DialHTTP(); err != nil { return err } } + lggr.Debugw("RPC dial: evmclient.Client#dial") promEVMPoolRPCNodeDialsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc() - return nil } @@ -230,7 +236,7 @@ func (r *rpcClient) Dial(callerCtx context.Context) error { // It can only return error if the URL is malformed. func (r *rpcClient) DialHTTP() error { promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() - lggr := r.rpcLog.With("httpuri", r.ws.uri.Redacted()) + lggr := r.rpcLog.With("httpuri", r.http.uri.Redacted()) lggr.Debugw("RPC dial: evmclient.Client#dial") var httprpc *rpc.Client @@ -250,7 +256,7 @@ func (r *rpcClient) DialHTTP() error { func (r *rpcClient) Close() { defer func() { - if r.ws.rpc != nil { + if r.ws != nil && r.ws.rpc != nil { r.ws.rpc.Close() } }() @@ -269,7 +275,10 @@ func (r *rpcClient) cancelInflightRequests() { } func (r *rpcClient) String() string { - s := fmt.Sprintf("(%s)%s:%s", r.tier.String(), r.name, r.ws.uri.Redacted()) + s := fmt.Sprintf("(%s)%s", r.tier.String(), r.name) + if r.ws != nil { + s = s + fmt.Sprintf(":%s", r.ws.uri.Redacted()) + } if r.http != nil { s = s + fmt.Sprintf(":%s", r.http.uri.Redacted()) } @@ -317,8 +326,8 @@ func (r *rpcClient) getRPCDomain() string { // registerSub adds the sub to the rpcClient list func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan struct{}) error { - r.stateMu.Lock() - defer r.stateMu.Unlock() + r.subsSliceMu.Lock() + defer r.subsSliceMu.Unlock() // ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to // previous `DisconnectAll` call. select { @@ -335,12 +344,16 @@ func (r *rpcClient) registerSub(sub ethereum.Subscription, stopInFLightCh chan s // DisconnectAll disconnects all clients connected to the rpcClient func (r *rpcClient) DisconnectAll() { r.stateMu.Lock() - defer r.stateMu.Unlock() - if r.ws.rpc != nil { + if r.ws != nil && r.ws.rpc != nil { r.ws.rpc.Close() } r.cancelInflightRequests() + r.stateMu.Unlock() + + r.subsSliceMu.Lock() r.unsubscribeAll() + r.subsSliceMu.Unlock() + r.chainInfoLock.Lock() r.latestChainInfo = commonclient.ChainInfo{} r.chainInfoLock.Unlock() @@ -492,19 +505,41 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp defer cancel() args := []interface{}{"newHeads"} lggr := r.newRqLggr().With("args", args) - if r.newHeadsPollInterval > 0 { interval := r.newHeadsPollInterval timeout := interval - poller, _ := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog) + poller, pollerCh := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog) if err = poller.Start(ctx); err != nil { return nil, err } + // NOTE this is a temporary special treatment for SubscribeNewHead before we refactor head tracker to use SubscribeToHeads + // as we need to forward new head from the poller channel to the channel passed from caller. + go func() { + for head := range pollerCh { + select { + case channel <- head: + // forwarding new head to the channel passed from caller + case <-poller.Err(): + // return as poller returns error + return + } + } + }() + + err = r.registerSub(&poller, chStopInFlight) + if err != nil { + return nil, err + } + lggr.Debugf("Polling new heads over http") return &poller, nil } + if ws == nil { + return nil, errors.New("SubscribeNewHead is not allowed without ws url") + } + lggr.Debug("RPC call: evmclient.Client#EthSubscribe") start := time.Now() defer func() { @@ -533,7 +568,6 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.Head, sub commontypes.Subscription, err error) { ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() - args := []interface{}{rpcSubscriptionMethodNewHeads} start := time.Now() lggr := r.newRqLggr().With("args", args) @@ -547,10 +581,19 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H return nil, nil, err } + err = r.registerSub(&poller, chStopInFlight) + if err != nil { + return nil, nil, err + } + lggr.Debugf("Polling new heads over http") return channel, &poller, nil } + if ws == nil { + return nil, nil, errors.New("SubscribeNewHead is not allowed without ws url") + } + lggr.Debug("RPC call: evmclient.Client#EthSubscribe") defer func() { duration := time.Since(start) @@ -579,6 +622,8 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H } func (r *rpcClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *evmtypes.Head, commontypes.Subscription, error) { + ctx, cancel, chStopInFlight, _, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) + defer cancel() interval := r.finalizedBlockPollInterval if interval == 0 { return nil, nil, errors.New("FinalizedBlockPollInterval is 0") @@ -588,6 +633,12 @@ func (r *rpcClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *evmt if err := poller.Start(ctx); err != nil { return nil, nil, err } + + err := r.registerSub(&poller, chStopInFlight) + if err != nil { + return nil, nil, err + } + return channel, &poller, nil } @@ -1249,6 +1300,9 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) { ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() + if ws == nil { + return nil, errors.New("SubscribeFilterLogs is not allowed without ws url") + } lggr := r.newRqLggr().With("q", q) lggr.Debug("RPC call: evmclient.Client#SubscribeFilterLogs") @@ -1353,18 +1407,21 @@ func (r *rpcClient) wrapHTTP(err error) error { } // makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx -func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { +func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws *rawclient, http *rawclient) { ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx, timeout) return } func (r *rpcClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, - chStopInFlight chan struct{}, ws rawclient, http *rawclient) { + chStopInFlight chan struct{}, ws *rawclient, http *rawclient) { // Need to wrap in mutex because state transition can cancel and replace the // context r.stateMu.RLock() chStopInFlight = r.chStopInFlight - ws = r.ws + if r.ws != nil { + cp := *r.ws + ws = &cp + } if r.http != nil { cp := *r.http http = &cp diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index b594a0ca16..662c757ffb 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -19,6 +19,8 @@ import ( "github.com/tidwall/gjson" "go.uber.org/zap" + commontypes "github.com/smartcontractkit/chainlink/v2/common/types" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -57,11 +59,37 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { } return } + + checkClosedRPCClientShouldRemoveExistingSub := func(t tests.TestingT, ctx context.Context, sub commontypes.Subscription, rpcClient client.RPCClient) { + errCh := sub.Err() + + // ensure sub exists + require.Equal(t, int32(1), rpcClient.SubscribersCount()) + rpcClient.DisconnectAll() + + // ensure sub is closed + select { + case <-errCh: // ok + default: + assert.Fail(t, "channel should be closed") + } + + require.NoError(t, rpcClient.Dial(ctx)) + require.Equal(t, int32(0), rpcClient.SubscribersCount()) + } + + t.Run("WS and HTTP URL cannot be both empty", func(t *testing.T) { + // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing + observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) + rpcClient := client.NewRPCClient(observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + require.Equal(t, errors.New("cannot dial rpc client when both ws and http info are missing"), rpcClient.Dial(ctx)) + }) + t.Run("Updates chain info on new blocks", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) // set to default values @@ -111,7 +139,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch := make(chan *evmtypes.Head) @@ -131,12 +159,56 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { assert.Equal(t, int64(0), highestUserObservations.FinalizedBlockNumber) assert.Equal(t, (*big.Int)(nil), highestUserObservations.TotalDifficulty) }) + t.Run("SubscribeToHeads with http polling enabled will update new heads", func(t *testing.T) { + type rpcServer struct { + Head *evmtypes.Head + URL *url.URL + } + createRPCServer := func() *rpcServer { + server := &rpcServer{} + server.Head = &evmtypes.Head{Number: 127} + server.URL = testutils.NewWSServer(t, chainId, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + assert.Equal(t, "eth_getBlockByNumber", method) + if assert.True(t, params.IsArray()) && assert.Equal(t, "latest", params.Array()[0].String()) { + head := server.Head + jsonHead, err := json.Marshal(head) + if err != nil { + panic(fmt.Errorf("failed to marshal head: %w", err)) + } + resp.Result = string(jsonHead) + } + + return + }).WSURL() + return server + } + + server := createRPCServer() + rpc := client.NewRPCClient(lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, tests.TestInterval, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + latest, highestUserObservations := rpc.GetInterceptedChainInfo() + // latest chain info hasn't been initialized + assert.Equal(t, int64(0), latest.BlockNumber) + assert.Equal(t, int64(0), highestUserObservations.BlockNumber) + + headCh, sub, err := rpc.SubscribeToHeads(commonclient.CtxAddHealthCheckFlag(tests.Context(t))) + require.NoError(t, err) + defer sub.Unsubscribe() + + head := <-headCh + assert.Equal(t, server.Head.Number, head.BlockNumber()) + // the http polling subscription should update the head block + latest, highestUserObservations = rpc.GetInterceptedChainInfo() + assert.Equal(t, server.Head.Number, latest.BlockNumber) + assert.Equal(t, server.Head.Number, highestUserObservations.BlockNumber) + }) t.Run("Concurrent Unsubscribe and onNewHead calls do not lead to a deadlock", func(t *testing.T) { const numberOfAttempts = 1000 // need a large number to increase the odds of reproducing the issue server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) var wg sync.WaitGroup @@ -160,7 +232,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch := make(chan *evmtypes.Head) @@ -177,17 +249,79 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) require.ErrorContains(t, err, "RPCClient returned error (rpc)") tests.AssertLogEventually(t, observed, "evmclient.Client#EthSubscribe RPC call failure") }) + t.Run("Closed rpc client should remove existing SubscribeNewHead subscription with WS", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + ch := make(chan *evmtypes.Head) + sub, err := rpc.SubscribeNewHead(tests.Context(t), ch) + require.NoError(t, err) + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) + t.Run("Closed rpc client should remove existing SubscribeNewHead subscription with HTTP polling", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 1, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + ch := make(chan *evmtypes.Head) + sub, err := rpc.SubscribeNewHead(tests.Context(t), ch) + require.NoError(t, err) + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) + t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with WS", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + _, sub, err := rpc.SubscribeToHeads(tests.Context(t)) + require.NoError(t, err) + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) + t.Run("Closed rpc client should remove existing SubscribeToHeads subscription with HTTP polling", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 1, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + _, sub, err := rpc.SubscribeToHeads(tests.Context(t)) + require.NoError(t, err) + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) + t.Run("Closed rpc client should remove existing SubscribeToFinalizedHeads subscription", func(t *testing.T) { + server := testutils.NewWSServer(t, chainId, serverCallBack) + wsURL := server.WSURL() + + rpc := client.NewRPCClient(lggr, wsURL, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 1, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + defer rpc.Close() + require.NoError(t, rpc.Dial(ctx)) + + _, sub, err := rpc.SubscribeToFinalizedHeads(tests.Context(t)) + require.NoError(t, err) + checkClosedRPCClientShouldRemoveExistingSub(t, ctx, sub, rpc) + }) t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) @@ -209,13 +343,22 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { lggr := logger.Test(t) ctx, cancel := context.WithTimeout(tests.Context(t), tests.WaitTimeout(t)) defer cancel() + t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { + // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing + observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) + rpcClient := client.NewRPCClient(observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + require.Nil(t, rpcClient.Dial(ctx)) + + _, err := rpcClient.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) + require.Equal(t, errors.New("SubscribeFilterLogs is not allowed without ws url"), err) + }) t.Run("Failed SubscribeFilterLogs logs and returns proper error", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, func(reqMethod string, reqParams gjson.Result) (resp testutils.JSONRPCResponse) { return resp }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(observedLggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -232,7 +375,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { return resp }) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -281,7 +424,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") + rpc := client.NewRPCClient(lggr, server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() server.Head = &evmtypes.Head{Number: 128} @@ -391,7 +534,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) { // use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout const rpcTimeout = time.Hour const largePayloadRPCTimeout = tests.TestInterval - rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "") + rpc := client.NewRPCClient(logger.Test(t), rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "") require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() err := testCase.Fn(ctx, rpc) @@ -431,7 +574,7 @@ func TestAstarCustomFinality(t *testing.T) { const expectedFinalizedBlockNumber = int64(4) const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804" - rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) + rpcClient := client.NewRPCClient(logger.Test(t), wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar) defer rpcClient.Close() err := rpcClient.Dial(tests.Context(t)) require.NoError(t, err) diff --git a/core/chains/evm/config/toml/config.go b/core/chains/evm/config/toml/config.go index fd4039f5ea..480d94d5d4 100644 --- a/core/chains/evm/config/toml/config.go +++ b/core/chains/evm/config/toml/config.go @@ -23,7 +23,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" ) -var ErrNotFound = errors.New("not found") +var ( + ErrNotFound = errors.New("not found") +) type HasEVMConfigs interface { EVMConfigs() EVMConfigs @@ -311,16 +313,38 @@ func (c *EVMConfig) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: "must have at least one node"}) } else { var hasPrimary bool - for _, n := range c.Nodes { + var logBroadcasterEnabled bool + var newHeadsPollingInterval commonconfig.Duration + if c.LogBroadcasterEnabled != nil { + logBroadcasterEnabled = *c.LogBroadcasterEnabled + } + + if c.NodePool.NewHeadsPollInterval != nil { + newHeadsPollingInterval = *c.NodePool.NewHeadsPollInterval + } + + for i, n := range c.Nodes { if n.SendOnly != nil && *n.SendOnly { continue } + hasPrimary = true - break + + // if the node is a primary node, then the WS URL is required when + // 1. LogBroadcaster is enabled + // 2. The http polling is disabled (newHeadsPollingInterval == 0) + if n.WSURL == nil || n.WSURL.IsZero() { + if logBroadcasterEnabled { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: fmt.Sprintf("%vth node (primary) must have a valid WSURL when LogBroadcaster is enabled", i)}) + } else if newHeadsPollingInterval.Duration() == 0 { + err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", Msg: fmt.Sprintf("%vth node (primary) must have a valid WSURL when http polling is disabled", i)}) + } + } } + if !hasPrimary { err = multierr.Append(err, commonconfig.ErrMissing{Name: "Nodes", - Msg: "must have at least one primary node with WSURL"}) + Msg: "must have at least one primary node"}) } } @@ -971,19 +995,8 @@ func (n *Node) ValidateConfig() (err error) { err = multierr.Append(err, commonconfig.ErrEmpty{Name: "Name", Msg: "required for all nodes"}) } - var sendOnly bool - if n.SendOnly != nil { - sendOnly = *n.SendOnly - } - if n.WSURL == nil { - if !sendOnly { - err = multierr.Append(err, commonconfig.ErrMissing{Name: "WSURL", Msg: "required for primary nodes"}) - } - } else if n.WSURL.IsZero() { - if !sendOnly { - err = multierr.Append(err, commonconfig.ErrEmpty{Name: "WSURL", Msg: "required for primary nodes"}) - } - } else { + // relax the check here as WSURL can potentially be empty if LogBroadcaster is disabled (checked in EVMConfig Validation) + if n.WSURL != nil && !n.WSURL.IsZero() { switch n.WSURL.Scheme { case "ws", "wss": default: diff --git a/core/config/docs/chains-evm.toml b/core/config/docs/chains-evm.toml index fc53b0cc70..fe83fe64d5 100644 --- a/core/config/docs/chains-evm.toml +++ b/core/config/docs/chains-evm.toml @@ -469,7 +469,7 @@ ObservationGracePeriod = '1s' # Default [[EVM.Nodes]] # Name is a unique (per-chain) identifier for this node. Name = 'foo' # Example -# WSURL is the WS(S) endpoint for this node. Required for primary nodes. +# WSURL is the WS(S) endpoint for this node. Required for primary nodes when `LogBroadcasterEnabled` is `true` WSURL = 'wss://web.socket/test' # Example # HTTPURL is the HTTP(S) endpoint for this node. Required for all nodes. HTTPURL = 'https://foo.web' # Example diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 97970df38f..9caf37ccf2 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -1337,11 +1337,12 @@ func TestConfig_Validate(t *testing.T) { - LDAP.RunUserGroupCN: invalid value (): LDAP ReadUserGroupCN can not be empty - LDAP.RunUserGroupCN: invalid value (): LDAP RunUserGroupCN can not be empty - LDAP.ReadUserGroupCN: invalid value (): LDAP ReadUserGroupCN can not be empty - - EVM: 9 errors: + - EVM: 10 errors: - 1.ChainID: invalid value (1): duplicate - must be unique - 0.Nodes.1.Name: invalid value (foo): duplicate - must be unique - 3.Nodes.4.WSURL: invalid value (ws://dupe.com): duplicate - must be unique - - 0: 3 errors: + - 0: 4 errors: + - Nodes: missing: 0th node (primary) must have a valid WSURL when LogBroadcaster is enabled - GasEstimator.BumpTxDepth: invalid value (11): must be less than or equal to Transactions.MaxInFlight - GasEstimator: 6 errors: - BumpPercent: invalid value (1): may not be less than Geth's default of 10 @@ -1351,9 +1352,7 @@ func TestConfig_Validate(t *testing.T) { - PriceMax: invalid value (10 gwei): must be greater than or equal to PriceDefault - BlockHistory.BlockHistorySize: invalid value (0): must be greater than or equal to 1 with BlockHistory Mode - Nodes: 2 errors: - - 0: 2 errors: - - WSURL: missing: required for primary nodes - - HTTPURL: missing: required for all nodes + - 0.HTTPURL: missing: required for all nodes - 1.HTTPURL: missing: required for all nodes - 1: 10 errors: - ChainType: invalid value (Foo): must not be set with this chain id @@ -1374,18 +1373,19 @@ func TestConfig_Validate(t *testing.T) { - ChainType: invalid value (Arbitrum): must be one of arbitrum, astar, celo, gnosis, hedera, kroma, mantle, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync or omitted - FinalityDepth: invalid value (0): must be greater than or equal to 1 - MinIncomingConfirmations: invalid value (0): must be greater than or equal to 1 - - 3.Nodes: 5 errors: - - 0: 3 errors: + - 3: 3 errors: + - Nodes: missing: 0th node (primary) must have a valid WSURL when LogBroadcaster is enabled + - Nodes: missing: 2th node (primary) must have a valid WSURL when LogBroadcaster is enabled + - Nodes: 5 errors: + - 0: 2 errors: - Name: missing: required for all nodes - - WSURL: missing: required for primary nodes - HTTPURL: empty: required for all nodes - 1: 3 errors: - Name: missing: required for all nodes - WSURL: invalid value (http): must be ws or wss - HTTPURL: missing: required for all nodes - - 2: 3 errors: + - 2: 2 errors: - Name: empty: required for all nodes - - WSURL: missing: required for primary nodes - HTTPURL: invalid value (ws): must be http or https - 3.HTTPURL: missing: required for all nodes - 4.HTTPURL: missing: required for all nodes @@ -1393,6 +1393,7 @@ func TestConfig_Validate(t *testing.T) { - ChainID: missing: required for all chains - Nodes: missing: must have at least one node - 5.Transactions.AutoPurge.DetectionApiUrl: invalid value (): must be set for scroll + - 6.Nodes: missing: 0th node (primary) must have a valid WSURL when http polling is disabled - Cosmos: 5 errors: - 1.ChainID: invalid value (Malaga-420): duplicate - must be unique - 0.Nodes.1.Name: invalid value (test): duplicate - must be unique diff --git a/core/services/chainlink/testdata/config-invalid.toml b/core/services/chainlink/testdata/config-invalid.toml index ca22e68c22..411741b1b5 100644 --- a/core/services/chainlink/testdata/config-invalid.toml +++ b/core/services/chainlink/testdata/config-invalid.toml @@ -117,6 +117,25 @@ Name = 'scroll node' WSURL = 'ws://foo.bar' HTTPURl = 'http://foo.bar' +[[EVM]] +ChainID = '100' +LogBroadcasterEnabled = false + +[[EVM.Nodes]] +Name = 'failing-fake' +HTTPURl = 'http://foo.bar1' + +[[EVM]] +ChainID = '101' +LogBroadcasterEnabled = false + +[EVM.NodePool] +NewHeadsPollInterval = '1s' + +[[EVM.Nodes]] +Name = 'passing-fake' +HTTPURl = 'http://foo.bar2' + [[Cosmos]] ChainID = 'Malaga-420' diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 0dca7763d4..98d2ec5d34 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -11348,7 +11348,7 @@ Name is a unique (per-chain) identifier for this node. ```toml WSURL = 'wss://web.socket/test' # Example ``` -WSURL is the WS(S) endpoint for this node. Required for primary nodes. +WSURL is the WS(S) endpoint for this node. Required for primary nodes when `LogBroadcasterEnabled` is `true` ### HTTPURL ```toml