-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BCI-2525: check all responses on transaction submission #11599
Changes from 13 commits
41cf0be
7bdce25
7274949
6c5915a
86104a2
ed88f18
6768c7e
fc41b06
e2767df
b27afc9
5db54d7
169a90b
6b0a7af
16dd2b1
70c7c2c
ab5fa6c
279f5f0
96f131e
d5d6ce2
d4268af
d43dc50
5ed9ab7
6369972
8256c5d
e376873
9a85e97
46f0fd3
0ee678f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,11 @@ var ( | |
Name: "multi_node_states", | ||
Help: "The number of RPC nodes currently in the given state for the given chain", | ||
}, []string{"network", "chainId", "state"}) | ||
// PromMultiNodeInvariantViolations reports violation of our assumptions | ||
PromMultiNodeInvariantViolations = promauto.NewCounterVec(prometheus.CounterOpts{ | ||
Name: "multi_node_invariant_violations", | ||
Help: "The number of invariant violations", | ||
}, []string{"network", "chainId", "invariant"}) | ||
ErroringNodeError = fmt.Errorf("no live nodes available") | ||
) | ||
|
||
|
@@ -556,63 +561,42 @@ type sendTxResult struct { | |
ResultCode SendTxReturnCode | ||
} | ||
|
||
// broadcastTxAsync - creates a goroutine that sends transaction to the node. Returns false, if MultiNode is Stopped | ||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context, | ||
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult) bool { | ||
|
||
ok := c.IfNotStopped(func() { | ||
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close | ||
c.wg.Add(1) | ||
go func() { | ||
defer c.wg.Done() | ||
|
||
txErr := n.RPC().SendTransaction(ctx, tx) | ||
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr) | ||
resultCode := c.classifySendTxError(tx, txErr) | ||
if resultCode != Successful && resultCode != TransactionAlreadyKnown { | ||
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) | ||
} | ||
|
||
// we expected txResults to have sufficient buffer, otherwise we are not interested in the response | ||
// and can drop it | ||
select { | ||
case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}: | ||
default: | ||
} | ||
|
||
}() | ||
}) | ||
if !ok { | ||
c.lggr.Debugw("Cannot broadcast transaction to node; MultiNode is stopped", "node", n.String()) | ||
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX) sendTxResult { | ||
txErr := n.RPC().SendTransaction(ctx, tx) | ||
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr) | ||
resultCode := c.classifySendTxError(tx, txErr) | ||
if resultCode != Successful && resultCode != TransactionAlreadyKnown { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since you just added a new list "sendTxSuccessfulCodes", this check should just change to checking if resultCode is in this list. |
||
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr) | ||
} | ||
|
||
return ok | ||
return sendTxResult{Err: txErr, ResultCode: resultCode} | ||
} | ||
|
||
// collectTxResults - reads send transaction results from the provided channel and groups them by `SendTxReturnCode.` | ||
// We balance the waiting time and the number of collected results. Our target is replies from 70% of nodes, | ||
// but we won't wait longer than sendTxSoftTimeout since the first reply to avoid waiting | ||
// for a timeout from slow/unhealthy nodes. | ||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) (map[SendTxReturnCode][]error, error) { | ||
const quorum = 0.7 | ||
requiredResults := int(math.Ceil(float64(len(c.nodes)) * quorum)) | ||
// collectTxResults - refer to SendTransaction comment for implementation details, | ||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) error { | ||
// combine context and stop channel to ensure we stop, when signal received | ||
ctx, cancel := c.chStop.Ctx(ctx) | ||
defer cancel() | ||
requiredResults := int(math.Ceil(float64(len(c.nodes)) * sendTxQuorum)) | ||
errorsByCode := map[SendTxReturnCode][]error{} | ||
var softTimeoutChan <-chan time.Time | ||
var resultsCount int | ||
loop: | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
c.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode) | ||
return nil, ctx.Err() | ||
return ctx.Err() | ||
case result := <-txResults: | ||
errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err) | ||
resultsCount++ | ||
if resultsCount >= requiredResults { | ||
return errorsByCode, nil | ||
if result.ResultCode == Successful || resultsCount >= requiredResults { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that if 70% of the nodes don't succeed, but another node succeeds at a later time, we won't be able to pick that up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, if first 70% of nodes to reply marked transaction as failed, we need to retry even if another node succeeds after that |
||
break loop | ||
} | ||
case <-softTimeoutChan: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if we have 0 responses so far? This seems to be half of the value for a standard There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is 0 responses, softTimeoutChan is nil and won't fire. We initialize it when we receive first reply |
||
c.lggr.Debugw("Send Tx soft timeout expired - returning responses we've collected so far", "tx", tx, "resultsCount", resultsCount, "requiredResults", requiredResults) | ||
return errorsByCode, nil | ||
break loop | ||
} | ||
|
||
if softTimeoutChan == nil { | ||
|
@@ -623,71 +607,126 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP | |
defer tm.Stop() | ||
} | ||
} | ||
|
||
// ignore critical error as it's reported in reportSendTxAnomalies | ||
result, _ := aggregateTxResults(errorsByCode) | ||
return result | ||
|
||
} | ||
|
||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) { | ||
defer c.wg.Done() | ||
resultsByCode := map[SendTxReturnCode][]error{} | ||
// txResults eventually will be closed | ||
for txResult := range txResults { | ||
resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err) | ||
} | ||
|
||
_, criticalErr := aggregateTxResults(resultsByCode) | ||
if criticalErr != nil { | ||
c.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) | ||
c.SvcErrBuffer.Append(criticalErr) | ||
PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), criticalErr.Error()).Inc() | ||
} | ||
} | ||
|
||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) aggregateTxResults(tx TX, resultsByCode map[SendTxReturnCode][]error) error { | ||
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown}) | ||
successResults, hasSuccess := findFirstIn(resultsByCode, []SendTxReturnCode{Successful, TransactionAlreadyKnown}) | ||
func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) { | ||
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors) | ||
successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes) | ||
if hasSuccess { | ||
// We assume that primary node would never report false positive result for a transaction. | ||
// We assume that primary node would never report false positive txResult for a transaction. | ||
// Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention. | ||
// Our best option in such situation is to return the error. | ||
if hasSevereErrors { | ||
const errMsg = "found contradictions in nodes replies on SendTransaction: got Successful and severe error" | ||
c.lggr.Criticalw(errMsg, "tx", tx, "resultsByCode", resultsByCode) | ||
err := fmt.Errorf(errMsg) | ||
c.SvcErrBuffer.Append(err) | ||
return severeErrors[0] | ||
const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error" | ||
// return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain | ||
return successResults[0], fmt.Errorf(errMsg) | ||
} | ||
|
||
// other errors are temporary - we are safe to return success | ||
return successResults[0] | ||
return successResults[0], nil | ||
} | ||
|
||
if hasSevereErrors { | ||
return severeErrors[0] | ||
return severeErrors[0], nil | ||
} | ||
|
||
// return temporary error | ||
for _, result := range resultsByCode { | ||
return result[0] | ||
return result[0], nil | ||
} | ||
|
||
const errMsg = "invariant violation: expected at least one response on SendTransaction" | ||
c.lggr.Criticalw(errMsg, "tx", tx) | ||
err := fmt.Errorf(errMsg) | ||
c.SvcErrBuffer.Append(err) | ||
return err | ||
err = fmt.Errorf("expected at least one response on SendTransaction") | ||
return err, err | ||
} | ||
|
||
const sendTxQuorum = 0.7 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment on this const? |
||
|
||
// SendTransaction - broadcasts transaction to all the send-only and primary nodes regardless of their health. | ||
// A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be | ||
// performed to determine the final state. | ||
// | ||
// Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary | ||
// to speed up the propagation of TX in the network. | ||
// | ||
// Handling of primary nodes' results consists of collection and aggregation. | ||
// In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds | ||
// on one of the following conditions: | ||
// * Received at least one success | ||
// * Received at least one result and `sendTxSoftTimeout` expired | ||
// * Received results from the sufficient number of nodes defined by sendTxQuorum. | ||
// The aggregation is based on the following conditions: | ||
// * If there is at least one success - returns success | ||
// * If there is at least one terminal error - returns terminal error | ||
// * If there is both success and terminal error - returns success and reports invariant violation | ||
// * Otherwise, returns any (effectively random) of the errors. | ||
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error { | ||
if len(c.nodes) == 0 { | ||
return ErroringNodeError | ||
} | ||
|
||
for _, n := range c.sendonlys { | ||
txResults := make(chan sendTxResult, len(c.nodes)) | ||
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close | ||
ok := c.IfNotStopped(func() { | ||
c.wg.Add(len(c.sendonlys)) | ||
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting | ||
if !c.broadcastTxAsync(ctx, n, tx, nil) { | ||
return fmt.Errorf("aborted while broadcasting to sendonlys - multinode is stopped: %w", context.Canceled) | ||
for _, n := range c.sendonlys { | ||
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { | ||
defer c.wg.Done() | ||
c.broadcastTxAsync(ctx, n, tx) | ||
}(n) | ||
} | ||
} | ||
|
||
txResults := make(chan sendTxResult, len(c.nodes)) | ||
for _, n := range c.nodes { | ||
if !c.broadcastTxAsync(ctx, n, tx, txResults) { | ||
return fmt.Errorf("aborted while broadcasting to primary - multinode is stopped: %w", context.Canceled) | ||
var primaryBroadcastWg sync.WaitGroup | ||
primaryBroadcastWg.Add(len(c.nodes)) | ||
txResultsToReport := make(chan sendTxResult, len(c.nodes)) | ||
for _, n := range c.nodes { | ||
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) { | ||
defer primaryBroadcastWg.Done() | ||
result := c.broadcastTxAsync(ctx, n, tx) | ||
// both channels are sufficiently buffered, so we won't be locked | ||
txResultsToReport <- result | ||
txResults <- result | ||
}(n) | ||
} | ||
} | ||
|
||
// combine context and stop channel to ensure we are not waiting for responses that won't arrive because MultiNode was stopped | ||
ctx, cancel := c.chStop.Ctx(ctx) | ||
defer cancel() | ||
resultsByCode, err := c.collectTxResults(ctx, tx, txResults) | ||
if err != nil { | ||
return fmt.Errorf("failed to collect tx results: %w", err) | ||
c.wg.Add(1) | ||
go func() { | ||
// wait for primary nodes to finish the broadcast before closing the channel | ||
primaryBroadcastWg.Wait() | ||
close(txResultsToReport) | ||
close(txResults) | ||
c.wg.Done() | ||
}() | ||
|
||
c.wg.Add(1) | ||
go c.reportSendTxAnomalies(tx, txResultsToReport) | ||
|
||
}) | ||
if !ok { | ||
return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled) | ||
} | ||
|
||
return c.aggregateTxResults(tx, resultsByCode) | ||
return c.collectTxResults(ctx, tx, txResults) | ||
} | ||
|
||
// findFirstIn - returns first existing value for the slice of keys | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we only need it to ensure that local unit tests cover all values