diff --git a/core/services/ocr2/plugins/ccip/ccipexec/batching.go b/core/services/ocr2/plugins/ccip/ccipexec/batching.go index 141d5af8f8..3bade12465 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/batching.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/batching.go @@ -78,13 +78,13 @@ func (s *BestEffortBatchingStrategy) BuildBatch( batchBuilder := newBatchBuildContainer(len(batchCtx.report.sendRequestsWithMeta)) for _, msg := range batchCtx.report.sendRequestsWithMeta { msgLggr := batchCtx.lggr.With("messageID", hexutil.Encode(msg.MessageID[:]), "seqNr", msg.SequenceNumber) - shouldAdd, status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr) + status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr) if err != nil { return []ccip.ObservedMessage{}, []messageExecStatus{} } - if !shouldAdd { + if status.shouldBeSkipped() { batchBuilder.skip(msg, status) continue } @@ -143,13 +143,13 @@ func (bs ZKOverflowBatchingStrategy) BuildBatch( msgLggr.Infow("No final status found for message, adding to batch") } - shouldAdd, status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr) + status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr) if err != nil { return []ccip.ObservedMessage{}, []messageExecStatus{} } - if !shouldAdd { + if status.shouldBeSkipped() { batchBuilder.skip(msg, status) continue } @@ -168,15 +168,15 @@ func performCommonChecks( batchCtx *BatchContext, msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta, msgLggr logger.Logger, -) (bool, messageStatus, uint64, [][]byte, *big.Int, error) { +) (messageStatus, uint64, [][]byte, *big.Int, error) { if msg.Executed { msgLggr.Infow("Skipping message - already executed") - return false, AlreadyExecuted, 0, nil, nil, nil + return AlreadyExecuted, 0, nil, nil, nil } if len(msg.Data) > batchCtx.availableDataLen { msgLggr.Infow("Skipping message - insufficient remaining batch data length", "msgDataLen", len(msg.Data), "availableBatchDataLen", batchCtx.availableDataLen) - return false, InsufficientRemainingBatchDataLength, 0, nil, nil, nil + return InsufficientRemainingBatchDataLength, 0, nil, nil, nil } messageMaxGas, err1 := calculateMessageMaxGas( @@ -187,20 +187,20 @@ func performCommonChecks( ) if err1 != nil { msgLggr.Errorw("Skipping message - message max gas calculation error", "err", err1) - return false, MessageMaxGasCalcError, 0, nil, nil, nil + return MessageMaxGasCalcError, 0, nil, nil, nil } // Check sufficient gas in batch if batchCtx.availableGas < messageMaxGas { msgLggr.Infow("Skipping message - insufficient remaining batch gas limit", "availableGas", batchCtx.availableGas, "messageMaxGas", messageMaxGas) - return false, InsufficientRemainingBatchGas, 0, nil, nil, nil + return InsufficientRemainingBatchGas, 0, nil, nil, nil } if _, ok := batchCtx.expectedNonces[msg.Sender]; !ok { nonce, ok1 := batchCtx.sendersNonce[msg.Sender] if !ok1 { msgLggr.Errorw("Skipping message - missing nonce", "sender", msg.Sender) - return false, MissingNonce, 0, nil, nil, nil + return MissingNonce, 0, nil, nil, nil } batchCtx.expectedNonces[msg.Sender] = nonce + 1 } @@ -209,19 +209,19 @@ func performCommonChecks( // Sequenced messages have non-zero nonces. if msg.Nonce > 0 && msg.Nonce != batchCtx.expectedNonces[msg.Sender] { msgLggr.Warnw("Skipping message - invalid nonce", "have", msg.Nonce, "want", batchCtx.expectedNonces[msg.Sender]) - return false, InvalidNonce, 0, nil, nil, nil + return InvalidNonce, 0, nil, nil, nil } msgValue, err1 := aggregateTokenValue(batchCtx.lggr, batchCtx.destTokenPricesUSD, batchCtx.sourceToDestToken, msg.TokenAmounts) if err1 != nil { msgLggr.Errorw("Skipping message - aggregate token value compute error", "err", err1) - return false, AggregateTokenValueComputeError, 0, nil, nil, nil + return AggregateTokenValueComputeError, 0, nil, nil, nil } // if token limit is smaller than message value skip message if tokensLeft, hasCapacity := hasEnoughTokens(batchCtx.aggregateTokenLimit, msgValue, batchCtx.inflightAggregateValue); !hasCapacity { msgLggr.Warnw("Skipping message - aggregate token limit exceeded", "aggregateTokenLimit", tokensLeft.String(), "msgValue", msgValue.String()) - return false, AggregateTokenLimitExceeded, 0, nil, nil, nil + return AggregateTokenLimitExceeded, 0, nil, nil, nil } tokenData, elapsed, err1 := getTokenDataWithTimeout(ctx, msg, batchCtx.tokenDataRemainingDuration, batchCtx.tokenDataWorker) @@ -229,16 +229,16 @@ func performCommonChecks( if err1 != nil { if errors.Is(err1, tokendata.ErrNotReady) { msgLggr.Warnw("Skipping message - token data not ready", "err", err1) - return false, TokenDataNotReady, 0, nil, nil, nil + return TokenDataNotReady, 0, nil, nil, nil } msgLggr.Errorw("Skipping message - token data fetch error", "err", err1) - return false, TokenDataFetchError, 0, nil, nil, nil + return TokenDataFetchError, 0, nil, nil, nil } dstWrappedNativePrice, exists := batchCtx.destTokenPricesUSD[batchCtx.destWrappedNative] if !exists { msgLggr.Errorw("Skipping message - token not in destination token prices", "token", batchCtx.destWrappedNative) - return false, TokenNotInDestTokenPrices, 0, nil, nil, nil + return TokenNotInDestTokenPrices, 0, nil, nil, nil } // calculating the source chain fee, dividing by 1e18 for denomination. @@ -248,14 +248,14 @@ func performCommonChecks( sourceFeeTokenPrice, exists := batchCtx.sourceTokenPricesUSD[msg.FeeToken] if !exists { msgLggr.Errorw("Skipping message - token not in source token prices", "token", msg.FeeToken) - return false, TokenNotInSrcTokenPrices, 0, nil, nil, nil + return TokenNotInSrcTokenPrices, 0, nil, nil, nil } // Fee boosting execCostUsd, err1 := batchCtx.gasPriceEstimator.EstimateMsgCostUSD(batchCtx.gasPrice, dstWrappedNativePrice, msg) if err1 != nil { msgLggr.Errorw("Failed to estimate message cost USD", "err", err1) - return false, "", 0, nil, nil, errors.New("failed to estimate message cost USD") + return "", 0, nil, nil, errors.New("failed to estimate message cost USD") } availableFee := big.NewInt(0).Mul(msg.FeeTokenAmount, sourceFeeTokenPrice) @@ -270,10 +270,10 @@ func performCommonChecks( "waitTime", time.Since(msg.BlockTimestamp), "boost", batchCtx.offchainConfig.RelativeBoostPerWaitHour, ) - return false, InsufficientRemainingFee, 0, nil, nil, nil + return InsufficientRemainingFee, 0, nil, nil, nil } - return true, "", messageMaxGas, tokenData, msgValue, nil + return SuccesfullyValidated, messageMaxGas, tokenData, msgValue, nil } // getTokenDataWithCappedLatency gets the token data for the provided message. @@ -457,6 +457,7 @@ func getCommitReportForSeqNum(ctx context.Context, commitStoreReader ccipdata.Co type messageStatus string const ( + SuccesfullyValidated messageStatus = "successfully_validated" AlreadyExecuted messageStatus = "already_executed" SenderAlreadySkipped messageStatus = "sender_already_skipped" MessageMaxGasCalcError messageStatus = "message_max_gas_calc_error" @@ -477,6 +478,10 @@ const ( SkippedInflight messageStatus = "skipped_inflight" ) +func (m messageStatus) shouldBeSkipped() bool { + return m != SuccesfullyValidated +} + type messageExecStatus struct { SeqNr uint64 MessageId string