Skip to content

Commit

Permalink
simplifying performCommonChecks
Browse files Browse the repository at this point in the history
  • Loading branch information
0xnogo committed Jul 12, 2024
1 parent 87ea544 commit d11a858
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions core/services/ocr2/plugins/ccip/ccipexec/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ func (s *BestEffortBatchingStrategy) BuildBatch(
batchBuilder := newBatchBuildContainer(len(batchCtx.report.sendRequestsWithMeta))
for _, msg := range batchCtx.report.sendRequestsWithMeta {
msgLggr := batchCtx.lggr.With("messageID", hexutil.Encode(msg.MessageID[:]), "seqNr", msg.SequenceNumber)
shouldAdd, status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr)
status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr)

if err != nil {
return []ccip.ObservedMessage{}, []messageExecStatus{}
}

if !shouldAdd {
if status.shouldBeSkipped() {
batchBuilder.skip(msg, status)
continue
}
Expand Down Expand Up @@ -143,13 +143,13 @@ func (bs ZKOverflowBatchingStrategy) BuildBatch(
msgLggr.Infow("No final status found for message, adding to batch")
}

shouldAdd, status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr)
status, messageMaxGas, tokenData, msgValue, err := performCommonChecks(ctx, batchCtx, msg, msgLggr)

if err != nil {
return []ccip.ObservedMessage{}, []messageExecStatus{}
}

if !shouldAdd {
if status.shouldBeSkipped() {
batchBuilder.skip(msg, status)
continue
}
Expand All @@ -168,15 +168,15 @@ func performCommonChecks(
batchCtx *BatchContext,
msg cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta,
msgLggr logger.Logger,
) (bool, messageStatus, uint64, [][]byte, *big.Int, error) {
) (messageStatus, uint64, [][]byte, *big.Int, error) {
if msg.Executed {
msgLggr.Infow("Skipping message - already executed")
return false, AlreadyExecuted, 0, nil, nil, nil
return AlreadyExecuted, 0, nil, nil, nil
}

if len(msg.Data) > batchCtx.availableDataLen {
msgLggr.Infow("Skipping message - insufficient remaining batch data length", "msgDataLen", len(msg.Data), "availableBatchDataLen", batchCtx.availableDataLen)
return false, InsufficientRemainingBatchDataLength, 0, nil, nil, nil
return InsufficientRemainingBatchDataLength, 0, nil, nil, nil
}

messageMaxGas, err1 := calculateMessageMaxGas(
Expand All @@ -187,20 +187,20 @@ func performCommonChecks(
)
if err1 != nil {
msgLggr.Errorw("Skipping message - message max gas calculation error", "err", err1)
return false, MessageMaxGasCalcError, 0, nil, nil, nil
return MessageMaxGasCalcError, 0, nil, nil, nil
}

// Check sufficient gas in batch
if batchCtx.availableGas < messageMaxGas {
msgLggr.Infow("Skipping message - insufficient remaining batch gas limit", "availableGas", batchCtx.availableGas, "messageMaxGas", messageMaxGas)
return false, InsufficientRemainingBatchGas, 0, nil, nil, nil
return InsufficientRemainingBatchGas, 0, nil, nil, nil
}

if _, ok := batchCtx.expectedNonces[msg.Sender]; !ok {
nonce, ok1 := batchCtx.sendersNonce[msg.Sender]
if !ok1 {
msgLggr.Errorw("Skipping message - missing nonce", "sender", msg.Sender)
return false, MissingNonce, 0, nil, nil, nil
return MissingNonce, 0, nil, nil, nil
}
batchCtx.expectedNonces[msg.Sender] = nonce + 1
}
Expand All @@ -209,36 +209,36 @@ func performCommonChecks(
// Sequenced messages have non-zero nonces.
if msg.Nonce > 0 && msg.Nonce != batchCtx.expectedNonces[msg.Sender] {
msgLggr.Warnw("Skipping message - invalid nonce", "have", msg.Nonce, "want", batchCtx.expectedNonces[msg.Sender])
return false, InvalidNonce, 0, nil, nil, nil
return InvalidNonce, 0, nil, nil, nil
}

msgValue, err1 := aggregateTokenValue(batchCtx.lggr, batchCtx.destTokenPricesUSD, batchCtx.sourceToDestToken, msg.TokenAmounts)
if err1 != nil {
msgLggr.Errorw("Skipping message - aggregate token value compute error", "err", err1)
return false, AggregateTokenValueComputeError, 0, nil, nil, nil
return AggregateTokenValueComputeError, 0, nil, nil, nil
}

// if token limit is smaller than message value skip message
if tokensLeft, hasCapacity := hasEnoughTokens(batchCtx.aggregateTokenLimit, msgValue, batchCtx.inflightAggregateValue); !hasCapacity {
msgLggr.Warnw("Skipping message - aggregate token limit exceeded", "aggregateTokenLimit", tokensLeft.String(), "msgValue", msgValue.String())
return false, AggregateTokenLimitExceeded, 0, nil, nil, nil
return AggregateTokenLimitExceeded, 0, nil, nil, nil
}

tokenData, elapsed, err1 := getTokenDataWithTimeout(ctx, msg, batchCtx.tokenDataRemainingDuration, batchCtx.tokenDataWorker)
batchCtx.tokenDataRemainingDuration -= elapsed
if err1 != nil {
if errors.Is(err1, tokendata.ErrNotReady) {
msgLggr.Warnw("Skipping message - token data not ready", "err", err1)
return false, TokenDataNotReady, 0, nil, nil, nil
return TokenDataNotReady, 0, nil, nil, nil
}
msgLggr.Errorw("Skipping message - token data fetch error", "err", err1)
return false, TokenDataFetchError, 0, nil, nil, nil
return TokenDataFetchError, 0, nil, nil, nil
}

dstWrappedNativePrice, exists := batchCtx.destTokenPricesUSD[batchCtx.destWrappedNative]
if !exists {
msgLggr.Errorw("Skipping message - token not in destination token prices", "token", batchCtx.destWrappedNative)
return false, TokenNotInDestTokenPrices, 0, nil, nil, nil
return TokenNotInDestTokenPrices, 0, nil, nil, nil
}

// calculating the source chain fee, dividing by 1e18 for denomination.
Expand All @@ -248,14 +248,14 @@ func performCommonChecks(
sourceFeeTokenPrice, exists := batchCtx.sourceTokenPricesUSD[msg.FeeToken]
if !exists {
msgLggr.Errorw("Skipping message - token not in source token prices", "token", msg.FeeToken)
return false, TokenNotInSrcTokenPrices, 0, nil, nil, nil
return TokenNotInSrcTokenPrices, 0, nil, nil, nil
}

// Fee boosting
execCostUsd, err1 := batchCtx.gasPriceEstimator.EstimateMsgCostUSD(batchCtx.gasPrice, dstWrappedNativePrice, msg)
if err1 != nil {
msgLggr.Errorw("Failed to estimate message cost USD", "err", err1)
return false, "", 0, nil, nil, errors.New("failed to estimate message cost USD")
return "", 0, nil, nil, errors.New("failed to estimate message cost USD")
}

availableFee := big.NewInt(0).Mul(msg.FeeTokenAmount, sourceFeeTokenPrice)
Expand All @@ -270,10 +270,10 @@ func performCommonChecks(
"waitTime", time.Since(msg.BlockTimestamp),
"boost", batchCtx.offchainConfig.RelativeBoostPerWaitHour,
)
return false, InsufficientRemainingFee, 0, nil, nil, nil
return InsufficientRemainingFee, 0, nil, nil, nil
}

return true, "", messageMaxGas, tokenData, msgValue, nil
return SuccesfullyValidated, messageMaxGas, tokenData, msgValue, nil
}

// getTokenDataWithCappedLatency gets the token data for the provided message.
Expand Down Expand Up @@ -457,6 +457,7 @@ func getCommitReportForSeqNum(ctx context.Context, commitStoreReader ccipdata.Co
type messageStatus string

const (
SuccesfullyValidated messageStatus = "successfully_validated"
AlreadyExecuted messageStatus = "already_executed"
SenderAlreadySkipped messageStatus = "sender_already_skipped"
MessageMaxGasCalcError messageStatus = "message_max_gas_calc_error"
Expand All @@ -477,6 +478,10 @@ const (
SkippedInflight messageStatus = "skipped_inflight"
)

func (m messageStatus) shouldBeSkipped() bool {
return m != SuccesfullyValidated
}

type messageExecStatus struct {
SeqNr uint64
MessageId string
Expand Down

0 comments on commit d11a858

Please sign in to comment.