Skip to content

Commit

Permalink
BCI-2525: check all responses on transaction submission (#11599)
Browse files Browse the repository at this point in the history
* sendTx: signal success if one of the nodes accepted transaction

* fix logger

* fix merge

* fix race

* fixed multinode tests race

* improve test coverage

* WIP: wait for 70% of nodes to reply on send TX

* tests

* Report invariant violation via prom metrics

* fixed sendTx tests

* address comments

* polish PR

* Describe implementation details in the comment to SendTransaction

* nit fixes

* more fixes

* use softTimeOut default value

* nit fix

* ensure all goroutines are done before Close

* refactor broadcast

* use sendTxSuccessfulCodes slice to identify if result is successful

---------

Co-authored-by: Prashant Yadav <[email protected]>
  • Loading branch information
dhaidashenko and prashantkumar1982 authored Feb 9, 2024
1 parent de15206 commit 556a4f3
Show file tree
Hide file tree
Showing 5 changed files with 454 additions and 108 deletions.
7 changes: 7 additions & 0 deletions common/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ const (
InsufficientFunds // Tx was rejected due to insufficient funds.
ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected.
FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price.
sendTxReturnCodeLen // tracks the number of errors. Must always be last
)

// sendTxSevereErrors - error codes which signal that transaction would never be accepted in its current form by the node
var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown}

// sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node
var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown}

type NodeTier int

const (
Expand Down
223 changes: 189 additions & 34 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package client
import (
"context"
"fmt"
"math"
"math/big"
"slices"
"sync"
"time"

Expand All @@ -26,6 +28,11 @@ var (
Name: "multi_node_states",
Help: "The number of RPC nodes currently in the given state for the given chain",
}, []string{"network", "chainId", "state"})
// PromMultiNodeInvariantViolations reports violation of our assumptions
PromMultiNodeInvariantViolations = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "multi_node_invariant_violations",
Help: "The number of invariant violations",
}, []string{"network", "chainId", "invariant"})
ErroringNodeError = fmt.Errorf("no live nodes available")
)

Expand Down Expand Up @@ -94,14 +101,15 @@ type multiNode[
leaseTicker *time.Ticker
chainFamily string
reportInterval time.Duration
sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation

activeMu sync.RWMutex
activeNode Node[CHAIN_ID, HEAD, RPC_CLIENT]

chStop services.StopChan
wg sync.WaitGroup

sendOnlyErrorParser func(err error) SendTxReturnCode
classifySendTxError func(tx TX, err error) SendTxReturnCode
}

func NewMultiNode[
Expand All @@ -127,13 +135,16 @@ func NewMultiNode[
chainID CHAIN_ID,
chainType config.ChainType,
chainFamily string,
sendOnlyErrorParser func(err error) SendTxReturnCode,
classifySendTxError func(tx TX, err error) SendTxReturnCode,
sendTxSoftTimeout time.Duration,
) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] {
nodeSelector := newNodeSelector(selectionMode, nodes)

// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
const reportInterval = 6500 * time.Millisecond
if sendTxSoftTimeout == 0 {
sendTxSoftTimeout = QueryTimeout / 2
}
c := &multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]{
nodes: nodes,
sendonlys: sendonlys,
Expand All @@ -146,8 +157,9 @@ func NewMultiNode[
chStop: make(services.StopChan),
leaseDuration: leaseDuration,
chainFamily: chainFamily,
sendOnlyErrorParser: sendOnlyErrorParser,
classifySendTxError: classifySendTxError,
reportInterval: reportInterval,
sendTxSoftTimeout: sendTxSoftTimeout,
}

c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode)
Expand Down Expand Up @@ -545,45 +557,188 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
return n.RPC().SendEmptyTransaction(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress)
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error {
main, nodeError := c.selectNode()
var all []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
for _, n := range c.nodes {
all = append(all, n)
type sendTxResult struct {
Err error
ResultCode SendTxReturnCode
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context,
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX) sendTxResult {
txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
resultCode := c.classifySendTxError(tx, txErr)
if !slices.Contains(sendTxSuccessfulCodes, resultCode) {
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}
all = append(all, c.sendonlys...)
for _, n := range all {
if n == main {
// main node is used at the end for the return value
continue

return sendTxResult{Err: txErr, ResultCode: resultCode}
}

// collectTxResults - refer to SendTransaction comment for implementation details,
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) error {
// combine context and stop channel to ensure we stop, when signal received
ctx, cancel := c.chStop.Ctx(ctx)
defer cancel()
requiredResults := int(math.Ceil(float64(len(c.nodes)) * sendTxQuorum))
errorsByCode := map[SendTxReturnCode][]error{}
var softTimeoutChan <-chan time.Time
var resultsCount int
loop:
for {
select {
case <-ctx.Done():
c.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode)
return ctx.Err()
case result := <-txResults:
errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err)
resultsCount++
if slices.Contains(sendTxSuccessfulCodes, result.ResultCode) || resultsCount >= requiredResults {
break loop
}
case <-softTimeoutChan:
c.lggr.Debugw("Send Tx soft timeout expired - returning responses we've collected so far", "tx", tx, "resultsCount", resultsCount, "requiredResults", requiredResults)
break loop
}
// Parallel send to all other nodes with ignored return value
// Async - we do not want to block the main thread with secondary nodes
// in case they are unreliable/slow.
// It is purely a "best effort" send.
// Resource is not unbounded because the default context has a timeout.
ok := c.IfNotStopped(func() {
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
c.wg.Add(1)

if softTimeoutChan == nil {
tm := time.NewTimer(c.sendTxSoftTimeout)
softTimeoutChan = tm.C
// we are fine with stopping timer at the end of function
//nolint
defer tm.Stop()
}
}

// ignore critical error as it's reported in reportSendTxAnomalies
result, _ := aggregateTxResults(errorsByCode)
return result

}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) {
defer c.wg.Done()
resultsByCode := map[SendTxReturnCode][]error{}
// txResults eventually will be closed
for txResult := range txResults {
resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err)
}

_, criticalErr := aggregateTxResults(resultsByCode)
if criticalErr != nil {
c.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr)
c.SvcErrBuffer.Append(criticalErr)
PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), criticalErr.Error()).Inc()
}
}

func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) {
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors)
successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes)
if hasSuccess {
// We assume that primary node would never report false positive txResult for a transaction.
// Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention.
if hasSevereErrors {
const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error"
// return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain
return successResults[0], fmt.Errorf(errMsg)
}

// other errors are temporary - we are safe to return success
return successResults[0], nil
}

if hasSevereErrors {
return severeErrors[0], nil
}

// return temporary error
for _, result := range resultsByCode {
return result[0], nil
}

err = fmt.Errorf("expected at least one response on SendTransaction")
return err, err
}

const sendTxQuorum = 0.7

// SendTransaction - broadcasts transaction to all the send-only and primary nodes regardless of their health.
// A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be
// performed to determine the final state.
//
// Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary
// to speed up the propagation of TX in the network.
//
// Handling of primary nodes' results consists of collection and aggregation.
// In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds
// on one of the following conditions:
// * Received at least one success
// * Received at least one result and `sendTxSoftTimeout` expired
// * Received results from the sufficient number of nodes defined by sendTxQuorum.
// The aggregation is based on the following conditions:
// * If there is at least one success - returns success
// * If there is at least one terminal error - returns terminal error
// * If there is both success and terminal error - returns success and reports invariant violation
// * Otherwise, returns any (effectively random) of the errors.
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error {
if len(c.nodes) == 0 {
return ErroringNodeError
}

txResults := make(chan sendTxResult, len(c.nodes))
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
ok := c.IfNotStopped(func() {
c.wg.Add(len(c.sendonlys))
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting
for _, n := range c.sendonlys {
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
defer c.wg.Done()
c.broadcastTxAsync(ctx, n, tx)
}(n)
}

txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
sendOnlyError := c.sendOnlyErrorParser(txErr)
if sendOnlyError != Successful {
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}
var primaryBroadcastWg sync.WaitGroup
primaryBroadcastWg.Add(len(c.nodes))
txResultsToReport := make(chan sendTxResult, len(c.nodes))
for _, n := range c.nodes {
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
defer primaryBroadcastWg.Done()
result := c.broadcastTxAsync(ctx, n, tx)
// both channels are sufficiently buffered, so we won't be locked
txResultsToReport <- result
txResults <- result
}(n)
})
if !ok {
c.lggr.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String())
}

c.wg.Add(1)
go func() {
// wait for primary nodes to finish the broadcast before closing the channel
primaryBroadcastWg.Wait()
close(txResultsToReport)
close(txResults)
c.wg.Done()
}()

c.wg.Add(1)
go c.reportSendTxAnomalies(tx, txResultsToReport)

})
if !ok {
return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled)
}
if nodeError != nil {
return nodeError

return c.collectTxResults(ctx, tx, txResults)
}

// findFirstIn - returns first existing value for the slice of keys
func findFirstIn[K comparable, V any](set map[K]V, keys []K) (V, bool) {
for _, k := range keys {
if v, ok := set[k]; ok {
return v, true
}
}
return main.RPC().SendTransaction(ctx, tx)
var v V
return v, false
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SequenceAt(ctx context.Context, account ADDR, blockNumber *big.Int) (s SEQ, err error) {
Expand Down
Loading

0 comments on commit 556a4f3

Please sign in to comment.