From dced7b74dd82f9bf0ffd43506c392ebb244c2d16 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:51:43 -0500 Subject: [PATCH 01/24] implement PruneUnstartedTxQueue --- common/txmgr/address_state.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 2bbb777c634..a81e9094c0d 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -166,6 +166,11 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchT // PruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) { + as.Lock() + defer as.Unlock() + + txs := as.unstartedTxs.PruneByTxIDs(ids) + as.deleteTxs(txs...) } // DeleteTxs removes the transactions with the given IDs from the address state. @@ -252,3 +257,21 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { return nil } + +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + for _, tx := range txs { + if tx.IdempotencyKey != nil { + delete(as.idempotencyKeyToTx, *tx.IdempotencyKey) + } + txID := tx.ID + if as.inprogressTx != nil && as.inprogressTx.ID == txID { + as.inprogressTx = nil + } + delete(as.allTxs, txID) + delete(as.unconfirmedTxs, txID) + delete(as.confirmedMissingReceiptTxs, txID) + delete(as.confirmedTxs, txID) + delete(as.fatalErroredTxs, txID) + as.unstartedTxs.RemoveTxByID(txID) + } +} From d537a89e9196c9feda17a0f7513a6cc61ed08972 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:52:23 -0500 Subject: [PATCH 02/24] implement DeleteTxs --- common/txmgr/address_state.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index a81e9094c0d..9f638d6c45b 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -175,6 +175,10 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneU // DeleteTxs removes the transactions with the given IDs from the address state. func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { + as.Lock() + defer as.Unlock() + + as.deleteTxs(txs...) } // PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. From 74d6b1e7938a3406754a2950f6a1d285e67a73df Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:56:36 -0500 Subject: [PATCH 03/24] implement PeekNextUnstartedTx --- common/txmgr/address_state.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 9f638d6c45b..ef1ec478293 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -182,8 +182,12 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Delete } // PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - return nil, nil +// If there are no unstarted transactions, nil is returned. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + return as.unstartedTxs.PeekNextTx() } // PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state. From d187dad0fce53fb00f35b237ac60dab3fdb6fd11 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 13:57:58 -0500 Subject: [PATCH 04/24] implement PeekInProgressTx --- common/txmgr/address_state.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index ef1ec478293..a6c2ef8df71 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -191,8 +191,12 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNe } // PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - return nil, nil +// If there is no in-progress transaction, nil is returned. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + as.RLock() + defer as.RUnlock() + + return as.inprogressTx } // AddTxToUnstarted adds the given transaction to the unstarted queue. From fd32104ff3dd7ccfa8c9025aa7a073c75050f69c Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 23 Feb 2024 15:29:32 -0500 Subject: [PATCH 05/24] implement AddTxToUnstartedQueue --- common/txmgr/address_state.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index a6c2ef8df71..b2821b013c3 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -1,6 +1,7 @@ package txmgr import ( + "fmt" "sync" "time" @@ -199,8 +200,27 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekIn return as.inprogressTx } -// AddTxToUnstarted adds the given transaction to the unstarted queue. -func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +// AddTxToUnstartedQueue adds the given transaction to the unstarted queue. +// If the queue is full, an error is returned. +// If the transaction was successfully added, nil is returned. +// If the transaction's idempotency key already exists, an error is returned. +func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstartedQueue(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { + as.Lock() + defer as.Unlock() + + if tx.IdempotencyKey != nil && as.idempotencyKeyToTx[*tx.IdempotencyKey] != nil { + return fmt.Errorf("add_tx_to_unstarted_queue: address %s idempotency key %s already exists", as.fromAddress, *tx.IdempotencyKey) + } + if as.unstartedTxs.Len() >= as.unstartedTxs.Cap() { + return fmt.Errorf("add_tx_to_unstarted_queue: address %s unstarted queue capacity has been reached", as.fromAddress) + } + + as.unstartedTxs.AddTx(tx) + as.allTxs[tx.ID] = tx + if tx.IdempotencyKey != nil { + as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx + } + return nil } From eb4d436f042e4bdcff413f5a62c6bdcb28293c27 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 15:48:07 -0500 Subject: [PATCH 06/24] add CreateTransaction initial logic --- common/txmgr/inmemory_store.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index c57c295cb82..34d316adefd 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -102,7 +102,30 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error, ) { - return txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, nil + if ms.chainID.String() != chainID.String() { + return tx, fmt.Errorf("create_transaction: %w", ErrInvalidChainID) + } + + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[txRequest.FromAddress] + if !ok { + return tx, fmt.Errorf("create_transaction: %w", ErrAddressNotFound) + } + + // Persist Transaction to persistent storage + tx, err = ms.txStore.CreateTransaction(ctx, txRequest, chainID) + if err != nil { + return tx, fmt.Errorf("create_transaction: %w", err) + } + + // Update in memory store + // Add the request to the Unstarted channel to be processed by the Broadcaster + if err := as.AddTxToUnstarted(&tx); err != nil { + return *ms.deepCopyTx(tx), fmt.Errorf("create_transaction: %w", err) + } + + return *ms.deepCopyTx(tx), nil } // FindTxWithIdempotencyKey returns a transaction with the given idempotency key From 154d51473f706262580962fc1fa9b0be1f4e0d8d Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 16:34:09 -0500 Subject: [PATCH 07/24] cleanup --- common/txmgr/inmemory_store.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 2dd69894911..2c4cf5bb19c 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -103,6 +103,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error, ) { + tx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} if ms.chainID.String() != chainID.String() { return tx, fmt.Errorf("create_transaction: %w", ErrInvalidChainID) } @@ -115,14 +116,14 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat } // Persist Transaction to persistent storage - tx, err = ms.txStore.CreateTransaction(ctx, txRequest, chainID) + tx, err := ms.txStore.CreateTransaction(ctx, txRequest, chainID) if err != nil { return tx, fmt.Errorf("create_transaction: %w", err) } // Update in memory store // Add the request to the Unstarted channel to be processed by the Broadcaster - if err := as.AddTxToUnstarted(&tx); err != nil { + if err := as.AddTxToUnstartedQueue(&tx); err != nil { return *ms.deepCopyTx(tx), fmt.Errorf("create_transaction: %w", err) } From 362dbe1b1434951c42ead5fd52ce65ae5105c505 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 26 Feb 2024 16:53:44 -0500 Subject: [PATCH 08/24] add tests for create transaction --- .../evm/txmgr/evm_inmemory_store_test.go | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 core/chains/evm/txmgr/evm_inmemory_store_test.go diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go new file mode 100644 index 00000000000..9d51447f979 --- /dev/null +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -0,0 +1,89 @@ +package txmgr_test + +import ( + "context" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + evmassets "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" + evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" +) + +func TestInMemoryStore_CreateTransaction(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, nil) + persistentStore := cltest.NewTestTxStore(t, db, cfg.Database()) + kst := cltest.NewKeyStore(t, db, cfg.Database()) + + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + toAddress := testutils.NewAddress() + gasLimit := uint32(1000) + payload := []byte{1, 2, 3} + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore) + require.NoError(t, err) + + t.Run("with queue under capacity inserts eth_tx", func(t *testing.T) { + subject := uuid.New() + strategy := newMockTxStrategy(t) + strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) + actTx, err := inMemoryStore.CreateTransaction(testutils.Context(t), evmtxmgr.TxRequest{ + FromAddress: fromAddress, + ToAddress: toAddress, + EncodedPayload: payload, + FeeLimit: gasLimit, + Meta: nil, + Strategy: strategy, + }, chainID) + require.NoError(t, err) + + assert.Greater(t, actTx.ID, int64(0)) + assert.Equal(t, commontxmgr.TxUnstarted, actTx.State) + assert.Equal(t, gasLimit, actTx.FeeLimit) + assert.Equal(t, fromAddress, actTx.FromAddress) + assert.Equal(t, toAddress, actTx.ToAddress) + assert.Equal(t, payload, actTx.EncodedPayload) + assert.Equal(t, big.Int(evmassets.NewEthValue(0)), actTx.Value) + assert.Equal(t, subject, actTx.Subject.UUID) + + cltest.AssertCount(t, db, "evm.txes", 1) + + var dbEthTx evmtxmgr.DbEthTx + require.NoError(t, db.Get(&dbEthTx, `SELECT * FROM evm.txes ORDER BY id ASC LIMIT 1`)) + + assert.Equal(t, commontxmgr.TxUnstarted, dbEthTx.State) + assert.Equal(t, gasLimit, dbEthTx.GasLimit) + assert.Equal(t, fromAddress, dbEthTx.FromAddress) + assert.Equal(t, toAddress, dbEthTx.ToAddress) + assert.Equal(t, payload, dbEthTx.EncodedPayload) + assert.Equal(t, evmassets.NewEthValue(0), dbEthTx.Value) + assert.Equal(t, subject, dbEthTx.Subject.UUID) + }) +} From e8a2b419c2d55b054c0ac804e8ffd9a7787825c7 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 14:39:05 -0500 Subject: [PATCH 09/24] implement PruneUnstartedTxQueue --- common/txmgr/inmemory_store.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..9a4c11a5447 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -255,7 +255,20 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { - return nil, nil + // Persist to persistent storage + ids, err := ms.txStore.PruneUnstartedTxQueue(ctx, queueSize, subject) + if err != nil { + return ids, err + } + + // Update in memory store + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + for _, as := range ms.addressStates { + as.PruneUnstartedTxQueue(ids) + } + + return ids, nil } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID CHAIN_ID) error { From ddf02a3c1d0cbb1b31889b471867bd581dd01675 Mon Sep 17 00:00:00 2001 From: James Walker Date: Tue, 27 Feb 2024 15:56:20 -0500 Subject: [PATCH 10/24] add panic if incorrect ChainID --- common/txmgr/inmemory_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 2c4cf5bb19c..8ddf3a23f0d 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -105,7 +105,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat ) { tx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} if ms.chainID.String() != chainID.String() { - return tx, fmt.Errorf("create_transaction: %w", ErrInvalidChainID) + panic(fmt.Sprintf("create_transaction: invalid chain ID: %s", chainID)) } ms.addressStatesLock.RLock() From 13aa9b9f7723fd050b404fe47ec0c76d099d2176 Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 28 Feb 2024 22:39:30 -0500 Subject: [PATCH 11/24] implement methods which are read only --- common/txmgr/inmemory_store.go | 35 +++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index cd783a25210..c05df50ab4f 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -148,7 +148,24 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat // GetTxInProgress returns the in_progress transaction for a given address. func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTxInProgress(ctx context.Context, fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - return nil, nil + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return nil, fmt.Errorf("get_tx_in_progress: %w", ErrAddressNotFound) + } + + tx := as.PeekInProgressTx() + if tx == nil { + return nil, fmt.Errorf("get_tx_in_progress: %w", ErrTxnNotFound) + } + + if len(tx.TxAttempts) != 1 || tx.TxAttempts[0].State != txmgrtypes.TxAttemptInProgress { + return nil, fmt.Errorf("get_tx_in_progress: invariant violation: expected in_progress transaction %v to have exactly one unsent attempt. "+ + "Your database is in an inconsistent state and this node will not function correctly until the problem is resolved", tx.ID) + } + + return ms.deepCopyTx(*tx), nil } // UpdateTxAttemptInProgressToBroadcast updates a transaction attempt from in_progress to broadcast. @@ -164,6 +181,22 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat // FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address. func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { + if ms.chainID.String() != chainID.String() { + return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrInvalidChainID) + } + ms.addressStatesLock.RLock() + defer ms.addressStatesLock.RUnlock() + as, ok := ms.addressStates[fromAddress] + if !ok { + return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrAddressNotFound) + } + + etx := as.PeekNextUnstartedTx() + if etx == nil { + return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrTxnNotFound) + } + tx = ms.deepCopyTx(*etx) + return nil } From 25775387406b3cbae0e16526edb7cfd9d6e2eee7 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 10:25:26 -0400 Subject: [PATCH 12/24] clean up test for CreateTransaction --- common/txmgr/inmemory_store.go | 10 ++++--- .../evm/txmgr/evm_inmemory_store_test.go | 29 +++++++------------ 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 8d6a73777ee..f41b97954ad 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/big" + "sync" "time" "github.com/google/uuid" @@ -46,7 +47,8 @@ type inMemoryStore[ keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + addressStatesLock sync.RWMutex + addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] } // NewInMemoryStore returns a new inMemoryStore @@ -107,18 +109,18 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat defer ms.addressStatesLock.RUnlock() as, ok := ms.addressStates[txRequest.FromAddress] if !ok { - return tx, fmt.Errorf("create_transaction: %w", ErrAddressNotFound) + return tx, fmt.Errorf("create_transaction: %w: %q", ErrAddressNotFound, txRequest.FromAddress) } // Persist Transaction to persistent storage - tx, err := ms.txStore.CreateTransaction(ctx, txRequest, chainID) + tx, err := ms.persistentTxStore.CreateTransaction(ctx, txRequest, chainID) if err != nil { return tx, fmt.Errorf("create_transaction: %w", err) } // Update in memory store // Add the request to the Unstarted channel to be processed by the Broadcaster - if err := as.AddTxToUnstartedQueue(&tx); err != nil { + if err := as.addTxToUnstartedQueue(&tx); err != nil { return *ms.deepCopyTx(tx), fmt.Errorf("create_transaction: %w", err) } diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 6ae06e99414..4f53a51bf09 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -18,7 +18,6 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) @@ -27,14 +26,10 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - persistentStore := cltest.NewTestTxStore(t, db, cfg.Database()) - kst := cltest.NewKeyStore(t, db, cfg.Database()) - + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) - toAddress := testutils.NewAddress() - gasLimit := uint32(1000) - payload := []byte{1, 2, 3} ethClient := evmtest.NewEthClientMockWithDefaultChain(t) lggr := logger.TestSugared(t) @@ -47,9 +42,13 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { *evmtypes.Receipt, evmtypes.Nonce, evmgas.EvmFee, - ](ctx, lggr, chainID, kst.Eth(), persistentStore) + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) require.NoError(t, err) + toAddress := testutils.NewAddress() + gasLimit := uint32(1000) + payload := []byte{1, 2, 3} + t.Run("with queue under capacity inserts eth_tx", func(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) @@ -64,15 +63,7 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { }, chainID) require.NoError(t, err) - assert.Greater(t, actTx.ID, int64(0)) - assert.Equal(t, commontxmgr.TxUnstarted, actTx.State) - assert.Equal(t, gasLimit, actTx.FeeLimit) - assert.Equal(t, fromAddress, actTx.FromAddress) - assert.Equal(t, toAddress, actTx.ToAddress) - assert.Equal(t, payload, actTx.EncodedPayload) - assert.Equal(t, big.Int(evmassets.NewEthValue(0)), actTx.Value) - assert.Equal(t, subject, actTx.Subject.UUID) - + // check that the transaction was inserted into the persistent store cltest.AssertCount(t, db, "evm.txes", 1) var dbEthTx evmtxmgr.DbEthTx @@ -88,6 +79,8 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { var expTx evmtxmgr.Tx dbEthTx.ToTx(&expTx) + + // check that the in-memory store has the same transaction data as the persistent store assertTxEqual(t, expTx, actTx) }) } From 4e642a318d380ce6e6a63e4899d734ec73e1c74b Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 11:30:11 -0400 Subject: [PATCH 13/24] add tests for GetTxInProgress --- common/txmgr/inmemory_store.go | 4 +- .../evm/txmgr/evm_inmemory_store_test.go | 85 ++++++++++++++++++- 2 files changed, 86 insertions(+), 3 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index e5cfb2874d4..9bf29cc498e 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -149,12 +149,12 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetTx defer ms.addressStatesLock.RUnlock() as, ok := ms.addressStates[fromAddress] if !ok { - return nil, fmt.Errorf("get_tx_in_progress: %w", ErrAddressNotFound) + return nil, nil } tx := as.peekInProgressTx() if tx == nil { - return nil, fmt.Errorf("get_tx_in_progress: %w", ErrTxnNotFound) + return nil, nil } if len(tx.TxAttempts) != 1 || tx.TxAttempts[0].State != txmgrtypes.TxAttemptInProgress { diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..34840de4199 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,98 @@ package txmgr_test import ( + "context" + "math/big" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_GetTxInProgress(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + _, otherAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // insert the transaction into the persistent store + inTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 123, fromAddress) + require.NotNil(t, inTx) + // insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + // insert non in-progress transaction for another address + otherTx := cltest.NewEthTx(otherAddress) + require.NoError(t, persistentStore.InsertTx(&otherTx)) + require.NoError(t, inMemoryStore.XXXTestInsertTx(otherAddress, &otherTx)) + + tcs := []struct { + name string + fromAddress common.Address + + hasErr bool + hasTx bool + }{ + {"finds the correct inprogress transaction", fromAddress, false, true}, + {"wrong fromAddress", common.Address{}, false, false}, + {"no inprogress transaction", otherAddress, false, false}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctx := testutils.Context(t) + actTx, actErr := inMemoryStore.GetTxInProgress(ctx, tc.fromAddress) + expTx, expErr := persistentStore.GetTxInProgress(ctx, tc.fromAddress) + if tc.hasErr { + require.NotNil(t, actErr) + require.NotNil(t, expErr) + require.Equal(t, expErr, actErr) + } else { + require.Nil(t, actErr) + require.Nil(t, expErr) + } + if tc.hasTx { + require.NotNil(t, actTx) + require.NotNil(t, expTx) + assertTxEqual(t, *expTx, *actTx) + } else { + require.Nil(t, actTx) + require.Nil(t, expTx) + } + }) + } +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) @@ -42,7 +126,6 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { func assertTxAttemptEqual(t *testing.T, exp, act evmtxmgr.TxAttempt) { assert.Equal(t, exp.ID, act.ID) assert.Equal(t, exp.TxID, act.TxID) - assert.Equal(t, exp.Tx, act.Tx) assert.Equal(t, exp.TxFee, act.TxFee) assert.Equal(t, exp.ChainSpecificFeeLimit, act.ChainSpecificFeeLimit) assert.Equal(t, exp.SignedRawTx, act.SignedRawTx) From 523354d313199c792843b004b24ca11440509834 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 15:10:53 -0400 Subject: [PATCH 14/24] add tests for FindNextUnstartedTransactionFromAddress --- common/txmgr/inmemory_store.go | 15 ++-- .../evm/txmgr/evm_inmemory_store_test.go | 71 +++++++++++++++++++ 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 284ca7b229d..9f6ed6e01e7 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -180,24 +180,27 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // FindNextUnstartedTransactionFromAddress returns the next unstarted transaction for a given address. -func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID) error { +func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindNextUnstartedTransactionFromAddress(_ context.Context, fromAddress ADDR, chainID CHAIN_ID) ( + *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + error, +) { if ms.chainID.String() != chainID.String() { - return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrInvalidChainID) + return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w: %q", ErrInvalidChainID, chainID) } ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock() as, ok := ms.addressStates[fromAddress] if !ok { - return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrAddressNotFound) + return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w: %q", ErrAddressNotFound, fromAddress) } etx := as.peekNextUnstartedTx() if etx == nil { - return fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrTxnNotFound) + return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w", ErrTxnNotFound) } - tx = ms.deepCopyTx(*etx) + tx := ms.deepCopyTx(*etx) - return nil + return tx, nil } // SaveReplacementInProgressAttempt saves a replacement attempt for a transaction that is in_progress. diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 34840de4199..06caa0549fa 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -93,6 +93,77 @@ func TestInMemoryStore_GetTxInProgress(t *testing.T) { } } +func TestInMemoryStore_FindNextUnstartedTransactionFromAddress(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + _, otherAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + // insert the transaction into the persistent store + inTx := mustCreateUnstartedGeneratedTx(t, persistentStore, fromAddress, chainID) + // insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + + // insert non in-progress transaction for another address + otherTx := mustInsertInProgressEthTxWithAttempt(t, persistentStore, 13, otherAddress) + require.NoError(t, inMemoryStore.XXXTestInsertTx(otherAddress, &otherTx)) + + tcs := []struct { + name string + fromAddress common.Address + chainID *big.Int + + hasErr bool + hasTx bool + }{ + {"finds the correct inprogress transaction", fromAddress, chainID, false, true}, + {"no unstarted transaction", otherAddress, chainID, true, false}, + {"wrong chainID", fromAddress, big.NewInt(123), true, false}, + {"unknown address", common.Address{}, chainID, true, false}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + ctx := testutils.Context(t) + actTx, actErr := inMemoryStore.FindNextUnstartedTransactionFromAddress(ctx, tc.fromAddress, tc.chainID) + expTx, expErr := persistentStore.FindNextUnstartedTransactionFromAddress(ctx, tc.fromAddress, tc.chainID) + if tc.hasErr { + require.NotNil(t, actErr) + require.NotNil(t, expErr) + } else { + require.Nil(t, actErr) + require.Nil(t, expErr) + } + if tc.hasTx { + require.NotNil(t, actTx) + require.NotNil(t, expTx) + assertTxEqual(t, *expTx, *actTx) + } else { + require.Nil(t, actTx) + require.Nil(t, expTx) + } + }) + } +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) From 36c252317d89d6c7154badeeb55bc7a9525c2827 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 16:48:04 -0400 Subject: [PATCH 15/24] add test for pruning --- common/txmgr/address_state.go | 68 +++++++++++++++- .../evm/txmgr/evm_inmemory_store_test.go | 78 ++++++++++++++++++- 2 files changed, 143 insertions(+), 3 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 97521b1154c..7b96e7040a3 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -172,7 +172,43 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTx filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, txIDs ...int64, ) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { - return nil + as.RLock() + defer as.RUnlock() + + // if txStates is empty then apply the filter to only the as.allTransactions map + if len(txStates) == 0 { + return as._findTxs(as.allTxs, filter, txIDs...) + } + + var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + for _, txState := range txStates { + switch txState { + case TxUnstarted: + filter2 := func(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool { + if tx.State != TxUnstarted { + return false + } + return filter(tx) + } + txs = append(txs, as._findTxs(as.allTxs, filter2, txIDs...)...) + case TxInProgress: + if as.inprogressTx != nil && filter(as.inprogressTx) { + txs = append(txs, *as.inprogressTx) + } + case TxUnconfirmed: + txs = append(txs, as._findTxs(as.unconfirmedTxs, filter, txIDs...)...) + case TxConfirmedMissingReceipt: + txs = append(txs, as._findTxs(as.confirmedMissingReceiptTxs, filter, txIDs...)...) + case TxConfirmed: + txs = append(txs, as._findTxs(as.confirmedTxs, filter, txIDs...)...) + case TxFatalError: + txs = append(txs, as._findTxs(as.fatalErroredTxs, filter, txIDs...)...) + default: + panic("findTxs: unknown transaction state") + } + } + + return txs } // pruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue. @@ -181,7 +217,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneU defer as.Unlock() txs := as.unstartedTxs.PruneByTxIDs(ids) - as.deleteTxs(txs...) + as._deleteTxs(txs...) } // deleteTxs removes the transactions with the given IDs from the address state. @@ -304,3 +340,31 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _delet as.unstartedTxs.RemoveTxByID(txID) } } + +// This method is not concurrent safe and should only be called from within a lock +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _findTxs( + txIDsToTx map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + filter func(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) bool, + txIDs ...int64, +) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { + var txs []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + // if txIDs is not empty then only apply the filter to those transactions + if len(txIDs) > 0 { + for _, txID := range txIDs { + tx := txIDsToTx[txID] + if tx != nil && filter(tx) { + txs = append(txs, *tx) + } + } + return txs + } + + // if txIDs is empty then apply the filter to all transactions + for _, tx := range txIDsToTx { + if filter(tx) { + txs = append(txs, *tx) + } + } + + return txs +} diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index a102ee1c996..25e55aca186 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,14 +1,91 @@ package txmgr_test import ( + "context" + "math/big" + "sort" "testing" + "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr" + txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" + + evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) +func TestInMemoryStore_PruneUnstartedTxQueue(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) + persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + kst := cltest.NewKeyStore(t, db, dbcfg) + _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) + + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + lggr := logger.TestSugared(t) + chainID := ethClient.ConfiguredChainID() + ctx := context.Background() + + inMemoryStore, err := commontxmgr.NewInMemoryStore[ + *big.Int, + common.Address, common.Hash, common.Hash, + *evmtypes.Receipt, + evmtypes.Nonce, + evmgas.EvmFee, + ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) + require.NoError(t, err) + + t.Run("prunes unstarted transactions", func(t *testing.T) { + maxQueueSize := uint32(5) + nTxs := 5 + subject := uuid.NullUUID{UUID: uuid.New(), Valid: true} + strat := commontxmgr.NewDropOldestStrategy(subject.UUID, maxQueueSize, dbcfg.DefaultQueryTimeout()) + for i := 0; i < nTxs; i++ { + inTx := cltest.NewEthTx(fromAddress) + inTx.Subject = subject + // insert the transaction into the persistent store + require.NoError(t, persistentStore.InsertTx(&inTx)) + // insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + } + + ids, err := strat.PruneQueue(ctx, inMemoryStore) + require.NoError(t, err) + assert.Equal(t, int(nTxs)-int(maxQueueSize-1), len(ids)) + + AssertCountPerSubject(t, persistentStore, int64(maxQueueSize-1), subject.UUID) + fn := func(tx *evmtxmgr.Tx) bool { return true } + states := []txmgrtypes.TxState{commontxmgr.TxUnstarted} + actTxs := inMemoryStore.XXXTestFindTxs(states, fn) + expTxs, err := persistentStore.FindTxesByFromAddressAndState(ctx, fromAddress, "unstarted") + require.NoError(t, err) + require.Equal(t, len(expTxs), len(actTxs)) + + // sort by ID to ensure the order is the same for comparison + sort.SliceStable(actTxs, func(i, j int) bool { + return actTxs[i].ID < actTxs[j].ID + }) + sort.SliceStable(expTxs, func(i, j int) bool { + return expTxs[i].ID < expTxs[j].ID + }) + for i := 0; i < len(expTxs); i++ { + assertTxEqual(t, *expTxs[i], actTxs[i]) + } + }) + +} + // assertTxEqual asserts that two transactions are equal func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { assert.Equal(t, exp.ID, act.ID) @@ -42,7 +119,6 @@ func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) { func assertTxAttemptEqual(t *testing.T, exp, act evmtxmgr.TxAttempt) { assert.Equal(t, exp.ID, act.ID) assert.Equal(t, exp.TxID, act.TxID) - assert.Equal(t, exp.Tx, act.Tx) assert.Equal(t, exp.TxFee, act.TxFee) assert.Equal(t, exp.ChainSpecificFeeLimit, act.ChainSpecificFeeLimit) assert.Equal(t, exp.SignedRawTx, act.SignedRawTx) From 4d9577aa2e13154f8400db2bfc8bbd3297ee3d36 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 16:50:42 -0400 Subject: [PATCH 16/24] add test for pruning --- .../evm/txmgr/evm_inmemory_store_test.go | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 25e55aca186..5aad20f8204 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -46,6 +46,43 @@ func TestInMemoryStore_PruneUnstartedTxQueue(t *testing.T) { ](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions()) require.NoError(t, err) + t.Run("doesnt prune unstarted transactions if under maxQueueSize", func(t *testing.T) { + maxQueueSize := uint32(5) + nTxs := 3 + subject := uuid.NullUUID{UUID: uuid.New(), Valid: true} + strat := commontxmgr.NewDropOldestStrategy(subject.UUID, maxQueueSize, dbcfg.DefaultQueryTimeout()) + for i := 0; i < nTxs; i++ { + inTx := cltest.NewEthTx(fromAddress) + inTx.Subject = subject + // insert the transaction into the persistent store + require.NoError(t, persistentStore.InsertTx(&inTx)) + // insert the transaction into the in-memory store + require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) + } + + ids, err := strat.PruneQueue(ctx, inMemoryStore) + require.NoError(t, err) + assert.Equal(t, 0, len(ids)) + + AssertCountPerSubject(t, persistentStore, int64(nTxs), subject.UUID) + fn := func(tx *evmtxmgr.Tx) bool { return true } + states := []txmgrtypes.TxState{commontxmgr.TxUnstarted} + actTxs := inMemoryStore.XXXTestFindTxs(states, fn) + expTxs, err := persistentStore.FindTxesByFromAddressAndState(ctx, fromAddress, "unstarted") + require.NoError(t, err) + require.Equal(t, len(expTxs), len(actTxs)) + + // sort by ID to ensure the order is the same for comparison + sort.SliceStable(actTxs, func(i, j int) bool { + return actTxs[i].ID < actTxs[j].ID + }) + sort.SliceStable(expTxs, func(i, j int) bool { + return expTxs[i].ID < expTxs[j].ID + }) + for i := 0; i < len(expTxs); i++ { + assertTxEqual(t, *expTxs[i], actTxs[i]) + } + }) t.Run("prunes unstarted transactions", func(t *testing.T) { maxQueueSize := uint32(5) nTxs := 5 From c55058054866a3b5f80ff77d977a939b0d0927a8 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 11 Mar 2024 17:00:40 -0400 Subject: [PATCH 17/24] fix delete method --- common/txmgr/address_state.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 97521b1154c..1271bd4b900 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -181,7 +181,7 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneU defer as.Unlock() txs := as.unstartedTxs.PruneByTxIDs(ids) - as.deleteTxs(txs...) + as._deleteTxs(txs...) } // deleteTxs removes the transactions with the given IDs from the address state. From b470da7194cc82a2ca6df88dcb1c1f655aa99bbb Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 20 Mar 2024 23:18:59 -0400 Subject: [PATCH 18/24] address comments --- common/txmgr/inmemory_store.go | 13 +++++++------ core/chains/evm/txmgr/evm_inmemory_store_test.go | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 8eaca33c49f..a380e40ba8a 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -46,6 +46,7 @@ type inMemoryStore[ keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + maxUnstarted uint64 addressStatesLock sync.RWMutex addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] @@ -75,9 +76,9 @@ func NewInMemoryStore[ addressStates: map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{}, } - maxUnstarted := config.MaxQueued() - if maxUnstarted <= 0 { - maxUnstarted = 10000 + ms.maxUnstarted = config.MaxQueued() + if ms.maxUnstarted <= 0 { + ms.maxUnstarted = 10000 } addresses, err := keyStore.EnabledAddressesForChain(ctx, chainID) if err != nil { @@ -88,7 +89,7 @@ func NewInMemoryStore[ if err != nil { return nil, fmt.Errorf("address_state: initialization: %w", err) } - ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs) + ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, ms.maxUnstarted, txs) } return &ms, nil @@ -105,14 +106,14 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat ) { tx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} if ms.chainID.String() != chainID.String() { - panic(fmt.Sprintf("create_transaction: invalid chain ID: %s", chainID)) + panic(fmt.Sprintf(ErrInvalidChainID.Error()+": %s", chainID.String())) } ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock() as, ok := ms.addressStates[txRequest.FromAddress] if !ok { - return tx, fmt.Errorf("create_transaction: %w: %q", ErrAddressNotFound, txRequest.FromAddress) + as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, txRequest.FromAddress, ms.maxUnstarted, nil) } // Persist Transaction to persistent storage diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 4f53a51bf09..9b280519c67 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -57,7 +57,7 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { FromAddress: fromAddress, ToAddress: toAddress, EncodedPayload: payload, - FeeLimit: gasLimit, + FeeLimit: uint64(gasLimit), Meta: nil, Strategy: strategy, }, chainID) From 395d35fe005c96ffb01b9962dadd14340689b11b Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 14:49:22 -0400 Subject: [PATCH 19/24] fix address state issue if from address not available --- common/txmgr/inmemory_store.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index a380e40ba8a..d2af7415bfd 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -109,11 +109,12 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat panic(fmt.Sprintf(ErrInvalidChainID.Error()+": %s", chainID.String())) } - ms.addressStatesLock.RLock() - defer ms.addressStatesLock.RUnlock() + ms.addressStatesLock.Lock() + defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[txRequest.FromAddress] if !ok { as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, txRequest.FromAddress, ms.maxUnstarted, nil) + ms.addressStates[txRequest.FromAddress] = as } // Persist Transaction to persistent storage From bbc0c359075dcd87c1bdc93ddfa311c632c8e7fc Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 15:27:21 -0400 Subject: [PATCH 20/24] simplify addTxToUnstartedQueue --- common/txmgr/address_state.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/common/txmgr/address_state.go b/common/txmgr/address_state.go index 1271bd4b900..179aacb867d 100644 --- a/common/txmgr/address_state.go +++ b/common/txmgr/address_state.go @@ -1,7 +1,6 @@ package txmgr import ( - "fmt" "sync" "time" @@ -211,27 +210,15 @@ func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekIn } // addTxToUnstartedQueue adds the given transaction to the unstarted queue. -// If the queue is full, an error is returned. -// If the transaction was successfully added, nil is returned. -// If the transaction's idempotency key already exists, an error is returned. -func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstartedQueue(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { +func (as *addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstartedQueue(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { as.Lock() defer as.Unlock() - if tx.IdempotencyKey != nil && as.idempotencyKeyToTx[*tx.IdempotencyKey] != nil { - return fmt.Errorf("add_tx_to_unstarted_queue: address %s idempotency key %s already exists", as.fromAddress, *tx.IdempotencyKey) - } - if as.unstartedTxs.Len() >= as.unstartedTxs.Cap() { - return fmt.Errorf("add_tx_to_unstarted_queue: address %s unstarted queue capacity has been reached", as.fromAddress) - } - as.unstartedTxs.AddTx(tx) as.allTxs[tx.ID] = tx if tx.IdempotencyKey != nil { as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx } - - return nil } // moveUnstartedToInProgress moves the next unstarted transaction to the in-progress state. From f7eb968f1013515ebfee043b62bb36af58f5d7d2 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 21 Mar 2024 15:36:20 -0400 Subject: [PATCH 21/24] address comments --- core/chains/evm/txmgr/evm_inmemory_store_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 9b280519c67..07ce523cb3f 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,7 +1,6 @@ package txmgr_test import ( - "context" "math/big" "testing" @@ -27,14 +26,14 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { db := pgtest.NewSqlxDB(t) _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) - persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + persistentStore := cltest.NewTestTxStore(t, db) kst := cltest.NewKeyStore(t, db, dbcfg) _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) lggr := logger.TestSugared(t) chainID := ethClient.ConfiguredChainID() - ctx := context.Background() + ctx := testutils.Context(t) inMemoryStore, err := commontxmgr.NewInMemoryStore[ *big.Int, @@ -53,7 +52,7 @@ func TestInMemoryStore_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - actTx, err := inMemoryStore.CreateTransaction(testutils.Context(t), evmtxmgr.TxRequest{ + actTx, err := inMemoryStore.CreateTransaction(ctx, evmtxmgr.TxRequest{ FromAddress: fromAddress, ToAddress: toAddress, EncodedPayload: payload, From 93900d528bb1da82ccd8295000aca8fc8cea0ddf Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 22 Mar 2024 15:04:24 -0400 Subject: [PATCH 22/24] cleanup context usage in tests --- core/chains/evm/txmgr/evm_inmemory_store_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/chains/evm/txmgr/evm_inmemory_store_test.go b/core/chains/evm/txmgr/evm_inmemory_store_test.go index 5aad20f8204..4d68ae47327 100644 --- a/core/chains/evm/txmgr/evm_inmemory_store_test.go +++ b/core/chains/evm/txmgr/evm_inmemory_store_test.go @@ -1,7 +1,6 @@ package txmgr_test import ( - "context" "math/big" "sort" "testing" @@ -19,6 +18,7 @@ import ( evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) @@ -28,14 +28,14 @@ func TestInMemoryStore_PruneUnstartedTxQueue(t *testing.T) { db := pgtest.NewSqlxDB(t) _, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t) - persistentStore := cltest.NewTestTxStore(t, db, dbcfg) + persistentStore := cltest.NewTestTxStore(t, db) kst := cltest.NewKeyStore(t, db, dbcfg) _, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth()) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) lggr := logger.TestSugared(t) chainID := ethClient.ConfiguredChainID() - ctx := context.Background() + ctx := testutils.Context(t) inMemoryStore, err := commontxmgr.NewInMemoryStore[ *big.Int, @@ -55,7 +55,7 @@ func TestInMemoryStore_PruneUnstartedTxQueue(t *testing.T) { inTx := cltest.NewEthTx(fromAddress) inTx.Subject = subject // insert the transaction into the persistent store - require.NoError(t, persistentStore.InsertTx(&inTx)) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx)) // insert the transaction into the in-memory store require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) } @@ -92,7 +92,7 @@ func TestInMemoryStore_PruneUnstartedTxQueue(t *testing.T) { inTx := cltest.NewEthTx(fromAddress) inTx.Subject = subject // insert the transaction into the persistent store - require.NoError(t, persistentStore.InsertTx(&inTx)) + require.NoError(t, persistentStore.InsertTx(ctx, &inTx)) // insert the transaction into the in-memory store require.NoError(t, inMemoryStore.XXXTestInsertTx(fromAddress, &inTx)) } From bffcc71024bcc69a8be4427b04730efe9c6ed5b4 Mon Sep 17 00:00:00 2001 From: James Walker Date: Mon, 1 Apr 2024 13:59:57 -0400 Subject: [PATCH 23/24] clean up --- common/txmgr/inmemory_store.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 63afb61e25a..3c8eb97b18d 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -112,16 +112,16 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat ) { tx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} if ms.chainID.String() != chainID.String() { - panic(fmt.Sprintf(ErrInvalidChainID.Error()+": %s", chainID.String())) + panic("invalid chain ID") } ms.addressStatesLock.Lock() - defer ms.addressStatesLock.Unlock() as, ok := ms.addressStates[txRequest.FromAddress] if !ok { as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, txRequest.FromAddress, ms.maxUnstarted, nil) ms.addressStates[txRequest.FromAddress] = as } + ms.addressStatesLock.Unlock() // Persist Transaction to persistent storage tx, err := ms.persistentTxStore.CreateTransaction(ctx, txRequest, chainID) @@ -132,7 +132,6 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat // Update in memory store // Add the request to the Unstarted channel to be processed by the Broadcaster as.addTxToUnstartedQueue(&tx) - return *ms.deepCopyTx(tx), nil } From 482f9a112c46a71351d214a3871df973e719f43a Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 4 Apr 2024 14:10:01 -0400 Subject: [PATCH 24/24] panic on chainID mismatch --- common/txmgr/inmemory_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index b850b1d519f..78bf33587ce 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -214,7 +214,7 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindN error, ) { if ms.chainID.String() != chainID.String() { - return nil, fmt.Errorf("find_next_unstarted_transaction_from_address: %w: %q", ErrInvalidChainID, chainID) + panic("invalid chain ID") } ms.addressStatesLock.RLock() defer ms.addressStatesLock.RUnlock()