diff --git a/.changeset/flat-emus-act.md b/.changeset/flat-emus-act.md new file mode 100644 index 00000000000..a4cbae12877 --- /dev/null +++ b/.changeset/flat-emus-act.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Updated TXM Confirmer logic to resume pending task runs with failure if transaction is terminally stuck #internal diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 86475c2e0e7..8ecd6dbf46b 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -756,7 +756,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save // Now we have an errored pipeline even though the tx succeeded. This case // is relatively benign and probably nobody will ever run into it in // practice, but something to be aware of. - if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback { + if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback && !etx.CallbackCompleted { err := eb.resumeCallback(ctx, etx.PipelineTaskRunID.UUID, nil, fmt.Errorf("fatal error while sending transaction: %s", etx.Error.String)) if errors.Is(err, sql.ErrNoRows) { lgr.Debugw("callback missing or already resumed", "etxID", etx.ID) diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index d67bd451228..bbf1d3b27b7 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -2,6 +2,7 @@ package txmgr import ( "context" + "database/sql" "encoding/hex" "errors" "fmt" @@ -514,6 +515,13 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro errMu.Unlock() return } + // Resume pending task runs with failure for stuck transactions + if err := ec.resumeFailedTaskRuns(ctx, tx); err != nil { + errMu.Lock() + errorList = append(errorList, fmt.Errorf("failed to resume pending task run for transaction: %w", err)) + errMu.Unlock() + return + } }(tx) } wg.Wait() @@ -584,7 +592,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet return fmt.Errorf("saveFetchedReceipts failed: %w", err) } // Save the receipts but mark the associated transactions as Fatal Error since the original transaction was purged - if err := ec.txStore.SaveFetchedReceipts(ctx, purgeReceipts, TxFatalError, ec.stuckTxDetector.StuckTxFatalError(), ec.chainID); err != nil { + stuckTxFatalErrMsg := ec.stuckTxDetector.StuckTxFatalError() + if err := ec.txStore.SaveFetchedReceipts(ctx, purgeReceipts, TxFatalError, &stuckTxFatalErrMsg, ec.chainID); err != nil { return fmt.Errorf("saveFetchedReceipts failed: %w", err) } promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(receipts))) @@ -616,6 +625,24 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sep return } +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resumeFailedTaskRuns(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if !etx.PipelineTaskRunID.Valid || ec.resumeCallback == nil || !etx.SignalCallback || etx.CallbackCompleted { + return nil + } + err := ec.resumeCallback(ctx, etx.PipelineTaskRunID.UUID, nil, errors.New(ec.stuckTxDetector.StuckTxFatalError())) + if errors.Is(err, sql.ErrNoRows) { + ec.lggr.Debugw("callback missing or already resumed", "etxID", etx.ID) + } else if err != nil { + return fmt.Errorf("failed to resume pipeline: %w", err) + } else { + // Mark tx as having completed callback + if err = ec.txStore.UpdateTxCallbackCompleted(ctx, etx.PipelineTaskRunID.UUID, ec.chainID); err != nil { + return err + } + } + return nil +} + func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) getMinedSequenceForAddress(ctx context.Context, from ADDR) (SEQ, error) { return ec.client.SequenceAt(ctx, from, nil) } diff --git a/common/txmgr/types/stuck_tx_detector.go b/common/txmgr/types/stuck_tx_detector.go index c4ca94b87f8..dc09eea9807 100644 --- a/common/txmgr/types/stuck_tx_detector.go +++ b/common/txmgr/types/stuck_tx_detector.go @@ -22,5 +22,5 @@ type StuckTxDetector[ // Sets the last purged block num after a transaction has been successfully purged with receipt SetPurgeBlockNum(fromAddress ADDR, blockNum int64) // Returns the error message to set in the transaction error field to mark it as terminally stuck - StuckTxFatalError() *string + StuckTxFatalError() string } diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index a9dec223bf8..dc8e30f5f4b 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -3235,6 +3235,11 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { stuckTxDetector := txmgr.NewStuckTxDetector(lggr, testutils.FixtureChainID, "", assets.NewWei(assets.NewEth(100).ToInt()), evmcfg.EVM().Transactions().AutoPurge(), feeEstimator, txStore, ethClient) ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) ec := txmgr.NewEvmConfirmer(txStore, txmgr.NewEvmTxmClient(ethClient, nil), txmgr.NewEvmTxmConfig(evmcfg.EVM()), txmgr.NewEvmTxmFeeConfig(ge), evmcfg.EVM().Transactions(), cfg.Database(), ethKeyStore, txBuilder, lggr, stuckTxDetector, ht) + fn := func(ctx context.Context, id uuid.UUID, result interface{}, err error) error { + require.ErrorContains(t, err, client.TerminallyStuckMsg) + return nil + } + ec.SetResumeCallback(fn) servicetest.Run(t, ec) ctx := tests.Context(t) @@ -3246,7 +3251,8 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { // Create autoPurgeMinAttempts number of attempts to ensure the broadcast attempt count check is not being triggered // Create attempts broadcasted autoPurgeThreshold block ago to ensure broadcast block num check is not being triggered tx := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, nonce, fromAddress, autoPurgeMinAttempts, blockNum-int64(autoPurgeThreshold), marketGasPrice.Add(oneGwei)) - + // Update tx to signal callback once it is identified as terminally stuck + pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, uuid.New(), tx.ID) head := evmtypes.Head{ Hash: testutils.NewHash(), Number: blockNum, @@ -3307,6 +3313,7 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { require.NotNil(t, dbTx) require.Equal(t, txmgrcommon.TxFatalError, dbTx.State) require.Equal(t, client.TerminallyStuckMsg, dbTx.Error.String) + require.Equal(t, true, dbTx.CallbackCompleted) }) } diff --git a/core/chains/evm/txmgr/stuck_tx_detector.go b/core/chains/evm/txmgr/stuck_tx_detector.go index 362bb6c0a5b..26d7643c15a 100644 --- a/core/chains/evm/txmgr/stuck_tx_detector.go +++ b/core/chains/evm/txmgr/stuck_tx_detector.go @@ -409,7 +409,6 @@ func (d *stuckTxDetector) SetPurgeBlockNum(fromAddress common.Address, blockNum d.purgeBlockNumMap[fromAddress] = blockNum } -func (d *stuckTxDetector) StuckTxFatalError() *string { - errorMsg := client.TerminallyStuckMsg - return &errorMsg +func (d *stuckTxDetector) StuckTxFatalError() string { + return client.TerminallyStuckMsg }