Skip to content

Commit

Permalink
use single cache instance
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkouv committed Oct 13, 2023
1 parent 617640a commit 7ff9ba8
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 60 deletions.
50 changes: 34 additions & 16 deletions core/services/ocr2/plugins/ccip/commit_price_updates_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,57 +8,75 @@ import (
"github.com/ethereum/go-ethereum/common"
)

type tokenPriceUpdatesCache struct {
mem map[common.Address]update
mu *sync.RWMutex
type priceUpdatesCache struct {
tokenPriceUpdates map[common.Address]update
gasPriceUpdate update
mu *sync.RWMutex
}

func newTokenPriceUpdatesCache() *tokenPriceUpdatesCache {
c := &tokenPriceUpdatesCache{
mem: make(map[common.Address]update),
mu: &sync.RWMutex{},
func newPriceUpdatesCache() *priceUpdatesCache {
c := &priceUpdatesCache{
tokenPriceUpdates: make(map[common.Address]update),
mu: &sync.RWMutex{},
}
return c
}

func (c *tokenPriceUpdatesCache) mostRecentTs() time.Time {
func (c *priceUpdatesCache) mostRecentTokenPriceUpdate() time.Time {
c.mu.RLock()
defer c.mu.RUnlock()

ts := time.Time{}
for _, upd := range c.mem {
for _, upd := range c.tokenPriceUpdates {
if upd.timestamp.After(ts) {
ts = upd.timestamp
}
}
return ts
}

func (c *tokenPriceUpdatesCache) updateIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool {
func (c *priceUpdatesCache) updateTokenPriceIfMoreRecent(ts time.Time, tk common.Address, val *big.Int) bool {
c.mu.RLock()
v, exists := c.mem[tk]
v, exists := c.tokenPriceUpdates[tk]
c.mu.RUnlock()

if !exists || v.timestamp.Before(ts) {
c.mu.Lock()
c.mem[tk] = update{timestamp: ts, value: val}
c.tokenPriceUpdates[tk] = update{timestamp: ts, value: val}
c.mu.Unlock()
return true
}

return false
}

// get returns all the price updates with timestamp greater than or equal to the provided
func (c *tokenPriceUpdatesCache) get(minTs time.Time) map[common.Address]update {
// getTokenPriceUpdates returns all the price updates with timestamp greater than or equal to the provided
func (c *priceUpdatesCache) getTokenPriceUpdates(minTs time.Time) map[common.Address]update {
c.mu.RLock()
defer c.mu.RUnlock()
cp := make(map[common.Address]update, len(c.mem))
for k, v := range c.mem {
cp := make(map[common.Address]update, len(c.tokenPriceUpdates))
for k, v := range c.tokenPriceUpdates {
if v.timestamp.Before(minTs) {
continue
}
cp[k] = v
}
return cp
}

func (c *priceUpdatesCache) getGasPriceUpdate() update {
c.mu.RLock()
defer c.mu.RUnlock()
return c.gasPriceUpdate
}

func (c *priceUpdatesCache) updateGasPriceIfMoreRecent(update update) bool {
c.mu.Lock()
defer c.mu.Unlock()
if update.timestamp.After(c.gasPriceUpdate.timestamp) {
c.gasPriceUpdate = update
return true
}

return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ func Test_tokenPriceUpdatesCache(t *testing.T) {
tk := common.HexToAddress("1")
ts := time.Now().Truncate(time.Second)

c := newTokenPriceUpdatesCache()
assert.Equal(t, time.Time{}, c.mostRecentTs())
c := newPriceUpdatesCache()
assert.Equal(t, time.Time{}, c.mostRecentTokenPriceUpdate())

c.updateIfMoreRecent(ts, tk, big.NewInt(100))
assert.Equal(t, ts, c.mostRecentTs())
v := c.get(time.Time{})
c.updateTokenPriceIfMoreRecent(ts, tk, big.NewInt(100))
assert.Equal(t, ts, c.mostRecentTokenPriceUpdate())
v := c.getTokenPriceUpdates(time.Time{})
assert.Equal(t, big.NewInt(100), v[tk].value)

// should not get updated if ts is older
c.updateIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101))
v = c.get(time.Time{})
c.updateTokenPriceIfMoreRecent(ts.Add(-1*time.Minute), tk, big.NewInt(101))
v = c.getTokenPriceUpdates(time.Time{})
assert.Equal(t, big.NewInt(100), v[tk].value)

// should not get anything when the provided timestamp is recent
v = c.get(time.Now())
v = c.getTokenPriceUpdates(time.Now())
assert.Len(t, v, 0)
}
18 changes: 8 additions & 10 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ type CommitReportingPlugin struct {
// State
inflightReports *inflightCommitReportsContainer
// Cache
priceUpdatesCache *priceUpdatesCache
tokenPriceUpdatesCache *tokenPriceUpdatesCache
priceUpdatesCache *priceUpdatesCache
}

type CommitReportingPluginFactory struct {
Expand Down Expand Up @@ -162,9 +161,8 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
rf.config.destClient,
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
priceUpdatesCache: newPriceUpdatesCache(),
tokenPriceUpdatesCache: newTokenPriceUpdatesCache(),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
priceUpdatesCache: newPriceUpdatesCache(),
},
types.ReportingPluginInfo{
Name: "CCIPCommit",
Expand Down Expand Up @@ -366,7 +364,7 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int {
// The updates returned by this function are guaranteed to not contain nil values.
func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[common.Address]update, error) {
ts := now.Add(-r.offchainConfig.TokenPriceHeartBeat)
if mostRecentCachedTs := r.tokenPriceUpdatesCache.mostRecentTs(); mostRecentCachedTs.After(ts) {
if mostRecentCachedTs := r.priceUpdatesCache.mostRecentTokenPriceUpdate(); mostRecentCachedTs.After(ts) {
ts = mostRecentCachedTs
}

Expand All @@ -380,14 +378,14 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context,
continue
}

r.tokenPriceUpdatesCache.updateIfMoreRecent(
r.priceUpdatesCache.updateTokenPriceIfMoreRecent(
time.Unix(upd.Data.Timestamp.Int64(), 0),
upd.Data.Token,
upd.Data.Value,
)
}

latestUpdates := r.tokenPriceUpdatesCache.get(now.Add(-r.offchainConfig.TokenPriceHeartBeat))
latestUpdates := r.priceUpdatesCache.getTokenPriceUpdates(now.Add(-r.offchainConfig.TokenPriceHeartBeat))
if !checkInflight {
return latestUpdates, nil
}
Expand Down Expand Up @@ -424,7 +422,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now

// Query from the last cached update timestamp, if any.
queryFrom := now.Add(-r.offchainConfig.GasPriceHeartBeat)
if last := r.priceUpdatesCache.get(); last.timestamp.After(queryFrom) {
if last := r.priceUpdatesCache.getGasPriceUpdate(); last.timestamp.After(queryFrom) {
queryFrom = last.timestamp
}
gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter(
Expand All @@ -444,7 +442,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
timestamp: timestamp,
value: priceUpdate.Data.Value,
}
r.priceUpdatesCache.updateCache(gasUpdate)
r.priceUpdatesCache.updateGasPriceIfMoreRecent(gasUpdate)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,8 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.gasPriceEstimator = gasPriceEstimator
p.offchainConfig.GasPriceHeartBeat = gasPriceHeartBeat.Duration()
p.commitStoreReader = commitStoreReader
p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache()
p.F = tc.f
p.priceUpdatesCache = newPriceUpdatesCache()
p.F = tc.f

aos := make([]types.AttributedObservation, 0, len(tc.observations))
for _, o := range tc.observations {
Expand Down Expand Up @@ -1427,7 +1426,7 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) {
destPriceRegistry := ccipdata.NewMockPriceRegistryReader(t)
p.destPriceRegistryReader = destPriceRegistry
p.priceUpdatesCache = newPriceUpdatesCache()
p.priceUpdatesCache.updateCache(tc.cacheValue)
p.priceUpdatesCache.updateGasPriceIfMoreRecent(tc.cacheValue)
p.offchainConfig.GasPriceHeartBeat = 5 * time.Minute

if tc.inflightGasPriceUpdate != nil {
Expand Down Expand Up @@ -1564,7 +1563,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) {
//_, priceRegAddr := testhelpers.NewFakePriceRegistry(t)
priceReg := ccipdata.NewMockPriceRegistryReader(t)
p.destPriceRegistryReader = priceReg
p.tokenPriceUpdatesCache = newTokenPriceUpdatesCache()
p.priceUpdatesCache = newPriceUpdatesCache()

//destReader := ccipdata.NewMockReader(t)
var events []ccipdata.Event[ccipdata.TokenPriceUpdate]
Expand Down Expand Up @@ -1608,7 +1607,7 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates_cache(t *testing.T) {
ctx := testutils.Context(t)
priceReg := ccipdata.NewMockPriceRegistryReader(t)
p := &CommitReportingPlugin{
tokenPriceUpdatesCache: newTokenPriceUpdatesCache(),
priceUpdatesCache: newPriceUpdatesCache(),
destPriceRegistryReader: priceReg,
offchainConfig: ccipdata.CommitOffchainConfig{
TokenPriceHeartBeat: 12 * time.Hour,
Expand Down
21 changes: 0 additions & 21 deletions core/services/ocr2/plugins/ccip/price_updates_cache.go

This file was deleted.

0 comments on commit 7ff9ba8

Please sign in to comment.