Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new EVM Multinode #13184

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
86a5d58
Update node
DylanTinianov May 13, 2024
281f93f
Update Multinode
DylanTinianov May 14, 2024
6813259
Fix build + generate
DylanTinianov May 15, 2024
9e45475
Udate multinode
DylanTinianov May 16, 2024
a61a99e
update multinode
DylanTinianov May 16, 2024
3ff4cb9
fix tests
DylanTinianov May 16, 2024
c399391
Fix mocks
DylanTinianov May 21, 2024
b2b5926
Update node_lifecycle_test.go
DylanTinianov May 22, 2024
029c82b
Fix all client tests
DylanTinianov May 22, 2024
bd14d51
Fix tests
DylanTinianov May 24, 2024
db2c5f3
Update client_test.go
DylanTinianov May 24, 2024
28c917f
go mod tidy
DylanTinianov May 24, 2024
d873d25
fix tests
DylanTinianov May 29, 2024
119d947
Fix tests
DylanTinianov May 29, 2024
b3b60fc
Clean up
DylanTinianov May 29, 2024
9db0039
Fix features test mocking
DylanTinianov May 29, 2024
88bc047
Fix logging
DylanTinianov May 29, 2024
59e6752
Remove logging
DylanTinianov May 31, 2024
e940efa
Fix tests
DylanTinianov May 31, 2024
5dd3c5c
Merge branch 'BCI-3160-EVM-MultiNode-PoC' of https://github.com/smart…
DylanTinianov May 31, 2024
7b52a43
Fix context
DylanTinianov May 31, 2024
f6c83ac
lint
DylanTinianov Jun 4, 2024
8ccad6e
Update node_lifecycle_test.go
DylanTinianov Jun 4, 2024
3469af3
Remove unused generics
DylanTinianov Jun 4, 2024
3d0209c
Add state locking
DylanTinianov Jun 4, 2024
8755d87
Set block difficulty
DylanTinianov Jun 6, 2024
80e0030
Update node_lifecycle.go
DylanTinianov Jun 6, 2024
35d6ec6
Merge branch 'BCI-3160-EVM-MultiNode-PoC' of https://github.com/smart…
DylanTinianov Jun 11, 2024
9d8b107
Fix tests
DylanTinianov Jun 11, 2024
4079440
Make NodeStates public
DylanTinianov Jun 12, 2024
7498dde
Update multi_node_test.go
DylanTinianov Jun 12, 2024
4796377
Update Unsubscribe
DylanTinianov Jun 12, 2024
ce1214b
Remove HEAD generic from Node interface
DylanTinianov Jun 12, 2024
ae2afe0
Remove unneeded generics
DylanTinianov Jun 12, 2024
0454491
Remove unneeded generics from Multinode
DylanTinianov Jun 12, 2024
107a767
Remove Multinode as interface
DylanTinianov Jun 12, 2024
b291867
Add PoolChainInfoProvider
DylanTinianov Jun 12, 2024
f7425d9
Setup SendOnly nodes
DylanTinianov Jun 18, 2024
a78085b
Merge branch 'BCI-3160-EVM-MultiNode-PoC' of https://github.com/smart…
DylanTinianov Jun 18, 2024
cd3fdc9
Test empty context
DylanTinianov Jun 18, 2024
252c488
Add err to log
DylanTinianov Jun 18, 2024
e50e1f3
Add rpc newHeads method
DylanTinianov Jun 18, 2024
caa83e6
Fix context
DylanTinianov Jun 18, 2024
7cd64ef
Changeset
DylanTinianov Jun 18, 2024
511a7a2
Remove unused mocks
DylanTinianov Jun 18, 2024
8886d0c
Address comments
DylanTinianov Jun 27, 2024
a83d342
Remove ChainClientRPC interface
DylanTinianov Jun 28, 2024
71a2b7f
Remove unneeded test
DylanTinianov Jun 28, 2024
181a38b
Generate mocks
DylanTinianov Jun 28, 2024
e6e5419
Merge branch 'BCI-3160-EVM-MultiNode-PoC' of https://github.com/smart…
DylanTinianov Jul 4, 2024
70cea08
fix tests
DylanTinianov Jul 4, 2024
c9a6c12
Use UnsubscribeAllExcept
DylanTinianov Jul 4, 2024
d5a1a8c
Fix rpc client tests
DylanTinianov Jul 4, 2024
a50bb22
Address comments
DylanTinianov Jul 4, 2024
6a6c5d1
Merge branch 'BCI-3160-EVM-MultiNode-PoC' of https://github.com/smart…
DylanTinianov Jul 4, 2024
21c1c64
Remove unused code
DylanTinianov Jul 4, 2024
15e3fa7
Generate private mock
DylanTinianov Jul 4, 2024
f4ebec0
lint
DylanTinianov Jul 4, 2024
f3e0ec1
Fix locks and unsubscribing
DylanTinianov Jul 5, 2024
62e5f55
Update node.go
DylanTinianov Jul 5, 2024
8308ece
fixed flaky headtracker tests
dhaidashenko Jul 8, 2024
c4dce66
Merge branch 'BCI-3160-implement-node-interface' of github.com:smartc…
dhaidashenko Jul 8, 2024
80ddd26
Update node_lifecycle_test.go
DylanTinianov Jul 8, 2024
169e44b
Update node_lifecycle_test.go
DylanTinianov Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
686 changes: 101 additions & 585 deletions common/client/multi_node.go

Large diffs are not rendered by default.

34 changes: 12 additions & 22 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,34 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/common/config"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type multiNodeRPCClient RPC[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], any]
type multiNodeRPCClient RPCClient[types.ID, types.Head[Hashable]]

type testMultiNode struct {
*multiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient, any]
*multiNode[types.ID, Hashable, types.Head[Hashable], multiNodeRPCClient]
}

type multiNodeOpts struct {
logger logger.Logger
selectionMode string
leaseDuration time.Duration
noNewHeadsThreshold time.Duration
nodes []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]
sendonlys []SendOnlyNode[types.ID, multiNodeRPCClient]
chainID types.ID
chainType config.ChainType
chainFamily string
classifySendTxError func(tx any, err error) SendTxReturnCode
sendTxSoftTimeout time.Duration
logger logger.Logger
selectionMode string
leaseDuration time.Duration
nodes []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]
sendonlys []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]
chainID types.ID
chainFamily string
}

func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode {
if opts.logger == nil {
opts.logger = logger.Test(t)
}

result := NewMultiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient, any](opts.logger,
opts.selectionMode, opts.leaseDuration, opts.noNewHeadsThreshold, opts.nodes, opts.sendonlys,
opts.chainID, opts.chainType, opts.chainFamily, opts.classifySendTxError, opts.sendTxSoftTimeout)
result := NewMultiNode[types.ID, Hashable, types.Head[Hashable], multiNodeRPCClient](
opts.logger, opts.selectionMode, opts.leaseDuration, opts.nodes, opts.sendonlys, opts.chainID, opts.chainFamily)
return testMultiNode{
result.(*multiNode[types.ID, *big.Int, Hashable, Hashable, any, Hashable, any, any,
types.Receipt[Hashable, Hashable], Hashable, types.Head[Hashable], multiNodeRPCClient, any]),
result.(*multiNode[types.ID, Hashable, types.Head[Hashable], multiNodeRPCClient]),
}
}

Expand Down
80 changes: 42 additions & 38 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,47 @@ type ChainConfig interface {
ChainType() commonconfig.ChainType
}

// ChainInfo - represents RPC’s view of the chain
type ChainInfo struct {
// BlockNumber - block number of the most recent block observed by the Node
BlockNumber int64
// BlockDifficulty - difficulty of the most recent block observed by the Node
BlockDifficulty *big.Int
// LatestFinalizedBlock - block number of the most recently finalized block
LatestFinalizedBlock int64
}

//go:generate mockery --quiet --name Node --structname mockNode --filename "mock_node_test.go" --inpackage --case=underscore
type Node[
CHAIN_ID types.ID,
HEAD Head,
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
RPC NodeClient[CHAIN_ID, HEAD],
RPC_CLIENT RPCClient[CHAIN_ID, HEAD],
] interface {
// State returns nodeState
// State returns health state of the underlying RPC
State() nodeState
// StateAndLatest returns nodeState with the latest received block number & total difficulty.
StateAndLatest() (nodeState, int64, *big.Int)
// StateAndLatest returns health state with the latest received block number & total difficulty.
StateAndLatest() (nodeState, ChainInfo)
// Name is a unique identifier for this node.
Name() string
// String - returns string representation of the node, useful for debugging (name + URLS used to connect to the RPC)
String() string
RPC() RPC
SubscribersCount() int32
UnsubscribeAllExceptAliveLoop()
// RPC - returns the underlying RPC_CLIENT
RPC() RPC_CLIENT
// UnsubscribeAll - terminates all client subscriptions. Called by MultiNode to trigger clients to resubscribe to
// new best RPC
UnsubscribeAll()
ConfiguredChainID() CHAIN_ID
// Order - returns priority order configured for the RPC
Order() int32
// Start - starts health checks
Start(context.Context) error
Close() error
}

type node[
CHAIN_ID types.ID,
HEAD Head,
RPC NodeClient[CHAIN_ID, HEAD],
RPC_CLIENT RPCClient[CHAIN_ID, HEAD],
] struct {
services.StateMachine
lfcLog logger.Logger
Expand All @@ -97,7 +112,7 @@ type node[
ws url.URL
http *url.URL

rpc RPC
rpc RPC_CLIENT

stateMu sync.RWMutex // protects state* fields
state nodeState
Expand All @@ -123,7 +138,7 @@ type node[
func NewNode[
CHAIN_ID types.ID,
HEAD Head,
RPC NodeClient[CHAIN_ID, HEAD],
RPC_CLIENT RPCClient[CHAIN_ID, HEAD],
](
nodeCfg NodeConfig,
chainCfg ChainConfig,
Expand All @@ -134,10 +149,10 @@ func NewNode[
id int32,
chainID CHAIN_ID,
nodeOrder int32,
rpc RPC,
rpc RPC_CLIENT,
chainFamily string,
) Node[CHAIN_ID, HEAD, RPC] {
n := new(node[CHAIN_ID, HEAD, RPC])
) Node[CHAIN_ID, HEAD, RPC_CLIENT] {
n := new(node[CHAIN_ID, HEAD, RPC_CLIENT])
n.name = name
n.id = id
n.chainID = chainID
Expand All @@ -164,39 +179,35 @@ func NewNode[
return n
}

func (n *node[CHAIN_ID, HEAD, RPC]) String() string {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
if n.http != nil {
s = s + fmt.Sprintf(":%s", n.http.String())
}
return s
}

func (n *node[CHAIN_ID, HEAD, RPC]) ConfiguredChainID() (chainID CHAIN_ID) {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) ConfiguredChainID() (chainID CHAIN_ID) {
return n.chainID
}

func (n *node[CHAIN_ID, HEAD, RPC]) Name() string {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) Name() string {
return n.name
}

func (n *node[CHAIN_ID, HEAD, RPC]) RPC() RPC {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) RPC() RPC_CLIENT {
return n.rpc
}

func (n *node[CHAIN_ID, HEAD, RPC]) SubscribersCount() int32 {
return n.rpc.SubscribersCount()
}

func (n *node[CHAIN_ID, HEAD, RPC]) UnsubscribeAllExceptAliveLoop() {
n.rpc.UnsubscribeAllExceptAliveLoop()
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) UnsubscribeAll() {
n.rpc.UnsubscribeAllExcept()
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
}

func (n *node[CHAIN_ID, HEAD, RPC]) Close() error {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) Close() error {
return n.StopOnce(n.name, n.close)
}

func (n *node[CHAIN_ID, HEAD, RPC]) close() error {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) close() error {
defer func() {
n.wg.Wait()
n.rpc.Close()
Expand All @@ -214,7 +225,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) close() error {
// Should only be called once in a node's lifecycle
// Return value is necessary to conform to interface but this will never
// actually return an error.
func (n *node[CHAIN_ID, HEAD, RPC]) Start(startCtx context.Context) error {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) Start(startCtx context.Context) error {
return n.StartOnce(n.name, func() error {
n.start(startCtx)
return nil
Expand All @@ -226,7 +237,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) Start(startCtx context.Context) error {
// Not thread-safe.
// Node lifecycle is synchronous: only one goroutine should be running at a
// time.
func (n *node[CHAIN_ID, HEAD, RPC]) start(startCtx context.Context) {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) start(startCtx context.Context) {
if n.state != nodeStateUndialed {
panic(fmt.Sprintf("cannot dial node with state %v", n.state))
}
Expand All @@ -245,7 +256,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) start(startCtx context.Context) {
// verifyChainID checks that connection to the node matches the given chain ID
// Not thread-safe
// Pure verifyChainID: does not mutate node "state" field.
func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lggr logger.Logger) nodeState {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) verifyChainID(callerCtx context.Context, lggr logger.Logger) nodeState {
promPoolRPCNodeVerifies.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()
promFailed := func() {
promPoolRPCNodeVerifiesFailed.WithLabelValues(n.chainFamily, n.chainID.String(), n.name).Inc()
Expand Down Expand Up @@ -288,7 +299,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg

// createVerifiedConn - establishes new connection with the RPC and verifies that it's valid: chainID matches, and it's not syncing.
// Returns desired state if one of the verifications fails. Otherwise, returns nodeStateAlive.
func (n *node[CHAIN_ID, HEAD, RPC]) createVerifiedConn(ctx context.Context, lggr logger.Logger) nodeState {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) createVerifiedConn(ctx context.Context, lggr logger.Logger) nodeState {
if err := n.rpc.Dial(ctx); err != nil {
n.lfcLog.Errorw("Dial failed: Node is unreachable", "err", err, "nodeState", n.State())
return nodeStateUnreachable
Expand All @@ -299,7 +310,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) createVerifiedConn(ctx context.Context, lggr

// verifyConn - verifies that current connection is valid: chainID matches, and it's not syncing.
// Returns desired state if one of the verifications fails. Otherwise, returns nodeStateAlive.
func (n *node[CHAIN_ID, HEAD, RPC]) verifyConn(ctx context.Context, lggr logger.Logger) nodeState {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) verifyConn(ctx context.Context, lggr logger.Logger) nodeState {
state := n.verifyChainID(ctx, lggr)
if state != nodeStateAlive {
return state
Expand All @@ -321,13 +332,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyConn(ctx context.Context, lggr logger.
return nodeStateAlive
}

// disconnectAll disconnects all clients connected to the node
// WARNING: NOT THREAD-SAFE
// This must be called from within the n.stateMu lock
func (n *node[CHAIN_ID, HEAD, RPC]) disconnectAll() {
n.rpc.DisconnectAll()
}

func (n *node[CHAIN_ID, HEAD, RPC]) Order() int32 {
func (n *node[CHAIN_ID, HEAD, RPC_CLIENT]) Order() int32 {
return n.order
}
15 changes: 9 additions & 6 deletions common/client/node_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) State() nodeState {
return n.state
}

func (n *node[CHAIN_ID, HEAD, RPC]) StateAndLatest() (nodeState, int64, *big.Int) {
func (n *node[CHAIN_ID, HEAD, RPC]) StateAndLatest() (nodeState, ChainInfo) {
n.stateMu.RLock()
defer n.stateMu.RUnlock()
return n.state, n.stateLatestBlockNumber, n.stateLatestTotalDifficulty
return n.state, ChainInfo{
BlockNumber: n.stateLatestBlockNumber,
BlockDifficulty: n.stateLatestTotalDifficulty,
LatestFinalizedBlock: n.stateLatestFinalizedBlockNumber}
}

// setState is only used by internal state management methods.
Expand Down Expand Up @@ -209,7 +212,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToOutOfSync(fn func()) {
}
switch n.state {
case nodeStateAlive:
n.disconnectAll()
n.UnsubscribeAll()
DylanTinianov marked this conversation as resolved.
Show resolved Hide resolved
n.state = nodeStateOutOfSync
default:
panic(transitionFail(n.state, nodeStateOutOfSync))
Expand All @@ -234,7 +237,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
}
switch n.state {
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
n.disconnectAll()
n.UnsubscribeAll()
n.state = nodeStateUnreachable
default:
panic(transitionFail(n.state, nodeStateUnreachable))
Expand Down Expand Up @@ -277,7 +280,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing:
n.disconnectAll()
n.UnsubscribeAll()
n.state = nodeStateInvalidChainID
default:
panic(transitionFail(n.state, nodeStateInvalidChainID))
Expand All @@ -302,7 +305,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
n.disconnectAll()
n.UnsubscribeAll()
n.state = nodeStateSyncing
default:
panic(transitionFail(n.state, nodeStateSyncing))
Expand Down
Loading
Loading