diff --git a/.changeset/cuddly-eels-lay.md b/.changeset/cuddly-eels-lay.md new file mode 100644 index 00000000000..25b38d3164d --- /dev/null +++ b/.changeset/cuddly-eels-lay.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Improve TXM performance by optimizing Confirmer and Finalizer queries to stop pulling EVM receipt. #internal diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 5489a57e636..3d874cc4366 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -79,6 +79,8 @@ type TransactionStore[ // Search for Tx using the fromAddress and sequence FindTxWithSequence(ctx context.Context, fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindNextUnstartedTransactionFromAddress(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) (*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) + + // FindTransactionsConfirmedInBlockRange retrieves tx with attempts and partial receipt values for optimization purpose FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID CHAIN_ID) (null.Time, error) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 4bdf191376b..fa2251168d9 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -669,10 +669,8 @@ func (o *evmTxStore) loadEthTxAttemptsReceipts(ctx context.Context, etx *Tx) (er return o.loadEthTxesAttemptsReceipts(ctx, []*Tx{etx}) } -func (o *evmTxStore) loadEthTxesAttemptsReceipts(ctx context.Context, etxs []*Tx) (err error) { - if len(etxs) == 0 { - return nil - } +// initEthTxesAttempts takes an input txes slice, return an initialized attempt map and attemptHashes slice +func initEthTxesAttempts(etxs []*Tx) (map[common.Hash]*TxAttempt, [][]byte) { attemptHashM := make(map[common.Hash]*TxAttempt, len(etxs)) // len here is lower bound attemptHashes := make([][]byte, len(etxs)) // len here is lower bound for _, etx := range etxs { @@ -681,6 +679,16 @@ func (o *evmTxStore) loadEthTxesAttemptsReceipts(ctx context.Context, etxs []*Tx attemptHashes = append(attemptHashes, attempt.Hash.Bytes()) } } + + return attemptHashM, attemptHashes +} + +func (o *evmTxStore) loadEthTxesAttemptsReceipts(ctx context.Context, etxs []*Tx) (err error) { + if len(etxs) == 0 { + return nil + } + + attemptHashM, attemptHashes := initEthTxesAttempts(etxs) var rs []DbReceipt if err = o.q.SelectContext(ctx, &rs, `SELECT * FROM evm.receipts WHERE tx_hash = ANY($1)`, pq.Array(attemptHashes)); err != nil { return pkgerrors.Wrap(err, "loadEthTxesAttemptsReceipts failed to load evm.receipts") @@ -697,6 +705,37 @@ func (o *evmTxStore) loadEthTxesAttemptsReceipts(ctx context.Context, etxs []*Tx return nil } +// loadEthTxesAttemptsWithPartialReceipts loads ethTxes with attempts and partial receipts values for optimization +func (o *evmTxStore) loadEthTxesAttemptsWithPartialReceipts(ctx context.Context, etxs []*Tx) (err error) { + if len(etxs) == 0 { + return nil + } + + attemptHashM, attemptHashes := initEthTxesAttempts(etxs) + var rs []DbReceipt + if err = o.q.SelectContext(ctx, &rs, `SELECT evm.receipts.block_hash, evm.receipts.block_number, evm.receipts.transaction_index, evm.receipts.tx_hash FROM evm.receipts WHERE tx_hash = ANY($1)`, pq.Array(attemptHashes)); err != nil { + return pkgerrors.Wrap(err, "loadEthTxesAttemptsReceipts failed to load evm.receipts") + } + + receipts := make([]*evmtypes.Receipt, len(rs)) + for i := 0; i < len(rs); i++ { + receipts[i] = &evmtypes.Receipt{ + BlockHash: rs[i].BlockHash, + BlockNumber: big.NewInt(rs[i].BlockNumber), + TransactionIndex: rs[i].TransactionIndex, + TxHash: rs[i].TxHash, + } + } + + for _, receipt := range receipts { + attempt := attemptHashM[receipt.TxHash] + // Although the attempts struct supports multiple receipts, the expectation for EVM is that there is only one receipt + // per tx and therefore attempt too. + attempt.Receipts = append(attempt.Receipts, receipt) + } + return nil +} + func loadConfirmedAttemptsReceipts(ctx context.Context, q sqlutil.DataSource, attempts []TxAttempt) error { byHash := make(map[string]*TxAttempt, len(attempts)) hashes := make([][]byte, len(attempts)) @@ -1172,7 +1211,9 @@ ORDER BY nonce ASC if err = orm.LoadTxesAttempts(ctx, etxs); err != nil { return pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed to load evm.tx_attempts") } - err = orm.loadEthTxesAttemptsReceipts(ctx, etxs) + + // retrieve tx with attempts and partial receipt values for optimization purpose + err = orm.loadEthTxesAttemptsWithPartialReceipts(ctx, etxs) return pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed to load evm.receipts") }) return etxs, pkgerrors.Wrap(err, "FindTransactionsConfirmedInBlockRange failed") @@ -2068,24 +2109,18 @@ func (o *evmTxStore) UpdateTxAttemptBroadcastBeforeBlockNum(ctx context.Context, return err } -// Returns all confirmed transactions with receipt block nums older than or equal to the finalized block number +// FindConfirmedTxesReceipts Returns all confirmed transactions with receipt block nums older than or equal to the finalized block number func (o *evmTxStore) FindConfirmedTxesReceipts(ctx context.Context, finalizedBlockNum int64, chainID *big.Int) (receipts []Receipt, err error) { var cancel context.CancelFunc ctx, cancel = o.stopCh.Ctx(ctx) defer cancel() - err = o.Transact(ctx, true, func(orm *evmTxStore) error { - sql := `SELECT evm.receipts.* FROM evm.receipts + + // note the receipts are partially loaded for performance reason + query := `SELECT evm.receipts.id, evm.receipts.tx_hash, evm.receipts.block_hash, evm.receipts.block_number FROM evm.receipts INNER JOIN evm.tx_attempts ON evm.tx_attempts.hash = evm.receipts.tx_hash INNER JOIN evm.txes ON evm.txes.id = evm.tx_attempts.eth_tx_id WHERE evm.txes.state = 'confirmed' AND evm.receipts.block_number <= $1 AND evm.txes.evm_chain_id = $2` - var dbReceipts []DbReceipt - err = o.q.SelectContext(ctx, &dbReceipts, sql, finalizedBlockNum, chainID.String()) - if len(dbReceipts) == 0 { - return nil - } - receipts = dbReceipts - return nil - }) + err = o.q.SelectContext(ctx, &receipts, query, finalizedBlockNum, chainID.String()) return receipts, err } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 992bd1f434c..c711c2788e8 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -816,6 +816,12 @@ func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { assert.Equal(t, etxes[0].Sequence, etx_8.Sequence) assert.Equal(t, etxes[1].Sequence, etx_9.Sequence) }) + + t.Run("return empty txes when no transactions in range found", func(t *testing.T) { + etxes, err := txStore.FindTransactionsConfirmedInBlockRange(tests.Context(t), 0, 0, ethClient.ConfiguredChainID()) + require.NoError(t, err) + assert.Len(t, etxes, 0) + }) } func TestORM_FindEarliestUnconfirmedBroadcastTime(t *testing.T) { diff --git a/core/chains/evm/txmgr/finalizer.go b/core/chains/evm/txmgr/finalizer.go index 6d5fb81782c..60744636159 100644 --- a/core/chains/evm/txmgr/finalizer.go +++ b/core/chains/evm/txmgr/finalizer.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ) var _ Finalizer = (*evmFinalizer)(nil) @@ -174,7 +175,7 @@ func (f *evmFinalizer) processFinalizedHead(ctx context.Context, latestFinalized // Find transactions with receipt block nums older than the latest finalized block num and block hashes still in chain for _, receipt := range unfinalizedReceipts { // The tx store query ensures transactions have receipts but leaving this check here for a belts and braces approach - if receipt.Receipt.IsZero() || receipt.Receipt.IsUnmined() { + if receipt.TxHash == utils.EmptyHash || receipt.BlockHash == utils.EmptyHash { f.lggr.AssumptionViolationw("invalid receipt found for confirmed transaction", "receipt", receipt) continue }