Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1147 - Cache last update #198

Merged
merged 9 commits into from
Oct 12, 2023
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/commit_inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 {
return max
}

// latestInflightGasPriceUpdates returns a map of the latest gas price updates.
// latestInflightGasPriceUpdates returns a map of the latest gas price updates indexed by chain selector.
func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update {
c.locker.RLock()
defer c.locker.RUnlock()
Expand Down
17 changes: 13 additions & 4 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ type CommitReportingPlugin struct {
// Offchain
priceGetter pricegetter.PriceGetter
// State
inflightReports *inflightCommitReportsContainer
inflightReports *inflightCommitReportsContainer
// Cache
priceUpdatesCache *priceUpdatesCache
tokenPriceUpdatesCache *tokenPriceUpdatesCache
}

Expand Down Expand Up @@ -161,6 +163,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
priceUpdatesCache: newPriceUpdatesCache(),
tokenPriceUpdatesCache: newTokenPriceUpdatesCache(),
},
types.ReportingPluginInfo{
Expand Down Expand Up @@ -417,17 +420,22 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
}
}

// If there are no price updates inflight, check latest prices onchain
// If there are no price updates inflight, check the latest prices onchain.

// Query from the last cached update timestamp, if any.
queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat)
if last := r.priceUpdatesCache.get(); last.timestamp.After(queryFrom) {
queryFrom = last.timestamp
}
gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter(
ctx,
r.sourceChainSelector,
now.Add(-r.offchainConfig.GasPriceHeartBeat),
queryFrom,
0,
)
if err != nil {
return update{}, err
}

for _, priceUpdate := range gasPriceUpdates {
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.Data.Timestamp.Int64(), 0)
Expand All @@ -436,6 +444,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
timestamp: timestamp,
value: priceUpdate.Data.Value,
}
r.priceUpdatesCache.updateCache(gasUpdate)
}
}

Expand Down
107 changes: 81 additions & 26 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.lggr = logger.TestLogger(t)
p.tokenDecimalsCache = tokenDecimalsCache
p.F = 1
p.priceUpdatesCache = newPriceUpdatesCache()

o := CommitObservation{Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)}
obs, err := o.Marshal()
Expand Down Expand Up @@ -300,6 +301,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.commitStoreReader = commitStoreReader
p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache()
p.F = tc.f
p.priceUpdatesCache = newPriceUpdatesCache()

aos := make([]types.AttributedObservation, 0, len(tc.observations))
for _, o := range tc.observations {
Expand Down Expand Up @@ -1342,41 +1344,75 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
chainSelector := uint64(1234)

testCases := []struct {
name string
checkInflight bool
inflightGasPriceUpdate *update
destGasPriceUpdates []update
expUpdate update
expErr bool
name string
checkInflight bool
inflightGasPriceUpdate *update
destGasPriceUpdates []update
cacheValue update
expUpdate update
expErr bool
expStartQuery time.Time
mockPriceRegistryReader *ccipdata.MockPriceRegistryReader
}{
{
name: "only inflight gas price",
checkInflight: true,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)},
expUpdate: update{timestamp: now, value: big.NewInt(1000)},
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
expUpdate: update{timestamp: now, value: big.NewInt(4000)},
expErr: false,
},
{
name: "inflight price is nil",
checkInflight: true,
inflightGasPriceUpdate: nil,
destGasPriceUpdates: []update{
{timestamp: now.Add(time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
},
{
name: "inflight updates are skipped",
name: "inflight updates skipped and cache empty",
checkInflight: false,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)},
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
destGasPriceUpdates: []update{
{timestamp: now.Add(time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expStartQuery: time.Time{},
},
{
name: "inflight updates skipped and cache not up to date",
checkInflight: false,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
destGasPriceUpdates: []update{
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
cacheValue: update{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expStartQuery: now.Add(-2 * time.Minute),
},
{
name: "inflight updates skipped and cache up to date",
checkInflight: false,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
destGasPriceUpdates: []update{
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
cacheValue: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expStartQuery: now.Add(-1 * time.Minute),
},
}

Expand All @@ -1390,6 +1426,9 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
p.lggr = lggr
destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t)
p.destPriceRegistryReader = destPriceRegistry
p.priceUpdatesCache = newPriceUpdatesCache()
p.priceUpdatesCache.updateCache(tc.cacheValue)
p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute

if tc.inflightGasPriceUpdate != nil {
p.inflightReports.inFlightPriceUpdates = append(
Expand All @@ -1404,20 +1443,26 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
)
}

// Build mocked result of GetGasPriceUpdatesCreatedAfter.
destReader := ccipdata.NewMockPriceRegistryReader(t)
if len(tc.destGasPriceUpdates) > 0 {
var events []ccipdata.Event[ccipdata.GasPriceUpdate]
for _, u := range tc.destGasPriceUpdates {
events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{
Data: ccipdata.GasPriceUpdate{
GasPrice: ccipdata.GasPrice{Value: u.value},
Timestamp: big.NewInt(u.timestamp.Unix()),
},
})
if tc.cacheValue.timestamp.IsZero() || !u.timestamp.Before(tc.cacheValue.timestamp) {
events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{
Data: ccipdata.GasPriceUpdate{
GasPrice: ccipdata.GasPrice{
DestChainSelector: chainSelector,
Value: u.value},
Timestamp: big.NewInt(u.timestamp.Unix()),
},
})
}
}
destReader := ccipdata.NewMockPriceRegistryReader(t)
destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0).Return(events, nil)
p.destPriceRegistryReader = destReader
destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, mock.Anything).Return(events, nil)
}
p.destPriceRegistryReader = destReader
tc.mockPriceRegistryReader = destReader

priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now(), tc.checkInflight)
if tc.expErr {
Expand All @@ -1428,6 +1473,16 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, tc.expUpdate.timestamp.Truncate(time.Second), priceUpdate.timestamp.Truncate(time.Second))
assert.Equal(t, tc.expUpdate.value.Uint64(), priceUpdate.value.Uint64())

// Verify proper cache usage: if the cache is used, we shouldn't query the full range.
reader := tc.mockPriceRegistryReader
if tc.checkInflight && tc.inflightGasPriceUpdate != nil {
reader.AssertNotCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0)
} else if tc.expStartQuery.IsZero() {
reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0)
} else {
reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, tc.expStartQuery, 0)
}
})
}
}
Expand Down
21 changes: 21 additions & 0 deletions core/services/ocr2/plugins/ccip/price_updates_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ccip

type priceUpdatesCache struct {
lastUpdate update
}

func newPriceUpdatesCache() *priceUpdatesCache {
return &priceUpdatesCache{
lastUpdate: update{},
}
}

func (c *priceUpdatesCache) get() update {
return c.lastUpdate
}

func (c *priceUpdatesCache) updateCache(update update) {
if update.timestamp.After(c.lastUpdate.timestamp) {
c.lastUpdate = update
}
}