diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 0b82b7030c8..17d7dc0fe28 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -328,14 +328,35 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete } } +// addTxAttempt adds the given attempt to the transaction which matches its TxID. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxAttempt(txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + tx, ok := as.allTxs[txAttempt.TxID] + if !ok || tx == nil { + return fmt.Errorf("no transaction with ID %d", txAttempt.TxID) + } + + // add the attempt to the transaction + if tx.TxAttempts == nil { + tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + } + tx.TxAttempts = append(tx.TxAttempts, txAttempt) + // add the attempt to the hash lookup map + as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt + + return nil +} + // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil } // peekInProgressTx returns the in-progress transaction without removing it from the in-progress state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - return nil, nil +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + return nil } // addTxToUnstarted adds the given transaction to the unstarted queue. diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 773c91bbea1..7bcac570888 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -190,6 +190,39 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveR oldAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { + if oldAttempt.State != txmgrtypes.TxAttemptInProgress || replacementAttempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("expected attempts to be in_progress") + } + if oldAttempt.ID == 0 { + return fmt.Errorf("expected oldAttempt to have an ID") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + for _, vas := range ms.addressStates { + if vas.hasTx(oldAttempt.TxID) { + as = vas + break + } + } + if as == nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w: %q", ErrTxnNotFound, oldAttempt.TxID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: %w", err) + } + + // Update in memory store + // delete the old attempt + as.deleteTxAttempt(oldAttempt.TxID, oldAttempt.ID) + // add the new attempt + if err := as.addTxAttempt(*replacementAttempt); err != nil { + return fmt.Errorf("save_replacement_in_progress_attempt: failed to add a replacement transaction attempt: %w", err) + } + return nil } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 249e1317c3d..ce3dcb59dfb 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -24,6 +24,112 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_SaveReplacementInProgressAttempt(t *testing.T) { + t.Parallel() + + t.Run("successfully replace tx attempt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + err = inMemoryStore.SaveReplacementInProgressAttempt( + ctx, + oldAttempt, + &newAttempt, + ) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + assert.Equal(t, txmgrtypes.TxAttemptInProgress, actTx.TxAttempts[0].State) + assert.Equal(t, newAttempt.Hash, actTx.TxAttempts[0].Hash) + assert.NotEqual(t, oldAttempt.ID, actTx.TxAttempts[0].ID) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + newAttempt := cltest.NewDynamicFeeEthTxAttempt(t, inTx.ID) + + t.Run("error when old attempt is not in progress", func(t *testing.T) { + oldAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when new attempt is not in progress", func(t *testing.T) { + newAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + newAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when old attempt id is 0", func(t *testing.T) { + originalID := oldAttempt.ID + oldAttempt.ID = 0 + expErr := persistentStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + actErr := inMemoryStore.SaveReplacementInProgressAttempt(ctx, oldAttempt, &newAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.ID = originalID + }) + }) +} + func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) { t.Parallel()