Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support new heads polling over http rpc client #14373

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bdb7eed
add functions for newHead polling
huangzhen1997 Sep 6, 2024
e40616f
add new flag, NewHeadsPollInterval
huangzhen1997 Sep 9, 2024
42c48ae
fix test
huangzhen1997 Sep 10, 2024
f6dba0b
Merge branch 'develop' into BCFR-706/rpc-client-polling-implementatio…
huangzhen1997 Sep 10, 2024
c51e82c
Merge branch 'develop' into BCFR-706/adding-a-new-flag-for-NewHeadsPo…
huangzhen1997 Sep 10, 2024
035b085
fix test
huangzhen1997 Sep 10, 2024
d58fe13
Merge pull request #14378 from smartcontractkit/BCFR-706/adding-a-new…
huangzhen1997 Sep 10, 2024
5090efc
step for adding polling new head impelmentation and sync+outOfSync lo…
huangzhen1997 Sep 11, 2024
1cd3f4c
fix lint
huangzhen1997 Sep 11, 2024
d595351
add unit test
huangzhen1997 Sep 11, 2024
7280eae
update changeset
huangzhen1997 Sep 11, 2024
309499f
update comments
huangzhen1997 Sep 11, 2024
70c5666
simplify step 1, need to fix test
huangzhen1997 Sep 13, 2024
b819f89
remove tests, no longer needed
huangzhen1997 Sep 13, 2024
4e8e35f
fix lint
huangzhen1997 Sep 13, 2024
22f90f0
fix mock
huangzhen1997 Sep 13, 2024
87f3148
fix simulated client
huangzhen1997 Sep 13, 2024
9f60dc8
update interface
huangzhen1997 Sep 13, 2024
aa4a030
rm more
huangzhen1997 Sep 13, 2024
5d22045
temp update for testing log level
huangzhen1997 Sep 16, 2024
c98f850
temp update
huangzhen1997 Sep 16, 2024
ec63fca
revert modification to polygon toml
huangzhen1997 Sep 17, 2024
f0d25ce
enable heap monitoring
huangzhen1997 Sep 18, 2024
612c560
enable for testing
huangzhen1997 Sep 18, 2024
6a45eee
revert
huangzhen1997 Sep 18, 2024
d8e5911
Dmytro's comments
huangzhen1997 Sep 19, 2024
8476135
make func private
huangzhen1997 Sep 20, 2024
a354ce5
Merge branch 'develop' into BCFR-706/rpc-client-polling-implementatio…
huangzhen1997 Sep 20, 2024
45f18e4
fix test
huangzhen1997 Sep 20, 2024
2752555
add polling support for SubscribeNewHeaddocke
huangzhen1997 Sep 20, 2024
4ccae14
update log level
huangzhen1997 Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/happy-feet-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chainlink": minor
---

This PR introduce few changes:
- Add a new config option `EVM.NodePool.NewHeadsPollInterval` (0 by default indicate disabled), which is an interval for polling new block periodically using http client rather than subscribe to ws feed.
- Updated new head handler for polling new head over http, and register the subscription in node lifecycle logic.
- If the polling new heads is enabled, WS new heads subscription will be replaced with the new http based polling.

Note: There will be another PR for making WS URL optional with some extra condition.
#added
1 change: 1 addition & 0 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NodeConfig interface {
FinalizedBlockPollInterval() time.Duration
EnforceRepeatableRead() bool
DeathDeclarationDelay() time.Duration
NewHeadsPollInterval() time.Duration
}

type ChainConfig interface {
Expand Down
5 changes: 5 additions & 0 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type testNodeConfig struct {
enforceRepeatableRead bool
finalizedBlockPollInterval time.Duration
deathDeclarationDelay time.Duration
newHeadsPollInterval time.Duration
}

func (n testNodeConfig) NewHeadsPollInterval() time.Duration {
return n.newHeadsPollInterval
}

func (n testNodeConfig) PollFailureThreshold() uint32 {
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/client/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewClientConfigs(
deathDeclarationDelay time.Duration,
noNewFinalizedHeadsThreshold time.Duration,
finalizedBlockPollInterval time.Duration,

newHeadsPollInterval time.Duration,
) (commonclient.ChainConfig, evmconfig.NodePool, []*toml.Node, error) {
nodes, err := parseNodeConfigs(nodeCfgs)
if err != nil {
Expand All @@ -59,6 +59,7 @@ func NewClientConfigs(
EnforceRepeatableRead: enforceRepeatableRead,
DeathDeclarationDelay: commonconfig.MustNewDuration(deathDeclarationDelay),
FinalizedBlockPollInterval: commonconfig.MustNewDuration(finalizedBlockPollInterval),
NewHeadsPollInterval: commonconfig.MustNewDuration(newHeadsPollInterval),
}
nodePoolCfg := &evmconfig.NodePoolConfig{C: nodePool}
chainConfig := &evmconfig.EVMConfig{
Expand Down
5 changes: 4 additions & 1 deletion core/chains/evm/client/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func TestClientConfigBuilder(t *testing.T) {
finalityDepth := ptr(uint32(10))
finalityTagEnabled := ptr(true)
noNewHeadsThreshold := time.Second
newHeadsPollInterval := 0 * time.Second
chainCfg, nodePool, nodes, err := client.NewClientConfigs(selectionMode, leaseDuration, chainTypeStr, nodeConfigs,
pollFailureThreshold, pollInterval, syncThreshold, nodeIsSyncingEnabled, noNewHeadsThreshold, finalityDepth,
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold, pollInterval)
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold,
pollInterval, newHeadsPollInterval)
require.NoError(t, err)

// Validate node pool configs
Expand All @@ -52,6 +54,7 @@ func TestClientConfigBuilder(t *testing.T) {
require.Equal(t, *enforceRepeatableRead, nodePool.EnforceRepeatableRead())
require.Equal(t, deathDeclarationDelay, nodePool.DeathDeclarationDelay())
require.Equal(t, pollInterval, nodePool.FinalizedBlockPollInterval())
require.Equal(t, newHeadsPollInterval, nodePool.NewHeadsPollInterval())

// Validate node configs
require.Equal(t, *nodeConfigs[0].Name, *nodes[0].Name)
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli
for i, node := range nodes {
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
rpc, "EVM")
Expand Down
4 changes: 3 additions & 1 deletion core/chains/evm/client/evm_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNewEvmClient(t *testing.T) {
deathDeclarationDelay := time.Second * 3
noNewFinalizedBlocksThreshold := time.Second * 5
finalizedBlockPollInterval := time.Second * 4
newHeadsPollInterval := time.Second * 4
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo"),
Expand All @@ -40,7 +41,8 @@ func TestNewEvmClient(t *testing.T) {
finalityTagEnabled := ptr(true)
chainCfg, nodePool, nodes, err := client.NewClientConfigs(selectionMode, leaseDuration, chainTypeStr, nodeConfigs,
pollFailureThreshold, pollInterval, syncThreshold, nodeIsSyncingEnabled, noNewHeadsThreshold, finalityDepth,
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold, finalizedBlockPollInterval)
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold,
finalizedBlockPollInterval, newHeadsPollInterval)
require.NoError(t, err)

client := client.NewEvmClient(nodePool, chainCfg, nil, logger.Test(t), testutils.FixtureChainID, nodes, chaintype.ChainType(chainTypeStr))
Expand Down
9 changes: 7 additions & 2 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type TestNodePoolConfig struct {
NodeErrors config.ClientErrors
EnforceRepeatableReadVal bool
NodeDeathDeclarationDelay time.Duration
NodeNewHeadsPollInterval time.Duration
}

func (tc TestNodePoolConfig) PollFailureThreshold() uint32 { return tc.NodePollFailureThreshold }
Expand All @@ -110,6 +111,10 @@ func (tc TestNodePoolConfig) FinalizedBlockPollInterval() time.Duration {
return tc.NodeFinalizedBlockPollInterval
}

func (tc TestNodePoolConfig) NewHeadsPollInterval() time.Duration {
return tc.NodeNewHeadsPollInterval
}

func (tc TestNodePoolConfig) Errors() config.ClientErrors {
return tc.NodeErrors
}
Expand Down Expand Up @@ -143,7 +148,7 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
Expand All @@ -155,7 +160,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down
32 changes: 32 additions & 0 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type rpcClient struct {
largePayloadRpcTimeout time.Duration
rpcTimeout time.Duration
finalizedBlockPollInterval time.Duration
newHeadsPollInterval time.Duration
chainType chaintype.ChainType

ws rawclient
Expand Down Expand Up @@ -159,6 +160,7 @@ func NewRPCClient(
chainID *big.Int,
tier commonclient.NodeTier,
finalizedBlockPollInterval time.Duration,
newHeadsPollInterval time.Duration,
largePayloadRpcTimeout time.Duration,
rpcTimeout time.Duration,
chainType chaintype.ChainType,
Expand All @@ -174,6 +176,7 @@ func NewRPCClient(
r.tier = tier
r.ws.uri = wsuri
r.finalizedBlockPollInterval = finalizedBlockPollInterval
r.newHeadsPollInterval = newHeadsPollInterval
if httpuri != nil {
r.http = &rawclient{uri: *httpuri}
}
Expand Down Expand Up @@ -490,6 +493,18 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp
args := []interface{}{"newHeads"}
lggr := r.newRqLggr().With("args", args)

if r.newHeadsPollInterval > 0 {
interval := r.newHeadsPollInterval
timeout := interval
poller, _ := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why copy r.newHeadsPollInterval twice? Not a big deal either way I guess, just wondering why not just:

poller, _ := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, interval, r.rpcLog)

or even:

poller, _ := commonclient.NewPoller[*evmtypes.Head](r.newHeadsPollInterval, r.latestBlock, r.newHeadsPollInterval, r.rpcLog)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the line 499 more readable, so it's not looking like passing the same parameter twice. And it's more configurable if we decided to update the timeout value later. And it's consistent with the SubscribeToFinalizedHeads function

if err = poller.Start(ctx); err != nil {
return nil, err
}

lggr.Debugf("Polling new heads over http")
return &poller, nil
}

lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
start := time.Now()
defer func() {
Expand Down Expand Up @@ -523,6 +538,19 @@ func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H
start := time.Now()
lggr := r.newRqLggr().With("args", args)

// if new head based on http polling is enabled, we will replace it for WS newHead subscription
if r.newHeadsPollInterval > 0 {
interval := r.newHeadsPollInterval
timeout := interval
poller, channel := commonclient.NewPoller[*evmtypes.Head](interval, r.latestBlock, timeout, r.rpcLog)
if err = poller.Start(ctx); err != nil {
return nil, nil, err
}

lggr.Debugf("Polling new heads over http")
return channel, &poller, nil
}

lggr.Debug("RPC call: evmclient.Client#EthSubscribe")
defer func() {
duration := time.Since(start)
Expand Down Expand Up @@ -695,6 +723,10 @@ func (r *rpcClient) LatestFinalizedBlock(ctx context.Context) (head *evmtypes.He
return
}

func (r *rpcClient) latestBlock(ctx context.Context) (head *evmtypes.Head, err error) {
return r.BlockByNumber(ctx, nil)
}

func (r *rpcClient) astarLatestFinalizedBlock(ctx context.Context, result interface{}) (err error) {
var hashResult string
err = r.CallContext(ctx, &hashResult, "chain_getFinalizedHead")
Expand Down
22 changes: 11 additions & 11 deletions core/chains/evm/client/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
// set to default values
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
ch := make(chan *evmtypes.Head)
Expand All @@ -136,7 +136,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()

rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
var wg sync.WaitGroup
Expand All @@ -160,7 +160,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
t.Run("Block's chain ID matched configured", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
ch := make(chan *evmtypes.Head)
Expand All @@ -177,7 +177,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
})
wsURL := server.WSURL()
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel)
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
server.Close()
_, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head))
Expand All @@ -187,7 +187,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) {
t.Run("Subscription error is properly wrapper", func(t *testing.T) {
server := testutils.NewWSServer(t, chainId, serverCallBack)
wsURL := server.WSURL()
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
sub, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head))
Expand Down Expand Up @@ -215,7 +215,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) {
})
wsURL := server.WSURL()
observedLggr, observed := logger.TestObserved(t, zap.DebugLevel)
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
server.Close()
_, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log))
Expand All @@ -232,7 +232,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) {
return resp
})
wsURL := server.WSURL()
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
defer rpc.Close()
require.NoError(t, rpc.Dial(ctx))
sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log))
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) {
}

server := createRPCServer()
rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
require.NoError(t, rpc.Dial(ctx))
defer rpc.Close()
server.Head = &evmtypes.Head{Number: 128}
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestRpcClientLargePayloadTimeout(t *testing.T) {
// use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout
const rpcTimeout = time.Hour
const largePayloadRPCTimeout = tests.TestInterval
rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, largePayloadRPCTimeout, rpcTimeout, "")
rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, largePayloadRPCTimeout, rpcTimeout, "")
require.NoError(t, rpc.Dial(ctx))
defer rpc.Close()
err := testCase.Fn(ctx, rpc)
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestAstarCustomFinality(t *testing.T) {

const expectedFinalizedBlockNumber = int64(4)
const expectedFinalizedBlockHash = "0x7441e97acf83f555e0deefef86db636bc8a37eb84747603412884e4df4d22804"
rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar)
rpcClient := client.NewRPCClient(logger.Test(t), *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, chaintype.ChainAstar)
defer rpcClient.Close()
err := rpcClient.Dial(tests.Context(t))
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped_node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func (n *NodePoolConfig) FinalizedBlockPollInterval() time.Duration {
return n.C.FinalizedBlockPollInterval.Duration()
}

func (n *NodePoolConfig) NewHeadsPollInterval() time.Duration {
return n.C.NewHeadsPollInterval.Duration()
}

func (n *NodePoolConfig) Errors() ClientErrors { return &clientErrorsConfig{c: n.C.Errors} }

func (n *NodePoolConfig) EnforceRepeatableRead() bool {
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type NodePool interface {
Errors() ClientErrors
EnforceRepeatableRead() bool
DeathDeclarationDelay() time.Duration
NewHeadsPollInterval() time.Duration
}

// TODO BCF-2509 does the chainscopedconfig really need the entire app config?
Expand Down
6 changes: 6 additions & 0 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ type NodePool struct {
Errors ClientErrors `toml:",omitempty"`
EnforceRepeatableRead *bool
DeathDeclarationDelay *commonconfig.Duration
NewHeadsPollInterval *commonconfig.Duration
}

func (p *NodePool) setFrom(f *NodePool) {
Expand Down Expand Up @@ -917,6 +918,11 @@ func (p *NodePool) setFrom(f *NodePool) {
if v := f.DeathDeclarationDelay; v != nil {
p.DeathDeclarationDelay = v
}

if v := f.NewHeadsPollInterval; v != nil {
p.NewHeadsPollInterval = v
}

p.Errors.setFrom(&f.Errors)
}

Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ NodeIsSyncingEnabled = false
FinalizedBlockPollInterval = '5s'
EnforceRepeatableRead = false
DeathDeclarationDelay = '10s'
NewHeadsPollInterval = '0s'

[OCR]
ContractConfirmations = 4
Expand Down
4 changes: 4 additions & 0 deletions core/config/docs/chains-evm.toml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ EnforceRepeatableRead = false # Default
# trigger declaration of `FinalizedBlockOutOfSync` due to insignificant network delays in broadcasting of the finalized state among RPCs.
# RPC will not be picked to handle a request even if this option is set to a nonzero value.
DeathDeclarationDelay = '10s' # Default
# NewHeadsPollInterval define an interval for polling new block periodically using http client rather than subscribe to ws feed
#
# Set to 0 to disable.
NewHeadsPollInterval = '0s' # Default
# **ADVANCED**
# Errors enable the node to provide custom regex patterns to match against error messages from RPCs.
[EVM.NodePool.Errors]
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ func TestConfig_Marshal(t *testing.T) {
FinalizedBlockPollInterval: &second,
EnforceRepeatableRead: ptr(true),
DeathDeclarationDelay: &minute,
NewHeadsPollInterval: &zeroSeconds,
Errors: evmcfg.ClientErrors{
NonceTooLow: ptr[string]("(: |^)nonce too low"),
NonceTooHigh: ptr[string]("(: |^)nonce too high"),
Expand Down Expand Up @@ -1117,6 +1118,7 @@ NodeIsSyncingEnabled = true
FinalizedBlockPollInterval = '1s'
EnforceRepeatableRead = true
DeathDeclarationDelay = '1m0s'
NewHeadsPollInterval = '0s'

[EVM.NodePool.Errors]
NonceTooLow = '(: |^)nonce too low'
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ NodeIsSyncingEnabled = true
FinalizedBlockPollInterval = '1s'
EnforceRepeatableRead = true
DeathDeclarationDelay = '1m0s'
NewHeadsPollInterval = '0s'

[EVM.NodePool.Errors]
NonceTooLow = '(: |^)nonce too low'
Expand Down
Loading
Loading