Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Jul 11, 2024
1 parent 5e7f182 commit efcfe7c
Show file tree
Hide file tree
Showing 5 changed files with 442 additions and 25 deletions.
14 changes: 10 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
mockOnRampPriceRegistryProvider.On("NewPriceRegistryReader", ctx, sourcePriceRegistryAddress).Return(sourcePriceRegReader, nil).Maybe()
p.sourcePriceRegistryProvider = mockOnRampPriceRegistryProvider

p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), time.Minute, time.Minute)
p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Minute, time.Minute)
p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader)

_, err = p.Observation(ctx, types.ReportTimestamp{}, types.Query{})
Expand Down Expand Up @@ -943,19 +943,25 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) {
Return(tc.onchainEvents, nil).Maybe()
p.onRampReader = sourceReader

finalized := make(map[uint64]bool)
finalized := make(map[uint64]cciptypes.FinalizedStatus)
for _, r := range tc.expReports {
for _, s := range r.sendRequestsWithMeta {
finalized[s.SequenceNumber] = s.Finalized
finalized[s.SequenceNumber] = cciptypes.FinalizedStatusNotFinalized
if s.Finalized {
finalized[s.SequenceNumber] = cciptypes.FinalizedStatusFinalized
}
}
}

var executedEvents []cciptypes.ExecutionStateChangedWithTxMeta
for _, executedSeqNum := range tc.destExecutedSeqNums {

executedEvents = append(executedEvents, cciptypes.ExecutionStateChangedWithTxMeta{
ExecutionStateChanged: cciptypes.ExecutionStateChanged{
SequenceNumber: executedSeqNum,
Finalized: finalized[executedSeqNum],
},
TxMeta: cciptypes.TxMeta{
Finalized: finalized[executedSeqNum],
},
})
}
Expand Down
91 changes: 73 additions & 18 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cache

import (
"context"
"encoding/hex"
"slices"
"sync"
"time"
Expand All @@ -15,6 +14,8 @@ import (
)

const (
// EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache
EvictionGracePeriod = 1 * time.Hour
// CleanupInterval defines how often roots cache is scanned to evict stale roots
CleanupInterval = 30 * time.Minute
)
Expand All @@ -37,6 +38,7 @@ func NewCommitRootsCache(
messageVisibilityInterval,
rootSnoozeTime,
CleanupInterval,
EvictionGracePeriod,
)
}

Expand All @@ -46,13 +48,17 @@ func newCommitRootsCache(
messageVisibilityInterval time.Duration,
rootSnoozeTime time.Duration,
cleanupInterval time.Duration,
evictionGracePeriod time.Duration,
) *commitRootsCache {
snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval)
executedRoots := cache.New(messageVisibilityInterval+evictionGracePeriod, cleanupInterval)

return &commitRootsCache{
lggr: lggr,
reader: reader,
rootSnoozeTime: rootSnoozeTime,
finalizedRoots: orderedmap.New[string, ccip.CommitStoreReportWithTxMeta](),
executedRoots: executedRoots,
snoozedRoots: snoozedRoots,
messageVisibilityInterval: messageVisibilityInterval,
latestFinalizedCommitRootTs: time.Now().Add(-messageVisibilityInterval),
Expand All @@ -64,39 +70,65 @@ type commitRootsCache struct {
lggr logger.Logger
reader ccip.CommitStoreReader
messageVisibilityInterval time.Duration

// Mutable state
cacheMu sync.RWMutex
finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta]
snoozedRoots *cache.Cache
rootSnoozeTime time.Duration

// Mutable state. finalizedRoots is thread-safe by default, but updating latestFinalizedCommitRootTs and finalizedRoots requires locking.
cacheMu sync.RWMutex
// finalizedRoots is a map of merkleRoot -> CommitStoreReportWithTxMeta. It stores all the CommitReports that are
// marked as finalized by LogPoller, but not executed yet. Keeping only finalized reports doesn't require any state sync between LP and the cache.
// In order to keep this map size under control, we evict stale items every time we fetch new logs from the database.
// Also, ccip.CommitStoreReportWithTxMeta is a very tiny entity with almost fixed size, so it's not a big deal to keep it in memory.
// In case of high memory footprint caused by storing roots, we can make these even more lightweight by removing token/gas price updates.
// Whenever the root is executed (all messages executed and ExecutionStateChange events are finalized), we remove the root from the map.
finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta]
// snoozedRoots used only for temporary snoozing roots. It's a cache with TTL (usually around 5 minutes, but this configuration is set up on chain using rootSnoozeTime)
snoozedRoots *cache.Cache
executedRoots *cache.Cache
// latestFinalizedCommitRootTs is the timestamp of the latest finalized commit root (youngest in terms of timestamp).
// It's used get only the logs that were considered as unfinalized in a previous run.
// This way we limit database scans to the minimum and keep polling "unfinalized" part of the ReportAccepted events queue.
latestFinalizedCommitRootTs time.Time
}

func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) {
// 1. Fetch all the logs from the database after the latest finalized commit root timestamp
// 1. Fetch all the logs from the database after the latest finalized commit root timestamp.
// If this is a first run, it will fetch all the logs based on the messageVisibilityInterval.
// Worst case scenario, it will fetch around 480 reports (OCR Commit 60 seconds (fast chains default) * messageVisibilityInterval set to 8 hours (mainnet default))
// Even with the larger messageVisibilityInterval window (e.g. 24 hours) it should be acceptable (around 1500 logs).
// Keep in mind that this potentially heavy operation happens only once during the plugin boot and it's no different from the previous implementation.
logs, err := r.fetchLogsFromCommitStore(ctx)
if err != nil {
return nil, err
}

// 2. Iterate over the logs and check if the root is finalized or not. Return finalized and unfinalized reports
// It promotes finalized roots to the finalizedRoots map and evicts stale roots.
finalizedReports, unfinalizedReports := r.updateFinalizedRoots(logs)

// 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots.
// Return only the reports that are not snoozed.
return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil

}

// MarkAsExecuted marks the root as executed. It means that all the messages from the root were executed and the ExecutionStateChange event was finalized.
// Executed roots are removed from the cache.
func (r *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) {
prettyMerkleRoot := merkleRootToString(merkleRoot)
r.lggr.Infow("Marking root as executed and removing entirely from cache", "merkleRoot", prettyMerkleRoot)

r.cacheMu.Lock()
defer r.cacheMu.Unlock()

prettyMerkleRoot := merkleRootToString(merkleRoot)
r.finalizedRoots.Delete(prettyMerkleRoot)
r.executedRoots.SetDefault(prettyMerkleRoot, struct{}{})
}

// Snooze temporarily snoozes the root. It means that the root is not eligible for execution for a certain period of time.
// Snoozed roots are skipped when calling RootsEligibleForExecution
func (r *commitRootsCache) Snooze(merkleRoot [32]byte) {
r.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{})
prettyMerkleRoot := merkleRootToString(merkleRoot)
r.lggr.Infow("Snoozing root temporarily", "merkleRoot", prettyMerkleRoot, "rootSnoozeTime", r.rootSnoozeTime)
r.snoozedRoots.SetDefault(prettyMerkleRoot, struct{}{})
}

func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool {
Expand All @@ -114,6 +146,7 @@ func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip
r.cacheMu.Unlock()

// IO operation, release lock before!
r.lggr.Infow("Fetching Commit Reports with timestamp greater than or equal to", "blockTimestamp", commitRootsFilterTimestamp)
return r.reader.GetAcceptedCommitReportsGteTimestamp(ctx, commitRootsFilterTimestamp, 0)
}

Expand All @@ -124,21 +157,31 @@ func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWit
// Assuming logs are properly ordered by block_timestamp, log_index
var unfinalizedReports []ccip.CommitStoreReportWithTxMeta
for _, log := range logs {
if _, finalized := r.finalizedRoots.Get(merkleRootToString(log.MerkleRoot)); finalized {
r.finalizedRoots.Store(merkleRootToString(log.MerkleRoot), log)
prettyMerkleRoot := merkleRootToString(log.MerkleRoot)
// Defensive check, if something is marked as executed, never allow it to come back to the cache
if _, executed := r.executedRoots.Get(prettyMerkleRoot); executed {
r.lggr.Debugw("Ignoring root marked as executed", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli)
continue
}

if log.IsFinalized() {
r.lggr.Debugw("Adding finalized root to cache", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli)
r.finalizedRoots.Store(prettyMerkleRoot, log)
} else {
r.lggr.Debugw("Bypassing unfinalized root", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli)
unfinalizedReports = append(unfinalizedReports, log)
}
}

if r.finalizedRoots.Newest() != nil {
r.latestFinalizedCommitRootTs = time.UnixMilli(r.finalizedRoots.Newest().Value.BlockTimestampUnixMilli)
if newest := r.finalizedRoots.Newest(); newest != nil {
r.latestFinalizedCommitRootTs = time.UnixMilli(newest.Value.BlockTimestampUnixMilli)
}

var finalizedRoots []ccip.CommitStoreReportWithTxMeta
messageVisibilityWindow := time.Now().Add(-r.messageVisibilityInterval)
for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() {
// Evict stale items
if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(time.Now().Add(-r.messageVisibilityInterval)) {
if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(messageVisibilityWindow) {
r.finalizedRoots.Delete(pair.Key)
continue
}
Expand All @@ -156,13 +199,25 @@ func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxM
}
eligibleReports = append(eligibleReports, report.CommitStoreReport)
}
// safety check
// safety check, probably not needed
slices.SortFunc(eligibleReports, func(i, j ccip.CommitStoreReport) int {
return int(i.Interval.Min - j.Interval.Min)
})
return eligibleReports
}

func merkleRootToString(merkleRoot [32]byte) string {
return hex.EncodeToString(merkleRoot[:])
// internal use only for testing
func (r *commitRootsCache) finalizedCachedLogs() []ccip.CommitStoreReport {
r.cacheMu.RLock()
defer r.cacheMu.RUnlock()

var finalizedRoots []ccip.CommitStoreReport
for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() {
finalizedRoots = append(finalizedRoots, pair.Value.CommitStoreReport)
}
return finalizedRoots
}

func merkleRootToString(merkleRoot ccip.Hash) string {
return merkleRoot.String()
}
Loading

0 comments on commit efcfe7c

Please sign in to comment.