diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 995e344271b..0b82b7030c8 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -198,6 +198,15 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } +// hasTx returns true if the transaction with the given ID is in the address state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) hasTx(txID int64) bool { + as.RLock() + defer as.RUnlock() + + _, ok := as.allTxs[txID] + return ok +} + // findTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. @@ -293,6 +302,32 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete as._deleteTxs(txIDs...) } +// deleteTxAttempt removes the attempt with a given ID from the transaction with the given ID. +// It removes the attempts from the hash lookup map and from the transaction. +// If an attempt is not found in the hash lookup map, it is ignored. +// If a transaction is not found in the allTxs map, it is ignored. +// No error is returned if the transaction or attempt is not found. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempt(txID, txAttemptID int64) { + as.Lock() + defer as.Unlock() + + tx, ok := as.allTxs[txID] + if !ok || tx == nil { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + txAttempt := tx.TxAttempts[i] + if txAttempt.ID == txAttemptID { + // remove the attempt from the hash lookup map + delete(as.attemptHashToTxAttempt, txAttempt.Hash) + // remove the attempt from the transaction + tx.TxAttempts = append(tx.TxAttempts[:i], tx.TxAttempts[i+1:]...) + break + } + } +} + // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 87c9747f7fe..773c91bbea1 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -300,6 +300,35 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count } func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("DeleteInProgressAttempt: expected attempt state to be in_progress") + } + if attempt.ID == 0 { + return fmt.Errorf("DeleteInProgressAttempt: expected attempt to have an id") + } + + // Check if fromaddress enabled + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + for _, vas := range ms.addressStates { + if vas.hasTx(attempt.TxID) { + as = vas + break + } + } + if as == nil { + return fmt.Errorf("delete_in_progress_attempt: %w: %q", ErrTxnNotFound, attempt.TxID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.DeleteInProgressAttempt(ctx, attempt); err != nil { + return fmt.Errorf("delete_in_progress_attempt: %w", err) + } + + // Update in memory store + as.deleteTxAttempt(attempt.TxID, attempt.ID) + return nil } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 734ac5588ac..249e1317c3d 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -24,6 +24,96 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) { + t.Parallel() + + t.Run("successfully replace tx attempt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 1, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + err = inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + assert.Equal(t, 0, len(expTx.TxAttempts)) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + t.Run("error when attempt is not in progress", func(t *testing.T) { + oldAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.DeleteInProgressAttempt(ctx, oldAttempt) + actErr := inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when attempt has 0 id", func(t *testing.T) { + originalID := oldAttempt.ID + oldAttempt.ID = 0 + expErr := persistentStore.DeleteInProgressAttempt(ctx, oldAttempt) + actErr := inMemoryStore.DeleteInProgressAttempt(ctx, oldAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.ID = originalID + }) + }) +} + func TestInMemoryStore_ReapTxHistory(t *testing.T) { t.Parallel()