Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rm offchain check for token pool rate limits #690

Merged
merged 2 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-rings-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ccip": patch
---

remove checking for token pool rate limits
161 changes: 7 additions & 154 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
r.inflightReports.expire(lggr)
inFlight := r.inflightReports.getAll()

executableObservations, err := r.getExecutableObservations(ctx, lggr, timestamp, inFlight)
executableObservations, err := r.getExecutableObservations(ctx, lggr, inFlight)
if err != nil {
return nil, err
}
Expand All @@ -138,7 +138,7 @@ func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp ty
return ccip.NewExecutionObservation(executableObservations).Marshal()
}

func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, timestamp types.ReportTimestamp, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context, lggr logger.Logger, inflight []InflightInternalExecutionReport) ([]ccip.ObservedMessage, error) {
unexpiredReports, err := r.getUnexpiredCommitReports(
ctx,
r.commitStoreReader,
Expand Down Expand Up @@ -166,14 +166,6 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return nil, err
}

getDestPoolRateLimits := cache.LazyFetch(func() (map[cciptypes.Address]*big.Int, error) {
tokenExecData, err1 := getExecTokenData()
if err1 != nil {
return nil, err1
}
return r.destPoolRateLimits(ctx, unexpiredReportsWithSendReqs, tokenExecData.sourceToDestTokens)
})

for _, unexpiredReport := range unexpiredReportsWithSendReqs {
r.tokenDataWorker.AddJobsFromMsgs(ctx, unexpiredReport.sendRequestsWithMeta)
}
Expand Down Expand Up @@ -218,11 +210,6 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return nil, err
}

destPoolRateLimits, err := getDestPoolRateLimits()
if err != nil {
return nil, err
}

batch := r.buildBatch(
ctx,
rootLggr,
Expand All @@ -232,8 +219,7 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
tokenExecData.sourceTokenPrices,
tokenExecData.destTokenPrices,
tokenExecData.gasPrice,
tokenExecData.sourceToDestTokens,
destPoolRateLimits)
tokenExecData.sourceToDestTokens)
if len(batch) != 0 {
return batch, nil
}
Expand All @@ -243,71 +229,6 @@ func (r *ExecutionReportingPlugin) getExecutableObservations(ctx context.Context
return []ccip.ObservedMessage{}, nil
}

// destPoolRateLimits returns a map that consists of the rate limits of each destination token of the provided reports.
// If a token is missing from the returned map it either means that token was not found or token pool is disabled for this token.
func (r *ExecutionReportingPlugin) destPoolRateLimits(ctx context.Context, commitReports []commitReportWithSendRequests, sourceToDestToken map[cciptypes.Address]cciptypes.Address) (map[cciptypes.Address]*big.Int, error) {
tokens, err := r.offRampReader.GetTokens(ctx)
if err != nil {
return nil, fmt.Errorf("get cached token pools: %w", err)
}

dstTokenToPool := make(map[cciptypes.Address]cciptypes.Address)
dstPoolToToken := make(map[cciptypes.Address]cciptypes.Address)
dstPoolAddresses := make([]cciptypes.Address, 0)

for _, msg := range commitReports {
for _, req := range msg.sendRequestsWithMeta {
for _, tk := range req.TokenAmounts {
dstToken, exists := sourceToDestToken[tk.Token]
if !exists {
r.lggr.Warnw("token not found on destination chain", "sourceToken", tk)
continue
}

// another message with the same token exists in the report
// we skip it since we don't want to query for the rate limit twice
if _, seen := dstTokenToPool[dstToken]; seen {
continue
}

poolAddress, exists := tokens.DestinationPool[dstToken]
if !exists {
return nil, fmt.Errorf("pool for token '%s' does not exist", dstToken)
}

if tokenAddr, seen := dstPoolToToken[poolAddress]; seen {
return nil, fmt.Errorf("pool is already seen for token %s", tokenAddr)
}

dstTokenToPool[dstToken] = poolAddress
dstPoolToToken[poolAddress] = dstToken
dstPoolAddresses = append(dstPoolAddresses, poolAddress)
}
}
}

rateLimits, err := r.tokenPoolBatchedReader.GetInboundTokenPoolRateLimits(ctx, dstPoolAddresses)
if err != nil {
return nil, fmt.Errorf("fetch pool rate limits: %w", err)
}

res := make(map[cciptypes.Address]*big.Int, len(dstTokenToPool))
for i, rateLimit := range rateLimits {
// if the rate limit is disabled for this token pool then we omit it from the result
if !rateLimit.IsEnabled {
continue
}

tokenAddr, exists := dstPoolToToken[dstPoolAddresses[i]]
if !exists {
return nil, fmt.Errorf("pool to token mapping does not contain %s", dstPoolAddresses[i])
}
res[tokenAddr] = rateLimit.Tokens
}

return res, nil
}

// Calculates a map that indicates whether a sequence number has already been executed.
// It doesn't matter if the execution succeeded, since we don't retry previous
// attempts even if they failed. Value in the map indicates whether the log is finalized or not.
Expand Down Expand Up @@ -341,9 +262,8 @@ func (r *ExecutionReportingPlugin) buildBatch(
destTokenPricesUSD map[cciptypes.Address]*big.Int,
gasPrice *big.Int,
sourceToDestToken map[cciptypes.Address]cciptypes.Address,
destTokenPoolRateLimits map[cciptypes.Address]*big.Int,
) (executableMessages []ccip.ObservedMessage) {
inflightAggregateValue, inflightTokenAmounts, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken)
inflightAggregateValue, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken)
if err != nil {
lggr.Errorw("Unexpected error computing inflight values", "err", err)
return []ccip.ObservedMessage{}
Expand Down Expand Up @@ -385,11 +305,6 @@ func (r *ExecutionReportingPlugin) buildBatch(
continue
}

if !r.isRateLimitEnoughForTokenPool(destTokenPoolRateLimits, msg.TokenAmounts, inflightTokenAmounts, sourceToDestToken) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove that function (isRateLimitEnoughForTokenPool) from the codebase

msgLggr.Warnw("Skipping message token pool rate limit hit")
continue
}

msgValue, err := aggregateTokenValue(destTokenPricesUSD, sourceToDestToken, msg.TokenAmounts)
if err != nil {
msgLggr.Errorw("Skipping message unable to compute aggregate value", "err", err)
Expand Down Expand Up @@ -470,16 +385,6 @@ func (r *ExecutionReportingPlugin) buildBatch(
}
availableGas -= messageMaxGas
aggregateTokenLimit.Sub(aggregateTokenLimit, msgValue)
for _, tk := range msg.TokenAmounts {
dstToken, exists := sourceToDestToken[tk.Token]
if !exists {
msgLggr.Warnw("destination token does not exist", "token", tk.Token)
continue
}
if rl, exists := destTokenPoolRateLimits[dstToken]; exists {
destTokenPoolRateLimits[dstToken] = rl.Sub(rl, tk.Amount)
}
}

msgLggr.Infow("Adding msg to batch", "seqNr", msg.SequenceNumber, "nonce", msg.Nonce,
"value", msgValue, "aggregateTokenLimit", aggregateTokenLimit)
Expand Down Expand Up @@ -513,49 +418,6 @@ func (r *ExecutionReportingPlugin) getTokenDataWithTimeout(
return tokenData, tDur, err
}

func (r *ExecutionReportingPlugin) isRateLimitEnoughForTokenPool(
destTokenPoolRateLimits map[cciptypes.Address]*big.Int,
sourceTokenAmounts []cciptypes.TokenAmount,
inflightTokenAmounts map[cciptypes.Address]*big.Int,
sourceToDestToken map[cciptypes.Address]cciptypes.Address,
) bool {
rateLimitsCopy := make(map[cciptypes.Address]*big.Int)
for destToken, rl := range destTokenPoolRateLimits {
rateLimitsCopy[destToken] = new(big.Int).Set(rl)
}

for sourceToken, amount := range inflightTokenAmounts {
if destToken, exists := sourceToDestToken[sourceToken]; exists {
if rl, exists := rateLimitsCopy[destToken]; exists {
rateLimitsCopy[destToken] = rl.Sub(rl, amount)
}
}
}

for _, sourceToken := range sourceTokenAmounts {
destToken, exists := sourceToDestToken[sourceToken.Token]
if !exists {
r.lggr.Warnw("dest token not found", "sourceToken", sourceToken.Token)
continue
}

rl, exists := rateLimitsCopy[destToken]
if !exists {
r.lggr.Debugw("rate limit not applied to token", "token", destToken)
continue
}

if rl.Cmp(sourceToken.Amount) < 0 {
r.lggr.Warnw("token pool rate limit reached",
"token", sourceToken.Token, "destToken", destToken, "amount", sourceToken.Amount, "rateLimit", rl)
return false
}
rateLimitsCopy[destToken] = rl.Sub(rl, sourceToken.Amount)
}

return true
}

func hasEnoughTokens(tokenLimit *big.Int, msgValue *big.Int, inflightValue *big.Int) (*big.Int, bool) {
tokensLeft := big.NewInt(0).Sub(tokenLimit, inflightValue)
return tokensLeft, tokensLeft.Cmp(msgValue) >= 0
Expand Down Expand Up @@ -917,28 +779,19 @@ func inflightAggregates(
inflight []InflightInternalExecutionReport,
destTokenPrices map[cciptypes.Address]*big.Int,
sourceToDest map[cciptypes.Address]cciptypes.Address,
) (*big.Int, map[cciptypes.Address]*big.Int, error) {
) (*big.Int, error) {
inflightAggregateValue := big.NewInt(0)
inflightTokenAmounts := make(map[cciptypes.Address]*big.Int)

for _, rep := range inflight {
for _, message := range rep.messages {
msgValue, err := aggregateTokenValue(destTokenPrices, sourceToDest, message.TokenAmounts)
if err != nil {
return nil, nil, err
return nil, err
}
inflightAggregateValue.Add(inflightAggregateValue, msgValue)

for _, tk := range message.TokenAmounts {
if rl, exists := inflightTokenAmounts[tk.Token]; exists {
inflightTokenAmounts[tk.Token] = rl.Add(rl, tk.Amount)
} else {
inflightTokenAmounts[tk.Token] = new(big.Int).Set(tk.Amount)
}
}
}
}
return inflightAggregateValue, inflightTokenAmounts, nil
return inflightAggregateValue, nil
}

// getTokensPrices returns token prices of the given price registry,
Expand Down
Loading
Loading