Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 8, 2024
1 parent 70c7c2c commit ab5fa6c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 94 deletions.
146 changes: 89 additions & 57 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,42 +563,44 @@ type sendTxResult struct {

// broadcastTxAsync - creates a goroutine that sends transaction to the node. Returns false, if MultiNode is Stopped
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) broadcastTxAsync(ctx context.Context,
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult) bool {
n SendOnlyNode[CHAIN_ID, RPC_CLIENT], tx TX, txResults chan sendTxResult, wg *sync.WaitGroup) {
defer wg.Done()

ok := c.IfNotStopped(func() {
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
c.wg.Add(1)
go func() {
defer c.wg.Done()

txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
resultCode := c.classifySendTxError(tx, txErr)
if resultCode != Successful && resultCode != TransactionAlreadyKnown {
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}

// we expected txResults to have sufficient buffer, otherwise we are not interested in the response
// and can drop it
select {
case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}:
default:
}
txErr := n.RPC().SendTransaction(ctx, tx)
c.lggr.Debugw("Node sent transaction", "name", n.String(), "tx", tx, "err", txErr)
resultCode := c.classifySendTxError(tx, txErr)
if resultCode != Successful && resultCode != TransactionAlreadyKnown {
c.lggr.Warnw("RPC returned error", "name", n.String(), "tx", tx, "err", txErr)
}

}()
})
if !ok {
c.lggr.Debugw("Cannot broadcast transaction to node; MultiNode is stopped", "node", n.String())
// we expected txResults to have sufficient buffer, otherwise we are not interested in the response
// and can drop it
select {
case txResults <- sendTxResult{Err: txErr, ResultCode: resultCode}:
default:
}
}

func cloneChannel[T any](source <-chan T) chan T {
result := make(chan T, len(source))
go func() {
for t := range source {
result <- t
}
close(result)
}()

return ok
return result
}

// collectTxResults - reads send transaction results from the provided channel and groups them by `SendTxReturnCode.`
// waitTxResults - waits for sufficient number of sendTxResult to determine result of transaction submission.
// We balance the waiting time and the number of collected results. Our target is replies from 70% of nodes,
// but we won't wait longer than sendTxSoftTimeout since the first reply to avoid waiting
// for a timeout from slow/unhealthy nodes.
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) collectTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) (map[SendTxReturnCode][]error, error) {
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) waitTxResults(ctx context.Context, tx TX, txResults <-chan sendTxResult) (map[SendTxReturnCode][]error, error) {
// combine context and stop channel to ensure we stop, when signal received
ctx, cancel := c.chStop.Ctx(ctx)
defer cancel()
const quorum = 0.7
requiredResults := int(math.Ceil(float64(len(c.nodes)) * quorum))
errorsByCode := map[SendTxReturnCode][]error{}
Expand All @@ -611,6 +613,9 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
return nil, ctx.Err()
case result := <-txResults:
errorsByCode[result.ResultCode] = append(errorsByCode[result.ResultCode], result.Err)
if result.ResultCode == Successful {
return errorsByCode, nil
}
resultsCount++
if resultsCount >= requiredResults {
return errorsByCode, nil
Expand All @@ -630,71 +635,98 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP
}
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) aggregateTxResults(tx TX, resultsByCode map[SendTxReturnCode][]error) error {
func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) {
ok := c.IfNotStopped(func() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
resultsByCode := map[SendTxReturnCode][]error{}
for txResult := range txResults {
resultsByCode[txResult.ResultCode] = append(resultsByCode[txResult.ResultCode], txResult.Err)
}

_, criticalErr := aggregateTxResults(resultsByCode)
if criticalErr != nil {
c.SvcErrBuffer.Append(criticalErr)
PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), criticalErr.Error()).Inc()
}
}()
})
if !ok {
c.lggr.Debugw("Cannot report SendTransaction anomalies; MultiNode is stopped")
}
}

func aggregateTxResults(resultsByCode map[SendTxReturnCode][]error) (txResult error, err error) {
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, []SendTxReturnCode{Fatal, Underpriced, Unsupported, ExceedsMaxFee, FeeOutOfValidRange, Unknown})
successResults, hasSuccess := findFirstIn(resultsByCode, []SendTxReturnCode{Successful, TransactionAlreadyKnown})
if hasSuccess {
// We assume that primary node would never report false positive result for a transaction.
// We assume that primary node would never report false positive txResult for a transaction.
// Thus, if such case occurs it's probably due to misconfiguration or a bug and requires manual intervention.
if hasSevereErrors {
const errMsg = "found contradictions in nodes replies on SendTransaction: got Successful and severe error"
c.lggr.Criticalw(errMsg, "tx", tx, "resultsByCode", resultsByCode)
err := fmt.Errorf(errMsg)
c.SvcErrBuffer.Append(err)
PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), errMsg).Inc()
// return success, since at least 1 node has accepted our broadcasted Tx, and thus it can now be included onchain
return successResults[0]
return successResults[0], fmt.Errorf(errMsg)
}

// other errors are temporary - we are safe to return success
return successResults[0]
return successResults[0], nil
}

if hasSevereErrors {
return severeErrors[0]
return severeErrors[0], nil
}

// return temporary error
for _, result := range resultsByCode {
return result[0]
return result[0], nil
}

const errMsg = "invariant violation: expected at least one response on SendTransaction"
c.lggr.Criticalw(errMsg, "tx", tx)
err := fmt.Errorf(errMsg)
c.SvcErrBuffer.Append(err)
PromMultiNodeInvariantViolations.WithLabelValues(c.chainFamily, c.chainID.String(), errMsg).Inc()
return err
err = fmt.Errorf("invariant violation: expected at least one response on SendTransaction")
return err, err
}

func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT]) SendTransaction(ctx context.Context, tx TX) error {
if len(c.nodes) == 0 {
return ErroringNodeError
}

for _, n := range c.sendonlys {
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting
if !c.broadcastTxAsync(ctx, n, tx, nil) {
return fmt.Errorf("aborted while broadcasting to sendonlys - multinode is stopped: %w", context.Canceled)
txResults := make(chan sendTxResult, len(c.nodes))
ok := c.IfNotStopped(func() {
// Must wrap inside IfNotStopped to avoid waitgroup racing with Close
var wg sync.WaitGroup
wg.Add(len(c.sendonlys) + len(c.nodes))
for _, n := range c.sendonlys {
// fire-n-forget, as sendOnlyNodes can not be trusted with result reporting
go c.broadcastTxAsync(ctx, n, tx, nil, &wg)
}
}

txResults := make(chan sendTxResult, len(c.nodes))
for _, n := range c.nodes {
if !c.broadcastTxAsync(ctx, n, tx, txResults) {
return fmt.Errorf("aborted while broadcasting to primary - multinode is stopped: %w", context.Canceled)
for _, n := range c.nodes {
go c.broadcastTxAsync(ctx, n, tx, txResults, &wg)
}

c.wg.Add(1)
go func() {
wg.Wait()
c.wg.Done()
close(txResults)

}()
})
if !ok {
return fmt.Errorf("aborted while broadcasting tx - multinode is stopped: %w", context.Canceled)
}

// combine context and stop channel to ensure we are not waiting for responses that won't arrive because MultiNode was stopped
ctx, cancel := c.chStop.Ctx(ctx)
defer cancel()
resultsByCode, err := c.collectTxResults(ctx, tx, txResults)
go c.reportSendTxAnomalies(tx, cloneChannel(txResults))

resultsByCode, err := c.waitTxResults(ctx, tx, txResults)
if err != nil {
return fmt.Errorf("failed to collect tx results: %w", err)
}

return c.aggregateTxResults(tx, resultsByCode)
// ignore critical error as it's report in reportSendTxAnomalies
result, _ := aggregateTxResults(resultsByCode)
return result
}

// findFirstIn - returns first existing value for the slice of keys
Expand Down
51 changes: 14 additions & 37 deletions common/client/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func TestMultiNode_SendTransaction(t *testing.T) {
err := mn.SendTransaction(tests.Context(t), nil)
require.NoError(t, err)
})
t.Run("Fails when closed on sendonly broadcast", func(t *testing.T) {
t.Run("Fails when closed", func(t *testing.T) {
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
Expand All @@ -680,35 +680,12 @@ func TestMultiNode_SendTransaction(t *testing.T) {
require.NoError(t, err)
require.NoError(t, mn.Close())
err = mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, "aborted while broadcasting to sendonlys - multinode is stopped: context canceled")
})
t.Run("Fails when closed on nodes broadcast", func(t *testing.T) {
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
nodes: []Node[types.ID, types.Head[Hashable], multiNodeRPCClient]{newNode(t, nil, nil)},
sendonlys: nil,
classifySendTxError: classifySendTxError,
})
err := mn.StartOnce("startedTestMultiNode", func() error { return nil })
require.NoError(t, err)
require.NoError(t, mn.Close())
err = mn.SendTransaction(tests.Context(t), nil)
require.EqualError(t, err, "aborted while broadcasting to primary - multinode is stopped: context canceled")
require.EqualError(t, err, "aborted while broadcasting tx - multinode is stopped: context canceled")
})
}

func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
t.Parallel()
mn := newTestMultiNode(t, multiNodeOpts{
selectionMode: NodeSelectionModeRoundRobin,
chainID: types.RandomID(),
})
err := mn.StartOnce("startedTestMultiNode", func() error { return nil })
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, mn.Close())
})
// ensure failure on new SendTxReturnCode
codesToCover := map[SendTxReturnCode]struct{}{}
for code := Successful; code < sendTxReturnCodeLen; code++ {
Expand All @@ -717,13 +694,13 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {

testCases := []struct {
Name string
ExpectedErr string
ExpectedTxResult string
ExpectedCriticalErr string
ResultsByCode map[SendTxReturnCode][]error
}{
{
Name: "Returns success and logs critical error on Success and Fatal",
ExpectedErr: "success",
ExpectedTxResult: "success",
ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got Successful and severe error",
ResultsByCode: map[SendTxReturnCode][]error{
Successful: {errors.New("success")},
Expand All @@ -732,7 +709,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
},
{
Name: "Returns TransactionAlreadyKnown and logs critical error on TransactionAlreadyKnown and Fatal",
ExpectedErr: "tx_already_known",
ExpectedTxResult: "tx_already_known",
ExpectedCriticalErr: "found contradictions in nodes replies on SendTransaction: got Successful and severe error",
ResultsByCode: map[SendTxReturnCode][]error{
TransactionAlreadyKnown: {errors.New("tx_already_known")},
Expand All @@ -741,7 +718,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
},
{
Name: "Prefers sever error to temporary",
ExpectedErr: "underpriced",
ExpectedTxResult: "underpriced",
ExpectedCriticalErr: "",
ResultsByCode: map[SendTxReturnCode][]error{
Retryable: {errors.New("retryable")},
Expand All @@ -750,15 +727,15 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
},
{
Name: "Returns temporary error",
ExpectedErr: "retryable",
ExpectedTxResult: "retryable",
ExpectedCriticalErr: "",
ResultsByCode: map[SendTxReturnCode][]error{
Retryable: {errors.New("retryable")},
},
},
{
Name: "Insufficient funds is treated as error",
ExpectedErr: "",
ExpectedTxResult: "",
ExpectedCriticalErr: "",
ResultsByCode: map[SendTxReturnCode][]error{
Successful: {nil},
Expand All @@ -767,7 +744,7 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
},
{
Name: "Logs critical error on empty ResultsByCode",
ExpectedErr: "invariant violation: expected at least one response on SendTransaction",
ExpectedTxResult: "invariant violation: expected at least one response on SendTransaction",
ExpectedCriticalErr: "invariant violation: expected at least one response on SendTransaction",
ResultsByCode: map[SendTxReturnCode][]error{},
},
Expand All @@ -778,17 +755,17 @@ func TestMultiNode_SendTransaction_aggregateTxResults(t *testing.T) {
delete(codesToCover, code)
}
t.Run(testCase.Name, func(t *testing.T) {
err := mn.aggregateTxResults(nil, testCase.ResultsByCode)
if testCase.ExpectedErr == "" {
txResult, err := aggregateTxResults(testCase.ResultsByCode)
if testCase.ExpectedTxResult == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, testCase.ExpectedErr)
assert.EqualError(t, txResult, testCase.ExpectedTxResult)
}

if testCase.ExpectedCriticalErr == "" {
assert.NoError(t, mn.Healthy())
assert.NoError(t, err)
} else {
assert.EqualError(t, mn.Healthy(), testCase.ExpectedCriticalErr)
assert.EqualError(t, err, testCase.ExpectedCriticalErr)
}
})
}
Expand Down

0 comments on commit ab5fa6c

Please sign in to comment.