Skip to content

Commit

Permalink
Update TxStore to use parent context created at initialization (#10735)
Browse files Browse the repository at this point in the history
* Removed pg.Opts and updated TxStore methods to use both the provided and TxStore contexts

* Updated upstream contexts

* Updated TxStore to maintain one context in scope at a time

* Updated Fluxmonitor context usage

* Wired remaining TxStore methods to upstream contexts

---------

Co-authored-by: Prashant Yadav <[email protected]>
  • Loading branch information
amit-momin and prashantkumar1982 authored Oct 4, 2023
1 parent 7547e57 commit 61ccf8b
Show file tree
Hide file tree
Showing 40 changed files with 956 additions and 1,011 deletions.
18 changes: 11 additions & 7 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
for {
maxInFlightTransactions := eb.txConfig.MaxInFlight()
if maxInFlightTransactions > 0 {
nUnconfirmed, err := eb.txStore.CountUnconfirmedTransactions(fromAddress, eb.chainID)
nUnconfirmed, err := eb.txStore.CountUnconfirmedTransactions(ctx, fromAddress, eb.chainID)
if err != nil {
return true, errors.Wrap(err, "CountUnconfirmedTransactions failed")
}
if nUnconfirmed >= maxInFlightTransactions {
nUnstarted, err := eb.txStore.CountUnstartedTransactions(fromAddress, eb.chainID)
nUnstarted, err := eb.txStore.CountUnstartedTransactions(ctx, fromAddress, eb.chainID)
if err != nil {
return true, errors.Wrap(err, "CountUnstartedTransactions failed")
}
Expand Down Expand Up @@ -477,7 +477,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
return retryable, errors.Wrap(err, "processUnstartedTxs failed on NewAttempt")
}

if err := eb.txStore.UpdateTxUnstartedToInProgress(etx, &a); errors.Is(err, ErrTxRemoved) {
if err := eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &a); errors.Is(err, ErrTxRemoved) {
eb.logger.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject)
continue
} else if err != nil {
Expand All @@ -493,7 +493,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
// handleInProgressTx checks if there is any transaction
// in_progress and if so, finishes the job
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleAnyInProgressTx(ctx context.Context, fromAddress ADDR) (err error, retryable bool) {
etx, err := eb.txStore.GetTxInProgress(fromAddress)
etx, err := eb.txStore.GetTxInProgress(ctx, fromAddress)
if err != nil {
return errors.Wrap(err, "handleAnyInProgressTx failed"), true
}
Expand Down Expand Up @@ -668,8 +668,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// Finds next transaction in the queue, assigns a sequence, and moves it to "in_progress" state ready for broadcast.
// Returns nil if no transactions are in queue
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) nextUnstartedTransactionWithSequence(fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(etx, fromAddress, eb.chainID); err != nil {
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
// Finish. No more transactions left to process. Hoorah!
return nil, nil
Expand Down Expand Up @@ -722,14 +724,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint32) (err error, retyrable bool) {
if err = eb.txStore.SaveReplacementInProgressAttempt(attempt, &replacementAttempt); err != nil {
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
return errors.Wrap(err, "tryAgainWithNewFee failed"), true
}
lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit)
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
if etx.State != TxInProgress {
return errors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State)
}
Expand All @@ -756,7 +760,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
return errors.Wrap(err, "failed to resume pipeline")
}
}
return eb.txStore.UpdateTxFatalError(etx)
return eb.txStore.UpdateTxFatalError(ctx, etx)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getNextSequence(address ADDR) (sequence SEQ, err error) {
Expand Down
37 changes: 18 additions & 19 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -279,7 +278,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

ec.lggr.Debugw("processHead start", "headNum", head.BlockNumber(), "id", "confirmer")

if err := ec.txStore.SetBroadcastBeforeBlockNum(head.BlockNumber(), ec.chainID); err != nil {
if err := ec.txStore.SetBroadcastBeforeBlockNum(ctx, head.BlockNumber(), ec.chainID); err != nil {
return errors.Wrap(err, "SetBroadcastBeforeBlockNum failed")
}
if err := ec.CheckConfirmedMissingReceipt(ctx); err != nil {
Expand Down Expand Up @@ -340,7 +339,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro
//
// This scenario might sound unlikely but has been observed to happen multiple times in the wild on Polygon.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckConfirmedMissingReceipt(ctx context.Context) (err error) {
attempts, err := ec.txStore.FindTxAttemptsConfirmedMissingReceipt(ec.chainID)
attempts, err := ec.txStore.FindTxAttemptsConfirmedMissingReceipt(ctx, ec.chainID)
if err != nil {
return err
}
Expand All @@ -351,7 +350,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
txCodes, txErrs, broadcastTime, txIDs, err := ec.client.BatchSendTransactions(ctx, attempts, int(ec.chainConfig.RPCDefaultBatchSize()), ec.lggr)
// update broadcast times before checking additional errors
if len(txIDs) > 0 {
if updateErr := ec.txStore.UpdateBroadcastAts(broadcastTime, txIDs); updateErr != nil {
if updateErr := ec.txStore.UpdateBroadcastAts(ctx, broadcastTime, txIDs); updateErr != nil {
err = fmt.Errorf("%w: failed to update broadcast time: %w", err, updateErr)
}
}
Expand All @@ -369,7 +368,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che

txIDsToUnconfirm = append(txIDsToUnconfirm, attempts[idx].TxID)
}
err = ec.txStore.UpdateTxsUnconfirmed(txIDsToUnconfirm)
err = ec.txStore.UpdateTxsUnconfirmed(ctx, txIDsToUnconfirm)

if err != nil {
return err
Expand All @@ -379,7 +378,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che

// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error {
attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ec.chainID)
attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ctx, ec.chainID)
if err != nil {
return errors.Wrap(err, "FindTxAttemptsRequiringReceiptFetch failed")
}
Expand Down Expand Up @@ -421,11 +420,11 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
}
}

if err := ec.txStore.MarkAllConfirmedMissingReceipt(ec.chainID); err != nil {
if err := ec.txStore.MarkAllConfirmedMissingReceipt(ctx, ec.chainID); err != nil {
return errors.Wrap(err, "unable to mark txes as 'confirmed_missing_receipt'")
}

if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil {
if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil {
return errors.Wrap(err, "unable to confirm buried unconfirmed txes")
}
return nil
Expand Down Expand Up @@ -489,7 +488,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
if err != nil {
return errors.Wrap(err, "batchFetchReceipts failed")
}
if err := ec.txStore.SaveFetchedReceipts(receipts, ec.chainID); err != nil {
if err := ec.txStore.SaveFetchedReceipts(ctx, receipts, ec.chainID); err != nil {
return errors.Wrap(err, "saveFetchedReceipts failed")
}
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(receipts)))
Expand All @@ -511,7 +510,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) get
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) batchFetchReceipts(ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], blockNum int64) (receipts []R, err error) {
// Metadata is required to determine whether a tx is forwarded or not.
if ec.txConfig.ForwardersEnabled() {
err = ec.txStore.PreloadTxes(attempts)
err = ec.txStore.PreloadTxes(ctx, attempts)
if err != nil {
return nil, errors.Wrap(err, "Confirmer#batchFetchReceipts error loading txs for attempts")
}
Expand Down Expand Up @@ -648,7 +647,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reb

lggr.Debugw("Rebroadcasting transaction", "nPreviousAttempts", len(etx.TxAttempts), "fee", attempt.TxFee)

if err := ec.txStore.SaveInProgressAttempt(&attempt); err != nil {
if err := ec.txStore.SaveInProgressAttempt(ctx, &attempt); err != nil {
return errors.Wrap(err, "saveInProgressAttempt failed")
}

Expand Down Expand Up @@ -687,7 +686,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address ADDR, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// NOTE: These two queries could be combined into one using union but it
// becomes harder to read and difficult to test in isolation. KISS principle
etxInsufficientFunds, err := ec.txStore.FindTxsRequiringResubmissionDueToInsufficientFunds(address, chainID, pg.WithParentCtx(ctx))
etxInsufficientFunds, err := ec.txStore.FindTxsRequiringResubmissionDueToInsufficientFunds(ctx, address, chainID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -825,7 +824,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
ec.lggr.Warnw("Got terminally underpriced error for gas bump, this should never happen unless the remote RPC node changed its configuration on the fly, or you are using multiple RPC nodes with different minimum gas price requirements. This is not recommended", "err", sendError, "attempt", attempt)
// "Lazily" load attempts here since the overwhelmingly common case is
// that we don't need them unless we enter this path
if err := ec.txStore.LoadTxAttempts(&etx, pg.WithParentCtx(ctx)); err != nil {
if err := ec.txStore.LoadTxAttempts(ctx, &etx); err != nil {
return errors.Wrap(err, "failed to load TxAttempts while bumping on terminally underpriced error")
}
if len(etx.TxAttempts) == 0 {
Expand All @@ -850,7 +849,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
"replacementAttempt", replacementAttempt,
).Errorf("gas price was rejected by the node for being too low. Node returned: '%s'", sendError.Error())

if err := ec.txStore.SaveReplacementInProgressAttempt(attempt, &replacementAttempt); err != nil {
if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
return errors.Wrap(err, "saveReplacementInProgressAttempt failed")
}
return ec.handleInProgressAttempt(ctx, lggr, etx, replacementAttempt, blockHeight)
Expand Down Expand Up @@ -882,11 +881,11 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
return ec.txStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, &attempt, now)
case clienttypes.InsufficientFunds:
timeout := ec.dbConfig.DefaultQueryTimeout()
return ec.txStore.SaveInsufficientFundsAttempt(timeout, &attempt, now)
return ec.txStore.SaveInsufficientFundsAttempt(ctx, timeout, &attempt, now)
case clienttypes.Successful:
lggr.Debugw("Successfully broadcast transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash.String())
timeout := ec.dbConfig.DefaultQueryTimeout()
return ec.txStore.SaveSentAttempt(timeout, &attempt, now)
return ec.txStore.SaveSentAttempt(ctx, timeout, &attempt, now)
case clienttypes.Unknown:
// Every error that doesn't fall under one of the above categories will be treated as Unknown.
fallthrough
Expand Down Expand Up @@ -921,7 +920,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens
} else {
ec.nConsecutiveBlocksChainTooShort = 0
}
etxs, err := ec.txStore.FindTransactionsConfirmedInBlockRange(head.BlockNumber(), head.EarliestHeadInChain().BlockNumber(), ec.chainID)
etxs, err := ec.txStore.FindTransactionsConfirmedInBlockRange(ctx, head.BlockNumber(), head.EarliestHeadInChain().BlockNumber(), ec.chainID)
if err != nil {
return errors.Wrap(err, "findTransactionsConfirmedInBlockRange failed")
}
Expand Down Expand Up @@ -1015,7 +1014,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
ec.lggr.Infow(fmt.Sprintf("Re-org detected. Rebroadcasting transaction %s which may have been re-org'd out of the main chain", attempt.Hash.String()), logValues...)

// Put it back in progress and delete all receipts (they do not apply to the new chain)
err := ec.txStore.UpdateTxForRebroadcast(etx, attempt)
err := ec.txStore.UpdateTxForRebroadcast(ec.ctx, etx, attempt)
return errors.Wrap(err, "markForRebroadcast failed")
}

Expand All @@ -1034,7 +1033,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) For

for _, seq := range seqs {

etx, err := ec.txStore.FindTxWithSequence(address, seq)
etx, err := ec.txStore.FindTxWithSequence(context.TODO(), address, seq)
if err != nil {
return errors.Wrap(err, "ForceRebroadcast failed")
}
Expand Down
45 changes: 18 additions & 27 deletions common/txmgr/mocks/tx_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func (r *Reaper[CHAIN_ID]) SetLatestBlockNum(latestBlockNum int64) {

// ReapTxes deletes old txes
func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {
ctx, cancel := utils.StopChan(r.chStop).NewCtx()
defer cancel()
threshold := r.txConfig.ReaperThreshold()
if threshold == 0 {
r.log.Debug("Transactions.ReaperThreshold set to 0; skipping ReapTxes")
Expand All @@ -108,7 +110,7 @@ func (r *Reaper[CHAIN_ID]) ReapTxes(headNum int64) error {

r.log.Debugw(fmt.Sprintf("reaping old txes created before %s", timeThreshold.Format(time.RFC3339)), "ageThreshold", threshold, "timeThreshold", timeThreshold, "minBlockNumberToKeep", minBlockNumberToKeep)

if err := r.store.ReapTxHistory(minBlockNumberToKeep, timeThreshold, r.chainID); err != nil {
if err := r.store.ReapTxHistory(ctx, minBlockNumberToKeep, timeThreshold, r.chainID); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 61ccf8b

Please sign in to comment.