diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 6739610f81a..ca644f844bc 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -163,6 +163,15 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx return nil } +// hasTx returns true if the transaction with the given ID is in the address state. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) hasTx(txID int64) bool { + as.RLock() + defer as.RUnlock() + + _, ok := as.allTxs[txID] + return ok +} + // findTxs returns all transactions that match the given filters. // If txIDs are provided, only the transactions with those IDs are considered. // If no txIDs are provided, all transactions are considered. @@ -239,7 +248,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxT // It returns an error if there is already a transaction in progress. // It returns an error if there is no unstarted transaction to move to in_progress. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress( - txID int64, seq SEQ, broadcastAt time.Time, initialBroadcastAt time.Time, + txID int64, seq *SEQ, broadcastAt *time.Time, initialBroadcastAt *time.Time, txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { as.Lock() @@ -255,9 +264,9 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUn } tx.State = TxInProgress tx.TxAttempts = []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{txAttempt} - tx.Sequence = &seq - tx.BroadcastAt = &broadcastAt - tx.InitialBroadcastAt = &initialBroadcastAt + tx.Sequence = seq + tx.BroadcastAt = broadcastAt + tx.InitialBroadcastAt = initialBroadcastAt as.attemptHashToTxAttempt[txAttempt.Hash] = &txAttempt as.inprogressTx = tx diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 7a1cc121b08..b54df545002 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -150,6 +150,36 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], ) error { + if tx.Sequence == nil { + return fmt.Errorf("in_progress transaction must have nonce") + } + if tx.State != TxUnstarted { + return fmt.Errorf("update_tx_unstarted_to_in_progress: can only transition to in_progress from unstarted, transaction is currently %s", tx.State) + } + if attempt.State != txmgrtypes.TxAttemptInProgress { + return fmt.Errorf("attempt state must be in_progress") + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[tx.FromAddress] + if !ok { + return nil + } + if !as.hasTx(tx.ID) { + return fmt.Errorf("update_tx_unstarted_to_in_progress: %w: %q", ErrTxnNotFound, tx.ID) + } + + // Persist to persistent storage + if err := ms.persistentTxStore.UpdateTxUnstartedToInProgress(ctx, tx, attempt); err != nil { + return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err) + } + + // Update in address state in memory + if err := as.moveUnstartedToInProgress(tx.ID, tx.Sequence, tx.BroadcastAt, tx.InitialBroadcastAt, *attempt); err != nil { + return fmt.Errorf("update_tx_unstarted_to_in_progress: %w", err) + } + return nil } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index eac8e176f22..0e12bff9108 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -24,6 +24,133 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_UpdateTxUnstartedToInProgress(t *testing.T) { + t.Parallel() + + t.Run("successfully updates unstarted tx to inprogress", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + nonce := evmtypes.Nonce(123) + // Insert a transaction into persistent store + inTx := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID) + inTx.Sequence = &nonce + inTxAttempt := cltest.NewLegacyEthTxAttempt(t, inTx.ID) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + // Update the transaction to in-progress + require.NoError(t, inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx, &inTxAttempt)) + + expTx, err := persistentStore.FindTxWithAttempts(ctx, inTx.ID) + require.NoError(t, err) + assert.Equal(t, commontxmgr.TxInProgress, expTx.State) + assert.Equal(t, 1, len(expTx.TxAttempts)) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + assert.Equal(t, commontxmgr.TxInProgress, actTx.State) + }) + + t.Run("wrong input error scenarios", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + nonce1 := evmtypes.Nonce(1) + nonce2 := evmtypes.Nonce(2) + // Insert a transaction into persistent store + inTx1 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID) + inTx2 := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID) + inTx1.Sequence = &nonce1 + inTx2.Sequence = &nonce2 + inTxAttempt1 := cltest.NewLegacyEthTxAttempt(t, inTx1.ID) + inTxAttempt2 := cltest.NewLegacyEthTxAttempt(t, inTx2.ID) + // Insert the transaction into the in-memory store + //inTx2 := cltest.NewEthTx(fromAddress) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx1)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx2)) + + // sequence nil + inTx1.Sequence = nil + inTx2.Sequence = nil + expErr := persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1) + actErr := inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2) + assert.Equal(t, expErr, actErr) + assert.Error(t, actErr) + assert.Error(t, expErr) + inTx1.Sequence = &nonce1 // reset + inTx2.Sequence = &nonce2 // reset + + // tx not in unstarted state + inTx1.State = commontxmgr.TxInProgress + inTx2.State = commontxmgr.TxInProgress + expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1) + actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2) + assert.Error(t, actErr) + assert.Error(t, expErr) + inTx1.State = commontxmgr.TxUnstarted // reset + inTx2.State = commontxmgr.TxUnstarted // reset + + // tx attempt not in in-progress state + inTxAttempt1.State = txmgrtypes.TxAttemptBroadcast + inTxAttempt2.State = txmgrtypes.TxAttemptBroadcast + expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1) + actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2) + assert.Equal(t, expErr, actErr) + assert.Error(t, actErr) + assert.Error(t, expErr) + inTxAttempt1.State = txmgrtypes.TxAttemptInProgress // reset + inTxAttempt2.State = txmgrtypes.TxAttemptInProgress // reset + + // wrong from address + inTx1.FromAddress = cltest.NewEIP55Address().Address() + inTx2.FromAddress = cltest.NewEIP55Address().Address() + expErr = persistentStore.UpdateTxUnstartedToInProgress(ctx, &inTx1, &inTxAttempt1) + actErr = inMemoryStore.UpdateTxUnstartedToInProgress(ctx, &inTx2, &inTxAttempt2) + assert.NoError(t, actErr) + assert.NoError(t, expErr) + inTx1.FromAddress = fromAddress // reset + inTx2.FromAddress = fromAddress // reset + }) +} + func TestInMemoryStore_SaveFetchedReceipts(t *testing.T) { t.Parallel()