diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go index 35b7d6b076..bf33ff92c1 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go @@ -215,7 +215,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { mockOnRampPriceRegistryProvider.On("NewPriceRegistryReader", ctx, sourcePriceRegistryAddress).Return(sourcePriceRegReader, nil).Maybe() p.sourcePriceRegistryProvider = mockOnRampPriceRegistryProvider - p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), time.Minute, time.Minute) + p.commitRootsCache = cache.NewCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Minute, time.Minute) p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader) _, err = p.Observation(ctx, types.ReportTimestamp{}, types.Query{}) @@ -943,19 +943,25 @@ func TestExecutionReportingPlugin_getReportsWithSendRequests(t *testing.T) { Return(tc.onchainEvents, nil).Maybe() p.onRampReader = sourceReader - finalized := make(map[uint64]bool) + finalized := make(map[uint64]cciptypes.FinalizedStatus) for _, r := range tc.expReports { for _, s := range r.sendRequestsWithMeta { - finalized[s.SequenceNumber] = s.Finalized + finalized[s.SequenceNumber] = cciptypes.FinalizedStatusNotFinalized + if s.Finalized { + finalized[s.SequenceNumber] = cciptypes.FinalizedStatusFinalized + } } } var executedEvents []cciptypes.ExecutionStateChangedWithTxMeta for _, executedSeqNum := range tc.destExecutedSeqNums { + executedEvents = append(executedEvents, cciptypes.ExecutionStateChangedWithTxMeta{ ExecutionStateChanged: cciptypes.ExecutionStateChanged{ SequenceNumber: executedSeqNum, - Finalized: finalized[executedSeqNum], + }, + TxMeta: cciptypes.TxMeta{ + Finalized: finalized[executedSeqNum], }, }) } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go index 12c99f3e15..65451a49e1 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go @@ -2,7 +2,6 @@ package cache import ( "context" - "encoding/hex" "slices" "sync" "time" @@ -15,6 +14,8 @@ import ( ) const ( + // EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache + EvictionGracePeriod = 1 * time.Hour // CleanupInterval defines how often roots cache is scanned to evict stale roots CleanupInterval = 30 * time.Minute ) @@ -37,6 +38,7 @@ func NewCommitRootsCache( messageVisibilityInterval, rootSnoozeTime, CleanupInterval, + EvictionGracePeriod, ) } @@ -46,13 +48,17 @@ func newCommitRootsCache( messageVisibilityInterval time.Duration, rootSnoozeTime time.Duration, cleanupInterval time.Duration, + evictionGracePeriod time.Duration, ) *commitRootsCache { snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval) + executedRoots := cache.New(messageVisibilityInterval+evictionGracePeriod, cleanupInterval) return &commitRootsCache{ lggr: lggr, reader: reader, + rootSnoozeTime: rootSnoozeTime, finalizedRoots: orderedmap.New[string, ccip.CommitStoreReportWithTxMeta](), + executedRoots: executedRoots, snoozedRoots: snoozedRoots, messageVisibilityInterval: messageVisibilityInterval, latestFinalizedCommitRootTs: time.Now().Add(-messageVisibilityInterval), @@ -64,39 +70,65 @@ type commitRootsCache struct { lggr logger.Logger reader ccip.CommitStoreReader messageVisibilityInterval time.Duration - - // Mutable state - cacheMu sync.RWMutex - finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta] - snoozedRoots *cache.Cache + rootSnoozeTime time.Duration + + // Mutable state. finalizedRoots is thread-safe by default, but updating latestFinalizedCommitRootTs and finalizedRoots requires locking. + cacheMu sync.RWMutex + // finalizedRoots is a map of merkleRoot -> CommitStoreReportWithTxMeta. It stores all the CommitReports that are + // marked as finalized by LogPoller, but not executed yet. Keeping only finalized reports doesn't require any state sync between LP and the cache. + // In order to keep this map size under control, we evict stale items every time we fetch new logs from the database. + // Also, ccip.CommitStoreReportWithTxMeta is a very tiny entity with almost fixed size, so it's not a big deal to keep it in memory. + // In case of high memory footprint caused by storing roots, we can make these even more lightweight by removing token/gas price updates. + // Whenever the root is executed (all messages executed and ExecutionStateChange events are finalized), we remove the root from the map. + finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta] + // snoozedRoots used only for temporary snoozing roots. It's a cache with TTL (usually around 5 minutes, but this configuration is set up on chain using rootSnoozeTime) + snoozedRoots *cache.Cache + executedRoots *cache.Cache + // latestFinalizedCommitRootTs is the timestamp of the latest finalized commit root (youngest in terms of timestamp). + // It's used get only the logs that were considered as unfinalized in a previous run. + // This way we limit database scans to the minimum and keep polling "unfinalized" part of the ReportAccepted events queue. latestFinalizedCommitRootTs time.Time } func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) { - // 1. Fetch all the logs from the database after the latest finalized commit root timestamp + // 1. Fetch all the logs from the database after the latest finalized commit root timestamp. + // If this is a first run, it will fetch all the logs based on the messageVisibilityInterval. + // Worst case scenario, it will fetch around 480 reports (OCR Commit 60 seconds (fast chains default) * messageVisibilityInterval set to 8 hours (mainnet default)) + // Even with the larger messageVisibilityInterval window (e.g. 24 hours) it should be acceptable (around 1500 logs). + // Keep in mind that this potentially heavy operation happens only once during the plugin boot and it's no different from the previous implementation. logs, err := r.fetchLogsFromCommitStore(ctx) if err != nil { return nil, err } // 2. Iterate over the logs and check if the root is finalized or not. Return finalized and unfinalized reports + // It promotes finalized roots to the finalizedRoots map and evicts stale roots. finalizedReports, unfinalizedReports := r.updateFinalizedRoots(logs) // 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots. + // Return only the reports that are not snoozed. return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil } +// MarkAsExecuted marks the root as executed. It means that all the messages from the root were executed and the ExecutionStateChange event was finalized. +// Executed roots are removed from the cache. func (r *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) { + prettyMerkleRoot := merkleRootToString(merkleRoot) + r.lggr.Infow("Marking root as executed and removing entirely from cache", "merkleRoot", prettyMerkleRoot) + r.cacheMu.Lock() defer r.cacheMu.Unlock() - - prettyMerkleRoot := merkleRootToString(merkleRoot) r.finalizedRoots.Delete(prettyMerkleRoot) + r.executedRoots.SetDefault(prettyMerkleRoot, struct{}{}) } +// Snooze temporarily snoozes the root. It means that the root is not eligible for execution for a certain period of time. +// Snoozed roots are skipped when calling RootsEligibleForExecution func (r *commitRootsCache) Snooze(merkleRoot [32]byte) { - r.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{}) + prettyMerkleRoot := merkleRootToString(merkleRoot) + r.lggr.Infow("Snoozing root temporarily", "merkleRoot", prettyMerkleRoot, "rootSnoozeTime", r.rootSnoozeTime) + r.snoozedRoots.SetDefault(prettyMerkleRoot, struct{}{}) } func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool { @@ -114,6 +146,7 @@ func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip r.cacheMu.Unlock() // IO operation, release lock before! + r.lggr.Infow("Fetching Commit Reports with timestamp greater than or equal to", "blockTimestamp", commitRootsFilterTimestamp) return r.reader.GetAcceptedCommitReportsGteTimestamp(ctx, commitRootsFilterTimestamp, 0) } @@ -124,21 +157,31 @@ func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWit // Assuming logs are properly ordered by block_timestamp, log_index var unfinalizedReports []ccip.CommitStoreReportWithTxMeta for _, log := range logs { - if _, finalized := r.finalizedRoots.Get(merkleRootToString(log.MerkleRoot)); finalized { - r.finalizedRoots.Store(merkleRootToString(log.MerkleRoot), log) + prettyMerkleRoot := merkleRootToString(log.MerkleRoot) + // Defensive check, if something is marked as executed, never allow it to come back to the cache + if _, executed := r.executedRoots.Get(prettyMerkleRoot); executed { + r.lggr.Debugw("Ignoring root marked as executed", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) + continue + } + + if log.IsFinalized() { + r.lggr.Debugw("Adding finalized root to cache", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) + r.finalizedRoots.Store(prettyMerkleRoot, log) } else { + r.lggr.Debugw("Bypassing unfinalized root", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli) unfinalizedReports = append(unfinalizedReports, log) } } - if r.finalizedRoots.Newest() != nil { - r.latestFinalizedCommitRootTs = time.UnixMilli(r.finalizedRoots.Newest().Value.BlockTimestampUnixMilli) + if newest := r.finalizedRoots.Newest(); newest != nil { + r.latestFinalizedCommitRootTs = time.UnixMilli(newest.Value.BlockTimestampUnixMilli) } var finalizedRoots []ccip.CommitStoreReportWithTxMeta + messageVisibilityWindow := time.Now().Add(-r.messageVisibilityInterval) for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() { // Evict stale items - if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(time.Now().Add(-r.messageVisibilityInterval)) { + if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(messageVisibilityWindow) { r.finalizedRoots.Delete(pair.Key) continue } @@ -156,13 +199,25 @@ func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxM } eligibleReports = append(eligibleReports, report.CommitStoreReport) } - // safety check + // safety check, probably not needed slices.SortFunc(eligibleReports, func(i, j ccip.CommitStoreReport) int { return int(i.Interval.Min - j.Interval.Min) }) return eligibleReports } -func merkleRootToString(merkleRoot [32]byte) string { - return hex.EncodeToString(merkleRoot[:]) +// internal use only for testing +func (r *commitRootsCache) finalizedCachedLogs() []ccip.CommitStoreReport { + r.cacheMu.RLock() + defer r.cacheMu.RUnlock() + + var finalizedRoots []ccip.CommitStoreReport + for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() { + finalizedRoots = append(finalizedRoots, pair.Value.CommitStoreReport) + } + return finalizedRoots +} + +func merkleRootToString(merkleRoot ccip.Hash) string { + return merkleRoot.String() } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go index c1ecf413de..ff1812bf1c 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go @@ -1,3 +1,186 @@ -package cache +package cache_test -// Implement me from scratch! +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/commit_store_1_2_0" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0" +) + +func Test_RootsEligibleForExecution(t *testing.T) { + ctx := testutils.Context(t) + chainID := testutils.NewRandomEVMChainID() + orm := logpoller.NewORM(chainID, pgtest.NewSqlxDB(t), logger.TestLogger(t)) + lpOpts := logpoller.Opts{ + PollPeriod: time.Hour, + FinalityDepth: 2, + BackfillBatchSize: 20, + RpcBatchSize: 10, + KeepFinalizedBlocksDepth: 1000, + } + lp := logpoller.NewLogPoller(orm, nil, logger.TestLogger(t), nil, lpOpts) + + commitStoreAddr := utils.RandomAddress() + + block1 := time.Now().Add(-8 * time.Hour) + block2 := time.Now().Add(-5 * time.Hour) + block25 := time.Now().Add(-4 * time.Hour) + block3 := time.Now().Add(-1 * time.Hour) + block4 := time.Now() + + root1 := utils.RandomBytes32() + root2 := utils.RandomBytes32() + root3 := utils.RandomBytes32() + root4 := utils.RandomBytes32() + root5 := utils.RandomBytes32() + + inputLogs := []logpoller.Log{ + createReportAcceptedLog(t, chainID, commitStoreAddr, 2, 1, root1, block1), + createReportAcceptedLog(t, chainID, commitStoreAddr, 2, 2, root2, block1), + } + require.NoError(t, orm.InsertLogsWithBlock(ctx, inputLogs, logpoller.NewLogPollerBlock(utils.RandomBytes32(), 2, time.Now(), 1))) + + commitStore, err := v1_2_0.NewCommitStore(logger.TestLogger(t), commitStoreAddr, nil, lp) + require.NoError(t, err) + + rootsCache := cache.NewCommitRootsCache(logger.TestLogger(t), commitStore, 10*time.Hour, time.Second) + + roots, err := rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root1, root2) + + rootsCache.Snooze(root1) + rootsCache.Snooze(root2) + + // Roots are snoozed + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots) + + // Roots are unsnoozed + require.Eventually(t, func() bool { + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + return len(roots) == 2 + }, 5*time.Second, 1*time.Second) + + // Marking root as executed doesn't ignore other roots from the same block + rootsCache.MarkAsExecuted(root1) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root2) + + // Finality progress, mark all roots as finalized + require.NoError(t, orm.InsertBlock(ctx, utils.RandomBytes32(), 3, time.Now(), 3)) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root2) + + inputLogs = []logpoller.Log{ + createReportAcceptedLog(t, chainID, commitStoreAddr, 3, 1, root3, block2), + createReportAcceptedLog(t, chainID, commitStoreAddr, 4, 1, root4, block3), + createReportAcceptedLog(t, chainID, commitStoreAddr, 5, 1, root5, block4), + } + require.NoError(t, orm.InsertLogsWithBlock(ctx, inputLogs, logpoller.NewLogPollerBlock(utils.RandomBytes32(), 5, time.Now(), 3))) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root2, root3, root4, root5) + + // Mark root in the middle as executed but keep the oldest one still waiting + rootsCache.MarkAsExecuted(root3) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root2, root4, root5) + + // Simulate reorg by removing all unfinalized blocks + require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 4)) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root2) + + // Root4 comes back but in the + inputLogs = []logpoller.Log{ + createReportAcceptedLog(t, chainID, commitStoreAddr, 4, 1, root4, block25), + } + require.NoError(t, orm.InsertLogsWithBlock(ctx, inputLogs, logpoller.NewLogPollerBlock(utils.RandomBytes32(), 5, time.Now(), 3))) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots, root2, root4) + + // Mark everything as executed + rootsCache.MarkAsExecuted(root2) + rootsCache.MarkAsExecuted(root4) + roots, err = rootsCache.RootsEligibleForExecution(ctx) + require.NoError(t, err) + assertRoots(t, roots) +} + +func assertRoots(t *testing.T, roots []cciptypes.CommitStoreReport, root ...[32]byte) { + require.Len(t, roots, len(root)) + for i, r := range root { + require.Equal(t, r, roots[i].MerkleRoot) + } +} + +func createReportAcceptedLog(t testing.TB, chainID *big.Int, address common.Address, blockNumber int64, logIndex int64, merkleRoot common.Hash, blockTimestamp time.Time) logpoller.Log { + tAbi, err := commit_store_1_2_0.CommitStoreMetaData.GetAbi() + require.NoError(t, err) + eseEvent, ok := tAbi.Events["ReportAccepted"] + require.True(t, ok) + + gasPriceUpdates := make([]commit_store_1_2_0.InternalGasPriceUpdate, 100) + tokenPriceUpdates := make([]commit_store_1_2_0.InternalTokenPriceUpdate, 100) + + for i := 0; i < 100; i++ { + gasPriceUpdates[i] = commit_store_1_2_0.InternalGasPriceUpdate{ + DestChainSelector: uint64(i), + UsdPerUnitGas: big.NewInt(int64(i)), + } + tokenPriceUpdates[i] = commit_store_1_2_0.InternalTokenPriceUpdate{ + SourceToken: utils.RandomAddress(), + UsdPerToken: big.NewInt(int64(i)), + } + } + + message := commit_store_1_2_0.CommitStoreCommitReport{ + PriceUpdates: commit_store_1_2_0.InternalPriceUpdates{ + TokenPriceUpdates: tokenPriceUpdates, + GasPriceUpdates: gasPriceUpdates, + }, + Interval: commit_store_1_2_0.CommitStoreInterval{Min: 1, Max: 10}, + MerkleRoot: merkleRoot, + } + + logData, err := eseEvent.Inputs.Pack(message) + require.NoError(t, err) + + topic0 := commit_store_1_2_0.CommitStoreReportAccepted{}.Topic() + + return logpoller.Log{ + Topics: [][]byte{ + topic0[:], + }, + Data: logData, + LogIndex: logIndex, + BlockHash: utils.RandomBytes32(), + BlockNumber: blockNumber, + BlockTimestamp: blockTimestamp, + EventSig: topic0, + Address: address, + TxHash: utils.RandomBytes32(), + EvmChainId: ubig.New(chainID), + } +} diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go new file mode 100644 index 0000000000..55f9df7fe8 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_unit_test.go @@ -0,0 +1,173 @@ +package cache + +import ( + "testing" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" +) + +func Test_CacheIsInitializedWithFirstCall(t *testing.T) { + commitStoreReader := mocks.NewCommitStoreReader(t) + cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Hour, time.Hour, time.Hour, time.Hour) + commitStoreReader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, mock.Anything, mock.Anything).Return([]ccip.CommitStoreReportWithTxMeta{}, nil) + + roots, err := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) +} + +func Test_CacheExpiration(t *testing.T) { + ts1 := time.Now().Add(-5 * time.Millisecond).Truncate(time.Millisecond) + ts2 := time.Now().Add(-3 * time.Millisecond).Truncate(time.Millisecond) + ts3 := time.Now().Add(-1 * time.Millisecond).Truncate(time.Millisecond) + + root1 := utils.RandomBytes32() + root2 := utils.RandomBytes32() + root3 := utils.RandomBytes32() + + commitStoreReader := mocks.NewCommitStoreReader(t) + cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Second, time.Hour, time.Hour, time.Hour) + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root1, ts1, true), + createCommitStoreEntry(root2, ts2, true), + createCommitStoreEntry(root3, ts3, false), + }) + roots, err := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1, root2, root3) + + require.Eventually(t, func() bool { + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root3, ts3, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + return len(roots) == 1 && roots[0].MerkleRoot == root3 + }, 5*time.Second, 1*time.Second) +} + +func Test_CacheProgression_Internal(t *testing.T) { + ts1 := time.Now().Add(-5 * time.Hour).Truncate(time.Millisecond) + ts2 := time.Now().Add(-3 * time.Hour).Truncate(time.Millisecond) + ts3 := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond) + + root1 := utils.RandomBytes32() + root2 := utils.RandomBytes32() + root3 := utils.RandomBytes32() + + commitStoreReader := mocks.NewCommitStoreReader(t) + + cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, 10*time.Hour, time.Hour, time.Hour, time.Hour) + + // Empty cache, no results from the reader + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{}) + roots, err := cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) + assertRoots(t, cache.finalizedCachedLogs()) + + // Single unfinalized root returned + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{createCommitStoreEntry(root1, ts1, false)}) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1) + assertRoots(t, cache.finalizedCachedLogs()) + + // Finalized and unfinalized roots returned + mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root1, ts1, true), + createCommitStoreEntry(root2, ts2, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1, root2) + assertRoots(t, cache.finalizedCachedLogs(), root1) + + // Returning the same data should not impact cache state (no duplicates) + mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root1, ts1, true), + createCommitStoreEntry(root2, ts2, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root1, root2) + assertRoots(t, cache.finalizedCachedLogs(), root1) + + // Snoozing oldest root + cache.Snooze(root1) + mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root2, ts2, false), + createCommitStoreEntry(root3, ts3, false), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots, root2, root3) + assertRoots(t, cache.finalizedCachedLogs(), root1) + + // Snoozing everything + cache.Snooze(root2) + cache.Snooze(root3) + mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root2, ts2, true), + createCommitStoreEntry(root3, ts3, true), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) + assertRoots(t, cache.finalizedCachedLogs(), root1, root2, root3) + + // Marking everything as executed removes it entirely, even if root is returned from the CommitStore + cache.MarkAsExecuted(root1) + cache.MarkAsExecuted(root2) + cache.MarkAsExecuted(root3) + mockCommitStoreReader(commitStoreReader, ts3, []ccip.CommitStoreReportWithTxMeta{ + createCommitStoreEntry(root2, ts2, true), + createCommitStoreEntry(root3, ts3, true), + }) + roots, err = cache.RootsEligibleForExecution(tests.Context(t)) + require.NoError(t, err) + assertRoots(t, roots) + assertRoots(t, cache.finalizedCachedLogs()) +} + +func assertRoots(t *testing.T, reports []ccip.CommitStoreReport, expectedRoots ...[32]byte) { + require.Len(t, reports, len(expectedRoots)) + for i, report := range reports { + assert.Equal(t, expectedRoots[i], report.MerkleRoot) + } +} + +func mockCommitStoreReader(reader *mocks.CommitStoreReader, blockTimestamp time.Time, roots []ccip.CommitStoreReportWithTxMeta) { + if blockTimestamp.IsZero() { + reader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, mock.Anything, mock.Anything). + Return(roots, nil).Once() + } else { + reader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, blockTimestamp, mock.Anything). + Return(roots, nil).Once() + } +} + +func createCommitStoreEntry(root [32]byte, ts time.Time, finalized bool) ccip.CommitStoreReportWithTxMeta { + status := ccip.FinalizedStatusNotFinalized + if finalized { + status = ccip.FinalizedStatusFinalized + } + return ccip.CommitStoreReportWithTxMeta{ + CommitStoreReport: ccip.CommitStoreReport{ + MerkleRoot: root, + }, + TxMeta: ccip.TxMeta{ + BlockTimestampUnixMilli: ts.UnixMilli(), + Finalized: status, + }, + } +} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_reader_unit_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_reader_unit_test.go index a0f6e5e6cd..f8b1dc4e61 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_reader_unit_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_reader_unit_test.go @@ -203,7 +203,7 @@ func Test_LogsAreProperlyMarkedAsFinalized(t *testing.T) { assert.Len(t, logs, len(inputLogs)) for _, log := range logs { - assert.Equal(t, slices.Contains(tt.expectedFinalizedSequenceNr, log.SequenceNumber), log.Finalized) + assert.Equal(t, slices.Contains(tt.expectedFinalizedSequenceNr, log.SequenceNumber), log.IsFinalized()) } }) }