From a937d5c577d8ba13dc7542a757359339442ae33f Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Tue, 20 Aug 2024 11:43:01 +0200 Subject: [PATCH] Separate token price reporting schedule (#14154) * Separate token price reporting schedule (#1278) ## Motivation Static price removal job rollout will be delayed to after 1.5 release. To unblock db load concerns in 1.4.21 which writes prices to db, we want to reduce number of token-price related insertions in db. ## Solution Separate gas price and token price insertion frequency, insert every 10 minutes for token price. 10-min resolution for token price is accurate enough for our use case. * Changeset --------- Co-authored-by: Chunkai Yang --- .changeset/late-stingrays-promise.md | 5 + .../ccip/internal/ccipdb/price_service.go | 249 ++++++++------ .../internal/ccipdb/price_service_test.go | 320 ++++++++++++------ 3 files changed, 363 insertions(+), 211 deletions(-) create mode 100644 .changeset/late-stingrays-promise.md diff --git a/.changeset/late-stingrays-promise.md b/.changeset/late-stingrays-promise.md new file mode 100644 index 00000000000..39ca570f581 --- /dev/null +++ b/.changeset/late-stingrays-promise.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Separate price updates schedule for token prices in CCIP #updated diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index 7d7d5bda3ad..2118d5832da 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -42,23 +42,28 @@ type PriceService interface { var _ PriceService = (*priceService)(nil) const ( - // Prices should expire after 10 minutes in DB. Prices should be fresh in the Commit plugin. - // 10 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while + // Gas prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. + gasPriceUpdateInterval = 1 * time.Minute + // Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show + // their prices are stable, 10-minute resolution is accurate enough. + tokenPriceUpdateInterval = 10 * time.Minute + + // Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin. + // 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while // surfacing price update outages quickly enough. - priceExpireSec = 600 - // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. - // 10 minutes should result in 40 rows being cleaned up per job, it is not a heavy load on DB, so there is no need - // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireSec`. - priceCleanupInterval = 600 * time.Second + priceExpireThreshold = 25 * time.Minute - // Prices are refreshed every 1 minute, they are sufficiently accurate, and consistent with Commit OCR round time. - priceUpdateInterval = 60 * time.Second + // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. + // 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need + // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`. + priceCleanupInterval = 10 * time.Minute ) type priceService struct { - priceExpireSec int - cleanupInterval time.Duration - updateInterval time.Duration + priceExpireThreshold time.Duration + cleanupInterval time.Duration + gasUpdateInterval time.Duration + tokenUpdateInterval time.Duration lggr logger.Logger orm cciporm.ORM @@ -93,9 +98,10 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireSec: priceExpireSec, - cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time - updateInterval: utils.WithJitter(priceUpdateInterval), + priceExpireThreshold: priceExpireThreshold, + cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time + gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval), + tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval), lggr: lggr, orm: orm, @@ -135,10 +141,14 @@ func (p *priceService) Close() error { func (p *priceService) run() { cleanupTicker := time.NewTicker(p.cleanupInterval) - updateTicker := time.NewTicker(p.updateInterval) + gasUpdateTicker := time.NewTicker(p.gasUpdateInterval) + tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval) go func() { defer p.wg.Done() + defer cleanupTicker.Stop() + defer gasUpdateTicker.Stop() + defer tokenUpdateTicker.Stop() for { select { @@ -149,10 +159,15 @@ func (p *priceService) run() { if err != nil { p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) } - case <-updateTicker.C: - err := p.runUpdate(p.backgroundCtx) + case <-gasUpdateTicker.C: + err := p.runGasPriceUpdate(p.backgroundCtx) if err != nil { - p.lggr.Errorw("Error when updating prices in the background", "err", err) + p.lggr.Errorw("Error when updating gas prices in the background", "err", err) + } + case <-tokenUpdateTicker.C: + err := p.runTokenPriceUpdate(p.backgroundCtx) + if err != nil { + p.lggr.Errorw("Error when updating token prices in the background", "err", err) } } } @@ -167,8 +182,11 @@ func (p *priceService) UpdateDynamicConfig(ctx context.Context, gasPriceEstimato // Config update may substantially change the prices, refresh the prices immediately, this also makes testing easier // for not having to wait to the full update interval. - if err := p.runUpdate(ctx); err != nil { - p.lggr.Errorw("Error when updating prices after dynamic config update", "err", err) + if err := p.runGasPriceUpdate(ctx); err != nil { + p.lggr.Errorw("Error when updating gas prices after dynamic config update", "err", err) + } + if err := p.runTokenPriceUpdate(ctx); err != nil { + p.lggr.Errorw("Error when updating token prices after dynamic config update", "err", err) } return nil @@ -224,7 +242,7 @@ func (p *priceService) runCleanup(ctx context.Context) error { eg := new(errgroup.Group) eg.Go(func() error { - err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) + err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) if err != nil { return fmt.Errorf("error clearing gas prices: %w", err) } @@ -232,7 +250,7 @@ func (p *priceService) runCleanup(ctx context.Context) error { }) eg.Go(func() error { - err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, p.priceExpireSec) + err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) if err != nil { return fmt.Errorf("error clearing token prices: %w", err) } @@ -242,84 +260,134 @@ func (p *priceService) runCleanup(ctx context.Context) error { return eg.Wait() } -func (p *priceService) runUpdate(ctx context.Context) error { +func (p *priceService) runGasPriceUpdate(ctx context.Context) error { // Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` - // Price updates happen infrequently - once every `priceUpdateInterval` seconds. + // Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds. // It does not happen on any code path that is performance sensitive. // We can afford to have non-performant unlocks here that is simple and safe. p.dynamicConfigMu.RLock() defer p.dynamicConfigMu.RUnlock() // There may be a period of time between service is started and dynamic config is updated - if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { - p.lggr.Info("Skipping price update due to gasPriceEstimator and/or destPriceRegistry not ready") + if p.gasPriceEstimator == nil { + p.lggr.Info("Skipping gas price update due to gasPriceEstimator not ready") + return nil + } + + sourceGasPriceUSD, err := p.observeGasPriceUpdates(ctx, p.lggr) + if err != nil { + return fmt.Errorf("failed to observe gas price updates: %w", err) + } + + err = p.writeGasPricesToDB(ctx, sourceGasPriceUSD) + if err != nil { + return fmt.Errorf("failed to write gas prices to db: %w", err) + } + + return nil +} + +func (p *priceService) runTokenPriceUpdate(ctx context.Context) error { + // Protect against concurrent updates of `tokenPriceEstimator` and `destPriceRegistryReader` + // Price updates happen infrequently - once every `tokenPriceUpdateInterval` seconds. + p.dynamicConfigMu.RLock() + defer p.dynamicConfigMu.RUnlock() + + // There may be a period of time between service is started and dynamic config is updated + if p.destPriceRegistryReader == nil { + p.lggr.Info("Skipping token price update due to destPriceRegistry not ready") return nil } - sourceGasPriceUSD, tokenPricesUSD, err := p.observePriceUpdates(ctx, p.lggr) + tokenPricesUSD, err := p.observeTokenPriceUpdates(ctx, p.lggr) if err != nil { - return fmt.Errorf("failed to observe price updates: %w", err) + return fmt.Errorf("failed to observe token price updates: %w", err) } - err = p.writePricesToDB(ctx, sourceGasPriceUSD, tokenPricesUSD) + err = p.writeTokenPricesToDB(ctx, tokenPricesUSD) if err != nil { - return fmt.Errorf("failed to write prices to db: %w", err) + return fmt.Errorf("failed to write token prices to db: %w", err) } return nil } -func (p *priceService) observePriceUpdates( +func (p *priceService) observeGasPriceUpdates( ctx context.Context, lggr logger.Logger, -) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { - if p.gasPriceEstimator == nil || p.destPriceRegistryReader == nil { - return nil, nil, fmt.Errorf("gasPriceEstimator and/or destPriceRegistry is not set yet") +) (sourceGasPriceUSD *big.Int, err error) { + if p.gasPriceEstimator == nil { + return nil, fmt.Errorf("gasPriceEstimator is not set yet") } - sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) + // Include wrapped native to identify the source native USD price, notice USD is in 1e18 scale, i.e. $1 = 1e18 + rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, []cciptypes.Address{p.sourceNative}) + if err != nil { + return nil, fmt.Errorf("failed to fetch source native price (%s): %w", p.sourceNative, err) + } - lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) + sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] + if !exists { + return nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) + } + sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) + if err != nil { + return nil, err + } + if sourceGasPrice == nil { + return nil, fmt.Errorf("missing gas price") + } + sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) if err != nil { - return nil, nil, fmt.Errorf("get destination tokens: %w", err) + return nil, err } - return p.generatePriceUpdates(ctx, lggr, sortedLaneTokens) + lggr.Infow("PriceService observed latest gas price", + "sourceChainSelector", p.sourceChainSelector, + "destChainSelector", p.destChainSelector, + "sourceNative", p.sourceNative, + "gasPriceWei", sourceGasPrice, + "sourceNativePriceUSD", sourceNativePriceUSD, + "sourceGasPriceUSD", sourceGasPriceUSD, + ) + return sourceGasPriceUSD, nil } // All prices are USD ($1=1e18) denominated. All prices must be not nil. // Return token prices should contain the exact same tokens as in tokenDecimals. -func (p *priceService) generatePriceUpdates( +func (p *priceService) observeTokenPriceUpdates( ctx context.Context, lggr logger.Logger, - sortedLaneTokens []cciptypes.Address, -) (sourceGasPriceUSD *big.Int, tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { - // Include wrapped native in our token query as way to identify the source native USD price. - // notice USD is in 1e18 scale, i.e. $1 = 1e18 - queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{p.sourceNative}, sortedLaneTokens) +) (tokenPricesUSD map[cciptypes.Address]*big.Int, err error) { + if p.destPriceRegistryReader == nil { + return nil, fmt.Errorf("destPriceRegistry is not set yet") + } + + sortedLaneTokens, filteredLaneTokens, err := ccipcommon.GetFilteredSortedLaneTokens(ctx, p.offRampReader, p.destPriceRegistryReader, p.priceGetter) + if err != nil { + return nil, fmt.Errorf("get destination tokens: %w", err) + } + lggr.Debugw("Filtered bridgeable tokens with no configured price getter", "filteredLaneTokens", filteredLaneTokens) + + queryTokens := ccipcommon.FlattenUniqueSlice(sortedLaneTokens) rawTokenPricesUSD, err := p.priceGetter.TokenPricesUSD(ctx, queryTokens) if err != nil { - return nil, nil, err + return nil, fmt.Errorf("failed to fetch token prices (%v): %w", queryTokens, err) } lggr.Infow("Raw token prices", "rawTokenPrices", rawTokenPricesUSD) // make sure that we got prices for all the tokens of our query for _, token := range queryTokens { if rawTokenPricesUSD[token] == nil { - return nil, nil, fmt.Errorf("missing token price: %+v", token) + return nil, fmt.Errorf("missing token price: %+v", token) } } - sourceNativePriceUSD, exists := rawTokenPricesUSD[p.sourceNative] - if !exists { - return nil, nil, fmt.Errorf("missing source native (%s) price", p.sourceNative) - } - destTokensDecimals, err := p.destPriceRegistryReader.GetTokensDecimals(ctx, sortedLaneTokens) if err != nil { - return nil, nil, fmt.Errorf("get tokens decimals: %w", err) + return nil, fmt.Errorf("get tokens decimals: %w", err) } tokenPricesUSD = make(map[cciptypes.Address]*big.Int, len(rawTokenPricesUSD)) @@ -327,68 +395,47 @@ func (p *priceService) generatePriceUpdates( tokenPricesUSD[token] = calculateUsdPer1e18TokenAmount(rawTokenPricesUSD[token], destTokensDecimals[i]) } - sourceGasPrice, err := p.gasPriceEstimator.GetGasPrice(ctx) - if err != nil { - return nil, nil, err - } - if sourceGasPrice == nil { - return nil, nil, fmt.Errorf("missing gas price") - } - sourceGasPriceUSD, err = p.gasPriceEstimator.DenoteInUSD(sourceGasPrice, sourceNativePriceUSD) - if err != nil { - return nil, nil, err - } - - lggr.Infow("PriceService observed latest price", + lggr.Infow("PriceService observed latest token prices", "sourceChainSelector", p.sourceChainSelector, "destChainSelector", p.destChainSelector, - "gasPriceWei", sourceGasPrice, - "sourceNativePriceUSD", sourceNativePriceUSD, - "sourceGasPriceUSD", sourceGasPriceUSD, "tokenPricesUSD", tokenPricesUSD, ) - return sourceGasPriceUSD, tokenPricesUSD, nil + return tokenPricesUSD, nil } -func (p *priceService) writePricesToDB( - ctx context.Context, - sourceGasPriceUSD *big.Int, - tokenPricesUSD map[cciptypes.Address]*big.Int, -) (err error) { - eg := new(errgroup.Group) - - if sourceGasPriceUSD != nil { - eg.Go(func() error { - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ - { - SourceChainSelector: p.sourceChainSelector, - GasPrice: assets.NewWei(sourceGasPriceUSD), - }, - }) - }) +func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) { + if sourceGasPriceUSD == nil { + return nil } - if tokenPricesUSD != nil { - var tokenPrices []cciporm.TokenPriceUpdate + return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ + { + SourceChainSelector: p.sourceChainSelector, + GasPrice: assets.NewWei(sourceGasPriceUSD), + }, + }) +} - for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ - TokenAddr: string(token), - TokenPrice: assets.NewWei(price), - }) - } +func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) { + if tokenPricesUSD == nil { + return nil + } - // Sort token by addr to make price updates ordering deterministic, easier to testing and debugging - sort.Slice(tokenPrices, func(i, j int) bool { - return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr - }) + var tokenPrices []cciporm.TokenPriceUpdate - eg.Go(func() error { - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) + for token, price := range tokenPricesUSD { + tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + TokenAddr: string(token), + TokenPrice: assets.NewWei(price), }) } - return eg.Wait() + // Sort token by addr to make price updates ordering deterministic, easier for testing and debugging + sort.Slice(tokenPrices, func(i, j int) bool { + return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr + }) + + return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) } // Input price is USD per full token, with 18 decimal precision diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 0bea8af9a19..26721bdf8e4 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -82,8 +82,8 @@ func TestPriceService_priceCleanup(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, priceExpireSec).Return(gasPricesError).Once() - mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, priceExpireSec).Return(tokenPricesError).Once() + mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(gasPricesError).Once() + mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -105,17 +105,13 @@ func TestPriceService_priceCleanup(t *testing.T) { } } -func TestPriceService_priceWrite(t *testing.T) { +func TestPriceService_writeGasPrices(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) sourceChainSelector := uint64(67890) gasPrice := big.NewInt(1e18) - tokenPrices := map[cciptypes.Address]*big.Int{ - "0x123": big.NewInt(2e18), - "0x234": big.NewInt(3e18), - } expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ { @@ -123,6 +119,67 @@ func TestPriceService_priceWrite(t *testing.T) { GasPrice: assets.NewWei(gasPrice), }, } + + testCases := []struct { + name string + gasPriceError bool + expectedErr bool + }{ + { + name: "ORM called successfully", + gasPriceError: false, + expectedErr: false, + }, + { + name: "gasPrice clear failed", + gasPriceError: true, + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := tests.Context(t) + + var gasPricesError error + if tc.gasPriceError { + gasPricesError = fmt.Errorf("gas prices error") + } + + mockOrm := ccipmocks.NewORM(t) + mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() + + priceService := NewPriceService( + lggr, + mockOrm, + jobId, + destChainSelector, + sourceChainSelector, + "", + nil, + nil, + ).(*priceService) + err := priceService.writeGasPricesToDB(ctx, gasPrice) + if tc.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestPriceService_writeTokenPrices(t *testing.T) { + lggr := logger.TestLogger(t) + jobId := int32(1) + destChainSelector := uint64(12345) + sourceChainSelector := uint64(67890) + + tokenPrices := map[cciptypes.Address]*big.Int{ + "0x123": big.NewInt(2e18), + "0x234": big.NewInt(3e18), + } + expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ { TokenAddr: "0x123", @@ -136,31 +193,16 @@ func TestPriceService_priceWrite(t *testing.T) { testCases := []struct { name string - gasPriceError bool tokenPriceError bool expectedErr bool }{ { name: "ORM called successfully", - gasPriceError: false, tokenPriceError: false, expectedErr: false, }, - { - name: "gasPrice clear failed", - gasPriceError: true, - tokenPriceError: false, - expectedErr: true, - }, { name: "tokenPrice clear failed", - gasPriceError: false, - tokenPriceError: true, - expectedErr: true, - }, - { - name: "both ORM calls failed", - gasPriceError: true, tokenPriceError: true, expectedErr: true, }, @@ -170,17 +212,12 @@ func TestPriceService_priceWrite(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx := tests.Context(t) - var gasPricesError error var tokenPricesError error - if tc.gasPriceError { - gasPricesError = fmt.Errorf("gas prices error") - } if tc.tokenPriceError { tokenPricesError = fmt.Errorf("token prices error") } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() priceService := NewPriceService( @@ -193,7 +230,7 @@ func TestPriceService_priceWrite(t *testing.T) { nil, nil, ).(*priceService) - err := priceService.writePricesToDB(ctx, gasPrice, tokenPrices) + err := priceService.writeTokenPricesToDB(ctx, tokenPrices) if tc.expectedErr { assert.Error(t, err) } else { @@ -203,22 +240,15 @@ func TestPriceService_priceWrite(t *testing.T) { } } -func TestPriceService_generatePriceUpdates(t *testing.T) { +func TestPriceService_observeGasPriceUpdates(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) sourceChainSelector := uint64(67890) - - const nTokens = 10 - tokens := make([]cciptypes.Address, nTokens) - for i := range tokens { - tokens[i] = cciptypes.Address(utils.RandomAddress().String()) - } - sort.Slice(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }) + sourceNativeToken := cciptypes.Address(utils.RandomAddress().String()) testCases := []struct { name string - tokenDecimals map[cciptypes.Address]uint8 sourceNativeToken cciptypes.Address priceGetterRespData map[cciptypes.Address]*big.Int priceGetterRespErr error @@ -226,108 +256,179 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { feeEstimatorRespErr error maxGasPrice uint64 expSourceGasPriceUSD *big.Int - expTokenPricesUSD map[cciptypes.Address]*big.Int expErr bool }{ { - name: "base", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 12, - }, - sourceNativeToken: tokens[0], + name: "base", + sourceNativeToken: sourceNativeToken, priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), - tokens[1]: val1e18(200), - tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) + sourceNativeToken: val1e18(100), }, priceGetterRespErr: nil, feeEstimatorRespFee: big.NewInt(10), feeEstimatorRespErr: nil, maxGasPrice: 1e18, expSourceGasPriceUSD: big.NewInt(1000), - expTokenPricesUSD: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), - tokens[1]: val1e18(200 * 1e6), - }, - expErr: false, + expErr: false, }, { - name: "price getter returned an error", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, - }, - sourceNativeToken: tokens[0], + name: "price getter returned an error", + sourceNativeToken: sourceNativeToken, priceGetterRespData: nil, priceGetterRespErr: fmt.Errorf("some random network error"), expErr: true, }, { - name: "price getter skipped a requested price", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, - }, - sourceNativeToken: tokens[0], + name: "price getter did not return source native gas price", + sourceNativeToken: sourceNativeToken, priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), + "0x1": val1e18(100), }, priceGetterRespErr: nil, expErr: true, }, { - name: "price getter skipped source native price", - tokenDecimals: map[cciptypes.Address]uint8{ - tokens[0]: 18, - tokens[1]: 18, + name: "dynamic fee cap overrides legacy", + sourceNativeToken: sourceNativeToken, + priceGetterRespData: map[cciptypes.Address]*big.Int{ + sourceNativeToken: val1e18(100), }, - sourceNativeToken: tokens[2], + priceGetterRespErr: nil, + feeEstimatorRespFee: big.NewInt(20), + feeEstimatorRespErr: nil, + maxGasPrice: 1e18, + expSourceGasPriceUSD: big.NewInt(2000), + expErr: false, + }, + { + name: "nil gas price", + sourceNativeToken: sourceNativeToken, priceGetterRespData: map[cciptypes.Address]*big.Int{ - tokens[0]: val1e18(100), - tokens[1]: val1e18(200), + sourceNativeToken: val1e18(100), }, - priceGetterRespErr: nil, - expErr: true, + feeEstimatorRespFee: nil, + maxGasPrice: 1e18, + expErr: true, }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + priceGetter := pricegetter.NewMockPriceGetter(t) + defer priceGetter.AssertExpectations(t) + + gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) + defer gasPriceEstimator.AssertExpectations(t) + + priceGetter.On("TokenPricesUSD", mock.Anything, []cciptypes.Address{tc.sourceNativeToken}).Return(tc.priceGetterRespData, tc.priceGetterRespErr) + + if tc.maxGasPrice > 0 { + gasPriceEstimator.On("GetGasPrice", mock.Anything).Return(tc.feeEstimatorRespFee, tc.feeEstimatorRespErr) + if tc.feeEstimatorRespFee != nil { + pUSD := ccipcalc.CalculateUsdPerUnitGas(tc.feeEstimatorRespFee, tc.priceGetterRespData[tc.sourceNativeToken]) + gasPriceEstimator.On("DenoteInUSD", mock.Anything, mock.Anything).Return(pUSD, nil) + } + } + + priceService := NewPriceService( + lggr, + nil, + jobId, + destChainSelector, + sourceChainSelector, + tc.sourceNativeToken, + priceGetter, + nil, + ).(*priceService) + priceService.gasPriceEstimator = gasPriceEstimator + + sourceGasPriceUSD, err := priceService.observeGasPriceUpdates(context.Background(), lggr) + if tc.expErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.True(t, tc.expSourceGasPriceUSD.Cmp(sourceGasPriceUSD) == 0) + }) + } +} + +func TestPriceService_observeTokenPriceUpdates(t *testing.T) { + lggr := logger.TestLogger(t) + jobId := int32(1) + destChainSelector := uint64(12345) + sourceChainSelector := uint64(67890) + + const nTokens = 10 + tokens := make([]cciptypes.Address, nTokens) + for i := range tokens { + tokens[i] = cciptypes.Address(utils.RandomAddress().String()) + } + sort.Slice(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }) + + testCases := []struct { + name string + tokenDecimals map[cciptypes.Address]uint8 + filterOutTokens []cciptypes.Address + priceGetterRespData map[cciptypes.Address]*big.Int + priceGetterRespErr error + expTokenPricesUSD map[cciptypes.Address]*big.Int + expErr bool + }{ { - name: "dynamic fee cap overrides legacy", + name: "base", tokenDecimals: map[cciptypes.Address]uint8{ tokens[0]: 18, - tokens[1]: 18, + tokens[1]: 12, }, - sourceNativeToken: tokens[0], + filterOutTokens: []cciptypes.Address{tokens[2]}, priceGetterRespData: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), tokens[1]: val1e18(200), tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) }, - priceGetterRespErr: nil, - feeEstimatorRespFee: big.NewInt(20), - feeEstimatorRespErr: nil, - maxGasPrice: 1e18, - expSourceGasPriceUSD: big.NewInt(2000), + priceGetterRespErr: nil, expTokenPricesUSD: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), - tokens[1]: val1e18(200), + tokens[1]: val1e18(200 * 1e6), }, expErr: false, }, { - name: "nil gas price", + name: "price getter returned an error", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, + }, + priceGetterRespData: nil, + priceGetterRespErr: fmt.Errorf("some random network error"), + expErr: true, + }, + { + name: "price getter skipped a requested price", tokenDecimals: map[cciptypes.Address]uint8{ tokens[0]: 18, tokens[1]: 18, }, - sourceNativeToken: tokens[0], priceGetterRespData: map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(100), + }, + priceGetterRespErr: nil, + expErr: true, + }, + { + name: "nil token price", + tokenDecimals: map[cciptypes.Address]uint8{ + tokens[0]: 18, + tokens[1]: 18, + }, + filterOutTokens: []cciptypes.Address{tokens[2]}, + priceGetterRespData: map[cciptypes.Address]*big.Int{ + tokens[0]: nil, tokens[1]: val1e18(200), - tokens[2]: val1e18(300), // price getter returned a price for this token even though we didn't request it (should be skipped) + tokens[2]: val1e18(300), }, - feeEstimatorRespFee: nil, - maxGasPrice: 1e18, - expErr: true, + expErr: true, }, } @@ -336,9 +437,6 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { priceGetter := pricegetter.NewMockPriceGetter(t) defer priceGetter.AssertExpectations(t) - gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) - defer gasPriceEstimator.AssertExpectations(t) - var destTokens []cciptypes.Address for tk := range tc.tokenDecimals { destTokens = append(destTokens, tk) @@ -351,22 +449,21 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { destDecimals = append(destDecimals, tc.tokenDecimals[token]) } - queryTokens := ccipcommon.FlattenUniqueSlice([]cciptypes.Address{tc.sourceNativeToken}, destTokens) + queryTokens := ccipcommon.FlattenUniqueSlice(destTokens) if len(queryTokens) > 0 { priceGetter.On("TokenPricesUSD", mock.Anything, queryTokens).Return(tc.priceGetterRespData, tc.priceGetterRespErr) + priceGetter.On("FilterConfiguredTokens", mock.Anything, mock.Anything).Return(destTokens, tc.filterOutTokens, nil) } - if tc.maxGasPrice > 0 { - gasPriceEstimator.On("GetGasPrice", mock.Anything).Return(tc.feeEstimatorRespFee, tc.feeEstimatorRespErr) - if tc.feeEstimatorRespFee != nil { - pUSD := ccipcalc.CalculateUsdPerUnitGas(tc.feeEstimatorRespFee, tc.expTokenPricesUSD[tc.sourceNativeToken]) - gasPriceEstimator.On("DenoteInUSD", mock.Anything, mock.Anything).Return(pUSD, nil) - } - } + offRampReader := ccipdatamocks.NewOffRampReader(t) + offRampReader.On("GetTokens", mock.Anything).Return(cciptypes.OffRampTokens{ + DestinationTokens: destTokens, + }, nil).Maybe() destPriceReg := ccipdatamocks.NewPriceRegistryReader(t) destPriceReg.On("GetTokensDecimals", mock.Anything, destTokens).Return(destDecimals, nil).Maybe() + destPriceReg.On("GetFeeTokens", mock.Anything).Return([]cciptypes.Address{destTokens[0]}, nil).Maybe() priceService := NewPriceService( lggr, @@ -374,20 +471,18 @@ func TestPriceService_generatePriceUpdates(t *testing.T) { jobId, destChainSelector, sourceChainSelector, - tc.sourceNativeToken, + "0x123", priceGetter, - nil, + offRampReader, ).(*priceService) - priceService.gasPriceEstimator = gasPriceEstimator priceService.destPriceRegistryReader = destPriceReg - sourceGasPriceUSD, tokenPricesUSD, err := priceService.generatePriceUpdates(context.Background(), lggr, destTokens) + tokenPricesUSD, err := priceService.observeTokenPriceUpdates(context.Background(), lggr) if tc.expErr { assert.Error(t, err) return } assert.NoError(t, err) - assert.True(t, tc.expSourceGasPriceUSD.Cmp(sourceGasPriceUSD) == 0) assert.True(t, reflect.DeepEqual(tc.expTokenPricesUSD, tokenPricesUSD)) }) } @@ -680,8 +775,10 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { gasPriceEstimator := prices.NewMockGasPriceEstimatorCommit(t) defer gasPriceEstimator.AssertExpectations(t) - priceGetter.On("TokenPricesUSD", mock.Anything, tokens).Return(map[cciptypes.Address]*big.Int{ + priceGetter.On("TokenPricesUSD", mock.Anything, tokens[:1]).Return(map[cciptypes.Address]*big.Int{ tokens[0]: val1e18(tokenPrices[0]), + }, nil) + priceGetter.On("TokenPricesUSD", mock.Anything, tokens[1:]).Return(map[cciptypes.Address]*big.Int{ tokens[1]: val1e18(tokenPrices[1]), tokens[2]: val1e18(tokenPrices[2]), }, nil) @@ -711,15 +808,18 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { offRampReader, ).(*priceService) - updateInterval := 2000 * time.Millisecond + gasUpdateInterval := 2000 * time.Millisecond + tokenUpdateInterval := 5000 * time.Millisecond cleanupInterval := 3000 * time.Millisecond - // run write task every 2 second - priceService.updateInterval = updateInterval + // run gas price task every 2 second + priceService.gasUpdateInterval = gasUpdateInterval + // run token price task every 5 second + priceService.tokenUpdateInterval = tokenUpdateInterval // run cleanup every 3 seconds priceService.cleanupInterval = cleanupInterval // expire all prices during every cleanup - priceService.priceExpireSec = 0 + priceService.priceExpireThreshold = time.Duration(0) // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0))