Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCI-2525: check all responses on transaction submission #11599

Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
41cf0be
sendTx: signal success if one of the nodes accepted transaction
dhaidashenko Dec 18, 2023
7bdce25
fix logger
dhaidashenko Dec 18, 2023
7274949
fix merge
dhaidashenko Dec 18, 2023
6c5915a
fix race
dhaidashenko Dec 18, 2023
86104a2
fixed multinode tests race
dhaidashenko Dec 19, 2023
ed88f18
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Dec 19, 2023
6768c7e
improve test coverage
dhaidashenko Dec 19, 2023
fc41b06
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Dec 19, 2023
e2767df
WIP: wait for 70% of nodes to reply on send TX
dhaidashenko Dec 21, 2023
b27afc9
Merge branch 'feature/BCI-2525-send-transaction-check-all-responses' …
dhaidashenko Dec 21, 2023
5db54d7
tests
dhaidashenko Dec 22, 2023
169a90b
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Dec 22, 2023
6b0a7af
Report invariant violation via prom metrics
dhaidashenko Jan 8, 2024
16dd2b1
Merge branch 'feature/BCI-2525-send-transaction-check-all-responses' …
dhaidashenko Jan 8, 2024
70c7c2c
fixed sendTx tests
dhaidashenko Jan 8, 2024
ab5fa6c
address comments
dhaidashenko Jan 8, 2024
279f5f0
polish PR
dhaidashenko Jan 9, 2024
96f131e
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Jan 9, 2024
d5d6ce2
Describe implementation details in the comment to SendTransaction
dhaidashenko Jan 11, 2024
d4268af
nit fixes
dhaidashenko Jan 11, 2024
d43dc50
more fixes
dhaidashenko Jan 11, 2024
5ed9ab7
use softTimeOut default value
dhaidashenko Jan 11, 2024
6369972
nit fix
dhaidashenko Jan 15, 2024
8256c5d
ensure all goroutines are done before Close
dhaidashenko Jan 16, 2024
e376873
refactor broadcast
dhaidashenko Jan 17, 2024
9a85e97
use sendTxSuccessfulCodes slice to identify if result is successful
dhaidashenko Feb 8, 2024
46f0fd3
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
dhaidashenko Feb 8, 2024
0ee678f
Merge branch 'develop' into feature/BCI-2525-send-transaction-check-a…
prashantkumar1982 Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/client/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ const (
InsufficientFunds // Tx was rejected due to insufficient funds.
ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected.
FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price.
sendTxReturnCodeLen // tracks the number of errors. Must always be last
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be private?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we only need it to ensure that local unit tests cover all values

)

// sendTxSevereErrors - error codes which signal that transaction would never be accepted in its current form by the node
var sendTxSevereErrors = []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown}

// sendTxSuccessfulCodes - error codes which signal that transaction was accepted by the node
var sendTxSuccessfulCodes = []SendTxReturnCode{Successful, TransactionAlreadyKnown}

type NodeTier int

const (
Expand Down
242 changes: 205 additions & 37 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"fmt"
"math"
"math/big"
"sync"
"time"
Expand All @@ -26,6 +27,11 @@ var (
Name: "multi_node_states",
Help: "The number of RPC nodes currently in the given state for the given chain",
}, []string{"network", "chainId", "state"})
// PromMultiNodeInvariantViolations reports violation of our assumptions
PromMultiNodeInvariantViolations = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "multi_node_invariant_violations",
Help: "The number of invariant violations",
}, []string{"network", "chainId", "invariant"})
ErroringNodeError = fmt.Errorf("no live nodes available")
)

Expand Down Expand Up @@ -94,14 +100,15 @@ type multiNode[
leaseTicker *time.Ticker
chainFamily string
reportInterval time.Duration
sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation

activeMu sync.RWMutex
activeNode Node[CHAIN_ID, HEAD, RPC_CLIENT]

chStop services.StopChan
wg sync.WaitGroup

sendOnlyErrorParser func(err error) SendTxReturnCode
classifySendTxError func(tx TX, err error) SendTxReturnCode
}

func NewMultiNode[
Expand All @@ -127,13 +134,16 @@ func NewMultiNode[
chainID CHAIN_ID,
chainType config.ChainType,
chainFamily string,
sendOnlyErrorParser func(err error) SendTxReturnCode,
classifySendTxError func(tx TX, err error) SendTxReturnCode,
sendTxSoftTimeout time.Duration,
) MultiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT] {
nodeSelector := newNodeSelector(selectionMode, nodes)

// Prometheus' default interval is 15s, set this to under 7.5s to avoid
// aliasing (see: https://en.wikipedia.org/wiki/Nyquist_frequency)
const reportInterval = 6500 * time.Millisecond
if sendTxSoftTimeout == 0 {
sendTxSoftTimeout = QueryTimeout / 2
}
c := &multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]{
nodes: nodes,
sendonlys: sendonlys,
Expand All @@ -146,8 +156,9 @@ func NewMultiNode[
chStop: make(services.StopChan),
leaseDuration: leaseDuration,
chainFamily: chainFamily,
sendOnlyErrorParser: sendOnlyErrorParser,
classifySendTxError: classifySendTxError,
reportInterval: reportInterval,
sendTxSoftTimeout: sendTxSoftTimeout,
}

c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode)
Expand Down Expand Up @@ -545,45 +556,202 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
return n.RPC().SendEmptyTransaction(ctx, newTxAttempt, seq, gasLimit, fee, fromAddress)
}

type sendTxResult struct {
Err error
ResultCode SendTxReturnCode
}

// broadcastTxAsync - creates a goroutine that sends transaction to the node. Returns false, if MultiNode is Stopped
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context,
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult, wg *sync.WaitGroup) {
defer wg.Done()

txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
resultCode := c.classifySendTxError(tx, txErr)
if resultCode != Successful && resultCode != TransactionAlreadyKnown {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you just added a new list "sendTxSuccessfulCodes", this check should just change to checking if resultCode is in this list.

c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}

// we expected txResults to have sufficient buffer, otherwise we are not interested in the response
// and can drop it
select {
case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}:
default:
}
}

func fanOut[T any](source chan T, destinations ...chan T) {
for t := range source {
for _, dest := range destinations {
dest <- t
}
}

for _, dest := range destinations {
close(dest)
}
}

// collectTxResults - refer to SendTransaction comment for implementation details,
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) error {
// combine context and stop channel to ensure we stop, when signal received
ctx, cancel := c.chStop.Ctx(ctx)
defer cancel()
requiredResults := int(math.Ceil(float64(len(c.nodes)) * sendTxQuorum))
errorsByCode := map[SendTxReturnCode][]error{}
var softTimeoutChan <-chan time.Time
var resultsCount int
loop:
for {
select {
case <-ctx.Done():
c.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode)
return ctx.Err()
case result := <-txResults:
errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err)
resultsCount++
if result.ResultCode == Successful || resultsCount >= requiredResults {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if 70% of the nodes don't succeed, but another node succeeds at a later time, we won't be able to pick that up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if first 70% of nodes to reply marked transaction as failed, we need to retry even if another node succeeds after that

break loop
}
case <-softTimeoutChan:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we have 0 responses so far? This seems to be half of the value for a standard QueryTimeout

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is 0 responses, softTimeoutChan is nil and won't fire. We initialize it when we receive first reply

c.lggr.Debugw("Send Tx soft timeout expired - returning responses we've collected so far", "tx", tx, "resultsCount", resultsCount, "requiredResults", requiredResults)
break loop
}

if softTimeoutChan == nil {
tm := time.NewTimer(c.sendTxSoftTimeout)
softTimeoutChan = tm.C
// we are fine with stopping timer at the end of function
//nolint
defer tm.Stop()
}
}

// ignore critical error as it's reported in reportSendTxAnomalies
result, _ := aggregateTxResults(errorsByCode)
return result

}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) {
defer c.wg.Done()
resultsByCode := map[SendTxReturnCode][]error{}
// txResults eventually will be closed
for txResult := range txResults {
resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err)
}

_, criticalErr := aggregateTxResults(resultsByCode)
if criticalErr != nil {
c.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr)
c.SvcErrBuffer.Append(criticalErr)
PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), criticalErr.Error()).Inc()
}
}

func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) {
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors)
successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes)
if hasSuccess {
// We assume that primary node would never report false positive txResult for a transaction.
// Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention.
if hasSevereErrors {
const errMsg = "found contradictions in nodes replies on SendTransaction: got success and severe error"
// return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain
return successResults[0], fmt.Errorf(errMsg)
}

// other errors are temporary - we are safe to return success
return successResults[0], nil
}

if hasSevereErrors {
return severeErrors[0], nil
}

// return temporary error
for _, result := range resultsByCode {
return result[0], nil
}

err = fmt.Errorf("expected at least one response on SendTransaction")
return err, err
}

const sendTxQuorum = 0.7
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on this const?


// SendTransaction - broadcasts transaction to all the send-only and primary nodes regardless of their health.
// A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be
// performed to determine the final state.
//
// Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary
// to speed up the propagation of TX in the network.
//
// Handling of primary nodes' results consists of collection and aggregation.
// In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds
// on one of the following conditions:
// * Received at least one success
// * Received at least one result and `sendTxSoftTimeout` expired
// * Received results from the sufficient number of nodes defined by sendTxQuorum.
// The aggregation is based on the following conditions:
// * If there is at least one success - returns success
// * If there is at least one terminal error - returns terminal error
// * If there is both success and terminal error - returns success and reports invariant violation
// * Otherwise, returns any (effectively random) of the errors.
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error {
main, nodeError := c.selectNode()
var all []SendOnlyNode[CHAIN_ID, RPC_CLIENT]
for _, n := range c.nodes {
all = append(all, n)
if len(c.nodes) == 0 {
return ErroringNodeError
}
all = append(all, c.sendonlys...)
for _, n := range all {
if n == main {
// main node is used at the end for the return value
continue

txResults := make(chan sendTxResult, len(c.nodes))
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
ok := c.IfNotStopped(func() {
c.wg.Add(len(c.sendonlys))
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting
for _, n := range c.sendonlys {
go c.broadcastTxAsync(ctx, n, tx, nil, &c.wg)
}
// Parallel send to all other nodes with ignored return value
// Async - we do not want to block the main thread with secondary nodes
// in case they are unreliable/slow.
// It is purely a "best effort" send.
// Resource is not unbounded because the default context has a timeout.
ok := c.IfNotStopped(func() {
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
c.wg.Add(1)
go func(n SendOnlyNode[CHAIN_ID, RPC_CLIENT]) {
defer c.wg.Done()

txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Sendonly node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
sendOnlyError := c.sendOnlyErrorParser(txErr)
if sendOnlyError != Successful {
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}
}(n)
})
if !ok {
c.lggr.Debug("Cannot send transaction on sendonly node; MultiNode is stopped", "node", n.String())

// signal when all the primary nodes done broadcasting tx
inTxResults := make(chan sendTxResult, len(c.nodes))
var wg sync.WaitGroup
wg.Add(len(c.nodes))
c.wg.Add(1)
go func() {
wg.Wait()
c.wg.Done()
close(inTxResults)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better if you call close before c.wg.Done() since Close() shouldn't proceed unless this routine has fully returned.


}()

for _, n := range c.nodes {
go c.broadcastTxAsync(ctx, n, tx, inTxResults, &wg)
}

txResultsToReport := make(chan sendTxResult, len(c.nodes))
go fanOut(inTxResults, txResultsToReport, txResults)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if broadcastTxAsync is already over at this point and someone calls Close(), would we have a leak?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fanOut is not leaking, because if broadcastTxAsync is already over - inTxResults is closed. Both txResultsToReport, txResults are sufficiently buffered, so we won't stuck even if no one is reading the results.
Also Close can not complete before fanOut is done as reportSendTxAnomalies, that is protected by a wait group, waits for txResultsToReport to be closed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not suggesting that it will get stuck, I'm saying there is a rare case the multinode can close before the fanOut method returns. The fanOut method will still close eventually, perhaps almost instantly after the mutlinode close, but theoretically you can have a scenario where the parent routine exits before the child routine, which might get picked up by random tests, so perhaps it should be better if we do c.wg.Add(2) at the beginning.

Copy link
Collaborator Author

@dhaidashenko dhaidashenko Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. It might be better to make it more explicit


c.wg.Add(1)
go c.reportSendTxAnomalies(tx, txResultsToReport)

})
if !ok {
return fmt.Errorf("aborted while broadcasting tx - multiNode is stopped: %w", context.Canceled)
}
if nodeError != nil {
return nodeError

return c.collectTxResults(ctx, tx, txResults)
}

// findFirstIn - returns first existing value for the slice of keys
func findFirstIn[K comparable, V any](set map[K]V, keys []K) (V, bool) {
for _, k := range keys {
if v, ok := set[k]; ok {
return v, true
}
}
return main.RPC().SendTransaction(ctx, tx)
var v V
return v, false
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SequenceAt(ctx context.Context, account ADDR, blockNumber *big.Int) (s SEQ, err error) {
Expand Down
Loading
Loading