diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 17d7dc0fe28..cfa5425d26a 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -302,6 +302,14 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete as._deleteTxs(txIDs...) } +// deleteAllTxAttempts removes all attempts for the given transactions. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteAllTxAttempts(txID int64) { + as.Lock() + defer as.Unlock() + + as._deleteAllTxAttempts(txID) +} + // deleteTxAttempt removes the attempt with a given ID from the transaction with the given ID. // It removes the attempts from the hash lookup map and from the transaction. // If an attempt is not found in the hash lookup map, it is ignored. @@ -558,6 +566,21 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findT return txs } +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteAllTxAttempts( + txID int64, +) { + tx, ok := as.allTxs[txID] + if !ok { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + delete(as.attemptHashToTxAttempt, txAttempt.Hash) + } + tx.TxAttempts = nil +} + func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) { for _, txID := range txIDs { as._deleteTx(txID) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 7bcac570888..7133863f2d3 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -228,6 +228,36 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR // UpdateTxFatalError updates a transaction to fatal_error. func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) UpdateTxFatalError(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if tx.State != TxInProgress && tx.State != TxUnstarted { + return fmt.Errorf("update_tx_fatal_error: can only transition to fatal_error from in_progress, transaction is currently %s", tx.State) + } + if !tx.Error.Valid { + return fmt.Errorf("update_tx_fatal_error: expected error field to be set") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return fmt.Errorf("update_tx_fatal_error: %w", ErrAddressNotFound) + } + if !as.hasTx(tx.ID) { + return fmt.Errorf("update_tx_fatal_error: %w: %q", ErrTxnNotFound, tx.ID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.UpdateTxFatalError(ctx, tx); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + + // Update in memory store + // remove all tx attempts for the transaction + as.deleteAllTxAttempts(tx.ID) + // move the transaction to fatal_error state + if err := as.moveTxToFatalError(tx.ID, tx.Error); err != nil { + return fmt.Errorf("update_tx_fatal_error: %w", err) + } + return nil } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index ce3dcb59dfb..261680b1275 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -24,6 +24,52 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_UpdateTxFatalError(t *testing.T) { + t.Parallel() + + t.Run("successfully update transaction", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 13, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + inTx.Error = null.StringFrom("no more toilet paper") + err = inMemoryStore.UpdateTxFatalError(ctx, &inTx) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + + assertTxEqual(t, expTx, actTx) + assert.Equal(t, commontxmgr.TxFatalError, actTx.State) + }) +} + func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) { t.Parallel()