From 82521c3d3710fb5f7421f40ce2ec4b30b994494d Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 13 Mar 2024 18:09:43 -0400 Subject: [PATCH] implement tests for DeleteInProgressAttempt --- common/txmgr/address_state.go | 25 +++++ common/txmgr/inmemory_store.go | 34 +++--- .../evm/txmgr/evm_inmemory_store_test.go | 103 ++++++++++++++++++ 3 files changed, 145 insertions(+), 17 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 791b16b2d42..e8a2f7c100d 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -244,6 +244,31 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) delete as._deleteTxs(txs...) } +// deleteTxAttempts removes the attempts with the given IDs from the address state. +// It removes the attempts from the hash lookup map and from the transaction. +// If an attempt is not found in the hash lookup map, it is ignored. +// If a transaction is not found in the allTxs map, it is ignored. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxAttempts(txAttempts ...txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + as.Lock() + defer as.Unlock() + + for _, txAttempt := range txAttempts { + // remove the attempt from the hash lookup map + delete(as.attemptHashToTxAttempt, txAttempt.Hash) + // remove the attempt from the transaction + if tx := as.allTxs[txAttempt.TxID]; tx != nil { + var removeIndex int + for i := 0; i < len(tx.TxAttempts); i++ { + if tx.TxAttempts[i].ID == txAttempt.ID { + removeIndex = i + break + } + } + tx.TxAttempts = append(tx.TxAttempts[:removeIndex], tx.TxAttempts[removeIndex+1:]...) + } + } +} + // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { return nil, nil diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 33d7185cf0f..97c0c753444 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -267,7 +267,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Count func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteInProgressAttempt(ctx context.Context, attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { if attempt.State != txmgrtypes.TxAttemptInProgress { - return fmt.Errorf("delete_in_progress_attempt: expected attempt to be in_progress") + return fmt.Errorf("DeleteInProgressAttempt: expected attempt state to be in_progress") } if attempt.ID == 0 { return fmt.Errorf("delete_in_progress_attempt: expected attempt to have an ID") @@ -276,9 +276,21 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delet // Check if fromaddress enabled ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock() - as, ok := ms.addressStates[attempt.Tx.FromAddress] - if !ok { - return fmt.Errorf("delete_in_progress_attempt: %w", ErrAddressNotFound) + filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return true + } + var as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + var tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, vas := range ms.addressStates { + txs := vas.findTxs(nil, filter, attempt.TxID) + if len(txs) == 1 { + tx = &txs[0] + as = vas + break + } + } + if tx == nil { + return fmt.Errorf("delete_in_progress_attempt: %w", ErrTxnNotFound) } // Persist to persistent storage @@ -287,19 +299,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delet } // Update in memory store - filter := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { - if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { - return false - } - - for _, a := range tx.TxAttempts { - if a.ID == attempt.ID { - return true - } - } - return false - } - as.deleteTxs(as.findTxs(nil, filter)...) + as.deleteTxAttempts(attempt) return nil } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..6513e7d3779 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,117 @@ package txmgr_test import ( + "context" + "math/big" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_DeleteInProgressAttempt(t *testing.T) { + t.Parallel() + + t.Run("successfully replace tx attempt", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 1, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + err = inMemoryStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt) + require.NoError(t, err) + + expTx, err := persistentStore.FindTxWithAttempts(inTx.ID) + require.NoError(t, err) + assert.Equal(t, 0, len(expTx.TxAttempts)) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + actTxs := inMemoryStore.XXXTestFindTxs(nil, fn, inTx.ID) + require.Equal(t, 1, len(actTxs)) + actTx := actTxs[0] + assertTxEqual(t, expTx, actTx) + }) + + t.Run("error parity for in-memory vs persistent store", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 124, fromAddress) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + oldAttempt := inTx.TxAttempts[0] + t.Run("error when attempt is not in progress", func(t *testing.T) { + oldAttempt.State = txmgrtypes.TxAttemptBroadcast + expErr := persistentStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt) + actErr := inMemoryStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.State = txmgrtypes.TxAttemptInProgress + }) + + t.Run("error when attempt has 0 id", func(t *testing.T) { + originalID := oldAttempt.ID + oldAttempt.ID = 0 + expErr := persistentStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt) + actErr := inMemoryStore.DeleteInProgressAttempt(testutils.Context(t), oldAttempt) + assert.Equal(t, expErr, actErr) + oldAttempt.ID = originalID + }) + }) +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID)