Skip to content

Commit

Permalink
do not call an RPC if it's not Alive (#11999)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko authored Feb 13, 2024
1 parent ce84ce5 commit e78d3b8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 6 deletions.
26 changes: 21 additions & 5 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
// main node is used at the end for the return value
continue
}

if n.State() != nodeStateAlive {
continue
}
// Parallel call made to all other nodes with ignored return value
wg.Add(1)
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
Expand Down Expand Up @@ -575,11 +579,14 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}

// collectTxResults - refer to SendTransaction comment for implementation details,
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) error {
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult) error {
if healthyNodesNum == 0 {
return ErroringNodeError
}
// combine context and stop channel to ensure we stop, when signal received
ctx, cancel := c.chStop.Ctx(ctx)
defer cancel()
requiredResults := int(math.Ceil(float64(len(c.nodes)) * sendTxQuorum))
requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum))
errorsByCode := map[SendTxReturnCode][]error{}
var softTimeoutChan <-chan time.Time
var resultsCount int
Expand Down Expand Up @@ -685,22 +692,31 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
return ErroringNodeError
}

healthyNodesNum := 0
txResults := make(chan sendTxResult, len(c.nodes))
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
ok := c.IfNotStopped(func() {
c.wg.Add(len(c.sendonlys))
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting
for _, n := range c.sendonlys {
if n.State() != nodeStateAlive {
continue
}
c.wg.Add(1)
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
defer c.wg.Done()
c.broadcastTxAsync(ctx, n, tx)
}(n)
}

var primaryBroadcastWg sync.WaitGroup
primaryBroadcastWg.Add(len(c.nodes))
txResultsToReport := make(chan sendTxResult, len(c.nodes))
for _, n := range c.nodes {
if n.State() != nodeStateAlive {
continue
}

healthyNodesNum++
primaryBroadcastWg.Add(1)
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
defer primaryBroadcastWg.Done()
result := c.broadcastTxAsync(ctx, n, tx)
Expand All @@ -727,7 +743,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled)
}

return c.collectTxResults(ctx, tx, txResults)
return c.collectTxResults(ctx, tx, healthyNodesNum, txResults)
}

// findFirstIn - returns first existing value for the slice of keys
Expand Down
70 changes: 69 additions & 1 deletion common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,10 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) {
// setup ok and failed auxiliary nodes
okNode := newMockSendOnlyNode[types.ID, multiNodeRPCClient](t)
okNode.On("RPC").Return(okRPC).Once()
okNode.On("State").Return(nodeStateAlive)
failedNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
failedNode.On("RPC").Return(failedRPC).Once()
failedNode.On("State").Return(nodeStateAlive)

// setup main node
mainNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
Expand All @@ -557,6 +559,34 @@ func TestMultiNode_BatchCallContextAll(t *testing.T) {
require.NoError(t, err)
tests.RequireLogMessage(t, observedLogs, "Secondary node BatchCallContext failed")
})
t.Run("Does not call BatchCallContext for unhealthy nodes", func(t *testing.T) {
// setup RPCs
okRPC := newMultiNodeRPCClient(t)
okRPC.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Twice()

// setup ok and failed auxiliary nodes
healthyNode := newMockSendOnlyNode[types.ID, multiNodeRPCClient](t)
healthyNode.On("RPC").Return(okRPC).Once()
healthyNode.On("State").Return(nodeStateAlive)
deadNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
deadNode.On("State").Return(nodeStateUnreachable)

// setup main node
mainNode := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
mainNode.On("RPC").Return(okRPC)
nodeSelector := newMockNodeSelector[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
nodeSelector.On("Select").Return(mainNode).Once()
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{deadNode, mainNode},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{healthyNode, deadNode},
})
mn.nodeSelector = nodeSelector

err := mn.BatchCallContextAll(tests.Context(t), nil)
require.NoError(t, err)
})
}

func TestMultiNode_SendTransaction(t *testing.T) {
Expand All @@ -568,15 +598,20 @@ func TestMultiNode_SendTransaction(t *testing.T) {

return Successful
}
newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] {
newNodeWithState := func(t *testing.T, state nodeState, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] {
rpc := newMultiNodeRPCClient(t)
rpc.On("SendTransaction", mock.Anything, mock.Anything).Return(txErr).Run(sendTxRun).Maybe()
node := newMockNode[types.ID, types.Head[Hashable], multiNodeRPCClient](t)
node.On("String").Return("node name").Maybe()
node.On("RPC").Return(rpc).Maybe()
node.On("State").Return(state).Maybe()
node.On("Close").Return(nil).Once()
return node
}

newNode := func(t *testing.T, txErr error, sendTxRun func(args mock.Arguments)) *mockNode[types.ID, types.Head[Hashable], multiNodeRPCClient] {
return newNodeWithState(t, nodeStateAlive, txErr, sendTxRun)
}
newStartedMultiNode := func(t *testing.T, opts multiNodeOpts) testMultiNode {
mn := newTestMultiNode(t, opts)
err := mn.StartOnce("startedTestMultiNode", func() error { return nil })
Expand Down Expand Up @@ -714,6 +749,39 @@ func TestMultiNode_SendTransaction(t *testing.T) {
err = mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, "aborted while broadcasting tx - multiNode is stopped: context canceled")
})
t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) {
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{newNodeWithState(t, nodeStateUnreachable, nil, nil)},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newNodeWithState(t, nodeStateUnreachable, nil, nil)},
classifySendTxError: classifySendTxError,
})
err := mn.SendTransaction(tests.Context(t), nil)
assert.EqualError(t, err, ErroringNodeError.Error())
})
t.Run("Transaction success even if one of the nodes is unhealthy", func(t *testing.T) {
chainID := types.RandomID()
mainNode := newNode(t, nil, nil)
unexpectedCall := func(args mock.Arguments) {
panic("SendTx must not be called for unhealthy node")
}
unhealthyNode := newNodeWithState(t, nodeStateUnreachable, nil, unexpectedCall)
unhealthySendOnlyNode := newNodeWithState(t, nodeStateUnreachable, nil, unexpectedCall)
lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel)
mn := newStartedMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: chainID,
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{mainNode, unhealthyNode},
sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{unhealthySendOnlyNode, newNode(t, errors.New("unexpected error"), nil)},
classifySendTxError: classifySendTxError,
logger: lggr,
})
err := mn.SendTransaction(tests.Context(t), nil)
require.NoError(t, err)
tests.AssertLogCountEventually(t, observedLogs, "Node sent transaction", 2)
tests.AssertLogCountEventually(t, observedLogs, "RPC returned error", 1)
})
}

func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
Expand Down

0 comments on commit e78d3b8

Please sign in to comment.