Skip to content

Commit

Permalink
Do not try to send tx result into collector if SendTransaction was al…
Browse files Browse the repository at this point in the history
…ready done
  • Loading branch information
dhaidashenko committed Nov 26, 2024
1 parent d77db32 commit e21d0bd
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
36 changes: 26 additions & 10 deletions common/client/transaction_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID types.ID, RPC SendT
// * Otherwise, returns any (effectively random) of the errors.
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT {
var result RESULT
ctx, cancel := txSender.chStop.Ctx(ctx)
defer cancel()
if !txSender.IfStarted(func() {
txResults := make(chan RESULT)
txResultsToReport := make(chan RESULT)
Expand All @@ -103,8 +105,6 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
if isSendOnly {
txSender.wg.Add(1)
go func(ctx context.Context) {
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
defer txSender.wg.Done()
// Send-only nodes' results are ignored as they tend to return false-positive responses.
// Broadcast to them is necessary to speed up the propagation of TX in the network.
Expand All @@ -117,8 +117,8 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
healthyNodesNum++
primaryNodeWg.Add(1)
go func(ctx context.Context) {
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
// Broadcasting transaction and results reporing are background jobs that should be detached from
// callers cancellation
defer primaryNodeWg.Done()
r := txSender.broadcastTxAsync(ctx, rpc, tx)
select {
Expand All @@ -128,6 +128,8 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
case txResults <- r:
}

ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
select {
case <-ctx.Done():
txSender.lggr.Debugw("Failed to send tx results to report", "err", ctx.Err())
Expand All @@ -151,8 +153,13 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
return
}

if healthyNodesNum == 0 {
result = txSender.newResult(ErroringNodeError)
return
}

txSender.wg.Add(1)
go txSender.reportSendTxAnomalies(ctx, tx, txResultsToReport)
go txSender.reportSendTxAnomalies(tx, txResultsToReport)

result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults)
}) {
Expand All @@ -163,6 +170,9 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT {
// broadcast is a background job, so always detach from caller's cancellation
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
result := rpc.SendTransaction(ctx, tx)
txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error())
if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil {
Expand All @@ -171,16 +181,25 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(c
return result
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(ctx context.Context, tx TX, txResults <-chan RESULT) {
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) {
defer txSender.wg.Done()
resultsByCode := sendTxResults[RESULT]{}
// txResults eventually will be closed
for txResult := range txResults {
resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult)
}

select {
case <-txSender.chStop:
// it's ok to receive no results if txSender is closing. Return early to prevent false reporting of invariant violation.
if len(resultsByCode) == 0 {
return
}
default:
}

_, criticalErr := aggregateTxResults[RESULT](resultsByCode)
if criticalErr != nil && ctx.Err() == nil {
if criticalErr != nil {
txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr)
PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc()
}
Expand Down Expand Up @@ -218,9 +237,6 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT {
if healthyNodesNum == 0 {
return txSender.newResult(ErroringNodeError)
}
requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum))
errorsByCode := sendTxResults[RESULT]{}
var softTimeoutChan <-chan time.Time
Expand Down
22 changes: 22 additions & 0 deletions common/client/transaction_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/common/types"
)

Expand Down Expand Up @@ -293,6 +294,27 @@ func TestTransactionSender_SendTransaction(t *testing.T) {
require.NoError(t, result.Error())
require.Equal(t, Successful, result.Code())
})
t.Run("All background jobs stop even if RPC returns result after soft timeout", func(t *testing.T) {
chainID := types.RandomID()
expectedError := errors.New("transaction failed")
fastNode := newNode(t, expectedError, nil)

// hold reply from the node till SendTransaction returns result
sendTxContext, sendTxCancel := context.WithCancel(tests.Context(t))
slowNode := newNode(t, errors.New("transaction failed"), func(_ mock.Arguments) {
<-sendTxContext.Done()
})

lggr := logger.Test(t)

_, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, TestSendTxRPCClient]{fastNode, slowNode}, nil)
result := txSender.SendTransaction(sendTxContext, nil)
sendTxCancel()
require.EqualError(t, result.Error(), expectedError.Error())
// TxSender should stop all background go routines after SendTransaction is done and before test is done.
// Otherwise, it signals that we have a goroutine leak.
txSender.wg.Wait()
})
}

func TestTransactionSender_SendTransaction_aggregateTxResults(t *testing.T) {
Expand Down

0 comments on commit e21d0bd

Please sign in to comment.