diff --git a/common/client/multi_node.go b/common/client/multi_node.go index 83ee1f6a895..343666a88ce 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -563,42 +563,44 @@ type sendTxResult struct { // broadcastTxAsync - creates a goroutine that sends transaction to the node. Returns false, if MultiNode is Stopped func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context, - n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult) bool { + n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult, wg *sync.WaitGroup) { + defer wg.Done() - ok := c.IfNotStopped(func() { - // Must wrap inside IfNotStopped to avoid waitgroup racing with Close - c.wg.Add(1) - go func() { - defer c.wg.Done() - - txErr := n.RPC().SendTransaction(ctx, tx) - c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr) - resultCode := c.classifySendTxError(tx, txErr) - if resultCode != Successful && resultCode != TransactionAlreadyKnown { - c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) - } - - // we expected txResults to have sufficient buffer, otherwise we are not interested in the response - // and can drop it - select { - case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}: - default: - } + txErr := n.RPC().SendTransaction(ctx, tx) + c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr) + resultCode := c.classifySendTxError(tx, txErr) + if resultCode != Successful && resultCode != TransactionAlreadyKnown { + c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) + } - }() - }) - if !ok { - c.lggr.Debugw("Cannot broadcast transaction to node; MultiNode is stopped", "node", n.String()) + // we expected txResults to have sufficient buffer, otherwise we are not interested in the response + // and can drop it + select { + case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}: + default: } +} + +func cloneChannel[T any](source <-chan T) chan T { + result := make(chan T, len(source)) + go func() { + for t := range source { + result <- t + } + close(result) + }() - return ok + return result } -// collectTxResults - reads send transaction results from the provided channel and groups them by `SendTxReturnCode.` +// waitTxResults - waits for sufficient number of sendTxResult to determine result of transaction submission. // We balance the waiting time and the number of collected results. Our target is replies from 70% of nodes, // but we won't wait longer than sendTxSoftTimeout since the first reply to avoid waiting // for a timeout from slow/unhealthy nodes. -func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) (map[SendTxReturnCode][]error, error) { +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) waitTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) (map[SendTxReturnCode][]error, error) { + // combine context and stop channel to ensure we stop, when signal received + ctx, cancel := c.chStop.Ctx(ctx) + defer cancel() const quorum = 0.7 requiredResults := int(math.Ceil(float64(len(c.nodes)) * quorum)) errorsByCode := map[SendTxReturnCode][]error{} @@ -611,6 +613,9 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return nil, ctx.Err() case result := <-txResults: errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) + if result.ResultCode == Successful { + return errorsByCode, nil + } resultsCount++ if resultsCount >= requiredResults { return errorsByCode, nil @@ -630,41 +635,55 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } } -func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) aggregateTxResults(tx TX, resultsByCode map[SendTxReturnCode][]error) error { +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { + ok := c.IfNotStopped(func() { + c.wg.Add(1) + go func() { + defer c.wg.Done() + resultsByCode := map[SendTxReturnCode][]error{} + for txResult := range txResults { + resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) + } + + _, criticalErr := aggregateTxResults(resultsByCode) + if criticalErr != nil { + c.SvcErrBuffer.Append(criticalErr) + PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), criticalErr.Error()).Inc() + } + }() + }) + if !ok { + c.lggr.Debugw("Cannot report SendTransaction anomalies; MultiNode is stopped") + } +} + +func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) { severeErrors, hasSevereErrors := findFirstIn(resultsByCode, []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown}) successResults, hasSuccess := findFirstIn(resultsByCode, []SendTxReturnCode{Successful, TransactionAlreadyKnown}) if hasSuccess { - // We assume that primary node would never report false positive result for a transaction. + // We assume that primary node would never report false positive txResult for a transaction. // Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. if hasSevereErrors { const errMsg = "found contradictions in nodes replies on SendTransaction: got Successful and severe error" - c.lggr.Criticalw(errMsg, "tx", tx, "resultsByCode", resultsByCode) - err := fmt.Errorf(errMsg) - c.SvcErrBuffer.Append(err) - PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), errMsg).Inc() // return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain - return successResults[0] + return successResults[0], fmt.Errorf(errMsg) } // other errors are temporary - we are safe to return success - return successResults[0] + return successResults[0], nil } if hasSevereErrors { - return severeErrors[0] + return severeErrors[0], nil } // return temporary error for _, result := range resultsByCode { - return result[0] + return result[0], nil } - const errMsg = "invariant violation: expected at least one response on SendTransaction" - c.lggr.Criticalw(errMsg, "tx", tx) - err := fmt.Errorf(errMsg) - c.SvcErrBuffer.Append(err) - PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), errMsg).Inc() - return err + err = fmt.Errorf("invariant violation: expected at least one response on SendTransaction") + return err, err } func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error { @@ -672,29 +691,42 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP return ErroringNodeError } - for _, n := range c.sendonlys { - // fire-n-forget, as sendOnlyNodes can not be trusted with result reporting - if !c.broadcastTxAsync(ctx, n, tx, nil) { - return fmt.Errorf("aborted while broadcasting to sendonlys - multinode is stopped: %w", context.Canceled) + txResults := make(chan sendTxResult, len(c.nodes)) + ok := c.IfNotStopped(func() { + // Must wrap inside IfNotStopped to avoid waitgroup racing with Close + var wg sync.WaitGroup + wg.Add(len(c.sendonlys) + len(c.nodes)) + for _, n := range c.sendonlys { + // fire-n-forget, as sendOnlyNodes can not be trusted with result reporting + go c.broadcastTxAsync(ctx, n, tx, nil, &wg) } - } - txResults := make(chan sendTxResult, len(c.nodes)) - for _, n := range c.nodes { - if !c.broadcastTxAsync(ctx, n, tx, txResults) { - return fmt.Errorf("aborted while broadcasting to primary - multinode is stopped: %w", context.Canceled) + for _, n := range c.nodes { + go c.broadcastTxAsync(ctx, n, tx, txResults, &wg) } + + c.wg.Add(1) + go func() { + wg.Wait() + c.wg.Done() + close(txResults) + + }() + }) + if !ok { + return fmt.Errorf("aborted while broadcasting tx - multinode is stopped: %w", context.Canceled) } - // combine context and stop channel to ensure we are not waiting for responses that won't arrive because MultiNode was stopped - ctx, cancel := c.chStop.Ctx(ctx) - defer cancel() - resultsByCode, err := c.collectTxResults(ctx, tx, txResults) + go c.reportSendTxAnomalies(tx, cloneChannel(txResults)) + + resultsByCode, err := c.waitTxResults(ctx, tx, txResults) if err != nil { return fmt.Errorf("failed to collect tx results: %w", err) } - return c.aggregateTxResults(tx, resultsByCode) + // ignore critical error as it's report in reportSendTxAnomalies + result, _ := aggregateTxResults(resultsByCode) + return result } // findFirstIn - returns first existing value for the slice of keys diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 5f7bc2bf911..a2e54cd79ac 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -668,7 +668,7 @@ func TestMultiNode_SendTransaction(t *testing.T) { err := mn.SendTransaction(tests.Context(t), nil) require.NoError(t, err) }) - t.Run("Fails when closed on sendonly broadcast", func(t *testing.T) { + t.Run("Fails when closed", func(t *testing.T) { mn := newTestMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, chainID: types.RandomID(), @@ -680,35 +680,12 @@ func TestMultiNode_SendTransaction(t *testing.T) { require.NoError(t, err) require.NoError(t, mn.Close()) err = mn.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "aborted while broadcasting to sendonlys - multinode is stopped: context canceled") - }) - t.Run("Fails when closed on nodes broadcast", func(t *testing.T) { - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: types.RandomID(), - nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{newNode(t, nil, nil)}, - sendonlys: nil, - classifySendTxError: classifySendTxError, - }) - err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) - require.NoError(t, err) - require.NoError(t, mn.Close()) - err = mn.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "aborted while broadcasting to primary - multinode is stopped: context canceled") + require.EqualError(t, err, "aborted while broadcasting tx - multinode is stopped: context canceled") }) } func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { t.Parallel() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: types.RandomID(), - }) - err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, mn.Close()) - }) // ensure failure on new SendTxReturnCode codesToCover := map[SendTxReturnCode]struct{}{} for code := Successful; code < sendTxReturnCodeLen; code++ { @@ -717,13 +694,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { testCases := []struct { Name string - ExpectedErr string + ExpectedTxResult string ExpectedCriticalErr string ResultsByCode map[SendTxReturnCode][]error }{ { Name: "Returns success and logs critical error on Success and Fatal", - ExpectedErr: "success", + ExpectedTxResult: "success", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got Successful and severe error", ResultsByCode: map[SendTxReturnCode][]error{ Successful: {errors.New("success")}, @@ -732,7 +709,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { }, { Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal", - ExpectedErr: "tx_already_known", + ExpectedTxResult: "tx_already_known", ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got Successful and severe error", ResultsByCode: map[SendTxReturnCode][]error{ TransactionAlreadyKnown: {errors.New("tx_already_known")}, @@ -741,7 +718,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { }, { Name: "Prefers sever error to temporary", - ExpectedErr: "underpriced", + ExpectedTxResult: "underpriced", ExpectedCriticalErr: "", ResultsByCode: map[SendTxReturnCode][]error{ Retryable: {errors.New("retryable")}, @@ -750,7 +727,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { }, { Name: "Returns temporary error", - ExpectedErr: "retryable", + ExpectedTxResult: "retryable", ExpectedCriticalErr: "", ResultsByCode: map[SendTxReturnCode][]error{ Retryable: {errors.New("retryable")}, @@ -758,7 +735,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { }, { Name: "Insufficient funds is treated as error", - ExpectedErr: "", + ExpectedTxResult: "", ExpectedCriticalErr: "", ResultsByCode: map[SendTxReturnCode][]error{ Successful: {nil}, @@ -767,7 +744,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { }, { Name: "Logs critical error on empty ResultsByCode", - ExpectedErr: "invariant violation: expected at least one response on SendTransaction", + ExpectedTxResult: "invariant violation: expected at least one response on SendTransaction", ExpectedCriticalErr: "invariant violation: expected at least one response on SendTransaction", ResultsByCode: map[SendTxReturnCode][]error{}, }, @@ -778,17 +755,17 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) { delete(codesToCover, code) } t.Run(testCase.Name, func(t *testing.T) { - err := mn.aggregateTxResults(nil, testCase.ResultsByCode) - if testCase.ExpectedErr == "" { + txResult, err := aggregateTxResults(testCase.ResultsByCode) + if testCase.ExpectedTxResult == "" { assert.NoError(t, err) } else { - assert.EqualError(t, err, testCase.ExpectedErr) + assert.EqualError(t, txResult, testCase.ExpectedTxResult) } if testCase.ExpectedCriticalErr == "" { - assert.NoError(t, mn.Healthy()) + assert.NoError(t, err) } else { - assert.EqualError(t, mn.Healthy(), testCase.ExpectedCriticalErr) + assert.EqualError(t, err, testCase.ExpectedCriticalErr) } }) }