From 92e43821e0ed87194f294ad9e2c311ab00a58999 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 11 Jul 2024 12:18:19 +0200 Subject: [PATCH] Tests --- .../ccip/internal/cache/commit_roots.go | 86 ++++++++--- .../ccip/internal/cache/commit_roots_test.go | 142 +++++++++++++++++- 2 files changed, 210 insertions(+), 18 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go index 12c99f3e150..8da9d45df7d 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go @@ -2,7 +2,6 @@ package cache import ( "context" - "encoding/hex" "slices" "sync" "time" @@ -15,6 +14,8 @@ import ( ) const ( + // EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache + EvictionGracePeriod = 1 * time.Hour // CleanupInterval defines how often roots cache is scanned to evict stale roots CleanupInterval = 30 * time.Minute ) @@ -48,11 +49,14 @@ func newCommitRootsCache( cleanupInterval time.Duration, ) *commitRootsCache { snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval) + executedRoots := cache.New(messageVisibilityInterval+EvictionGracePeriod, cleanupInterval) return &commitRootsCache{ lggr: lggr, reader: reader, + rootSnoozeTime: rootSnoozeTime, finalizedRoots: orderedmap.New[string, ccip.CommitStoreReportWithTxMeta](), + executedRoots: executedRoots, snoozedRoots: snoozedRoots, messageVisibilityInterval: messageVisibilityInterval, latestFinalizedCommitRootTs: time.Now().Add(-messageVisibilityInterval), @@ -64,39 +68,65 @@ type commitRootsCache struct { lggr logger.Logger reader ccip.CommitStoreReader messageVisibilityInterval time.Duration - - // Mutable state - cacheMu sync.RWMutex - finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta] - snoozedRoots *cache.Cache + rootSnoozeTime time.Duration + + // Mutable state. finalizedRoots is thread-safe by default, but updating latestFinalizedCommitRootTs and finalizedRoots requires locking. + cacheMu sync.RWMutex + // finalizedRoots is a map of merkleRoot -> CommitStoreReportWithTxMeta. It stores all the CommitReports that are + // marked as finalized by LogPoller, but not executed yet. Keeping only finalized reports doesn't require any state sync between LP and the cache. + // In order to keep this map size under control, we evict stale items every time we fetch new logs from the database. + // Also, ccip.CommitStoreReportWithTxMeta is a very tiny entity with almost fixed size, so it's not a big deal to keep it in memory. + // In case of high memory footprint caused by storing roots, we can make these even more lightweight by removing token/gas price updates. + // Whenever the root is executed (all messages executed and ExecutionStateChange events are finalized), we remove the root from the map. + finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta] + // snoozedRoots used only for temporary snoozing roots. It's a cache with TTL (usually around 5 minutes, but this configuration is set up on chain using rootSnoozeTime) + snoozedRoots *cache.Cache + executedRoots *cache.Cache + // latestFinalizedCommitRootTs is the timestamp of the latest finalized commit root (youngest in terms of timestamp). + // It's used get only the logs that were considered as unfinalized in a previous run. + // This way we limit database scans to the minimum and keep polling "unfinalized" part of the ReportAccepted events queue. latestFinalizedCommitRootTs time.Time } func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) { - // 1. Fetch all the logs from the database after the latest finalized commit root timestamp + // 1. Fetch all the logs from the database after the latest finalized commit root timestamp. + // If this is a first run, it will fetch all the logs based on the messageVisibilityInterval. + // Worst case scenario, it will fetch around 480 reports (OCR Commit 60 seconds (fast chains default) * messageVisibilityInterval set to 8 hours (mainnet default)) + // Even with the larger messageVisibilityInterval window (e.g. 24 hours) it should be acceptable (around 1500 logs). + // Keep in mind that this potentially heavy operation happens only once during the plugin boot and it's no different from the previous implementation. logs, err := r.fetchLogsFromCommitStore(ctx) if err != nil { return nil, err } // 2. Iterate over the logs and check if the root is finalized or not. Return finalized and unfinalized reports + // It promotes finalized roots to the finalizedRoots map and evicts stale roots. finalizedReports, unfinalizedReports := r.updateFinalizedRoots(logs) // 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots. + // Return only the reports that are not snoozed. return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil } +// MarkAsExecuted marks the root as executed. It means that all the messages from the root were executed and the ExecutionStateChange event was finalized. +// Executed roots are removed from the cache. func (r *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) { + prettyMerkleRoot := merkleRootToString(merkleRoot) + r.lggr.Infow("Marking root as executed and removing entirely from cache", "merkleRoot", prettyMerkleRoot) + r.cacheMu.Lock() defer r.cacheMu.Unlock() - - prettyMerkleRoot := merkleRootToString(merkleRoot) r.finalizedRoots.Delete(prettyMerkleRoot) + r.executedRoots.SetDefault(prettyMerkleRoot, struct{}{}) } +// Snooze temporarily snoozes the root. It means that the root is not eligible for execution for a certain period of time. +// Snoozed roots are skipped when calling RootsEligibleForExecution func (r *commitRootsCache) Snooze(merkleRoot [32]byte) { - r.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{}) + prettyMerkleRoot := merkleRootToString(merkleRoot) + r.lggr.Infow("Snoozing root temporarily", "merkleRoot", prettyMerkleRoot, "rootSnoozeTime", r.rootSnoozeTime) + r.snoozedRoots.SetDefault(prettyMerkleRoot, struct{}{}) } func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool { @@ -114,6 +144,7 @@ func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip r.cacheMu.Unlock() // IO operation, release lock before! + r.lggr.Infow("Fetching Commit Reports with timestamp greater than or equal to", "timestamp", commitRootsFilterTimestamp) return r.reader.GetAcceptedCommitReportsGteTimestamp(ctx, commitRootsFilterTimestamp, 0) } @@ -124,15 +155,24 @@ func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWit // Assuming logs are properly ordered by block_timestamp, log_index var unfinalizedReports []ccip.CommitStoreReportWithTxMeta for _, log := range logs { - if _, finalized := r.finalizedRoots.Get(merkleRootToString(log.MerkleRoot)); finalized { - r.finalizedRoots.Store(merkleRootToString(log.MerkleRoot), log) + prettyMerkleRoot := merkleRootToString(log.MerkleRoot) + // Defensive check, if something is marked as executed, never allow it to come back to the cache + if _, executed := r.executedRoots.Get(prettyMerkleRoot); executed { + r.lggr.Debugw("Ignoring root marked as executed", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) + continue + } + + if log.IsFinalized() { + r.lggr.Debugw("Adding finalized root to cache", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) + r.finalizedRoots.Store(prettyMerkleRoot, log) } else { + r.lggr.Debugw("Bypassing unfinalized root", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) unfinalizedReports = append(unfinalizedReports, log) } } - if r.finalizedRoots.Newest() != nil { - r.latestFinalizedCommitRootTs = time.UnixMilli(r.finalizedRoots.Newest().Value.BlockTimestampUnixMilli) + if newest := r.finalizedRoots.Newest(); newest != nil { + r.latestFinalizedCommitRootTs = time.UnixMilli(newest.Value.BlockTimestampUnixMilli) } var finalizedRoots []ccip.CommitStoreReportWithTxMeta @@ -156,13 +196,25 @@ func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxM } eligibleReports = append(eligibleReports, report.CommitStoreReport) } - // safety check + // safety check, probably not needed slices.SortFunc(eligibleReports, func(i, j ccip.CommitStoreReport) int { return int(i.Interval.Min - j.Interval.Min) }) return eligibleReports } -func merkleRootToString(merkleRoot [32]byte) string { - return hex.EncodeToString(merkleRoot[:]) +// internal use only for testing +func (r *commitRootsCache) finalizedCachedLogs() []ccip.CommitStoreReport { + r.cacheMu.RLock() + defer r.cacheMu.RUnlock() + + var finalizedRoots []ccip.CommitStoreReport + for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() { + finalizedRoots = append(finalizedRoots, pair.Value.CommitStoreReport) + } + return finalizedRoots +} + +func merkleRootToString(merkleRoot ccip.Hash) string { + return merkleRoot.String() } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go index c1ecf413de7..3f5e1ac591f 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go @@ -1,3 +1,143 @@ package cache -// Implement me from scratch! +import ( + "testing" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" +) + +func Test_CacheIsInitializedWithFirstCall(t *testing.T) { + commitStoreReader := mocks.NewCommitStoreReader(t) + cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Hour, time.Hour, time.Hour) + commitStoreReader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, mock.Anything, mock.Anything).Return([]ccip.CommitStoreReportWithTxMeta{}, nil) + + roots, err := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assert.Len(t, roots, 0) +} + +func Test_CacheProgression_Internal(t *testing.T) { + ts1 := time.Now().Add(-5 * time.Hour).Truncate(time.Millisecond) + ts2 := time.Now().Add(-3 * time.Hour).Truncate(time.Millisecond) + ts3 := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + + root1 := utils.RandomBytes32() + root2 := utils.RandomBytes32() + root3 := utils.RandomBytes32() + + commitStoreReader := mocks.NewCommitStoreReader(t) + + cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, 10*time.Hour, time.Hour, time.Hour) + + // Empty cache, no results from the reader + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{}) + roots, err := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) + assertRoots(t, cache.finalizedCachedLogs()) + + // Single unfinalized root returned + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{createCommitStoreEntry(root1, ts1, false)}) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1) + assertRoots(t, cache.finalizedCachedLogs()) + + // Finalized and unfinalized roots returned + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root1, ts1, true), + createCommitStoreEntry(root2, ts2, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1, root2) + assertRoots(t, cache.finalizedCachedLogs(), root1) + + // Returning the same data should not impact cache state (no duplicates) + mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root1, ts1, true), + createCommitStoreEntry(root2, ts2, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1, root2) + assertRoots(t, cache.finalizedCachedLogs(), root1) + + // Snoozing oldest root + cache.Snooze(root1) + mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root2, ts2, false), + createCommitStoreEntry(root3, ts3, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root2, root3) + assertRoots(t, cache.finalizedCachedLogs(), root1) + + // Snoozing everything + cache.Snooze(root2) + cache.Snooze(root3) + mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root2, ts2, true), + createCommitStoreEntry(root3, ts3, true), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) + assertRoots(t, cache.finalizedCachedLogs(), root1, root2, root3) + + // Marking everything as executed removes it entirely, even if root is returned from the CommitStore + cache.MarkAsExecuted(root1) + cache.MarkAsExecuted(root2) + cache.MarkAsExecuted(root3) + mockCommitStoreReader(commitStoreReader, ts3, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root2, ts2, true), + createCommitStoreEntry(root3, ts3, true), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) + assertRoots(t, cache.finalizedCachedLogs()) +} + +func assertRoots(t *testing.T, reports []ccip.CommitStoreReport, expectedRoots ...[32]byte) { + require.Len(t, reports, len(expectedRoots)) + for i, report := range reports { + assert.Equal(t, expectedRoots[i], report.MerkleRoot) + } +} + +func mockCommitStoreReader(reader *mocks.CommitStoreReader, blockTimestamp time.Time, roots []ccip.CommitStoreReportWithTxMeta) { + if blockTimestamp.IsZero() { + reader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, mock.Anything, mock.Anything). + Return(roots, nil).Once() + } else { + reader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, blockTimestamp, mock.Anything). + Return(roots, nil).Once() + } +} + +func createCommitStoreEntry(root [32]byte, ts time.Time, finalized bool) ccip.CommitStoreReportWithTxMeta { + status := ccip.FinalizedStatusNotFinalized + if finalized { + status = ccip.FinalizedStatusFinalized + } + return ccip.CommitStoreReportWithTxMeta{ + CommitStoreReport: ccip.CommitStoreReport{ + MerkleRoot: root, + }, + TxMeta: ccip.TxMeta{ + BlockTimestampUnixMilli: ts.UnixMilli(), + Finalized: status, + }, + } +}