Skip to content

Commit

Permalink
refactor TransactionStore interface
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhen1997 committed Aug 9, 2024
1 parent 82fde8d commit acc3a8a
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 31 deletions.
2 changes: 1 addition & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) For
ec.lggr.Infof("ForceRebroadcast: will rebroadcast transactions for all sequences between %v and %v", seqs[0], seqs[len(seqs)-1])

for _, seq := range seqs {
etx, err := ec.txStore.FindTxWithSequence(ctx, address, seq)
etx, err := ec.txStore.FindTxWithSequenceForRebroadcast(ctx, address, seq)
if err != nil {
return fmt.Errorf("ForceRebroadcast failed: %w", err)
}
Expand Down
82 changes: 71 additions & 11 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ type TransactionStore[
FindTxAttemptsRequiringResend(ctx context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the idempotencyKey and chainID
FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the fromAddress and sequence
FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx for rebroadcast using the fromAddress and sequence
FindTxWithSequenceForRebroadcast(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindFinalizedTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindNextUnstartedTransactionFromAddress(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error)
Expand Down
6 changes: 3 additions & 3 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3013,7 +3013,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
return
}
// Retrieve Tx to check if callback completed flag was set to true
updateTx, err3 := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce)
updateTx, err3 := txStore.FindFinalizedTxWithSequence(tests.Context(t), fromAddress, nonce)
if assert.NoError(t, err3) {
assert.Equal(t, true, updateTx.CallbackCompleted)
}
Expand Down Expand Up @@ -3067,7 +3067,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
return
}
// Retrieve Tx to check if callback completed flag was set to true
updateTx, err3 := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce)
updateTx, err3 := txStore.FindTxWithSequenceForRebroadcast(tests.Context(t), fromAddress, nonce)
if assert.NoError(t, err3) {
assert.Equal(t, true, updateTx.CallbackCompleted)
}
Expand Down Expand Up @@ -3103,7 +3103,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
updateTx, err := txStore.FindTxWithSequence(tests.Context(t), fromAddress, nonce)
updateTx, err := txStore.FindTxWithSequenceForRebroadcast(tests.Context(t), fromAddress, nonce)
require.NoError(t, err)
require.Equal(t, false, updateTx.CallbackCompleted)
})
Expand Down
26 changes: 25 additions & 1 deletion core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ func (o *evmTxStore) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKe
}

// FindTxWithSequence returns any broadcast ethtx with the given nonce
func (o *evmTxStore) FindTxWithSequence(ctx context.Context, fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) {
func (o *evmTxStore) FindTxWithSequenceForRebroadcast(ctx context.Context, fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
Expand All @@ -1110,6 +1110,30 @@ SELECT * FROM evm.txes WHERE from_address = $1 AND nonce = $2 AND state IN ('con
return
}

// FindFinalizedTxWithSequence returns finalized ethtx with the given nonce
func (o *evmTxStore) FindFinalizedTxWithSequence(ctx context.Context, fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()
etx = new(Tx)
err = o.Transact(ctx, true, func(orm *evmTxStore) error {
var dbEtx DbEthTx
err = orm.q.GetContext(ctx, &dbEtx, `
SELECT * FROM evm.txes WHERE from_address = $1 AND nonce = $2 AND state = 'finalized'
`, fromAddress, nonce.Int64())
if err != nil {
return pkgerrors.Wrap(err, "FindEthTxWithNonce failed to load evm.txes")
}
dbEtx.ToTx(etx)
err = orm.loadTxAttemptsAtomic(ctx, etx)
return pkgerrors.Wrap(err, "FindEthTxWithNonce failed to load evm.tx_attempts")
})
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return
}

func updateEthTxAttemptUnbroadcast(ctx context.Context, orm *evmTxStore, attempt TxAttempt) error {
if attempt.State != txmgrtypes.TxAttemptBroadcast {
return errors.New("expected eth_tx_attempt to be broadcast")
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func TestORM_FindTxWithSequence(t *testing.T) {
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

t.Run("returns nil if no results", func(t *testing.T) {
etx, err := txStore.FindTxWithSequence(tests.Context(t), fromAddress, evmtypes.Nonce(777))
etx, err := txStore.FindTxWithSequenceForRebroadcast(tests.Context(t), fromAddress, evmtypes.Nonce(777))
require.NoError(t, err)
assert.Nil(t, etx)
})
Expand All @@ -739,7 +739,7 @@ func TestORM_FindTxWithSequence(t *testing.T) {
etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 777, 1, fromAddress)
require.Equal(t, evmtypes.Nonce(777), *etx.Sequence)

res, err := txStore.FindTxWithSequence(tests.Context(t), fromAddress, evmtypes.Nonce(777))
res, err := txStore.FindTxWithSequenceForRebroadcast(tests.Context(t), fromAddress, evmtypes.Nonce(777))
require.NoError(t, err)
assert.Equal(t, etx.Sequence, res.Sequence)
})
Expand Down
Loading

0 comments on commit acc3a8a

Please sign in to comment.