Skip to content

Commit

Permalink
take pruning logic out of create_transaction logic (#11845)
Browse files Browse the repository at this point in the history
* take pruning logic out of create_transaction logic

* change prune queue function signature to return array of ids rather than count

* refactor txmgr a little

* change order of pruning and creating transaction

* Apply suggestions from code review

Co-authored-by: amit-momin <[email protected]>

* address comments

* fix tests

* address comments

* address comments

* add pruneQueueAndCreateLock

---------

Co-authored-by: amit-momin <[email protected]>
Co-authored-by: Prashant Yadav <[email protected]>
  • Loading branch information
3 people authored Jan 25, 2024
1 parent 9f70d30 commit e214014
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 61 deletions.
11 changes: 6 additions & 5 deletions common/txmgr/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewSendEveryStrategy() txmgrtypes.TxStrategy {
type SendEveryStrategy struct{}

func (SendEveryStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{} }
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error) {
return 0, nil
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) ([]int64, error) {
return nil, nil
}

var _ txmgrtypes.TxStrategy = DropOldestStrategy{}
Expand All @@ -56,14 +56,15 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID {
return uuid.NullUUID{UUID: s.subject, Valid: true}
}

func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error) {
func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.queryTimeout)
defer cancel()

n, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject)
// NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune.
ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject)
if err != nil {
return 0, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
}
return
}
55 changes: 46 additions & 9 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ type Txm[
FEE feetypes.Fee,
] struct {
services.StateMachine
logger logger.SugaredLogger
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
config txmgrtypes.TransactionManagerChainConfig
txConfig txmgrtypes.TransactionManagerTransactionsConfig
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
chainID CHAIN_ID
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
logger logger.SugaredLogger
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
config txmgrtypes.TransactionManagerChainConfig
txConfig txmgrtypes.TransactionManagerTransactionsConfig
keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
chainID CHAIN_ID
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
pruneQueueAndCreateLock sync.Mutex

chHeads chan HEAD
trigger chan ADDR
Expand Down Expand Up @@ -522,7 +523,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
return tx, fmt.Errorf("Txm#CreateTransaction: %w", err)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID)
tx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, b.chainID)
if err != nil {
return tx, err
}
Expand Down Expand Up @@ -562,7 +563,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative
FeeLimit: gasLimit,
Strategy: NewSendEveryStrategy(),
}
etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
etx, err = b.pruneQueueAndCreateTxn(ctx, txRequest, chainID)
if err != nil {
return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err)
}
Expand Down Expand Up @@ -682,3 +683,39 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) {
return count, errors.New(n.ErrMsg)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pruneQueueAndCreateTxn(
ctx context.Context,
txRequest txmgrtypes.TxRequest[ADDR, TX_HASH],
chainID CHAIN_ID,
) (
tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
err error,
) {
b.pruneQueueAndCreateLock.Lock()
defer b.pruneQueueAndCreateLock.Unlock()

pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore)
if err != nil {
return tx, err
}
if len(pruned) > 0 {
b.logger.Warnw(fmt.Sprintf("Pruned %d old unstarted transactions", len(pruned)),
"subject", txRequest.Strategy.Subject(),
"pruned-tx-ids", pruned,
)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
if err != nil {
return tx, err
}
b.logger.Debugw("Created transaction",
"fromAddress", txRequest.FromAddress,
"toAddress", txRequest.ToAddress,
"meta", txRequest.Meta,
"transactionID", tx.ID,
)

return tx, nil
}
12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_strategy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type TxStrategy interface {
// PruneQueue is called after tx insertion
// It accepts the service responsible for deleting
// unstarted txs and deletion options
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (n int64, err error)
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (ids []int64, err error)
}

type TxAttemptState int8
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type TxHistoryReaper[CHAIN_ID types.ID] interface {
}

type UnstartedTxQueuePruner interface {
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error)
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error)
}

// R is the raw unparsed transaction receipt
Expand Down
20 changes: 7 additions & 13 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,28 +1841,20 @@ RETURNING "txes".*
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx")
}
var pruned int64
pruned, err = txRequest.Strategy.PruneQueue(ctx, o)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to prune evm.txes")
}
if pruned > 0 {
o.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), "replacementID", dbEtx.ID)
}
return nil
})
var etx Tx
dbEtx.ToTx(&etx)
return etx, err
}

func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) {
func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Transaction(func(tx pg.Queryer) error {
res, err := qq.Exec(`
err := qq.Select(&ids, `
DELETE FROM evm.txes
WHERE state = 'unstarted' AND subject = $1 AND
id < (
Expand All @@ -1873,11 +1865,13 @@ id < (
ORDER BY id DESC
LIMIT $3
) numbers
)`, subject, subject, queueSize)
) RETURNING id`, subject, subject, queueSize)
if err != nil {
return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed")
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return fmt.Errorf("PruneUnstartedTxQueue failed: %w", err)
}
n, err = res.RowsAffected()
return err
})
return
Expand Down
11 changes: 4 additions & 7 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"
)
Expand Down Expand Up @@ -1723,7 +1722,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1780,7 +1778,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1816,22 +1813,22 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

t.Run("does not prune if queue has not exceeded capacity", func(t *testing.T) {
t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) {
subject1 := uuid.New()
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout())
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
AssertCountPerSubject(t, txStore, int64(5), subject1)
AssertCountPerSubject(t, txStore, int64(4), subject1)
})

t.Run("prunes if queue has exceeded capacity", func(t *testing.T) {
t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) {
subject2 := uuid.New()
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout())
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
AssertCountPerSubject(t, txStore, int64(3), subject2)
AssertCountPerSubject(t, txStore, int64(2), subject2)
})
}

Expand Down
12 changes: 7 additions & 5 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func Test_SendEveryStrategy(t *testing.T) {

assert.Equal(t, uuid.NullUUID{}, s.Subject())

n, err := s.PruneQueue(testutils.Context(t), nil)
ids, err := s.PruneQueue(testutils.Context(t), nil)
assert.NoError(t, err)
assert.Equal(t, int64(0), n)
assert.Len(t, ids, 0)
}

func Test_DropOldestStrategy_Subject(t *testing.T) {
Expand All @@ -47,9 +47,9 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) {

t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) {
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return(int64(2), nil)
n, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil)
ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
require.NoError(t, err)
assert.Equal(t, int64(2), n)
assert.Equal(t, []int64{1, 2}, ids)
})
}
12 changes: 8 additions & 4 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestTxm_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)
evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, otherKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -430,7 +430,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -451,7 +451,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 42, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
Expand Down Expand Up @@ -799,6 +799,10 @@ func mustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress c
func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStore, txRequest txmgr.TxRequest, chainID *big.Int) (tx txmgr.Tx) {
tx, err := txStore.CreateTransaction(testutils.Context(t), txRequest, chainID)
require.NoError(t, err)

_, err = txRequest.Strategy.PruneQueue(testutils.Context(t), txStore)
require.NoError(t, err)

return tx
}

Expand Down
2 changes: 1 addition & 1 deletion core/web/eth_keys_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
subject := uuid.New()
strategy := commontxmmocks.NewTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(nil, nil)
_, err := chain.TxManager().CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: addr,
ToAddress: testutils.NewAddress(),
Expand Down

0 comments on commit e214014

Please sign in to comment.