Skip to content

Commit

Permalink
Add RequestID field to Transactions (#10363)
Browse files Browse the repository at this point in the history
* Added request_id field to eth_txes to track external component requests

* Rearranged logic, added missing validation, and linted

* Fixed mocks

* Added new TxStore tests

* Addressed PR feedback

* Updated migration sequencing

* Changed RequestID name to IdempotencyKey and updated type to UUID

* Switched IdempotencyKey type back to string
  • Loading branch information
amit-momin authored Sep 5, 2023
1 parent 2afd9c2 commit c38b862
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 3 deletions.
15 changes: 15 additions & 0 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package txmgr

import (
"context"
"database/sql"
"fmt"
"math/big"
"sync"
Expand Down Expand Up @@ -435,6 +436,20 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad

// CreateTransaction inserts a new transaction
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], qs ...pg.QOpt) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(*txRequest.IdempotencyKey, b.chainID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return tx, errors.Wrap(err, "Failed to search for transaction with IdempotencyKey")
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
return *existingTx, nil
}
}

if err = b.checkEnabled(txRequest.FromAddress); err != nil {
return tx, err
}
Expand Down
26 changes: 26 additions & 0 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (s TxAttemptState) String() (str string) {
}

type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct {
// IdempotencyKey is a globally unique ID set by the caller, to prevent accidental creation of duplicated Txs during retries or crash recovery.
// If this field is set, the TXM will first search existing Txs with this field.
// If found, it will return the existing Tx, without creating a new one. TXM will not validate or ensure that existing Tx is same as the incoming TxRequest.
// If not found, TXM will create a new Tx.
// If IdempotencyKey is set to null, TXM will always create a new Tx.
// Since IdempotencyKey has to be globally unique, consider prepending the service or component's name it is being used by
// Such as {service}-{ID}. E.g vrf-12345
IdempotencyKey *string
FromAddress ADDR
ToAddress ADDR
EncodedPayload []byte
Expand Down Expand Up @@ -178,6 +186,7 @@ type Tx[
FEE feetypes.Fee,
] struct {
ID int64
IdempotencyKey *string
Sequence *SEQ
FromAddress ADDR
ToAddress ADDR
Expand Down
3 changes: 3 additions & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type TransactionStore[
FindTxAttemptsConfirmedMissingReceipt(chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsRequiringReceiptFetch(chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsRequiringResend(olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the idempotencyKey and chainID
FindTxWithIdempotencyKey(idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the fromAddress and sequence
FindTxWithSequence(fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindNextUnstartedTransactionFromAddress(etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID, qopts ...pg.QOpt) error
FindTransactionsConfirmedInBlockRange(highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Expand Down
23 changes: 20 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func toOnchainReceipt(rs []*evmtypes.Receipt) []rawOnchainReceipt {
// This is exported, as tests and other external code still directly reads DB using this schema.
type DbEthTx struct {
ID int64
IdempotencyKey *string
Nonce *int64
FromAddress common.Address
ToAddress common.Address
Expand Down Expand Up @@ -218,6 +219,7 @@ func DbEthTxToEthTx(dbEthTx DbEthTx, evmEthTx *Tx) {
n := evmtypes.Nonce(*dbEthTx.Nonce)
evmEthTx.Sequence = &n
}
evmEthTx.IdempotencyKey = dbEthTx.IdempotencyKey
evmEthTx.FromAddress = dbEthTx.FromAddress
evmEthTx.ToAddress = dbEthTx.ToAddress
evmEthTx.EncodedPayload = dbEthTx.EncodedPayload
Expand Down Expand Up @@ -906,6 +908,21 @@ func (o *evmTxStore) FindReceiptsPendingConfirmation(ctx context.Context, blockN
return
}

// FindTxWithIdempotencyKey returns any broadcast ethtx with the given idempotencyKey and chainID
func (o *evmTxStore) FindTxWithIdempotencyKey(idempotencyKey string, chainID *big.Int) (etx *Tx, err error) {
var dbEtx DbEthTx
err = o.q.Get(&dbEtx, `SELECT * FROM eth_txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load eth_txes")
}
etx = new(Tx)
DbEthTxToEthTx(dbEtx, etx)
return
}

// FindTxWithSequence returns any broadcast ethtx with the given nonce
func (o *evmTxStore) FindTxWithSequence(fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) {
etx = new(Tx)
Expand Down Expand Up @@ -1501,12 +1518,12 @@ func (o *evmTxStore) CreateTransaction(txRequest TxRequest, chainID *big.Int, qo
}
}
err = tx.Get(&dbEtx, `
INSERT INTO eth_txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker)
INSERT INTO eth_txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key)
VALUES (
$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11
$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12
)
RETURNING "eth_txes".*
`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker)
`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert eth_tx")
}
Expand Down
29 changes: 29 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,35 @@ func TestORM_FindReceiptsPendingConfirmation(t *testing.T) {
assert.Equal(t, tr.ID, receiptsPlus[0].ID)
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
t.Parallel()
db := pgtest.NewSqlxDB(t)
cfg := newTestChainScopedConfig(t)
txStore := cltest.NewTestTxStore(t, db, cfg.Database())
ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth()
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0)

t.Run("returns nil if no results", func(t *testing.T) {
idempotencyKey := "777"
etx, err := txStore.FindTxWithIdempotencyKey(idempotencyKey, big.NewInt(0))
require.NoError(t, err)
assert.Nil(t, etx)
})

t.Run("returns transaction if it exists", func(t *testing.T) {
idempotencyKey := "777"
cfg.EVM().ChainID()
etx := cltest.MustCreateUnstartedGeneratedTx(t, txStore, fromAddress, big.NewInt(0),
cltest.EvmTxRequestWithIdempotencyKey(idempotencyKey))
require.Equal(t, idempotencyKey, *etx.IdempotencyKey)

res, err := txStore.FindTxWithIdempotencyKey(idempotencyKey, big.NewInt(0))
require.NoError(t, err)
assert.Equal(t, etx.Sequence, res.Sequence)
require.Equal(t, idempotencyKey, *res.IdempotencyKey)
})
}

func TestORM_FindTxWithSequence(t *testing.T) {
t.Parallel()

Expand Down
26 changes: 26 additions & 0 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,51 @@ func TestTxm_CreateTransaction(t *testing.T) {
require.NotNil(t, m.FwdrDestAddress)
require.Equal(t, etx.ToAddress.String(), fwdrAddr.String())
})

t.Run("insert Tx successfully with a IdempotencyKey", func(t *testing.T) {
evmConfig.maxQueued = uint64(3)
id := uuid.New()
idempotencyKey := "1"
_, err := txm.CreateTransaction(txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: testutils.NewAddress(),
EncodedPayload: []byte{1, 2, 3},
FeeLimit: 21000,
PipelineTaskRunID: &id,
Strategy: txmgrcommon.NewSendEveryStrategy(),
})
assert.NoError(t, err)
})

t.Run("doesn't insert eth_tx if a matching tx already exists for that IdempotencyKey", func(t *testing.T) {
evmConfig.maxQueued = uint64(3)
id := uuid.New()
idempotencyKey := "2"
tx1, err := txm.CreateTransaction(txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: testutils.NewAddress(),
EncodedPayload: []byte{1, 2, 3},
FeeLimit: 21000,
PipelineTaskRunID: &id,
Strategy: txmgrcommon.NewSendEveryStrategy(),
})
assert.NoError(t, err)

tx2, err := txm.CreateTransaction(txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: testutils.NewAddress(),
EncodedPayload: []byte{1, 2, 3},
FeeLimit: 21000,
PipelineTaskRunID: &id,
Strategy: txmgrcommon.NewSendEveryStrategy(),
})
assert.NoError(t, err)

assert.Equal(t, tx1.GetID(), tx2.GetID())
})
}

func newMockTxStrategy(t *testing.T) *commontxmmocks.TxStrategy {
Expand Down
6 changes: 6 additions & 0 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ func EvmTxRequestWithValue(value big.Int) func(*txmgr.TxRequest) {
}
}

func EvmTxRequestWithIdempotencyKey(idempotencyKey string) func(*txmgr.TxRequest) {
return func(tx *txmgr.TxRequest) {
tx.IdempotencyKey = &idempotencyKey
}
}

func MustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress common.Address, toAddress common.Address, encodedPayload []byte, gasLimit uint32, value big.Int, chainID *big.Int, opts ...interface{}) (tx txmgr.Tx) {
txRequest := txmgr.TxRequest{
FromAddress: fromAddress,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +goose Up

ALTER TABLE eth_txes ADD COLUMN idempotency_key varchar(2000) UNIQUE;

-- +goose Down

ALTER TABLE eth_txes DROP COLUMN idempotency_key;

0 comments on commit c38b862

Please sign in to comment.