From ce09bc0b408b5f60f50107ddbc635e85257714f8 Mon Sep 17 00:00:00 2001 From: Jean Arnaud <jean.arnaud@smartcontract.com> Date: Wed, 11 Oct 2023 16:47:49 +0200 Subject: [PATCH 1/7] CCIP-1147 - Cache last update --- .../ocr2/plugins/ccip/commit_inflight.go | 2 +- .../plugins/ccip/commit_reporting_plugin.go | 16 ++++++++-- .../ccip/commit_reporting_plugin_test.go | 1 + .../ocr2/plugins/ccip/price_updates_cache.go | 31 +++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/price_updates_cache.go diff --git a/core/services/ocr2/plugins/ccip/commit_inflight.go b/core/services/ocr2/plugins/ccip/commit_inflight.go index 726bbf6020..670bfed6ca 100644 --- a/core/services/ocr2/plugins/ccip/commit_inflight.go +++ b/core/services/ocr2/plugins/ccip/commit_inflight.go @@ -68,7 +68,7 @@ func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 { return max } -// latestInflightGasPriceUpdates returns a map of the latest gas price updates. +// latestInflightGasPriceUpdates returns a map of the latest gas price updates indexed by chain selector. func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update { c.locker.RLock() defer c.locker.RUnlock() diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index bb24464574..37d51d736f 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -81,6 +81,8 @@ type CommitReportingPlugin struct { priceGetter pricegetter.PriceGetter // State inflightReports *inflightCommitReportsContainer + // Cache + priceUpdatesCache *priceUpdatesCache } type CommitReportingPluginFactory struct { @@ -160,6 +162,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + priceUpdatesCache: newPriceUpdatesCache(), }, types.ReportingPluginInfo{ Name: "CCIPCommit", @@ -413,17 +416,22 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now } } - // If there are no price updates inflight, check latest prices onchain + // If there are no price updates inflight, check the latest prices onchain. + + // Query from the last cached update timestamp, if any. + queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat) + if last := r.priceUpdatesCache.get(); last.timestamp.After(queryFrom) { + queryFrom = last.timestamp + } gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter( ctx, r.sourceChainSelector, - now.Add(-r.offchainConfig.GasPriceHeartBeat), + queryFrom, 0, ) if err != nil { return update{}, err } - for _, priceUpdate := range gasPriceUpdates { // Ordered by ascending timestamps timestamp := time.Unix(priceUpdate.Data.Timestamp.Int64(), 0) @@ -432,6 +440,8 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now timestamp: timestamp, value: priceUpdate.Data.Value, } + // Update the cache. + r.priceUpdatesCache.updateCache(gasUpdate) } } diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index d47d2bd123..6fec381ba5 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -1389,6 +1389,7 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { p.lggr = lggr destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry + p.priceUpdatesCache = newPriceUpdatesCache() if tc.inflightGasPriceUpdate != nil { p.inflightReports.inFlightPriceUpdates = append( diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go new file mode 100644 index 0000000000..fa3e01fceb --- /dev/null +++ b/core/services/ocr2/plugins/ccip/price_updates_cache.go @@ -0,0 +1,31 @@ +package ccip + +import ( + "time" +) + +type priceUpdatesCache struct { + lastUpdate update +} + +func newPriceUpdatesCache() *priceUpdatesCache { + return &priceUpdatesCache{ + lastUpdate: update{}, + } +} + +func (c *priceUpdatesCache) containsData() bool { + return c.lastUpdate.timestamp != time.Time{} +} + +func (c *priceUpdatesCache) lastCheckpoint() time.Time { + return c.lastUpdate.timestamp +} + +func (c *priceUpdatesCache) get() update { + return c.lastUpdate +} + +func (c *priceUpdatesCache) updateCache(update update) { + c.lastUpdate = update +} From c7c6b8ab1bbfd28a92d182a5698f9adb695953a1 Mon Sep 17 00:00:00 2001 From: Jean Arnaud <jean.arnaud@smartcontract.com> Date: Thu, 12 Oct 2023 09:52:25 +0200 Subject: [PATCH 2/7] CCIP-1147 - Update cache only with most recent data --- core/services/ocr2/plugins/ccip/price_updates_cache.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go index fa3e01fceb..c6639ffd76 100644 --- a/core/services/ocr2/plugins/ccip/price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/price_updates_cache.go @@ -27,5 +27,7 @@ func (c *priceUpdatesCache) get() update { } func (c *priceUpdatesCache) updateCache(update update) { - c.lastUpdate = update + if update.timestamp.After(c.lastUpdate.timestamp) { + c.lastUpdate = update + } } From ea8c8933beb573b20f9627f76b074379e4b5a323 Mon Sep 17 00:00:00 2001 From: Jean Arnaud <jean.arnaud@smartcontract.com> Date: Thu, 12 Oct 2023 10:31:04 +0200 Subject: [PATCH 3/7] CCIP-1147 - Remove unused methods --- .../ocr2/plugins/ccip/price_updates_cache.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/price_updates_cache.go b/core/services/ocr2/plugins/ccip/price_updates_cache.go index c6639ffd76..087c7c4ed2 100644 --- a/core/services/ocr2/plugins/ccip/price_updates_cache.go +++ b/core/services/ocr2/plugins/ccip/price_updates_cache.go @@ -1,9 +1,5 @@ package ccip -import ( - "time" -) - type priceUpdatesCache struct { lastUpdate update } @@ -14,14 +10,6 @@ func newPriceUpdatesCache() *priceUpdatesCache { } } -func (c *priceUpdatesCache) containsData() bool { - return c.lastUpdate.timestamp != time.Time{} -} - -func (c *priceUpdatesCache) lastCheckpoint() time.Time { - return c.lastUpdate.timestamp -} - func (c *priceUpdatesCache) get() update { return c.lastUpdate } From 0d701bfc5caaba3498cce400590c6a25b4a21c19 Mon Sep 17 00:00:00 2001 From: Jean Arnaud <jean.arnaud@smartcontract.com> Date: Thu, 12 Oct 2023 11:05:43 +0200 Subject: [PATCH 4/7] CCIP-1147 - Fix test setup --- core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index 6fec381ba5..239548eda1 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -170,6 +170,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.lggr = logger.TestLogger(t) p.tokenDecimalsCache = tokenDecimalsCache p.F = 1 + p.priceUpdatesCache = newPriceUpdatesCache() o := CommitObservation{Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)} obs, err := o.Marshal() @@ -299,6 +300,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration() p.commitStoreReader = commitStoreReader p.F = tc.f + p.priceUpdatesCache = newPriceUpdatesCache() aos := make([]types.AttributedObservation, 0, len(tc.observations)) for _, o := range tc.observations { From f5cb16d40e9a72913d55c23c52fef86d0442216c Mon Sep 17 00:00:00 2001 From: Jean Arnaud <jean.arnaud@smartcontract.com> Date: Thu, 12 Oct 2023 15:16:38 +0200 Subject: [PATCH 5/7] CCIP-1147 - Test scenario for cache usage --- .../ccip/commit_reporting_plugin_test.go | 104 +++++++++++++----- 1 file changed, 78 insertions(+), 26 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go index 239548eda1..89064ba407 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go @@ -1343,18 +1343,21 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { chainSelector := uint64(1234) testCases := []struct { - name string - checkInflight bool - inflightGasPriceUpdate *update - destGasPriceUpdates []update - expUpdate update - expErr bool + name string + checkInflight bool + inflightGasPriceUpdate *update + destGasPriceUpdates []update + cacheValue update + expUpdate update + expErr bool + expStartQuery time.Time + mockPriceRegistryReader *ccipdata.MockPriceRegistryReader }{ { name: "only inflight gas price", checkInflight: true, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, - expUpdate: update{timestamp: now, value: big.NewInt(1000)}, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + expUpdate: update{timestamp: now, value: big.NewInt(4000)}, expErr: false, }, { @@ -1362,22 +1365,53 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { checkInflight: true, inflightGasPriceUpdate: nil, destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, expErr: false, }, { - name: "inflight updates are skipped", + name: "inflight updates skipped and cache empty", checkInflight: false, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, - expErr: false, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: time.Time{}, + }, + { + name: "inflight updates skipped and cache not up to date", + checkInflight: false, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + destGasPriceUpdates: []update{ + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + }, + cacheValue: update{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: now.Add(-2 * time.Minute), + }, + { + name: "inflight updates skipped and cache up to date", + checkInflight: false, + inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)}, + destGasPriceUpdates: []update{ + {timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)}, + {timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)}, + {timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + }, + cacheValue: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)}, + expErr: false, + expStartQuery: now.Add(-1 * time.Minute), }, } @@ -1392,6 +1426,8 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry p.priceUpdatesCache = newPriceUpdatesCache() + p.priceUpdatesCache.updateCache(tc.cacheValue) + p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute if tc.inflightGasPriceUpdate != nil { p.inflightReports.inFlightPriceUpdates = append( @@ -1406,20 +1442,26 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { ) } + // Build mocked result of GetGasPriceUpdatesCreatedAfter. + destReader := ccipdata.NewMockPriceRegistryReader(t) if len(tc.destGasPriceUpdates) > 0 { var events []ccipdata.Event[ccipdata.GasPriceUpdate] for _, u := range tc.destGasPriceUpdates { - events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{ - Data: ccipdata.GasPriceUpdate{ - GasPrice: ccipdata.GasPrice{Value: u.value}, - Timestamp: big.NewInt(u.timestamp.Unix()), - }, - }) + if tc.cacheValue.timestamp.IsZero() || !u.timestamp.Before(tc.cacheValue.timestamp) { + events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{ + Data: ccipdata.GasPriceUpdate{ + GasPrice: ccipdata.GasPrice{ + DestChainSelector: chainSelector, + Value: u.value}, + Timestamp: big.NewInt(u.timestamp.Unix()), + }, + }) + } } - destReader := ccipdata.NewMockPriceRegistryReader(t) - destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0).Return(events, nil) - p.destPriceRegistryReader = destReader + destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, mock.Anything).Return(events, nil) } + p.destPriceRegistryReader = destReader + tc.mockPriceRegistryReader = destReader priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now(), tc.checkInflight) if tc.expErr { @@ -1430,6 +1472,16 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tc.expUpdate.timestamp.Truncate(time.Second), priceUpdate.timestamp.Truncate(time.Second)) assert.Equal(t, tc.expUpdate.value.Uint64(), priceUpdate.value.Uint64()) + + // Verify proper cache usage: if the cache is used, we shouldn't query the full range. + reader := tc.mockPriceRegistryReader + if tc.checkInflight && tc.inflightGasPriceUpdate != nil { + reader.AssertNotCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0) + } else if tc.expStartQuery.IsZero() { + reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0) + } else { + reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, tc.expStartQuery, 0) + } }) } } From 48eea0c845f488bce116df4339c59b1d44e6ebbb Mon Sep 17 00:00:00 2001 From: Jean Arnaud <jean.arnaud@smartcontract.com> Date: Thu, 12 Oct 2023 16:32:49 +0200 Subject: [PATCH 6/7] CCIP-1147 - Remove useless comment --- core/services/ocr2/plugins/ccip/commit_reporting_plugin.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 37d51d736f..ed5bd4ea39 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -440,7 +440,6 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now timestamp: timestamp, value: priceUpdate.Data.Value, } - // Update the cache. r.priceUpdatesCache.updateCache(gasUpdate) } } From 040a801f64c27abab83827b8faae38fc76883139 Mon Sep 17 00:00:00 2001 From: dimkouv <dimitrios.kouveris@smartcontract.com> Date: Thu, 12 Oct 2023 18:23:53 +0300 Subject: [PATCH 7/7] run gofmt --- core/services/ocr2/plugins/ccip/commit_reporting_plugin.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 702b73f6d1..ea8ab2e429 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -82,7 +82,7 @@ type CommitReportingPlugin struct { // State inflightReports *inflightCommitReportsContainer // Cache - priceUpdatesCache *priceUpdatesCache + priceUpdatesCache *priceUpdatesCache tokenPriceUpdatesCache *tokenPriceUpdatesCache } @@ -162,8 +162,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin rf.config.destClient, int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), ), - gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), - priceUpdatesCache: newPriceUpdatesCache(), + gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + priceUpdatesCache: newPriceUpdatesCache(), tokenPriceUpdatesCache: newTokenPriceUpdatesCache(), }, types.ReportingPluginInfo{