Skip to content

Commit

Permalink
Mark terminally stuck transaction pending task runs as failure (#14282)
Browse files Browse the repository at this point in the history
* Updated Confirmer to resume pending task runs with failure for terminally stuck txs

* Updated resume callback error message

* Added CallbackCompleted check before resuming pending task
  • Loading branch information
amit-momin authored Sep 4, 2024
1 parent f7fec9c commit 1a2b7b6
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/flat-emus-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Updated TXM Confirmer logic to resume pending task runs with failure if transaction is terminally stuck #internal
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
// Now we have an errored pipeline even though the tx succeeded. This case
// is relatively benign and probably nobody will ever run into it in
// practice, but something to be aware of.
if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback {
if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback && !etx.CallbackCompleted {
err := eb.resumeCallback(ctx, etx.PipelineTaskRunID.UUID, nil, fmt.Errorf("fatal error while sending transaction: %s", etx.Error.String))
if errors.Is(err, sql.ErrNoRows) {
lgr.Debugw("callback missing or already resumed", "etxID", etx.ID)
Expand Down
29 changes: 28 additions & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package txmgr

import (
"context"
"database/sql"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -514,6 +515,13 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro
errMu.Unlock()
return
}
// Resume pending task runs with failure for stuck transactions
if err := ec.resumeFailedTaskRuns(ctx, tx); err != nil {
errMu.Lock()
errorList = append(errorList, fmt.Errorf("failed to resume pending task run for transaction: %w", err))
errMu.Unlock()
return
}
}(tx)
}
wg.Wait()
Expand Down Expand Up @@ -584,7 +592,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
return fmt.Errorf("saveFetchedReceipts failed: %w", err)
}
// Save the receipts but mark the associated transactions as Fatal Error since the original transaction was purged
if err := ec.txStore.SaveFetchedReceipts(ctx, purgeReceipts, TxFatalError, ec.stuckTxDetector.StuckTxFatalError(), ec.chainID); err != nil {
stuckTxFatalErrMsg := ec.stuckTxDetector.StuckTxFatalError()
if err := ec.txStore.SaveFetchedReceipts(ctx, purgeReceipts, TxFatalError, &stuckTxFatalErrMsg, ec.chainID); err != nil {
return fmt.Errorf("saveFetchedReceipts failed: %w", err)
}
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(receipts)))
Expand Down Expand Up @@ -616,6 +625,24 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sep
return
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resumeFailedTaskRuns(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
if !etx.PipelineTaskRunID.Valid || ec.resumeCallback == nil || !etx.SignalCallback || etx.CallbackCompleted {
return nil
}
err := ec.resumeCallback(ctx, etx.PipelineTaskRunID.UUID, nil, errors.New(ec.stuckTxDetector.StuckTxFatalError()))
if errors.Is(err, sql.ErrNoRows) {
ec.lggr.Debugw("callback missing or already resumed", "etxID", etx.ID)
} else if err != nil {
return fmt.Errorf("failed to resume pipeline: %w", err)
} else {
// Mark tx as having completed callback
if err = ec.txStore.UpdateTxCallbackCompleted(ctx, etx.PipelineTaskRunID.UUID, ec.chainID); err != nil {
return err
}
}
return nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) getMinedSequenceForAddress(ctx context.Context, from ADDR) (SEQ, error) {
return ec.client.SequenceAt(ctx, from, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ type StuckTxDetector[
// Sets the last purged block num after a transaction has been successfully purged with receipt
SetPurgeBlockNum(fromAddress ADDR, blockNum int64)
// Returns the error message to set in the transaction error field to mark it as terminally stuck
StuckTxFatalError() *string
StuckTxFatalError() string
}
9 changes: 8 additions & 1 deletion core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3235,6 +3235,11 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) {
stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), evmcfg.EVM().Transactions().AutoPurge(), feeEstimator, txStore, ethClient)
ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0)
ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database(), ethKeyStore, txBuilder, lggr, stuckTxDetector, ht)
fn := func(ctx context.Context, id uuid.UUID, result interface{}, err error) error {
require.ErrorContains(t, err, client.TerminallyStuckMsg)
return nil
}
ec.SetResumeCallback(fn)
servicetest.Run(t, ec)

ctx := tests.Context(t)
Expand All @@ -3246,7 +3251,8 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) {
// Create autoPurgeMinAttempts number of attempts to ensure the broadcast attempt count check is not being triggered
// Create attempts broadcasted autoPurgeThreshold block ago to ensure broadcast block num check is not being triggered
tx := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, nonce, fromAddress, autoPurgeMinAttempts, blockNum-int64(autoPurgeThreshold), marketGasPrice.Add(oneGwei))

// Update tx to signal callback once it is identified as terminally stuck
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, uuid.New(), tx.ID)
head := evmtypes.Head{
Hash: testutils.NewHash(),
Number: blockNum,
Expand Down Expand Up @@ -3307,6 +3313,7 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) {
require.NotNil(t, dbTx)
require.Equal(t, txmgrcommon.TxFatalError, dbTx.State)
require.Equal(t, client.TerminallyStuckMsg, dbTx.Error.String)
require.Equal(t, true, dbTx.CallbackCompleted)
})
}

Expand Down
5 changes: 2 additions & 3 deletions core/chains/evm/txmgr/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ func (d *stuckTxDetector) SetPurgeBlockNum(fromAddress common.Address, blockNum
d.purgeBlockNumMap[fromAddress] = blockNum
}

func (d *stuckTxDetector) StuckTxFatalError() *string {
errorMsg := client.TerminallyStuckMsg
return &errorMsg
func (d *stuckTxDetector) StuckTxFatalError() string {
return client.TerminallyStuckMsg
}

0 comments on commit 1a2b7b6

Please sign in to comment.