From a4b734a13217d341219432a747630ecfef6e518c Mon Sep 17 00:00:00 2001 From: Chunkai Yang Date: Mon, 12 Aug 2024 10:12:28 -0400 Subject: [PATCH] aggregate token and gas price heartbeat updates into batches (#1282) ## Motivation Price reporting frequencies have been turned to following values on Eth mainnet (where most price update costs is) ``` GasHeartBeat: 2hr ExecGasPriceDeviation: 400% DAPriceDeviation: 400% TokenPriceHeartBeat: 12hr TokenPriceDeviation: 20% ``` As a result, updates are mostly heartbeat driven as opposed to deviation driven. A simple trick to improve leader-lane batching is to always batch report all heartbeat prices. In clam environments where prices are heartbeat driven, this absolutely minimizes number of price reports. In flux environments where good number of prices are deviation driven while some remain heartbeat driven, e.g. in case of Eth blob base fee spike, the penalty we take for including each additional prices is low (~6k gas) v.s the OCR tx overhead (~110k gas) --- .../ocr2/plugins/ccip/ccipcommit/ocr2.go | 76 ++++++--- .../ocr2/plugins/ccip/ccipcommit/ocr2_test.go | 150 ++++++++++++++++-- 2 files changed, 193 insertions(+), 33 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go index 2f0fc4e795..0ffe224df8 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go @@ -461,24 +461,42 @@ func (r *CommitReportingPlugin) selectPriceUpdates(ctx context.Context, now time // The returned latestGasPrice and latestTokenPrices should not contain nil values. func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]*big.Int, tokenPriceObs map[cciptypes.Address][]*big.Int, latestGasPrice map[uint64]update, latestTokenPrices map[cciptypes.Address]update) ([]cciptypes.GasPrice, []cciptypes.TokenPrice, error) { var tokenPriceUpdates []cciptypes.TokenPrice + // Token prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows: + // If any token requires heartbeat update, include all token prices in the report. + // Otherwise, only include token prices that exceed deviation threshold. + needTokenHeartbeat := false + for token := range tokenPriceObs { + latestTokenPrice, exists := latestTokenPrices[token] + if !exists || time.Since(latestTokenPrice.timestamp) >= r.offchainConfig.TokenPriceHeartBeat { + r.lggr.Infow("Token requires heartbeat update", "token", token) + needTokenHeartbeat = true + break + } + } + for token, tokenPriceObservations := range tokenPriceObs { medianPrice := ccipcalc.BigIntSortedMiddle(tokenPriceObservations) + if needTokenHeartbeat { + r.lggr.Debugw("Token price update included due to heartbeat", "token", token, "newPrice", medianPrice) + tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{ + Token: token, + Value: medianPrice, + }) + continue + } + latestTokenPrice, exists := latestTokenPrices[token] if exists { - tokenPriceUpdatedRecently := time.Since(latestTokenPrice.timestamp) < r.offchainConfig.TokenPriceHeartBeat - tokenPriceNotChanged := !ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB)) - if tokenPriceUpdatedRecently && tokenPriceNotChanged { - r.lggr.Debugw("token price was updated recently, skipping the update", + if ccipcalc.Deviates(medianPrice, latestTokenPrice.value, int64(r.offchainConfig.TokenPriceDeviationPPB)) { + r.lggr.Debugw("Token price update included due to deviation", "token", token, "newPrice", medianPrice, "existingPrice", latestTokenPrice.value) - continue // skip the update if we recently had a price update close to the new value + tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{ + Token: token, + Value: medianPrice, + }) } } - - tokenPriceUpdates = append(tokenPriceUpdates, cciptypes.TokenPrice{ - Token: token, - Value: medianPrice, - }) } // Determinism required. @@ -487,31 +505,49 @@ func (r *CommitReportingPlugin) calculatePriceUpdates(gasPriceObs map[uint64][]* }) var gasPriceUpdate []cciptypes.GasPrice + // Gas prices are mostly heartbeat driven. To maximize heartbeat batching, the price inclusion rule is as follows: + // If any source chain gas price requires heartbeat update, include all gas prices in the report. + // Otherwise, only include gas prices that exceed deviation threshold. + needGasHeartbeat := false + for chainSelector := range gasPriceObs { + latestGasPrice, exists := latestGasPrice[chainSelector] + if !exists || latestGasPrice.value == nil || time.Since(latestGasPrice.timestamp) >= r.offchainConfig.GasPriceHeartBeat { + r.lggr.Infow("Chain gas price requires heartbeat update", "chainSelector", chainSelector) + needGasHeartbeat = true + break + } + } + for chainSelector, gasPriceObservations := range gasPriceObs { newGasPrice, err := r.gasPriceEstimator.Median(gasPriceObservations) // Compute the median price if err != nil { return nil, nil, fmt.Errorf("failed to calculate median gas price for chain selector %d: %w", chainSelector, err) } - // Default to updating so that we update if there are no prior updates. + if needGasHeartbeat { + r.lggr.Debugw("Gas price update included due to heartbeat", "chainSelector", chainSelector) + gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{ + DestChainSelector: chainSelector, + Value: newGasPrice, + }) + continue + } + latestGasPrice, exists := latestGasPrice[chainSelector] if exists && latestGasPrice.value != nil { - gasPriceUpdatedRecently := time.Since(latestGasPrice.timestamp) < r.offchainConfig.GasPriceHeartBeat gasPriceDeviated, err := r.gasPriceEstimator.Deviates(newGasPrice, latestGasPrice.value) if err != nil { return nil, nil, err } - if gasPriceUpdatedRecently && !gasPriceDeviated { - r.lggr.Debugw("gas price was updated recently and not deviated sufficiently, skipping the update", + if gasPriceDeviated { + r.lggr.Debugw("Gas price update included due to deviation", "chainSelector", chainSelector, "newPrice", newGasPrice, "existingPrice", latestGasPrice.value) - continue + gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{ + DestChainSelector: chainSelector, + Value: newGasPrice, + }) } } - - gasPriceUpdate = append(gasPriceUpdate, cciptypes.GasPrice{ - DestChainSelector: chainSelector, - Value: newGasPrice, - }) } sort.Slice(gasPriceUpdate, func(i, j int) bool { diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go index 6cf7e4bec7..f1ca4d91cd 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go @@ -1132,7 +1132,7 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: val1e18(20)}}, }, { - name: "multichain gas prices", + name: "multi-chain gas price updates due to heartbeat", commitObservations: []ccip.CommitObservation{ {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(1)}}, {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(11)}}, @@ -1162,9 +1162,47 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { f: 1, expGasUpdates: []cciptypes.GasPrice{ {DestChainSelector: defaultSourceChainSelector, Value: val1e18(2)}, + {DestChainSelector: defaultSourceChainSelector + 1, Value: val1e18(22)}, {DestChainSelector: defaultSourceChainSelector + 2, Value: val1e18(222)}, }, }, + { + name: "multi-chain gas prices but only one updates due to deviation", + commitObservations: []ccip.CommitObservation{ + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(1)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(11)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 2: val1e18(111)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(2)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(22)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 2: val1e18(222)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(3)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 1: val1e18(33)}}, + {SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector + 2: val1e18(333)}}, + }, + gasPriceHeartBeat: *config.MustNewDuration(time.Hour), + daGasPriceDeviationPPB: 20e7, + execGasPriceDeviationPPB: 20e7, + tokenPriceHeartBeat: *config.MustNewDuration(time.Hour), + tokenPriceDeviationPPB: 20e7, + latestGasPrice: map[uint64]update{ + defaultSourceChainSelector: { + timestamp: time.Now().Add(-30 * time.Minute), // recent + value: val1e18(9), // median deviates + }, + defaultSourceChainSelector + 1: { + timestamp: time.Now().Add(-30 * time.Minute), // recent + value: val1e18(20), // median does not deviate + }, + defaultSourceChainSelector + 2: { + timestamp: time.Now().Add(-30 * time.Minute), // recent + value: val1e18(220), // median does not deviate + }, + }, + f: 1, + expGasUpdates: []cciptypes.GasPrice{ + {DestChainSelector: defaultSourceChainSelector, Value: val1e18(2)}, + }, + }, { name: "median one token", commitObservations: []ccip.CommitObservation{ @@ -1205,14 +1243,14 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}}, }, { - name: "token price update skipped because it is close to the latest", + name: "token price update skipped because it does not deviate and are recent", commitObservations: []ccip.CommitObservation{ { - TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11)}, + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(11)}, SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)}, }, { - TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12)}, + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12), feeToken2: val1e18(12)}, SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)}, }, }, @@ -1227,10 +1265,81 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { timestamp: time.Now().Add(-30 * time.Minute), value: val1e18(10), }, + feeToken2: { + timestamp: time.Now().Add(-30 * time.Minute), + value: val1e18(10), + }, }, // We expect a gas update because no latest expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}}, }, + { + name: "multiple token price update due to staleness", + commitObservations: []ccip.CommitObservation{ + { + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(11)}, + SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)}, + }, + { + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12), feeToken2: val1e18(12)}, + SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)}, + }, + }, + f: 1, + gasPriceHeartBeat: *config.MustNewDuration(time.Hour), + daGasPriceDeviationPPB: 20e7, + execGasPriceDeviationPPB: 20e7, + tokenPriceHeartBeat: *config.MustNewDuration(time.Hour), + tokenPriceDeviationPPB: 20e7, + latestTokenPrices: map[cciptypes.Address]update{ + feeToken1: { + timestamp: time.Now().Add(-90 * time.Minute), + value: val1e18(10), + }, + feeToken2: { + timestamp: time.Now().Add(-30 * time.Minute), + value: val1e18(10), + }, + }, + expTokenUpdates: []cciptypes.TokenPrice{ + {Token: feeToken1, Value: val1e18(12)}, + {Token: feeToken2, Value: val1e18(12)}, + }, + expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}}, + }, + { + name: "multiple token exist but only one updates due to deviation", + commitObservations: []ccip.CommitObservation{ + { + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(13)}, + SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)}, + }, + { + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(12), feeToken2: val1e18(14)}, + SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(0)}, + }, + }, + f: 1, + gasPriceHeartBeat: *config.MustNewDuration(time.Hour), + daGasPriceDeviationPPB: 20e7, + execGasPriceDeviationPPB: 20e7, + tokenPriceHeartBeat: *config.MustNewDuration(time.Hour), + tokenPriceDeviationPPB: 20e7, + latestTokenPrices: map[cciptypes.Address]update{ + feeToken1: { + timestamp: time.Now().Add(-30 * time.Minute), + value: val1e18(10), + }, + feeToken2: { + timestamp: time.Now().Add(-30 * time.Minute), + value: val1e18(10), + }, + }, + expTokenUpdates: []cciptypes.TokenPrice{ + {Token: feeToken2, Value: val1e18(14)}, + }, + expGasUpdates: []cciptypes.GasPrice{{DestChainSelector: defaultSourceChainSelector, Value: big.NewInt(0)}}, + }, { name: "gas price and token price both included because they are not close to the latest", commitObservations: []ccip.CommitObservation{ @@ -1331,12 +1440,18 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { name: "gas price included because it deviates from latest and token price skipped because it does not deviate", commitObservations: []ccip.CommitObservation{ { - TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(20)}, - SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(10)}, + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(20)}, + SourceGasPriceUSDPerChain: map[uint64]*big.Int{ + defaultSourceChainSelector: val1e18(10), + defaultSourceChainSelector + 1: val1e18(20), + }, }, { - TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(21)}, - SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(11)}, + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(21)}, + SourceGasPriceUSDPerChain: map[uint64]*big.Int{ + defaultSourceChainSelector: val1e18(11), + defaultSourceChainSelector + 1: val1e18(21), + }, }, }, f: 1, @@ -1347,8 +1462,12 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { tokenPriceDeviationPPB: 200e7, latestGasPrice: map[uint64]update{ defaultSourceChainSelector: { - timestamp: time.Now().Add(-90 * time.Minute), - value: val1e18(9), + timestamp: time.Now().Add(-30 * time.Minute), + value: val1e18(8), + }, + defaultSourceChainSelector + 1: { + timestamp: time.Now().Add(-30 * time.Minute), + value: val1e18(21), }, }, latestTokenPrices: map[cciptypes.Address]update{ @@ -1363,11 +1482,11 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { name: "gas price skipped because it does not deviate and token price included because it has not been updated recently", commitObservations: []ccip.CommitObservation{ { - TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(20)}, + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(10), feeToken2: val1e18(20)}, SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(10)}, }, { - TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(21)}, + TokenPricesUSD: map[cciptypes.Address]*big.Int{feeToken1: val1e18(11), feeToken2: val1e18(21)}, SourceGasPriceUSDPerChain: map[uint64]*big.Int{defaultSourceChainSelector: val1e18(11)}, }, }, @@ -1386,11 +1505,16 @@ func TestCommitReportingPlugin_calculatePriceUpdates(t *testing.T) { latestTokenPrices: map[cciptypes.Address]update{ feeToken1: { timestamp: time.Now().Add(-4 * time.Hour), + value: val1e18(11), + }, + feeToken2: { + timestamp: time.Now().Add(-1 * time.Hour), value: val1e18(21), }, }, expTokenUpdates: []cciptypes.TokenPrice{ - {Token: feeToken1, Value: val1e18(21)}, + {Token: feeToken1, Value: val1e18(11)}, + {Token: feeToken2, Value: val1e18(21)}, }, expGasUpdates: nil, },