Skip to content

Commit

Permalink
Merge branch 'develop' into feature/BCF-2393-multichain-telemetry-sup…
Browse files Browse the repository at this point in the history
…port
  • Loading branch information
george-dorin authored Oct 4, 2023
2 parents af67041 + dd3a699 commit 6125405
Show file tree
Hide file tree
Showing 193 changed files with 4,654 additions and 2,203 deletions.
24 changes: 22 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ jobs:
- run: echo "this exists so we don't have to run anything else if the build is skipped"
if: needs.changes.outputs.src == 'false' || needs.solana-test-image-exists.outputs.exists == 'true'

solana-smoke-tests:
solana-smoke-tests-matrix:
if: ${{ !contains(join(github.event.pull_request.labels.*.name, ' '), 'skip-smoke-tests') }}
environment: integration
permissions:
Expand All @@ -674,7 +674,7 @@ jobs:
strategy:
matrix:
image:
- name: ""
- name: (legacy)
tag-suffix: ""
- name: (plugins)
tag-suffix: -plugins
Expand Down Expand Up @@ -732,6 +732,26 @@ jobs:
path: /tmp/gotest.log
retention-days: 7
continue-on-error: true

### Used to check the required checks box when the matrix completes
solana-smoke-tests:
if: always()
runs-on: ubuntu-latest
name: Solana Smoke Tests
needs: [solana-smoke-tests-matrix]
steps:
- name: Check smoke test matrix status
if: needs.solana-smoke-tests-matrix.result != 'success'
run: exit 1
- name: Collect Metrics
if: always()
id: collect-gha-metrics
uses: smartcontractkit/push-gha-metrics-action@d2c2b7bdc9012651230b2608a1bcb0c48538b6ec
with:
basic-auth: ${{ secrets.GRAFANA_CLOUD_BASIC_AUTH }}
hostname: ${{ secrets.GRAFANA_CLOUD_HOST }}
this-job-name: Solana Smoke Tests
continue-on-error: true
### End Solana Section

### Start Live Testnet Section
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint-gh-workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Check out Code
uses: actions/checkout@f43a0e5ff2bd294095638e18286ca9a3d1956744 # v3.6.0
- name: Run actionlint
uses: reviewdog/action-actionlint@17ea0452ae2cd009a22ca629732a9ce7f49a55e6 # v1.39.0
uses: reviewdog/action-actionlint@82693e9e3b239f213108d6e412506f8b54003586 # v1.39.1
- name: Collect Metrics
if: always()
id: collect-gha-metrics
Expand Down
18 changes: 11 additions & 7 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
for {
maxInFlightTransactions := eb.txConfig.MaxInFlight()
if maxInFlightTransactions > 0 {
nUnconfirmed, err := eb.txStore.CountUnconfirmedTransactions(fromAddress, eb.chainID)
nUnconfirmed, err := eb.txStore.CountUnconfirmedTransactions(ctx, fromAddress, eb.chainID)
if err != nil {
return true, errors.Wrap(err, "CountUnconfirmedTransactions failed")
}
if nUnconfirmed >= maxInFlightTransactions {
nUnstarted, err := eb.txStore.CountUnstartedTransactions(fromAddress, eb.chainID)
nUnstarted, err := eb.txStore.CountUnstartedTransactions(ctx, fromAddress, eb.chainID)
if err != nil {
return true, errors.Wrap(err, "CountUnstartedTransactions failed")
}
Expand Down Expand Up @@ -477,7 +477,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
return retryable, errors.Wrap(err, "processUnstartedTxs failed on NewAttempt")
}

if err := eb.txStore.UpdateTxUnstartedToInProgress(etx, &a); errors.Is(err, ErrTxRemoved) {
if err := eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &a); errors.Is(err, ErrTxRemoved) {
eb.logger.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject)
continue
} else if err != nil {
Expand All @@ -493,7 +493,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
// handleInProgressTx checks if there is any transaction
// in_progress and if so, finishes the job
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleAnyInProgressTx(ctx context.Context, fromAddress ADDR) (err error, retryable bool) {
etx, err := eb.txStore.GetTxInProgress(fromAddress)
etx, err := eb.txStore.GetTxInProgress(ctx, fromAddress)
if err != nil {
return errors.Wrap(err, "handleAnyInProgressTx failed"), true
}
Expand Down Expand Up @@ -668,8 +668,10 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// Finds next transaction in the queue, assigns a sequence, and moves it to "in_progress" state ready for broadcast.
// Returns nil if no transactions are in queue
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) nextUnstartedTransactionWithSequence(fromAddress ADDR) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(etx, fromAddress, eb.chainID); err != nil {
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
// Finish. No more transactions left to process. Hoorah!
return nil, nil
Expand Down Expand Up @@ -722,14 +724,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) tryA
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveTryAgainAttempt(ctx context.Context, lgr logger.Logger, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], replacementAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time, newFee FEE, newFeeLimit uint32) (err error, retyrable bool) {
if err = eb.txStore.SaveReplacementInProgressAttempt(attempt, &replacementAttempt); err != nil {
if err = eb.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
return errors.Wrap(err, "tryAgainWithNewFee failed"), true
}
lgr.Debugw("Bumped fee on initial send", "oldFee", attempt.TxFee.String(), "newFee", newFee.String(), "newFeeLimit", newFeeLimit)
return eb.handleInProgressTx(ctx, etx, replacementAttempt, initialBroadcastAt)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
if etx.State != TxInProgress {
return errors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State)
}
Expand All @@ -756,7 +760,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
return errors.Wrap(err, "failed to resume pipeline")
}
}
return eb.txStore.UpdateTxFatalError(etx)
return eb.txStore.UpdateTxFatalError(ctx, etx)
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getNextSequence(address ADDR) (sequence SEQ, err error) {
Expand Down
37 changes: 18 additions & 19 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -279,7 +278,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

ec.lggr.Debugw("processHead start", "headNum", head.BlockNumber(), "id", "confirmer")

if err := ec.txStore.SetBroadcastBeforeBlockNum(head.BlockNumber(), ec.chainID); err != nil {
if err := ec.txStore.SetBroadcastBeforeBlockNum(ctx, head.BlockNumber(), ec.chainID); err != nil {
return errors.Wrap(err, "SetBroadcastBeforeBlockNum failed")
}
if err := ec.CheckConfirmedMissingReceipt(ctx); err != nil {
Expand Down Expand Up @@ -340,7 +339,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro
//
// This scenario might sound unlikely but has been observed to happen multiple times in the wild on Polygon.
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckConfirmedMissingReceipt(ctx context.Context) (err error) {
attempts, err := ec.txStore.FindTxAttemptsConfirmedMissingReceipt(ec.chainID)
attempts, err := ec.txStore.FindTxAttemptsConfirmedMissingReceipt(ctx, ec.chainID)
if err != nil {
return err
}
Expand All @@ -351,7 +350,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
txCodes, txErrs, broadcastTime, txIDs, err := ec.client.BatchSendTransactions(ctx, attempts, int(ec.chainConfig.RPCDefaultBatchSize()), ec.lggr)
// update broadcast times before checking additional errors
if len(txIDs) > 0 {
if updateErr := ec.txStore.UpdateBroadcastAts(broadcastTime, txIDs); updateErr != nil {
if updateErr := ec.txStore.UpdateBroadcastAts(ctx, broadcastTime, txIDs); updateErr != nil {
err = fmt.Errorf("%w: failed to update broadcast time: %w", err, updateErr)
}
}
Expand All @@ -369,7 +368,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che

txIDsToUnconfirm = append(txIDsToUnconfirm, attempts[idx].TxID)
}
err = ec.txStore.UpdateTxsUnconfirmed(txIDsToUnconfirm)
err = ec.txStore.UpdateTxsUnconfirmed(ctx, txIDsToUnconfirm)

if err != nil {
return err
Expand All @@ -379,7 +378,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che

// CheckForReceipts finds attempts that are still pending and checks to see if a receipt is present for the given block number
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CheckForReceipts(ctx context.Context, blockNum int64) error {
attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ec.chainID)
attempts, err := ec.txStore.FindTxAttemptsRequiringReceiptFetch(ctx, ec.chainID)
if err != nil {
return errors.Wrap(err, "FindTxAttemptsRequiringReceiptFetch failed")
}
Expand Down Expand Up @@ -421,11 +420,11 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
}
}

if err := ec.txStore.MarkAllConfirmedMissingReceipt(ec.chainID); err != nil {
if err := ec.txStore.MarkAllConfirmedMissingReceipt(ctx, ec.chainID); err != nil {
return errors.Wrap(err, "unable to mark txes as 'confirmed_missing_receipt'")
}

if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil {
if err := ec.txStore.MarkOldTxesMissingReceiptAsErrored(ctx, blockNum, ec.chainConfig.FinalityDepth(), ec.chainID); err != nil {
return errors.Wrap(err, "unable to confirm buried unconfirmed txes")
}
return nil
Expand Down Expand Up @@ -489,7 +488,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
if err != nil {
return errors.Wrap(err, "batchFetchReceipts failed")
}
if err := ec.txStore.SaveFetchedReceipts(receipts, ec.chainID); err != nil {
if err := ec.txStore.SaveFetchedReceipts(ctx, receipts, ec.chainID); err != nil {
return errors.Wrap(err, "saveFetchedReceipts failed")
}
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(receipts)))
Expand All @@ -511,7 +510,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) get
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) batchFetchReceipts(ctx context.Context, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], blockNum int64) (receipts []R, err error) {
// Metadata is required to determine whether a tx is forwarded or not.
if ec.txConfig.ForwardersEnabled() {
err = ec.txStore.PreloadTxes(attempts)
err = ec.txStore.PreloadTxes(ctx, attempts)
if err != nil {
return nil, errors.Wrap(err, "Confirmer#batchFetchReceipts error loading txs for attempts")
}
Expand Down Expand Up @@ -648,7 +647,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) reb

lggr.Debugw("Rebroadcasting transaction", "nPreviousAttempts", len(etx.TxAttempts), "fee", attempt.TxFee)

if err := ec.txStore.SaveInProgressAttempt(&attempt); err != nil {
if err := ec.txStore.SaveInProgressAttempt(ctx, &attempt); err != nil {
return errors.Wrap(err, "saveInProgressAttempt failed")
}

Expand Down Expand Up @@ -687,7 +686,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequiringRebroadcast(ctx context.Context, lggr logger.Logger, address ADDR, blockNum, gasBumpThreshold, bumpDepth int64, maxInFlightTransactions uint32, chainID CHAIN_ID) (etxs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) {
// NOTE: These two queries could be combined into one using union but it
// becomes harder to read and difficult to test in isolation. KISS principle
etxInsufficientFunds, err := ec.txStore.FindTxsRequiringResubmissionDueToInsufficientFunds(address, chainID, pg.WithParentCtx(ctx))
etxInsufficientFunds, err := ec.txStore.FindTxsRequiringResubmissionDueToInsufficientFunds(ctx, address, chainID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -825,7 +824,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
ec.lggr.Warnw("Got terminally underpriced error for gas bump, this should never happen unless the remote RPC node changed its configuration on the fly, or you are using multiple RPC nodes with different minimum gas price requirements. This is not recommended", "err", sendError, "attempt", attempt)
// "Lazily" load attempts here since the overwhelmingly common case is
// that we don't need them unless we enter this path
if err := ec.txStore.LoadTxAttempts(&etx, pg.WithParentCtx(ctx)); err != nil {
if err := ec.txStore.LoadTxAttempts(ctx, &etx); err != nil {
return errors.Wrap(err, "failed to load TxAttempts while bumping on terminally underpriced error")
}
if len(etx.TxAttempts) == 0 {
Expand All @@ -850,7 +849,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
"replacementAttempt", replacementAttempt,
).Errorf("gas price was rejected by the node for being too low. Node returned: '%s'", sendError.Error())

if err := ec.txStore.SaveReplacementInProgressAttempt(attempt, &replacementAttempt); err != nil {
if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &replacementAttempt); err != nil {
return errors.Wrap(err, "saveReplacementInProgressAttempt failed")
}
return ec.handleInProgressAttempt(ctx, lggr, etx, replacementAttempt, blockHeight)
Expand Down Expand Up @@ -882,11 +881,11 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han
return ec.txStore.SaveConfirmedMissingReceiptAttempt(ctx, timeout, &attempt, now)
case clienttypes.InsufficientFunds:
timeout := ec.dbConfig.DefaultQueryTimeout()
return ec.txStore.SaveInsufficientFundsAttempt(timeout, &attempt, now)
return ec.txStore.SaveInsufficientFundsAttempt(ctx, timeout, &attempt, now)
case clienttypes.Successful:
lggr.Debugw("Successfully broadcast transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash.String())
timeout := ec.dbConfig.DefaultQueryTimeout()
return ec.txStore.SaveSentAttempt(timeout, &attempt, now)
return ec.txStore.SaveSentAttempt(ctx, timeout, &attempt, now)
case clienttypes.Unknown:
// Every error that doesn't fall under one of the above categories will be treated as Unknown.
fallthrough
Expand Down Expand Up @@ -921,7 +920,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Ens
} else {
ec.nConsecutiveBlocksChainTooShort = 0
}
etxs, err := ec.txStore.FindTransactionsConfirmedInBlockRange(head.BlockNumber(), head.EarliestHeadInChain().BlockNumber(), ec.chainID)
etxs, err := ec.txStore.FindTransactionsConfirmedInBlockRange(ctx, head.BlockNumber(), head.EarliestHeadInChain().BlockNumber(), ec.chainID)
if err != nil {
return errors.Wrap(err, "findTransactionsConfirmedInBlockRange failed")
}
Expand Down Expand Up @@ -1015,7 +1014,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) mar
ec.lggr.Infow(fmt.Sprintf("Re-org detected. Rebroadcasting transaction %s which may have been re-org'd out of the main chain", attempt.Hash.String()), logValues...)

// Put it back in progress and delete all receipts (they do not apply to the new chain)
err := ec.txStore.UpdateTxForRebroadcast(etx, attempt)
err := ec.txStore.UpdateTxForRebroadcast(ec.ctx, etx, attempt)
return errors.Wrap(err, "markForRebroadcast failed")
}

Expand All @@ -1034,7 +1033,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) For

for _, seq := range seqs {

etx, err := ec.txStore.FindTxWithSequence(address, seq)
etx, err := ec.txStore.FindTxWithSequence(context.TODO(), address, seq)
if err != nil {
return errors.Wrap(err, "ForceRebroadcast failed")
}
Expand Down
Loading

0 comments on commit 6125405

Please sign in to comment.