From b291867eedecac65d2431308265c017f87846592 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 12 Jun 2024 13:26:57 -0400 Subject: [PATCH] Add PoolChainInfoProvider --- common/client/mock_node_test.go | 5 ++ .../mock_pool_chain_info_provider_test.go | 70 +++++++++++++++++++ common/client/multi_node.go | 44 +++++++++--- common/client/multi_node_test.go | 4 +- common/client/node.go | 8 +++ common/client/types.go | 12 ++++ 6 files changed, 132 insertions(+), 11 deletions(-) create mode 100644 common/client/mock_pool_chain_info_provider_test.go diff --git a/common/client/mock_node_test.go b/common/client/mock_node_test.go index 8e669391b30..4cf399ddffb 100644 --- a/common/client/mock_node_test.go +++ b/common/client/mock_node_test.go @@ -104,6 +104,11 @@ func (_m *mockNode[CHAIN_ID, RPC_CLIENT]) RPC() RPC_CLIENT { return r0 } +// SetPoolChainInfoProvider provides a mock function with given fields: _a0 +func (_m *mockNode[CHAIN_ID, RPC_CLIENT]) SetPoolChainInfoProvider(_a0 PoolChainInfoProvider) { + _m.Called(_a0) +} + // Start provides a mock function with given fields: _a0 func (_m *mockNode[CHAIN_ID, RPC_CLIENT]) Start(_a0 context.Context) error { ret := _m.Called(_a0) diff --git a/common/client/mock_pool_chain_info_provider_test.go b/common/client/mock_pool_chain_info_provider_test.go new file mode 100644 index 00000000000..563641f701d --- /dev/null +++ b/common/client/mock_pool_chain_info_provider_test.go @@ -0,0 +1,70 @@ +// Code generated by mockery v2.42.2. DO NOT EDIT. + +package client + +import mock "github.com/stretchr/testify/mock" + +// mockPoolChainInfoProvider is an autogenerated mock type for the PoolChainInfoProvider type +type mockPoolChainInfoProvider struct { + mock.Mock +} + +// HighestChainInfo provides a mock function with given fields: +func (_m *mockPoolChainInfoProvider) HighestChainInfo() ChainInfo { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for HighestChainInfo") + } + + var r0 ChainInfo + if rf, ok := ret.Get(0).(func() ChainInfo); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(ChainInfo) + } + + return r0 +} + +// LatestChainInfo provides a mock function with given fields: +func (_m *mockPoolChainInfoProvider) LatestChainInfo() (int, ChainInfo) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for LatestChainInfo") + } + + var r0 int + var r1 ChainInfo + if rf, ok := ret.Get(0).(func() (int, ChainInfo)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func() ChainInfo); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(ChainInfo) + } + + return r0, r1 +} + +// newMockPoolChainInfoProvider creates a new instance of mockPoolChainInfoProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockPoolChainInfoProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *mockPoolChainInfoProvider { + mock := &mockPoolChainInfoProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/common/client/multi_node.go b/common/client/multi_node.go index bce7c87d8f3..28b56910352 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -134,6 +134,39 @@ func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState { return states } +// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync. +// Return highest ChainInfo most recently received by the alive nodes. +// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12. +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) LatestChainInfo() (int, ChainInfo) { + var nLiveNodes int + ch := ChainInfo{ + BlockDifficulty: big.NewInt(0), + } + for _, n := range c.primaryNodes { + if s, nodeChainInfo := n.StateAndLatest(); s == NodeStateAlive { + nLiveNodes++ + ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) + ch.LatestFinalizedBlock = max(ch.LatestFinalizedBlock, nodeChainInfo.LatestFinalizedBlock) + ch.BlockDifficulty = nodeChainInfo.BlockDifficulty + } + } + return nLiveNodes, ch +} + +// HighestChainInfo - returns highest ChainInfo ever observed by any node in the pool. +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) HighestChainInfo() ChainInfo { + ch := ChainInfo{ + BlockDifficulty: big.NewInt(0), + } + for _, n := range c.primaryNodes { + _, nodeChainInfo := n.StateAndLatest() + ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) + ch.LatestFinalizedBlock = max(ch.LatestFinalizedBlock, nodeChainInfo.LatestFinalizedBlock) + ch.BlockDifficulty = nodeChainInfo.BlockDifficulty + } + return ch +} + // Dial starts every node in the pool // // Nodes handle their own redialing and runloops, so this function does not @@ -148,16 +181,7 @@ func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error { if n.ConfiguredChainID().String() != c.chainID.String() { return ms.CloseBecause(fmt.Errorf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", n.String(), n.ConfiguredChainID().String(), c.chainID.String())) } - /* TODO: Dmytro's PR on local finality handles this better. - rawNode, ok := n.(*node[CHAIN_ID, *evmtypes.Head, RPC_CLIENT]) - if ok { - // This is a bit hacky but it allows the node to be aware of - // pool state and prevent certain state transitions that might - // otherwise leave no primaryNodes available. It is better to have one - // node in a degraded state than no primaryNodes at all. - rawNode.nLiveNodes = c.nLiveNodes - } - */ + n.SetPoolChainInfoProvider(c) // node will handle its own redialing and automatic recovery if err := ms.Start(ctx, n); err != nil { return err diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 2f8aa6ff008..3981e05a3cc 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -58,6 +58,7 @@ func newNodeWithState(t *testing.T, chainID types.ID, state NodeState) *mockNode node.On("Close").Return(nil).Once() node.On("State").Return(state).Maybe() node.On("String").Return(fmt.Sprintf("healthy_node_%d", rand.Int())).Maybe() + node.On("SetPoolChainInfoProvider", mock.Anything).Once() return node } func TestMultiNode_Dial(t *testing.T) { @@ -98,6 +99,7 @@ func TestMultiNode_Dial(t *testing.T) { node.On("ConfiguredChainID").Return(chainID).Once() expectedError := errors.New("failed to start node") node.On("Start", mock.Anything).Return(expectedError).Once() + node.On("SetPoolChainInfoProvider", mock.Anything).Once() mn := newTestMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, chainID: chainID, @@ -115,6 +117,7 @@ func TestMultiNode_Dial(t *testing.T) { node2.On("ConfiguredChainID").Return(chainID).Once() expectedError := errors.New("failed to start node") node2.On("Start", mock.Anything).Return(expectedError).Once() + node2.On("SetPoolChainInfoProvider", mock.Anything).Once() mn := newTestMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, @@ -270,7 +273,6 @@ func TestMultiNode_CheckLease(t *testing.T) { t.Parallel() chainID := types.RandomID() node := newHealthyNode(t, chainID) - //node.On("SubscribersCount").Return(int32(2)) node.On("UnsubscribeAll") bestNode := newHealthyNode(t, chainID) nodeSelector := newMockNodeSelector[types.ID, multiNodeRPCClient](t) diff --git a/common/client/node.go b/common/client/node.go index 593665bf970..edb05cd9a12 100644 --- a/common/client/node.go +++ b/common/client/node.go @@ -71,6 +71,7 @@ type Node[ State() NodeState // StateAndLatest returns health state with the latest received block number & total difficulty. StateAndLatest() (NodeState, ChainInfo) + SetPoolChainInfoProvider(PoolChainInfoProvider) // Name is a unique identifier for this node. Name() string // String - returns string representation of the node, useful for debugging (name + URLS used to connect to the RPC) @@ -110,6 +111,9 @@ type node[ stateMu sync.RWMutex // protects state* fields state NodeState + + poolInfoProvider PoolChainInfoProvider + // Each node is tracking the last received head number and total difficulty stateLatestBlockNumber int64 stateLatestTotalDifficulty *big.Int @@ -173,6 +177,10 @@ func NewNode[ return n } +func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) SetPoolChainInfoProvider(poolInfoProvider PoolChainInfoProvider) { + n.poolInfoProvider = poolInfoProvider +} + func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) String() string { s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String()) if n.http != nil { diff --git a/common/client/types.go b/common/client/types.go index 74b9408e475..2dbc1d568a5 100644 --- a/common/client/types.go +++ b/common/client/types.go @@ -10,6 +10,18 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/types" ) +// PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo +// +//go:generate mockery --quiet --name PoolChainInfoProvider --structname mockPoolChainInfoProvider --filename "mock_pool_chain_info_provider_test.go" --inpackage --case=underscore +type PoolChainInfoProvider interface { + // LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being. + // Return highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number + // observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12. + LatestChainInfo() (int, ChainInfo) + // HighestChainInfo - returns highest ChainInfo ever observed by any node in the pool. + HighestChainInfo() ChainInfo +} + // RPCClient includes all the necessary generalized RPC methods along with any additional chain-specific methods. // //go:generate mockery --quiet --name RPCClient --structname MockRPCClient --filename "mock_rpc_client_test.go" --inpackage --case=underscore