From fbd4413692617abc4afc61cbe46d564b0e58e900 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 7 May 2024 13:46:20 +0200 Subject: [PATCH 1/2] Test --- core/chains/evm/txmgr/evm_tx_store.go | 156 ++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 8187a39087..82a3e4ed06 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -16,6 +16,8 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" pkgerrors "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" nullv4 "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -32,6 +34,36 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) +var ( + sqlLatencyBuckets = []float64{ + float64(1 * time.Millisecond), + float64(5 * time.Millisecond), + float64(10 * time.Millisecond), + float64(20 * time.Millisecond), + float64(30 * time.Millisecond), + float64(40 * time.Millisecond), + float64(50 * time.Millisecond), + float64(60 * time.Millisecond), + float64(70 * time.Millisecond), + float64(80 * time.Millisecond), + float64(90 * time.Millisecond), + float64(100 * time.Millisecond), + float64(200 * time.Millisecond), + float64(300 * time.Millisecond), + float64(400 * time.Millisecond), + float64(500 * time.Millisecond), + float64(750 * time.Millisecond), + float64(1 * time.Second), + float64(2 * time.Second), + float64(5 * time.Second), + } + txmgrQueryDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "txm_query_duration", + Help: "Measures duration of TXM's queries", + Buckets: sqlLatencyBuckets, + }, []string{"evmChainID", "query"}) +) + var ( ErrKeyNotUpdated = errors.New("evmTxStore: Key not updated") ErrInvalidQOpt = errors.New("evmTxStore: Invalid QOpt") @@ -705,6 +737,9 @@ func loadConfirmedAttemptsReceipts(q pg.Queryer, attempts []TxAttempt) error { // FindTxAttemptsRequiringResend returns the highest priced attempt for each // eth_tx that was last sent before or at the given time (up to limit) func (o *evmTxStore) FindTxAttemptsRequiringResend(ctx context.Context, olderThan time.Time, maxInFlightTransactions uint32, chainID *big.Int, address common.Address) (attempts []TxAttempt, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxAttemptsRequiringResend", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -749,6 +784,9 @@ func (o *evmTxStore) UpdateBroadcastAts(ctx context.Context, now time.Time, etxI // current block number. This is safe no matter how old the head is because if // the attempt is already broadcast it _must_ have been before this head. func (o *evmTxStore) SetBroadcastBeforeBlockNum(ctx context.Context, blockNum int64, chainID *big.Int) error { + start := time.Now() + defer func() { reportQueryDuration("SetBroadcastBeforeBlockNum", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -765,6 +803,9 @@ AND evm.txes.id = evm.tx_attempts.eth_tx_id AND evm.txes.evm_chain_id = $2`, } func (o *evmTxStore) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, chainID *big.Int) (attempts []TxAttempt, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxAttemptsConfirmedMissingReceipt", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -785,6 +826,9 @@ func (o *evmTxStore) FindTxAttemptsConfirmedMissingReceipt(ctx context.Context, } func (o *evmTxStore) UpdateTxsUnconfirmed(ctx context.Context, ids []int64) error { + start := time.Now() + defer func() { reportQueryDuration("UpdateTxsUnconfirmed", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -798,6 +842,9 @@ func (o *evmTxStore) UpdateTxsUnconfirmed(ctx context.Context, ids []int64) erro } func (o *evmTxStore) FindTxAttemptsRequiringReceiptFetch(ctx context.Context, chainID *big.Int) (attempts []TxAttempt, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxAttemptsRequiringReceiptFetch", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -821,6 +868,9 @@ ORDER BY evm.txes.nonce ASC, evm.tx_attempts.gas_price DESC, evm.tx_attempts.gas } func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Receipt, chainID *big.Int) (err error) { + start := time.Now() + defer func() { reportQueryDuration("SaveFetchedReceipts", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -926,6 +976,9 @@ func (o *evmTxStore) SaveFetchedReceipts(ctx context.Context, r []*evmtypes.Rece // We will continue to try to fetch a receipt for these attempts until all // attempts are below the finality depth from current head. func (o *evmTxStore) MarkAllConfirmedMissingReceipt(ctx context.Context, chainID *big.Int) (err error) { + start := time.Now() + defer func() { reportQueryDuration("MarkAllConfirmedMissingReceipt", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -958,6 +1011,9 @@ WHERE state = 'unconfirmed' } func (o *evmTxStore) GetInProgressTxAttempts(ctx context.Context, address common.Address, chainID *big.Int) (attempts []TxAttempt, err error) { + start := time.Now() + defer func() { reportQueryDuration("GetInProgressTxAttempts", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -981,6 +1037,9 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e // Find confirmed txes requiring callback but have not yet been signaled func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxesPendingCallback", chainID, start) }() + var rs []dbReceiptPlus var cancel context.CancelFunc @@ -1002,6 +1061,9 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64 // Update tx to mark that its callback has been signaled func (o *evmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunId uuid.UUID, chainId *big.Int) error { + start := time.Now() + defer func() { reportQueryDuration("UpdateTxCallbackCompleted", chainId, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1014,6 +1076,9 @@ func (o *evmTxStore) UpdateTxCallbackCompleted(ctx context.Context, pipelineTask } func (o *evmTxStore) FindLatestSequence(ctx context.Context, fromAddress common.Address, chainId *big.Int) (nonce evmtypes.Nonce, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindLatestSequence", chainId, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1025,6 +1090,9 @@ func (o *evmTxStore) FindLatestSequence(ctx context.Context, fromAddress common. // FindTxWithIdempotencyKey returns any broadcast ethtx with the given idempotencyKey and chainID func (o *evmTxStore) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKey string, chainID *big.Int) (etx *Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxWithIdempotencyKey", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1044,6 +1112,9 @@ func (o *evmTxStore) FindTxWithIdempotencyKey(ctx context.Context, idempotencyKe // FindTxWithSequence returns any broadcast ethtx with the given nonce func (o *evmTxStore) FindTxWithSequence(ctx context.Context, fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxWithSequence", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1068,6 +1139,9 @@ SELECT * FROM evm.txes WHERE from_address = $1 AND nonce = $2 AND state IN ('con } func updateEthTxAttemptUnbroadcast(q pg.Queryer, attempt TxAttempt) error { + start := time.Now() + defer func() { reportQueryDuration("updateEthTxAttemptUnbroadcast", nil, start) }() + if attempt.State != txmgrtypes.TxAttemptBroadcast { return errors.New("expected eth_tx_attempt to be broadcast") } @@ -1076,6 +1150,9 @@ func updateEthTxAttemptUnbroadcast(q pg.Queryer, attempt TxAttempt) error { } func updateEthTxUnconfirm(q pg.Queryer, etx Tx) error { + start := time.Now() + defer func() { reportQueryDuration("updateEthTxUnconfirm", nil, start) }() + if etx.State != txmgr.TxConfirmed { return errors.New("expected eth_tx state to be confirmed") } @@ -1084,6 +1161,9 @@ func updateEthTxUnconfirm(q pg.Queryer, etx Tx) error { } func deleteEthReceipts(q pg.Queryer, etxID int64) (err error) { + start := time.Now() + defer func() { reportQueryDuration("deleteEthReceipts", nil, start) }() + _, err = q.Exec(` DELETE FROM evm.receipts USING evm.tx_attempts @@ -1110,6 +1190,9 @@ func (o *evmTxStore) UpdateTxForRebroadcast(ctx context.Context, etx Tx, etxAtte } func (o *evmTxStore) FindTransactionsConfirmedInBlockRange(ctx context.Context, highBlockNumber, lowBlockNumber int64, chainID *big.Int) (etxs []*Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTransactionsConfirmedInBlockRange", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1138,6 +1221,9 @@ ORDER BY nonce ASC } func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, chainID *big.Int) (broadcastAt nullv4.Time, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindEarliestUnconfirmedBroadcastTime", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1152,6 +1238,9 @@ func (o *evmTxStore) FindEarliestUnconfirmedBroadcastTime(ctx context.Context, c } func (o *evmTxStore) FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID *big.Int) (earliestUnconfirmedTxBlock nullv4.Int, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindEarliestUnconfirmedTxAttemptBlock", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1171,6 +1260,9 @@ AND evm_chain_id = $1`, chainID.String()).Scan(&earliestUnconfirmedTxBlock) } func (o *evmTxStore) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID *big.Int) (finalized bool, err error) { + start := time.Now() + defer func() { reportQueryDuration("IsTxFinalized", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1205,6 +1297,9 @@ func saveAttemptWithNewState(ctx context.Context, q pg.Queryer, logger logger.Lo } func (o *evmTxStore) SaveInsufficientFundsAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt, broadcastAt time.Time) error { + start := time.Now() + defer func() { reportQueryDuration("SaveInsufficientFundsAttempt", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1229,6 +1324,9 @@ func saveSentAttempt(ctx context.Context, q pg.Queryer, timeout time.Duration, l } func (o *evmTxStore) SaveSentAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt, broadcastAt time.Time) error { + start := time.Now() + defer func() { reportQueryDuration("SaveSentAttempt", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1237,6 +1335,9 @@ func (o *evmTxStore) SaveSentAttempt(ctx context.Context, timeout time.Duration, } func (o *evmTxStore) SaveConfirmedMissingReceiptAttempt(ctx context.Context, timeout time.Duration, attempt *TxAttempt, broadcastAt time.Time) error { + start := time.Now() + defer func() { reportQueryDuration("SaveConfirmedMissingReceiptAttempt", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1255,6 +1356,9 @@ func (o *evmTxStore) SaveConfirmedMissingReceiptAttempt(ctx context.Context, tim } func (o *evmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt TxAttempt) error { + start := time.Now() + defer func() { reportQueryDuration("DeleteInProgressAttempt", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1271,6 +1375,9 @@ func (o *evmTxStore) DeleteInProgressAttempt(ctx context.Context, attempt TxAtte // SaveInProgressAttempt inserts or updates an attempt func (o *evmTxStore) SaveInProgressAttempt(ctx context.Context, attempt *TxAttempt) error { + start := time.Now() + defer func() { reportQueryDuration("SaveInProgressAttempt", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1306,6 +1413,9 @@ func (o *evmTxStore) SaveInProgressAttempt(ctx context.Context, attempt *TxAttem } func (o *evmTxStore) GetNonFatalTransactions(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("GetNonFatalTransactions", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1329,6 +1439,9 @@ func (o *evmTxStore) GetNonFatalTransactions(ctx context.Context, chainID *big.I } func (o *evmTxStore) GetTxByID(ctx context.Context, id int64) (txe *Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("GetTxByID", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1365,6 +1478,9 @@ func (o *evmTxStore) FindTxsRequiringGasBump(ctx context.Context, address common if gasBumpThreshold == 0 { return } + start := time.Now() + defer func() { reportQueryDuration("FindTxsRequiringGasBump", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1393,6 +1509,9 @@ ORDER BY nonce ASC // that need to be re-sent because they hit an out-of-eth error on a previous // block func (o *evmTxStore) FindTxsRequiringResubmissionDueToInsufficientFunds(ctx context.Context, address common.Address, chainID *big.Int) (etxs []*Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("FindTxsRequiringResubmissionDueToInsufficientFunds", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1424,6 +1543,9 @@ ORDER BY nonce ASC // The job run will also be marked as errored in this case since we never got a // receipt and thus cannot pass on any transaction hash func (o *evmTxStore) MarkOldTxesMissingReceiptAsErrored(ctx context.Context, blockNum int64, finalityDepth uint32, chainID *big.Int) error { + start := time.Now() + defer func() { reportQueryDuration("MarkOldTxesMissingReceiptAsErrored", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1518,6 +1640,9 @@ GROUP BY e.id } func (o *evmTxStore) SaveReplacementInProgressAttempt(ctx context.Context, oldAttempt TxAttempt, replacementAttempt *TxAttempt) error { + start := time.Now() + defer func() { reportQueryDuration("SaveReplacementInProgressAttempt", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1546,6 +1671,9 @@ func (o *evmTxStore) SaveReplacementInProgressAttempt(ctx context.Context, oldAt // Finds earliest saved transaction that has yet to be broadcast from the given address func (o *evmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Context, fromAddress common.Address, chainID *big.Int) (*Tx, error) { + start := time.Now() + defer func() { reportQueryDuration("FindNextUnstartedTransactionFromAddress", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1562,6 +1690,9 @@ func (o *evmTxStore) FindNextUnstartedTransactionFromAddress(ctx context.Context } func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error { + start := time.Now() + defer func() { reportQueryDuration("UpdateTxFatalError", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1590,6 +1721,9 @@ func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error { // Updates eth attempt from in_progress to broadcast. Also updates the eth tx to unconfirmed. func (o *evmTxStore) UpdateTxAttemptInProgressToBroadcast(ctx context.Context, etx *Tx, attempt TxAttempt, NewAttemptState txmgrtypes.TxAttemptState) error { + start := time.Now() + defer func() { reportQueryDuration("UpdateTxAttemptInProgressToBroadcast", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1629,6 +1763,9 @@ func (o *evmTxStore) UpdateTxAttemptInProgressToBroadcast(ctx context.Context, e // Updates eth tx from unstarted to in_progress and inserts in_progress eth attempt func (o *evmTxStore) UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx, attempt *TxAttempt) error { + start := time.Now() + defer func() { reportQueryDuration("UpdateTxUnstartedToInProgress", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1702,6 +1839,9 @@ func (o *evmTxStore) UpdateTxUnstartedToInProgress(ctx context.Context, etx *Tx, // the node crashed in the middle of the ProcessUnstartedEthTxs loop. // It may or may not have been broadcast to an eth node. func (o *evmTxStore) GetTxInProgress(ctx context.Context, fromAddress common.Address) (etx *Tx, err error) { + start := time.Now() + defer func() { reportQueryDuration("GetTxInProgress", nil, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1734,6 +1874,9 @@ func (o *evmTxStore) GetTxInProgress(ctx context.Context, fromAddress common.Add } func (o *evmTxStore) HasInProgressTransaction(ctx context.Context, account common.Address, chainID *big.Int) (exists bool, err error) { + start := time.Now() + defer func() { reportQueryDuration("HasInProgressTransaction", chainID, start) }() + var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() @@ -1743,6 +1886,9 @@ func (o *evmTxStore) HasInProgressTransaction(ctx context.Context, account commo } func (o *evmTxStore) UpdateKeyNextSequence(newNextNonce, currentNextNonce evmtypes.Nonce, address common.Address, chainID *big.Int, qopts ...pg.QOpt) error { + start := time.Now() + defer func() { reportQueryDuration("UpdateKeyNextSequence", chainID, start) }() + qq := o.q.WithOpts(qopts...) return qq.Transaction(func(tx pg.Queryer) error { // We filter by next_nonce here as an optimistic lock to make sure it @@ -2088,3 +2234,13 @@ func (o *evmTxStore) mergeContexts(ctx context.Context) (context.Context, contex cancel(context.Canceled) } } + +func reportQueryDuration(query string, chainID *big.Int, queryStarted time.Time) { + stringChainId := "" + if chainID != nil { + stringChainId = chainID.String() + } + txmgrQueryDuration. + WithLabelValues(stringChainId, query). + Observe(time.Since(queryStarted).Seconds()) +} From 22dafbc52fbdd7acb60f97186c8e1e5bc9c7b216 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 7 May 2024 14:33:01 +0200 Subject: [PATCH 2/2] Test --- core/chains/evm/txmgr/evm_tx_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 82a3e4ed06..71510547ab 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -2242,5 +2242,5 @@ func reportQueryDuration(query string, chainID *big.Int, queryStarted time.Time) } txmgrQueryDuration. WithLabelValues(stringChainId, query). - Observe(time.Since(queryStarted).Seconds()) + Observe(float64(time.Since(queryStarted))) }