Skip to content

Commit

Permalink
Remove Multinode as interface
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Jun 12, 2024
1 parent 0454491 commit 107a767
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 35 deletions.
45 changes: 14 additions & 31 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,6 @@ var (
type MultiNode[
CHAIN_ID types.ID,
RPC_CLIENT any,
] interface {
Dial(ctx context.Context) error
ChainID() CHAIN_ID
// SelectRPC - returns the best healthy RPCClient
SelectRPC() (RPC_CLIENT, error)
// DoAll - calls `do` sequentially on all healthy RPCClients.
// `do` can abort subsequent calls by returning `false`.
// Returns error if `do` was not called or context returns an error.
DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error
// NodeStates - returns RPCs' states
NodeStates() map[string]NodeState
Close() error
}

type multiNode[
CHAIN_ID types.ID,
RPC_CLIENT any,
] struct {
services.StateMachine
primaryNodes []Node[CHAIN_ID, RPC_CLIENT]
Expand Down Expand Up @@ -84,12 +67,12 @@ func NewMultiNode[
sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC_CLIENT],
chainID CHAIN_ID, // configured chain ID (used to verify that passed primaryNodes belong to the same chain)
chainFamily string, // name of the chain family - used in the metrics
) MultiNode[CHAIN_ID, RPC_CLIENT] {
) *MultiNode[CHAIN_ID, RPC_CLIENT] {
nodeSelector := newNodeSelector(selectionMode, primaryNodes)
// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
const reportInterval = 6500 * time.Millisecond
c := &multiNode[CHAIN_ID, RPC_CLIENT]{
c := &MultiNode[CHAIN_ID, RPC_CLIENT]{
primaryNodes: primaryNodes,
sendOnlyNodes: sendOnlyNodes,
chainID: chainID,
Expand All @@ -107,11 +90,11 @@ func NewMultiNode[
return c
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) ChainID() CHAIN_ID {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) ChainID() CHAIN_ID {
return c.chainID
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC_CLIENT, isSendOnly bool) bool) error {
callsCompleted := 0
for _, n := range c.primaryNodes {
if ctx.Err() != nil {
Expand Down Expand Up @@ -140,7 +123,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) DoAll(ctx context.Context, do func(ctx
return nil
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState {
states := map[string]NodeState{}
for _, n := range c.primaryNodes {
states[n.String()] = n.State()
Expand All @@ -155,7 +138,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) NodeStates() map[string]NodeState {
//
// Nodes handle their own redialing and runloops, so this function does not
// return any error if the nodes aren't available
func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error {
return c.StartOnce("MultiNode", func() (merr error) {
if len(c.primaryNodes) == 0 {
return fmt.Errorf("no available nodes for chain %s", c.chainID.String())
Expand Down Expand Up @@ -204,7 +187,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Dial(ctx context.Context) error {
}

// Close tears down the MultiNode and closes all nodes
func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Close() error {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) Close() error {
return c.StopOnce("MultiNode", func() error {
close(c.chStop)
c.wg.Wait()
Expand All @@ -215,7 +198,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) Close() error {

// SelectRPC returns an RPC of an active node. If there are no active nodes it returns an error.
// Call this method from your chain-specific client implementation to access any chain-specific rpc calls.
func (c *multiNode[CHAIN_ID, RPC_CLIENT]) SelectRPC() (rpc RPC_CLIENT, err error) {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) SelectRPC() (rpc RPC_CLIENT, err error) {
n, err := c.selectNode()
if err != nil {
return rpc, err
Expand All @@ -224,7 +207,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) SelectRPC() (rpc RPC_CLIENT, err error
}

// selectNode returns the active Node, if it is still NodeStateAlive, otherwise it selects a new one from the NodeSelector.
func (c *multiNode[CHAIN_ID, RPC_CLIENT]) selectNode() (node Node[CHAIN_ID, RPC_CLIENT], err error) {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) selectNode() (node Node[CHAIN_ID, RPC_CLIENT], err error) {
c.activeMu.RLock()
node = c.activeNode
c.activeMu.RUnlock()
Expand Down Expand Up @@ -254,7 +237,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) selectNode() (node Node[CHAIN_ID, RPC_

// nLiveNodes returns the number of currently alive nodes, as well as the highest block number and greatest total difficulty.
// totalDifficulty will be 0 if all nodes return nil.
func (c *multiNode[CHAIN_ID, RPC_CLIENT]) nLiveNodes() (nLiveNodes int, blockNumber int64, totalDifficulty *big.Int) {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) nLiveNodes() (nLiveNodes int, blockNumber int64, totalDifficulty *big.Int) {
totalDifficulty = big.NewInt(0)
for _, n := range c.primaryNodes {
if s, chainInfo := n.StateAndLatest(); s == NodeStateAlive {
Expand All @@ -270,7 +253,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) nLiveNodes() (nLiveNodes int, blockNum
return
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLease() {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) checkLease() {
bestNode := c.nodeSelector.Select()
for _, n := range c.primaryNodes {
// Terminate client subscriptions. Services are responsible for reconnecting, which will be routed to the new
Expand All @@ -288,7 +271,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLease() {
c.activeMu.Unlock()
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLeaseLoop() {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) checkLeaseLoop() {
defer c.wg.Done()
c.leaseTicker = time.NewTicker(c.leaseDuration)
defer c.leaseTicker.Stop()
Expand All @@ -303,7 +286,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) checkLeaseLoop() {
}
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) runLoop() {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) runLoop() {
defer c.wg.Done()

c.report()
Expand All @@ -321,7 +304,7 @@ func (c *multiNode[CHAIN_ID, RPC_CLIENT]) runLoop() {
}
}

func (c *multiNode[CHAIN_ID, RPC_CLIENT]) report() {
func (c *MultiNode[CHAIN_ID, RPC_CLIENT]) report() {
type nodeWithState struct {
Node string
State string
Expand Down
6 changes: 3 additions & 3 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
type multiNodeRPCClient RPCClient[types.ID, types.Head[Hashable]]

type testMultiNode struct {
*multiNode[types.ID, multiNodeRPCClient]
*MultiNode[types.ID, multiNodeRPCClient]
}

type multiNodeOpts struct {
Expand All @@ -43,7 +43,7 @@ func newTestMultiNode(t *testing.T, opts multiNodeOpts) testMultiNode {
result := NewMultiNode[types.ID, multiNodeRPCClient](
opts.logger, opts.selectionMode, opts.leaseDuration, opts.nodes, opts.sendonlys, opts.chainID, opts.chainFamily)
return testMultiNode{
result.(*multiNode[types.ID, multiNodeRPCClient]),
result,
}
}

Expand Down Expand Up @@ -640,7 +640,7 @@ func TestMultiNode_SendTransaction(t *testing.T) {
require.NoError(t, err)
require.NoError(t, mn.Close())
err = mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, "aborted while broadcasting tx - multiNode is stopped: context canceled")
require.EqualError(t, err, "aborted while broadcasting tx - MultiNode is stopped: context canceled")
})
t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) {
mn := newStartedMultiNode(t, multiNodeOpts{
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func ContextWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc
}

type chainClient struct {
multiNode commonclient.MultiNode[
multiNode *commonclient.MultiNode[
*big.Int,
ChainClientRPC,
]
Expand Down

0 comments on commit 107a767

Please sign in to comment.