Skip to content

Commit

Permalink
Merge branch 'develop' into ml/fix-persistence-manager-race
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-cll authored Sep 6, 2023
2 parents 25055ee + 5aadc9c commit 09a1c0d
Show file tree
Hide file tree
Showing 19 changed files with 251 additions and 74 deletions.
15 changes: 15 additions & 0 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package txmgr

import (
"context"
"database/sql"
"fmt"
"math/big"
"sync"
Expand Down Expand Up @@ -435,6 +436,20 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad

// CreateTransaction inserts a new transaction
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], qs ...pg.QOpt) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing
// Skipping CreateTransaction to avoid double send
if txRequest.IdempotencyKey != nil {
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(*txRequest.IdempotencyKey, b.chainID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return tx, errors.Wrap(err, "Failed to search for transaction with IdempotencyKey")
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
return *existingTx, nil
}
}

if err = b.checkEnabled(txRequest.FromAddress); err != nil {
return tx, err
}
Expand Down
26 changes: 26 additions & 0 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (s TxAttemptState) String() (str string) {
}

type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct {
// IdempotencyKey is a globally unique ID set by the caller, to prevent accidental creation of duplicated Txs during retries or crash recovery.
// If this field is set, the TXM will first search existing Txs with this field.
// If found, it will return the existing Tx, without creating a new one. TXM will not validate or ensure that existing Tx is same as the incoming TxRequest.
// If not found, TXM will create a new Tx.
// If IdempotencyKey is set to null, TXM will always create a new Tx.
// Since IdempotencyKey has to be globally unique, consider prepending the service or component's name it is being used by
// Such as {service}-{ID}. E.g vrf-12345
IdempotencyKey *string
FromAddress ADDR
ToAddress ADDR
EncodedPayload []byte
Expand Down Expand Up @@ -178,6 +186,7 @@ type Tx[
FEE feetypes.Fee,
] struct {
ID int64
IdempotencyKey *string
Sequence *SEQ
FromAddress ADDR
ToAddress ADDR
Expand Down
3 changes: 3 additions & 0 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type TransactionStore[
FindTxAttemptsConfirmedMissingReceipt(chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsRequiringReceiptFetch(chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindTxAttemptsRequiringResend(olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the idempotencyKey and chainID
FindTxWithIdempotencyKey(idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
// Search for Tx using the fromAddress and sequence
FindTxWithSequence(fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
FindNextUnstartedTransactionFromAddress(etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID, qopts ...pg.QOpt) error
FindTransactionsConfirmedInBlockRange(highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
Expand Down
23 changes: 20 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func toOnchainReceipt(rs []*evmtypes.Receipt) []rawOnchainReceipt {
// This is exported, as tests and other external code still directly reads DB using this schema.
type DbEthTx struct {
ID int64
IdempotencyKey *string
Nonce *int64
FromAddress common.Address
ToAddress common.Address
Expand Down Expand Up @@ -218,6 +219,7 @@ func DbEthTxToEthTx(dbEthTx DbEthTx, evmEthTx *Tx) {
n := evmtypes.Nonce(*dbEthTx.Nonce)
evmEthTx.Sequence = &n
}
evmEthTx.IdempotencyKey = dbEthTx.IdempotencyKey
evmEthTx.FromAddress = dbEthTx.FromAddress
evmEthTx.ToAddress = dbEthTx.ToAddress
evmEthTx.EncodedPayload = dbEthTx.EncodedPayload
Expand Down Expand Up @@ -906,6 +908,21 @@ func (o *evmTxStore) FindReceiptsPendingConfirmation(ctx context.Context, blockN
return
}

// FindTxWithIdempotencyKey returns any broadcast ethtx with the given idempotencyKey and chainID
func (o *evmTxStore) FindTxWithIdempotencyKey(idempotencyKey string, chainID *big.Int) (etx *Tx, err error) {
var dbEtx DbEthTx
err = o.q.Get(&dbEtx, `SELECT * FROM eth_txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load eth_txes")
}
etx = new(Tx)
DbEthTxToEthTx(dbEtx, etx)
return
}

// FindTxWithSequence returns any broadcast ethtx with the given nonce
func (o *evmTxStore) FindTxWithSequence(fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) {
etx = new(Tx)
Expand Down Expand Up @@ -1501,12 +1518,12 @@ func (o *evmTxStore) CreateTransaction(txRequest TxRequest, chainID *big.Int, qo
}
}
err = tx.Get(&dbEtx, `
INSERT INTO eth_txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker)
INSERT INTO eth_txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key)
VALUES (
$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11
$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12
)
RETURNING "eth_txes".*
`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker)
`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert eth_tx")
}
Expand Down
29 changes: 29 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,35 @@ func TestORM_FindReceiptsPendingConfirmation(t *testing.T) {
assert.Equal(t, tr.ID, receiptsPlus[0].ID)
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
t.Parallel()
db := pgtest.NewSqlxDB(t)
cfg := newTestChainScopedConfig(t)
txStore := cltest.NewTestTxStore(t, db, cfg.Database())
ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth()
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0)

t.Run("returns nil if no results", func(t *testing.T) {
idempotencyKey := "777"
etx, err := txStore.FindTxWithIdempotencyKey(idempotencyKey, big.NewInt(0))
require.NoError(t, err)
assert.Nil(t, etx)
})

t.Run("returns transaction if it exists", func(t *testing.T) {
idempotencyKey := "777"
cfg.EVM().ChainID()
etx := cltest.MustCreateUnstartedGeneratedTx(t, txStore, fromAddress, big.NewInt(0),
cltest.EvmTxRequestWithIdempotencyKey(idempotencyKey))
require.Equal(t, idempotencyKey, *etx.IdempotencyKey)

res, err := txStore.FindTxWithIdempotencyKey(idempotencyKey, big.NewInt(0))
require.NoError(t, err)
assert.Equal(t, etx.Sequence, res.Sequence)
require.Equal(t, idempotencyKey, *res.IdempotencyKey)
})
}

func TestORM_FindTxWithSequence(t *testing.T) {
t.Parallel()

Expand Down
26 changes: 26 additions & 0 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,51 @@ func TestTxm_CreateTransaction(t *testing.T) {
require.NotNil(t, m.FwdrDestAddress)
require.Equal(t, etx.ToAddress.String(), fwdrAddr.String())
})

t.Run("insert Tx successfully with a IdempotencyKey", func(t *testing.T) {
evmConfig.maxQueued = uint64(3)
id := uuid.New()
idempotencyKey := "1"
_, err := txm.CreateTransaction(txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: testutils.NewAddress(),
EncodedPayload: []byte{1, 2, 3},
FeeLimit: 21000,
PipelineTaskRunID: &id,
Strategy: txmgrcommon.NewSendEveryStrategy(),
})
assert.NoError(t, err)
})

t.Run("doesn't insert eth_tx if a matching tx already exists for that IdempotencyKey", func(t *testing.T) {
evmConfig.maxQueued = uint64(3)
id := uuid.New()
idempotencyKey := "2"
tx1, err := txm.CreateTransaction(txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: testutils.NewAddress(),
EncodedPayload: []byte{1, 2, 3},
FeeLimit: 21000,
PipelineTaskRunID: &id,
Strategy: txmgrcommon.NewSendEveryStrategy(),
})
assert.NoError(t, err)

tx2, err := txm.CreateTransaction(txmgr.TxRequest{
IdempotencyKey: &idempotencyKey,
FromAddress: fromAddress,
ToAddress: testutils.NewAddress(),
EncodedPayload: []byte{1, 2, 3},
FeeLimit: 21000,
PipelineTaskRunID: &id,
Strategy: txmgrcommon.NewSendEveryStrategy(),
})
assert.NoError(t, err)

assert.Equal(t, tx1.GetID(), tx2.GetID())
})
}

func newMockTxStrategy(t *testing.T) *commontxmmocks.TxStrategy {
Expand Down
6 changes: 6 additions & 0 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ func EvmTxRequestWithValue(value big.Int) func(*txmgr.TxRequest) {
}
}

func EvmTxRequestWithIdempotencyKey(idempotencyKey string) func(*txmgr.TxRequest) {
return func(tx *txmgr.TxRequest) {
tx.IdempotencyKey = &idempotencyKey
}
}

func MustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress common.Address, toAddress common.Address, encodedPayload []byte, gasLimit uint32, value big.Int, chainID *big.Int, opts ...interface{}) (tx txmgr.Tx) {
txRequest := txmgr.TxRequest{
FromAddress: fromAddress,
Expand Down
12 changes: 6 additions & 6 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/ethereum/go-ethereum v1.12.0
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/google/uuid v1.3.1
github.com/manyminds/api2go v0.0.0-20171030193247-e7b693844a6f
github.com/montanaflynn/stats v0.7.1
github.com/olekukonko/tablewriter v0.0.5
Expand Down Expand Up @@ -52,7 +52,7 @@ require (
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/ava-labs/avalanchego v1.10.1 // indirect
github.com/avast/retry-go/v4 v4.3.4 // indirect
github.com/avast/retry-go/v4 v4.5.0 // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
github.com/benbjohnson/clock v1.3.4 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down Expand Up @@ -180,7 +180,7 @@ require (
github.com/ipfs/go-log v1.0.4 // indirect
github.com/ipfs/go-log/v2 v2.1.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.0 // indirect
github.com/jackc/pgconn v1.14.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
Expand Down Expand Up @@ -294,7 +294,7 @@ require (
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/scylladb/go-reflectx v1.0.1 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shirou/gopsutil/v3 v3.22.12 // indirect
github.com/shirou/gopsutil/v3 v3.23.8 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smartcontractkit/caigo v0.0.0-20230621050857-b29a4ca8c704 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20230831132059-42af68994512 // indirect
Expand All @@ -321,7 +321,7 @@ require (
github.com/tidwall/gjson v1.16.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
Expand Down Expand Up @@ -353,7 +353,7 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/term v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
Loading

0 comments on commit 09a1c0d

Please sign in to comment.