diff --git a/common/txmgr/strategies.go b/common/txmgr/strategies.go index faba2ba97bc..3772e6d1d20 100644 --- a/common/txmgr/strategies.go +++ b/common/txmgr/strategies.go @@ -32,8 +32,8 @@ func NewSendEveryStrategy() txmgrtypes.TxStrategy { type SendEveryStrategy struct{} func (SendEveryStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{} } -func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error) { - return 0, nil +func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) ([]int64, error) { + return nil, nil } var _ txmgrtypes.TxStrategy = DropOldestStrategy{} @@ -56,14 +56,15 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{UUID: s.subject, Valid: true} } -func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error) { +func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, s.queryTimeout) defer cancel() - n, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject) + // NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune. + ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject) if err != nil { - return 0, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) + return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err) } return } diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index c6676674fbf..3e3fa9a20db 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -82,13 +82,14 @@ type Txm[ FEE feetypes.Fee, ] struct { services.StateMachine - logger logger.SugaredLogger - txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - config txmgrtypes.TransactionManagerChainConfig - txConfig txmgrtypes.TransactionManagerTransactionsConfig - keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] - chainID CHAIN_ID - checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + logger logger.SugaredLogger + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + config txmgrtypes.TransactionManagerChainConfig + txConfig txmgrtypes.TransactionManagerTransactionsConfig + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + chainID CHAIN_ID + checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + pruneQueueAndCreateLock sync.Mutex chHeads chan HEAD trigger chan ADDR @@ -522,7 +523,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran return tx, fmt.Errorf("Txm#CreateTransaction: %w", err) } - tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID) + tx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, b.chainID) if err != nil { return tx, err } @@ -562,7 +563,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative FeeLimit: gasLimit, Strategy: NewSendEveryStrategy(), } - etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID) + etx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, chainID) if err != nil { return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err) } @@ -682,3 +683,39 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) { return count, errors.New(n.ErrMsg) } + +func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueueAndCreateTxn( + ctx context.Context, + txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], + chainID CHAIN_ID, +) ( + tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], + err error, +) { + b.pruneQueueAndCreateLock.Lock() + defer b.pruneQueueAndCreateLock.Unlock() + + pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore) + if err != nil { + return tx, err + } + if len(pruned) > 0 { + b.logger.Warnw(fmt.Sprintf("Pruned %d old unstarted transactions", len(pruned)), + "subject", txRequest.Strategy.Subject(), + "pruned-tx-ids", pruned, + ) + } + + tx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID) + if err != nil { + return tx, err + } + b.logger.Debugw("Created transaction", + "fromAddress", txRequest.FromAddress, + "toAddress", txRequest.ToAddress, + "meta", txRequest.Meta, + "transactionID", tx.ID, + ) + + return tx, nil +} diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 16c20df31d7..353f398316d 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -937,22 +937,24 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PreloadTxes } // PruneUnstartedTxQueue provides a mock function with given fields: ctx, queueSize, subject -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (int64, error) { +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { ret := _m.Called(ctx, queueSize, subject) if len(ret) == 0 { panic("no return value specified for PruneUnstartedTxQueue") } - var r0 int64 + var r0 []int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) ([]int64, error)); ok { return rf(ctx, queueSize, subject) } - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) []int64); ok { r0 = rf(ctx, queueSize, subject) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } } if rf, ok := ret.Get(1).(func(context.Context, uint32, uuid.UUID) error); ok { diff --git a/common/txmgr/types/mocks/tx_strategy.go b/common/txmgr/types/mocks/tx_strategy.go index 7992c3fe05f..92d4b7da569 100644 --- a/common/txmgr/types/mocks/tx_strategy.go +++ b/common/txmgr/types/mocks/tx_strategy.go @@ -17,22 +17,24 @@ type TxStrategy struct { } // PruneQueue provides a mock function with given fields: ctx, pruneService -func (_m *TxStrategy) PruneQueue(ctx context.Context, pruneService types.UnstartedTxQueuePruner) (int64, error) { +func (_m *TxStrategy) PruneQueue(ctx context.Context, pruneService types.UnstartedTxQueuePruner) ([]int64, error) { ret := _m.Called(ctx, pruneService) if len(ret) == 0 { panic("no return value specified for PruneQueue") } - var r0 int64 + var r0 []int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) ([]int64, error)); ok { return rf(ctx, pruneService) } - if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, types.UnstartedTxQueuePruner) []int64); ok { r0 = rf(ctx, pruneService) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } } if rf, ok := ret.Get(1).(func(context.Context, types.UnstartedTxQueuePruner) error); ok { diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index 0f5d651ae29..31f568b99a2 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -30,7 +30,7 @@ type TxStrategy interface { // PruneQueue is called after tx insertion // It accepts the service responsible for deleting // unstarted txs and deletion options - PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (n int64, err error) + PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (ids []int64, err error) } type TxAttemptState int8 diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 57ecf28d589..742a1740033 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -114,7 +114,7 @@ type TxHistoryReaper[CHAIN_ID types.ID] interface { } type UnstartedTxQueuePruner interface { - PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) + PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) } // R is the raw unparsed transaction receipt diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index add4d915809..ae986acee27 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1841,14 +1841,6 @@ RETURNING "txes".* if err != nil { return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx") } - var pruned int64 - pruned, err = txRequest.Strategy.PruneQueue(ctx, o) - if err != nil { - return pkgerrors.Wrap(err, "CreateEthTransaction failed to prune evm.txes") - } - if pruned > 0 { - o.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), "replacementID", dbEtx.ID) - } return nil }) var etx Tx @@ -1856,13 +1848,13 @@ RETURNING "txes".* return etx, err } -func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) { +func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) err = qq.Transaction(func(tx pg.Queryer) error { - res, err := qq.Exec(` + err := qq.Select(&ids, ` DELETE FROM evm.txes WHERE state = 'unstarted' AND subject = $1 AND id < ( @@ -1873,11 +1865,13 @@ id < ( ORDER BY id DESC LIMIT $3 ) numbers -)`, subject, subject, queueSize) +) RETURNING id`, subject, subject, queueSize) if err != nil { - return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed") + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return fmt.Errorf("PruneUnstartedTxQueue failed: %w", err) } - n, err = res.RowsAffected() return err }) return diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index b5da5527448..35d684727d1 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -27,7 +27,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v4" ) @@ -1723,7 +1722,6 @@ func TestORM_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: fromAddress, ToAddress: toAddress, @@ -1780,7 +1778,6 @@ func TestORM_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: fromAddress, ToAddress: toAddress, @@ -1816,22 +1813,22 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) - t.Run("does not prune if queue has not exceeded capacity", func(t *testing.T) { + t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) { subject1 := uuid.New() strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout()) for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1)) } - AssertCountPerSubject(t, txStore, int64(5), subject1) + AssertCountPerSubject(t, txStore, int64(4), subject1) }) - t.Run("prunes if queue has exceeded capacity", func(t *testing.T) { + t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) { subject2 := uuid.New() strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout()) for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2)) } - AssertCountPerSubject(t, txStore, int64(3), subject2) + AssertCountPerSubject(t, txStore, int64(2), subject2) }) } diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index a9a7023ac1f..9690bf9728d 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -1058,22 +1058,24 @@ func (_m *EvmTxStore) PreloadTxes(ctx context.Context, attempts []types.TxAttemp } // PruneUnstartedTxQueue provides a mock function with given fields: ctx, queueSize, subject -func (_m *EvmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (int64, error) { +func (_m *EvmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) ([]int64, error) { ret := _m.Called(ctx, queueSize, subject) if len(ret) == 0 { panic("no return value specified for PruneUnstartedTxQueue") } - var r0 int64 + var r0 []int64 var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) (int64, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) ([]int64, error)); ok { return rf(ctx, queueSize, subject) } - if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, uint32, uuid.UUID) []int64); ok { r0 = rf(ctx, queueSize, subject) } else { - r0 = ret.Get(0).(int64) + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } } if rf, ok := ret.Get(1).(func(context.Context, uint32, uuid.UUID) error); ok { diff --git a/core/chains/evm/txmgr/strategies_test.go b/core/chains/evm/txmgr/strategies_test.go index 765b43e78f2..19f5f197289 100644 --- a/core/chains/evm/txmgr/strategies_test.go +++ b/core/chains/evm/txmgr/strategies_test.go @@ -21,9 +21,9 @@ func Test_SendEveryStrategy(t *testing.T) { assert.Equal(t, uuid.NullUUID{}, s.Subject()) - n, err := s.PruneQueue(testutils.Context(t), nil) + ids, err := s.PruneQueue(testutils.Context(t), nil) assert.NoError(t, err) - assert.Equal(t, int64(0), n) + assert.Len(t, ids, 0) } func Test_DropOldestStrategy_Subject(t *testing.T) { @@ -47,9 +47,9 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) { t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) { strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout) - mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return(int64(2), nil) - n, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore) + mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil) + ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore) require.NoError(t, err) - assert.Equal(t, int64(2), n) + assert.Equal(t, []int64{1, 2}, ids) }) } diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 6fafff1a5c1..0e28f2948ee 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -121,7 +121,7 @@ func TestTxm_CreateTransaction(t *testing.T) { subject := uuid.New() strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) evmConfig.MaxQueued = uint64(1) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: fromAddress, @@ -406,7 +406,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, otherKey.Address) strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: evmFromAddress, @@ -430,7 +430,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, thisKey.Address) strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: evmFromAddress, @@ -451,7 +451,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) { cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 42, thisKey.Address) strategy := newMockTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{}) - strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil) evmConfig.MaxQueued = uint64(1) etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{ @@ -799,6 +799,10 @@ func mustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress c func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStore, txRequest txmgr.TxRequest, chainID *big.Int) (tx txmgr.Tx) { tx, err := txStore.CreateTransaction(testutils.Context(t), txRequest, chainID) require.NoError(t, err) + + _, err = txRequest.Strategy.PruneQueue(testutils.Context(t), txStore) + require.NoError(t, err) + return tx } diff --git a/core/web/eth_keys_controller_test.go b/core/web/eth_keys_controller_test.go index 739af5820c9..a9be5517bcc 100644 --- a/core/web/eth_keys_controller_test.go +++ b/core/web/eth_keys_controller_test.go @@ -401,7 +401,7 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) { subject := uuid.New() strategy := commontxmmocks.NewTxStrategy(t) strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true}) - strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil) + strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(nil, nil) _, err := chain.TxManager().CreateTransaction(testutils.Context(t), txmgr.TxRequest{ FromAddress: addr, ToAddress: testutils.NewAddress(),