From ef987f42fb73fd44dddb057d02720d9c30b84ec0 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 15:05:29 -0500 Subject: [PATCH 1/5] implement ReapTxHistory --- common/txmgr/inmemory_store.go | 60 ++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..901b6eb09ce 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -259,6 +259,66 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prune } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("reap_tx_history: %w", ErrInvalidChainID) + } + + // Persist to persistent storage + if err := ms.txStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID); err != nil { + return err + } + + // Update in memory store + states := []txmgrtypes.TxState{TxConfirmed} + filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { + return false + } + for _, attempt := range tx.TxAttempts { + if attempt.Receipts == nil || len(attempt.Receipts) == 0 { + continue + } + if attempt.Receipts[0].GetBlockNumber() == nil { + continue + } + if attempt.Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep { + continue + } + if tx.CreatedAt.After(timeThreshold) { + continue + } + return tx.State == TxConfirmed + } + return false + } + + wg := sync.WaitGroup{} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + as.DeleteTxs(as.FindTxs(states, filterFn)...) + wg.Done() + }(as) + } + wg.Wait() + + filterFn = func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + return tx.State == TxFatalError && tx.CreatedAt.Before(timeThreshold) + } + states = []txmgrtypes.TxState{TxFatalError} + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + wg.Add(1) + go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + as.DeleteTxs(as.FindTxs(states, filterFn)...) + wg.Done() + }(as) + } + wg.Wait() + return nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CountTransactionsByState(_ context.Context, state txmgrtypes.TxState, chainID CHAIN_ID) (uint32, error) { From d22d62ca92ab9cf1e7f105d4f0a7a6892277f96c Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 23:13:48 -0400 Subject: [PATCH 2/5] implement tests for ReapTxHistory --- common/txmgr/address_state.go | 85 ++++++++--- common/txmgr/inmemory_store.go | 48 ++---- .../evm/txmgr/evm_inmemory_store_test.go | 144 ++++++++++++++++++ 3 files changed, 225 insertions(+), 52 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 24ccf795eb0..3a960db8f4d 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -247,12 +247,51 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { } +// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold. +// It also removes confirmed transactions that are older than the given block number threshold. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapConfirmedTxs(minBlockNumberToKeep int64, timeThreshold time.Time) { + as.Lock() + defer as.Unlock() + + for _, tx := range as.confirmedTxs { + if len(tx.TxAttempts) == 0 { + continue + } + if tx.CreatedAt.After(timeThreshold) { + continue + } + + for i := 0; i < len(tx.TxAttempts); i++ { + if len(tx.TxAttempts[i].Receipts) == 0 { + continue + } + if tx.TxAttempts[i].Receipts[0].GetBlockNumber() == nil || tx.TxAttempts[i].Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep { + continue + } + as._deleteTx(tx.ID) + } + } +} + +// reapFatalErroredTxs removes fatal errored transactions that are older than the given time threshold. +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapFatalErroredTxs(timeThreshold time.Time) { + as.Lock() + defer as.Unlock() + + for _, tx := range as.fatalErroredTxs { + if tx.CreatedAt.After(timeThreshold) { + continue + } + as._deleteTx(tx.ID) + } +} + // deleteTxs removes the transactions with the given IDs from the address state. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txIDs ...int64) { as.Lock() defer as.Unlock() - as._deleteTxs(txs...) + as._deleteTxs(txIDs...) } // peekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. @@ -458,22 +497,34 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findT return txs } -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - for _, tx := range txs { - if tx.IdempotencyKey != nil { - delete(as.idempotencyKeyToTx, *tx.IdempotencyKey) - } - txID := tx.ID - if as.inprogressTx != nil && as.inprogressTx.ID == txID { - as.inprogressTx = nil - } - delete(as.allTxs, txID) - delete(as.unconfirmedTxs, txID) - delete(as.confirmedMissingReceiptTxs, txID) - delete(as.confirmedTxs, txID) - delete(as.fatalErroredTxs, txID) - as.unstartedTxs.RemoveTxByID(txID) +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTxs(txIDs ...int64) { + for _, txID := range txIDs { + as._deleteTx(txID) + } +} + +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _deleteTx(txID int64) { + tx, ok := as.allTxs[txID] + if !ok { + return + } + + for i := 0; i < len(tx.TxAttempts); i++ { + txAttemptHash := tx.TxAttempts[i].Hash + delete(as.attemptHashToTxAttempt, txAttemptHash) + } + if tx.IdempotencyKey != nil { + delete(as.idempotencyKeyToTx, *tx.IdempotencyKey) + } + if as.inprogressTx != nil && as.inprogressTx.ID == txID { + as.inprogressTx = nil } + as.unstartedTxs.RemoveTxByID(txID) + delete(as.unconfirmedTxs, txID) + delete(as.confirmedMissingReceiptTxs, txID) + delete(as.confirmedTxs, txID) + delete(as.fatalErroredTxs, txID) + delete(as.allTxs, txID) } func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _moveTxToFatalError( diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 15cbaa23755..5d42071de23 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -81,11 +81,21 @@ func NewInMemoryStore[ ms.maxUnstarted = 10000 } + addressesToTxs := map[ADDR][]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + // populate all enabled addresses + enabledAddresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) + if err != nil { + return nil, fmt.Errorf("new_in_memory_store: %w", err) + } + for _, addr := range enabledAddresses { + addressesToTxs[addr] = []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + } + txs, err := persistentTxStore.GetAllTransactions(ctx, chainID) if err != nil { return nil, fmt.Errorf("address_state: initialization: %w", err) } - addressesToTxs := map[ADDR][]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} + for _, tx := range txs { at, exists := addressesToTxs[tx.FromAddress] if !exists { @@ -276,43 +286,11 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapT } // Update in memory store - states := []txmgrtypes.TxState{TxConfirmed} - filterFn := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { - if tx.TxAttempts == nil || len(tx.TxAttempts) == 0 { - return false - } - for _, attempt := range tx.TxAttempts { - if attempt.Receipts == nil || len(attempt.Receipts) == 0 { - continue - } - if attempt.Receipts[0].GetBlockNumber() == nil { - continue - } - if attempt.Receipts[0].GetBlockNumber().Int64() >= minBlockNumberToKeep { - continue - } - if tx.CreatedAt.After(timeThreshold) { - continue - } - return tx.State == TxConfirmed - } - return false - } - - ms.addressStatesLock.RLock() - defer ms.addressStatesLock.RUnlock() - for _, as := range ms.addressStates { - as.deleteTxs(as.findTxs(states, filterFn)...) - } - - filterFn = func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { - return tx.State == TxFatalError && tx.CreatedAt.Before(timeThreshold) - } - states = []txmgrtypes.TxState{TxFatalError} ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock() for _, as := range ms.addressStates { - as.deleteTxs(as.findTxs(states, filterFn)...) + as.reapConfirmedTxs(minBlockNumberToKeep, timeThreshold) + as.reapFatalErroredTxs(timeThreshold) } return nil diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index b0860b5b0ed..43b43b4a41a 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -3,10 +3,12 @@ package txmgr_test import ( "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/logger" commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" @@ -15,12 +17,154 @@ import ( evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_ReapTxHistory(t *testing.T) { + t.Parallel() + + t.Run("reap all confirmed txs", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx_0 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 7, 1, fromAddress) + r_0 := mustInsertEthReceipt(t, persistentStore, 1, utils.NewHash(), inTx_0.TxAttempts[0].Hash) + inTx_0.TxAttempts[0].Receipts = append(inTx_0.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_0)) + inTx_1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 8, 2, fromAddress) + r_1 := mustInsertEthReceipt(t, persistentStore, 2, utils.NewHash(), inTx_1.TxAttempts[0].Hash) + inTx_1.TxAttempts[0].Receipts = append(inTx_1.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_1)) + inTx_2 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 9, 3, fromAddress) + r_2 := mustInsertEthReceipt(t, persistentStore, 3, utils.NewHash(), inTx_2.TxAttempts[0].Hash) + inTx_2.TxAttempts[0].Receipts = append(inTx_2.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_2)) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2)) + + minBlockNumberToKeep := int64(3) + timeThreshold := inTx_2.CreatedAt + expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + require.NoError(t, expErr) + require.NoError(t, actErr) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + // Check that the transactions were reaped in persistent store + expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_0.ID) + expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_1.ID) + // Check that the transactions were reaped in in-memory store + actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID) + require.Equal(t, 0, len(actTxs_0)) + actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID) + require.Equal(t, 0, len(actTxs_1)) + + // Check that the transaction was not reaped + expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID) + require.NoError(t, err) + require.Equal(t, inTx_2.ID, expTx_2.ID) + actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID) + require.Equal(t, 1, len(actTxs_2)) + assertTxEqual(t, expTx_2, actTxs_2[0]) + }) + t.Run("reap all fatal error txs", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store + inTx_0 := cltest.NewEthTx(fromAddress) + inTx_0.Error = null.StringFrom("something exploded") + inTx_0.State = commontxmgr.TxFatalError + inTx_0.CreatedAt = time.Unix(1000, 0) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx_0)) + inTx_1 := cltest.NewEthTx(fromAddress) + inTx_1.Error = null.StringFrom("something exploded") + inTx_1.State = commontxmgr.TxFatalError + inTx_1.CreatedAt = time.Unix(2000, 0) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx_1)) + inTx_2 := cltest.NewEthTx(fromAddress) + inTx_2.Error = null.StringFrom("something exploded") + inTx_2.State = commontxmgr.TxFatalError + inTx_2.CreatedAt = time.Unix(3000, 0) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx_2)) + // Insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_0)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_1)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx_2)) + + minBlockNumberToKeep := int64(3) + timeThreshold := time.Unix(2500, 0) // Only reap txs created before this time + expErr := persistentStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + actErr := inMemoryStore.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, chainID) + require.NoError(t, expErr) + require.NoError(t, actErr) + + fn := func(tx *evmtxmgr.Tx) bool { return true } + // Check that the transactions were reaped in persistent store + expTx_0, err := persistentStore.FindTxWithAttempts(ctx, inTx_0.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_0.ID) + expTx_1, err := persistentStore.FindTxWithAttempts(ctx, inTx_1.ID) + require.Error(t, err) + require.Equal(t, int64(0), expTx_1.ID) + // Check that the transactions were reaped in in-memory store + actTxs_0 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_0.ID) + require.Equal(t, 0, len(actTxs_0)) + actTxs_1 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_1.ID) + require.Equal(t, 0, len(actTxs_1)) + + // Check that the transaction was not reaped + expTx_2, err := persistentStore.FindTxWithAttempts(ctx, inTx_2.ID) + require.NoError(t, err) + require.Equal(t, inTx_2.ID, expTx_2.ID) + actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID) + require.Equal(t, 1, len(actTxs_2)) + assertTxEqual(t, expTx_2, actTxs_2[0]) + }) +} + func TestInMemoryStore_UpdateTxForRebroadcast(t *testing.T) { t.Parallel() From 6a0b5ad1f64331ec1709af02e9cc84675cefb474 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 23:14:16 -0400 Subject: [PATCH 3/5] panic if wrong chain ID --- common/txmgr/inmemory_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 5d42071de23..b592fb48d04 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -277,7 +277,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Prune func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { if ms.chainID.String() != chainID.String() { - return fmt.Errorf("reap_tx_history: %w", ErrInvalidChainID) + panic("invalid chain ID") } // Persist to persistent storage From 26d432a77da1ecd6a9fd4803c3d6b99cd32c6614 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 22 Mar 2024 14:14:57 -0400 Subject: [PATCH 4/5] fix merge conflict --- .../evm/txmgr/evm_inmemory_store_test.go | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 42a777f82c4..734ac5588ac 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -24,19 +24,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) -<<<<<<< HEAD func TestInMemoryStore_ReapTxHistory(t *testing.T) { t.Parallel() t.Run("reap all confirmed txs", func(t *testing.T) { -======= -func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { - t.Parallel() - blockNum := int64(10) - finalityDepth := uint32(2) - - t.Run("successfully mark errored transaction", func(t *testing.T) { ->>>>>>> jtw/step-3-04 db := pgtest.NewSqlxDB(t) _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) persistentStore := cltest.NewTestTxStore(t, db) @@ -58,7 +49,6 @@ func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { require.NoError(t, err) // Insert a transaction into persistent store -<<<<<<< HEAD inTx_0 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, persistentStore, 7, 1, fromAddress) r_0 := mustInsertEthReceipt(t, persistentStore, 1, utils.NewHash(), inTx_0.TxAttempts[0].Hash) inTx_0.TxAttempts[0].Receipts = append(inTx_0.TxAttempts[0].Receipts, evmtxmgr.DbReceiptToEvmReceipt(&r_0)) @@ -172,7 +162,36 @@ func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { actTxs_2 := inMemoryStore.XXXTestFindTxs(nil, fn, inTx_2.ID) require.Equal(t, 1, len(actTxs_2)) assertTxEqual(t, expTx_2, actTxs_2[0]) -======= + }) +} + +func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { + t.Parallel() + blockNum := int64(10) + finalityDepth := uint32(2) + + t.Run("successfully mark errored transaction", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := testutils.Context(t) + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // Insert a transaction into persistent store inTx := mustInsertConfirmedMissingReceiptEthTxWithLegacyAttempt(t, persistentStore, 1, 7, time.Now(), fromAddress) // Insert the transaction into the in-memory store require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) @@ -191,7 +210,6 @@ func TestInMemoryStore_MarkOldTxesMissingReceiptAsErrored(t *testing.T) { assertTxEqual(t, expTx, actTx) assert.Equal(t, txmgrtypes.TxAttemptBroadcast, actTx.TxAttempts[0].State) assert.Equal(t, commontxmgr.TxFatalError, actTx.State) ->>>>>>> jtw/step-3-04 }) } From 6deab0d829bae0e2b1d26e5fa6b21dd0dad6ac27 Mon Sep 17 00:00:00 2001 From: Jim W Date: Mon, 1 Apr 2024 16:25:40 -0400 Subject: [PATCH 5/5] Update common/txmgr/address_state.go Co-authored-by: amit-momin <108959691+amit-momin@users.noreply.github.com> --- common/txmgr/address_state.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index a0809de7cb6..995e344271b 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -247,8 +247,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneUnstartedTxQueue(ids []int64) { } -// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold. -// It also removes confirmed transactions that are older than the given block number threshold. +// reapConfirmedTxs removes confirmed transactions that are older than the given time threshold and have receipts older than the given block number threshold. func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reapConfirmedTxs(minBlockNumberToKeep int64, timeThreshold time.Time) { as.Lock() defer as.Unlock()