Skip to content

Commit

Permalink
Merge pull request #198 from smartcontractkit/CCIP-1147
Browse files Browse the repository at this point in the history
CCIP-1147 - Cache last update
  • Loading branch information
jarnaud authored Oct 12, 2023
2 parents 7ac836e + 040a801 commit 134da68
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 31 deletions.
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/ccip/commit_inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 {
return max
}

// latestInflightGasPriceUpdates returns a map of the latest gas price updates.
// latestInflightGasPriceUpdates returns a map of the latest gas price updates indexed by chain selector.
func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update {
c.locker.RLock()
defer c.locker.RUnlock()
Expand Down
17 changes: 13 additions & 4 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ type CommitReportingPlugin struct {
// Offchain
priceGetter pricegetter.PriceGetter
// State
inflightReports *inflightCommitReportsContainer
inflightReports *inflightCommitReportsContainer
// Cache
priceUpdatesCache *priceUpdatesCache
tokenPriceUpdatesCache *tokenPriceUpdatesCache
}

Expand Down Expand Up @@ -161,6 +163,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
priceUpdatesCache: newPriceUpdatesCache(),
tokenPriceUpdatesCache: newTokenPriceUpdatesCache(),
},
types.ReportingPluginInfo{
Expand Down Expand Up @@ -417,17 +420,22 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
}
}

// If there are no price updates inflight, check latest prices onchain
// If there are no price updates inflight, check the latest prices onchain.

// Query from the last cached update timestamp, if any.
queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat)
if last := r.priceUpdatesCache.get(); last.timestamp.After(queryFrom) {
queryFrom = last.timestamp
}
gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter(
ctx,
r.sourceChainSelector,
now.Add(-r.offchainConfig.GasPriceHeartBeat),
queryFrom,
0,
)
if err != nil {
return update{}, err
}

for _, priceUpdate := range gasPriceUpdates {
// Ordered by ascending timestamps
timestamp := time.Unix(priceUpdate.Data.Timestamp.Int64(), 0)
Expand All @@ -436,6 +444,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
timestamp: timestamp,
value: priceUpdate.Data.Value,
}
r.priceUpdatesCache.updateCache(gasUpdate)
}
}

Expand Down
107 changes: 81 additions & 26 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.lggr = logger.TestLogger(t)
p.tokenDecimalsCache = tokenDecimalsCache
p.F = 1
p.priceUpdatesCache = newPriceUpdatesCache()

o := CommitObservation{Interval: ccipdata.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)}
obs, err := o.Marshal()
Expand Down Expand Up @@ -300,6 +301,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.commitStoreReader = commitStoreReader
p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache()
p.F = tc.f
p.priceUpdatesCache = newPriceUpdatesCache()

aos := make([]types.AttributedObservation, 0, len(tc.observations))
for _, o := range tc.observations {
Expand Down Expand Up @@ -1342,41 +1344,75 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
chainSelector := uint64(1234)

testCases := []struct {
name string
checkInflight bool
inflightGasPriceUpdate *update
destGasPriceUpdates []update
expUpdate update
expErr bool
name string
checkInflight bool
inflightGasPriceUpdate *update
destGasPriceUpdates []update
cacheValue update
expUpdate update
expErr bool
expStartQuery time.Time
mockPriceRegistryReader *ccipdata.MockPriceRegistryReader
}{
{
name: "only inflight gas price",
checkInflight: true,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)},
expUpdate: update{timestamp: now, value: big.NewInt(1000)},
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
expUpdate: update{timestamp: now, value: big.NewInt(4000)},
expErr: false,
},
{
name: "inflight price is nil",
checkInflight: true,
inflightGasPriceUpdate: nil,
destGasPriceUpdates: []update{
{timestamp: now.Add(time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
},
{
name: "inflight updates are skipped",
name: "inflight updates skipped and cache empty",
checkInflight: false,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)},
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
destGasPriceUpdates: []update{
{timestamp: now.Add(time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expStartQuery: time.Time{},
},
{
name: "inflight updates skipped and cache not up to date",
checkInflight: false,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
destGasPriceUpdates: []update{
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
cacheValue: update{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expStartQuery: now.Add(-2 * time.Minute),
},
{
name: "inflight updates skipped and cache up to date",
checkInflight: false,
inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(4000)},
destGasPriceUpdates: []update{
{timestamp: now.Add(-3 * time.Minute), value: big.NewInt(1000)},
{timestamp: now.Add(-2 * time.Minute), value: big.NewInt(2000)},
{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
},
cacheValue: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expUpdate: update{timestamp: now.Add(-1 * time.Minute), value: big.NewInt(3000)},
expErr: false,
expStartQuery: now.Add(-1 * time.Minute),
},
}

Expand All @@ -1390,6 +1426,9 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
p.lggr = lggr
destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t)
p.destPriceRegistryReader = destPriceRegistry
p.priceUpdatesCache = newPriceUpdatesCache()
p.priceUpdatesCache.updateCache(tc.cacheValue)
p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute

if tc.inflightGasPriceUpdate != nil {
p.inflightReports.inFlightPriceUpdates = append(
Expand All @@ -1404,20 +1443,26 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
)
}

// Build mocked result of GetGasPriceUpdatesCreatedAfter.
destReader := ccipdata.NewMockPriceRegistryReader(t)
if len(tc.destGasPriceUpdates) > 0 {
var events []ccipdata.Event[ccipdata.GasPriceUpdate]
for _, u := range tc.destGasPriceUpdates {
events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{
Data: ccipdata.GasPriceUpdate{
GasPrice: ccipdata.GasPrice{Value: u.value},
Timestamp: big.NewInt(u.timestamp.Unix()),
},
})
if tc.cacheValue.timestamp.IsZero() || !u.timestamp.Before(tc.cacheValue.timestamp) {
events = append(events, ccipdata.Event[ccipdata.GasPriceUpdate]{
Data: ccipdata.GasPriceUpdate{
GasPrice: ccipdata.GasPrice{
DestChainSelector: chainSelector,
Value: u.value},
Timestamp: big.NewInt(u.timestamp.Unix()),
},
})
}
}
destReader := ccipdata.NewMockPriceRegistryReader(t)
destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0).Return(events, nil)
p.destPriceRegistryReader = destReader
destReader.On("GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, mock.Anything).Return(events, nil)
}
p.destPriceRegistryReader = destReader
tc.mockPriceRegistryReader = destReader

priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now(), tc.checkInflight)
if tc.expErr {
Expand All @@ -1428,6 +1473,16 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, tc.expUpdate.timestamp.Truncate(time.Second), priceUpdate.timestamp.Truncate(time.Second))
assert.Equal(t, tc.expUpdate.value.Uint64(), priceUpdate.value.Uint64())

// Verify proper cache usage: if the cache is used, we shouldn't query the full range.
reader := tc.mockPriceRegistryReader
if tc.checkInflight && tc.inflightGasPriceUpdate != nil {
reader.AssertNotCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0)
} else if tc.expStartQuery.IsZero() {
reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, mock.Anything, 0)
} else {
reader.AssertCalled(t, "GetGasPriceUpdatesCreatedAfter", ctx, chainSelector, tc.expStartQuery, 0)
}
})
}
}
Expand Down
21 changes: 21 additions & 0 deletions core/services/ocr2/plugins/ccip/price_updates_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ccip

type priceUpdatesCache struct {
lastUpdate update
}

func newPriceUpdatesCache() *priceUpdatesCache {
return &priceUpdatesCache{
lastUpdate: update{},
}
}

func (c *priceUpdatesCache) get() update {
return c.lastUpdate
}

func (c *priceUpdatesCache) updateCache(update update) {
if update.timestamp.After(c.lastUpdate.timestamp) {
c.lastUpdate = update
}
}

0 comments on commit 134da68

Please sign in to comment.