Skip to content

Commit

Permalink
Add PoolChainInfoProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Jun 12, 2024
1 parent 107a767 commit b291867
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 11 deletions.
5 changes: 5 additions & 0 deletions common/client/mock_node_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions common/client/mock_pool_chain_info_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 34 additions & 10 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,39 @@ func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState {
return states
}

// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync.
// Return highest ChainInfo most recently received by the alive nodes.
// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) LatestChainInfo() (int, ChainInfo) {
var nLiveNodes int
ch := ChainInfo{
BlockDifficulty: big.NewInt(0),
}
for _, n := range c.primaryNodes {
if s, nodeChainInfo := n.StateAndLatest(); s == NodeStateAlive {
nLiveNodes++
ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber)
ch.LatestFinalizedBlock = max(ch.LatestFinalizedBlock, nodeChainInfo.LatestFinalizedBlock)
ch.BlockDifficulty = nodeChainInfo.BlockDifficulty
}
}
return nLiveNodes, ch
}

// HighestChainInfo - returns highest ChainInfo ever observed by any node in the pool.
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) HighestChainInfo() ChainInfo {
ch := ChainInfo{
BlockDifficulty: big.NewInt(0),
}
for _, n := range c.primaryNodes {
_, nodeChainInfo := n.StateAndLatest()
ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber)
ch.LatestFinalizedBlock = max(ch.LatestFinalizedBlock, nodeChainInfo.LatestFinalizedBlock)
ch.BlockDifficulty = nodeChainInfo.BlockDifficulty
}
return ch
}

// Dial starts every node in the pool
//
// Nodes handle their own redialing and runloops, so this function does not
Expand All @@ -148,16 +181,7 @@ func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error {
if n.ConfiguredChainID().String() != c.chainID.String() {
return ms.CloseBecause(fmt.Errorf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", n.String(), n.ConfiguredChainID().String(), c.chainID.String()))
}
/* TODO: Dmytro's PR on local finality handles this better.
rawNode, ok := n.(*node[CHAIN_ID, *evmtypes.Head, RPC_CLIENT])
if ok {
// This is a bit hacky but it allows the node to be aware of
// pool state and prevent certain state transitions that might
// otherwise leave no primaryNodes available. It is better to have one
// node in a degraded state than no primaryNodes at all.
rawNode.nLiveNodes = c.nLiveNodes
}
*/
n.SetPoolChainInfoProvider(c)
// node will handle its own redialing and automatic recovery
if err := ms.Start(ctx, n); err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func newNodeWithState(t *testing.T, chainID types.ID, state NodeState) *mockNode
node.On("Close").Return(nil).Once()
node.On("State").Return(state).Maybe()
node.On("String").Return(fmt.Sprintf("healthy_node_%d", rand.Int())).Maybe()
node.On("SetPoolChainInfoProvider", mock.Anything).Once()
return node
}
func TestMultiNode_Dial(t *testing.T) {
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestMultiNode_Dial(t *testing.T) {
node.On("ConfiguredChainID").Return(chainID).Once()
expectedError := errors.New("failed to start node")
node.On("Start", mock.Anything).Return(expectedError).Once()
node.On("SetPoolChainInfoProvider", mock.Anything).Once()
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
Expand All @@ -115,6 +117,7 @@ func TestMultiNode_Dial(t *testing.T) {
node2.On("ConfiguredChainID").Return(chainID).Once()
expectedError := errors.New("failed to start node")
node2.On("Start", mock.Anything).Return(expectedError).Once()
node2.On("SetPoolChainInfoProvider", mock.Anything).Once()

mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
Expand Down Expand Up @@ -270,7 +273,6 @@ func TestMultiNode_CheckLease(t *testing.T) {
t.Parallel()
chainID := types.RandomID()
node := newHealthyNode(t, chainID)
//node.On("SubscribersCount").Return(int32(2))
node.On("UnsubscribeAll")
bestNode := newHealthyNode(t, chainID)
nodeSelector := newMockNodeSelector[types.ID, multiNodeRPCClient](t)
Expand Down
8 changes: 8 additions & 0 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Node[
State() NodeState
// StateAndLatest returns health state with the latest received block number & total difficulty.
StateAndLatest() (NodeState, ChainInfo)
SetPoolChainInfoProvider(PoolChainInfoProvider)
// Name is a unique identifier for this node.
Name() string
// String - returns string representation of the node, useful for debugging (name + URLS used to connect to the RPC)
Expand Down Expand Up @@ -110,6 +111,9 @@ type node[

stateMu sync.RWMutex // protects state* fields
state NodeState

poolInfoProvider PoolChainInfoProvider

// Each node is tracking the last received head number and total difficulty
stateLatestBlockNumber int64
stateLatestTotalDifficulty *big.Int
Expand Down Expand Up @@ -173,6 +177,10 @@ func NewNode[
return n
}

func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) SetPoolChainInfoProvider(poolInfoProvider PoolChainInfoProvider) {
n.poolInfoProvider = poolInfoProvider
}

func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
if n.http != nil {
Expand Down
12 changes: 12 additions & 0 deletions common/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo
//
//go:generate mockery --quiet --name PoolChainInfoProvider --structname mockPoolChainInfoProvider --filename "mock_pool_chain_info_provider_test.go" --inpackage --case=underscore
type PoolChainInfoProvider interface {
// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being.
// Return highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number
// observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12.
LatestChainInfo() (int, ChainInfo)
// HighestChainInfo - returns highest ChainInfo ever observed by any node in the pool.
HighestChainInfo() ChainInfo
}

// RPCClient includes all the necessary generalized RPC methods along with any additional chain-specific methods.
//
//go:generate mockery --quiet --name RPCClient --structname MockRPCClient --filename "mock_rpc_client_test.go" --inpackage --case=underscore
Expand Down

0 comments on commit b291867

Please sign in to comment.