diff --git a/common/client/multi_node.go b/common/client/multi_node.go index a7f1cba0393..bce7c87d8f3 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -36,23 +36,6 @@ var ( type MultiNode[ CHAIN_ID types.ID, RPC_CLIENT any, -] interface { - Dial(ctx context.Context) error - ChainID() CHAIN_ID - // SelectRPC - returns the best healthy RPCClient - SelectRPC() (RPC_CLIENT, error) - // DoAll - calls `do` sequentially on all healthy RPCClients. - // `do` can abort subsequent calls by returning `false`. - // Returns error if `do` was not called or context returns an error. - DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error - // NodeStates - returns RPCs' states - NodeStates() map[string]NodeState - Close() error -} - -type multiNode[ - CHAIN_ID types.ID, - RPC_CLIENT any, ] struct { services.StateMachine primaryNodes []Node[CHAIN_ID, RPC_CLIENT] @@ -84,12 +67,12 @@ func NewMultiNode[ sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC_CLIENT], chainID CHAIN_ID, // configured chain ID (used to verify that passed primaryNodes belong to the same chain) chainFamily string, // name of the chain family - used in the metrics -) MultiNode[CHAIN_ID, RPC_CLIENT] { +) *MultiNode[CHAIN_ID, RPC_CLIENT] { nodeSelector := newNodeSelector(selectionMode, primaryNodes) // Prometheus' default interval is 15s, set this to under 7.5s to avoid // aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency) const reportInterval = 6500 * time.Millisecond - c := &multiNode[CHAIN_ID, RPC_CLIENT]{ + c := &MultiNode[CHAIN_ID, RPC_CLIENT]{ primaryNodes: primaryNodes, sendOnlyNodes: sendOnlyNodes, chainID: chainID, @@ -107,11 +90,11 @@ func NewMultiNode[ return c } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) ChainID() CHAIN_ID { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) ChainID() CHAIN_ID { return c.chainID } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error { callsCompleted := 0 for _, n := range c.primaryNodes { if ctx.Err() != nil { @@ -140,7 +123,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx return nil } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState { states := map[string]NodeState{} for _, n := range c.primaryNodes { states[n.String()] = n.State() @@ -155,7 +138,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState { // // Nodes handle their own redialing and runloops, so this function does not // return any error if the nodes aren't available -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error { return c.StartOnce("MultiNode", func() (merr error) { if len(c.primaryNodes) == 0 { return fmt.Errorf("no available nodes for chain %s", c.chainID.String()) @@ -204,7 +187,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error { } // Close tears down the MultiNode and closes all nodes -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Close() error { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) Close() error { return c.StopOnce("MultiNode", func() error { close(c.chStop) c.wg.Wait() @@ -215,7 +198,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Close() error { // SelectRPC returns an RPC of an active node. If there are no active nodes it returns an error. // Call this method from your chain-specific client implementation to access any chain-specific rpc calls. -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) SelectRPC() (rpc RPC_CLIENT, err error) { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) SelectRPC() (rpc RPC_CLIENT, err error) { n, err := c.selectNode() if err != nil { return rpc, err @@ -224,7 +207,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) SelectRPC() (rpc RPC_CLIENT, err error } // selectNode returns the active Node, if it is still NodeStateAlive, otherwise it selects a new one from the NodeSelector. -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) selectNode() (node Node[CHAIN_ID, RPC_CLIENT], err error) { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) selectNode() (node Node[CHAIN_ID, RPC_CLIENT], err error) { c.activeMu.RLock() node = c.activeNode c.activeMu.RUnlock() @@ -254,7 +237,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) selectNode() (node Node[CHAIN_ID, RPC_ // nLiveNodes returns the number of currently alive nodes, as well as the highest block number and greatest total difficulty. // totalDifficulty will be 0 if all nodes return nil. -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) nLiveNodes() (nLiveNodes int, blockNumber int64, totalDifficulty *big.Int) { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) nLiveNodes() (nLiveNodes int, blockNumber int64, totalDifficulty *big.Int) { totalDifficulty = big.NewInt(0) for _, n := range c.primaryNodes { if s, chainInfo := n.StateAndLatest(); s == NodeStateAlive { @@ -270,7 +253,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) nLiveNodes() (nLiveNodes int, blockNum return } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLease() { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) checkLease() { bestNode := c.nodeSelector.Select() for _, n := range c.primaryNodes { // Terminate client subscriptions. Services are responsible for reconnecting, which will be routed to the new @@ -288,7 +271,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLease() { c.activeMu.Unlock() } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLeaseLoop() { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) checkLeaseLoop() { defer c.wg.Done() c.leaseTicker = time.NewTicker(c.leaseDuration) defer c.leaseTicker.Stop() @@ -303,7 +286,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLeaseLoop() { } } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) runLoop() { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) runLoop() { defer c.wg.Done() c.report() @@ -321,7 +304,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) runLoop() { } } -func (c *multiNode[CHAIN_ID, RPC_CLIENT]) report() { +func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) report() { type nodeWithState struct { Node string State string diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 7218db1a06e..2f8aa6ff008 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -22,7 +22,7 @@ import ( type multiNodeRPCClient RPCClient[types.ID, types.Head[Hashable]] type testMultiNode struct { - *multiNode[types.ID, multiNodeRPCClient] + *MultiNode[types.ID, multiNodeRPCClient] } type multiNodeOpts struct { @@ -43,7 +43,7 @@ func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode { result := NewMultiNode[types.ID, multiNodeRPCClient]( opts.logger, opts.selectionMode, opts.leaseDuration, opts.nodes, opts.sendonlys, opts.chainID, opts.chainFamily) return testMultiNode{ - result.(*multiNode[types.ID, multiNodeRPCClient]), + result, } } @@ -640,7 +640,7 @@ func TestMultiNode_SendTransaction(t *testing.T) { require.NoError(t, err) require.NoError(t, mn.Close()) err = mn.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "aborted while broadcasting tx - multiNode is stopped: context canceled") + require.EqualError(t, err, "aborted while broadcasting tx - MultiNode is stopped: context canceled") }) t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) { mn := newStartedMultiNode(t, multiNodeOpts{ diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index fe2bd36a4b1..e3f7a5559b0 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -101,7 +101,7 @@ func ContextWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc } type chainClient struct { - multiNode commonclient.MultiNode[ + multiNode *commonclient.MultiNode[ *big.Int, ChainClientRPC, ]