diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory.go b/core/services/ocr2/plugins/ccip/ccipexec/factory.go index 1a18793a833..27c9d01fbff 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" @@ -136,7 +137,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep offRampReader: rf.config.offRampReader, tokenPoolBatchedReader: rf.config.tokenPoolBatchedReader, inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()), - commitRootsCache: cache.NewCommitRootsCache(lggr, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()), + commitRootsCache: cache.NewCommitRootsCache(lggr, rf.config.commitStoreReader, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()), metricsCollector: rf.config.metricsCollector, chainHealthcheck: rf.config.chainHealthcheck, } diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go index 3f594a423b8..df95cb4416c 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go @@ -865,39 +865,13 @@ func (r *ExecutionReportingPlugin) getUnexpiredCommitReports( commitStoreReader ccipdata.CommitStoreReader, lggr logger.Logger, ) ([]cciptypes.CommitStoreReport, error) { - createdAfterTimestamp := r.commitRootsCache.OldestRootTimestamp() - lggr.Infow("Fetching unexpired commit roots from database", "createdAfterTimestamp", createdAfterTimestamp) - acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteTimestamp( - ctx, - createdAfterTimestamp, - 0, - ) + eligibleRootsForExec, err := r.commitRootsCache.RootsEligibleForExecution(ctx) if err != nil { return nil, err } - var reports []cciptypes.CommitStoreReport - for _, acceptedReport := range acceptedReports { - reports = append(reports, acceptedReport.CommitStoreReport) - r.commitRootsCache.AppendUnexecutedRoot(acceptedReport.MerkleRoot, time.UnixMilli(acceptedReport.TxMeta.BlockTimestampUnixMilli)) - } - - notSnoozedReports := make([]cciptypes.CommitStoreReport, 0) - for _, report := range reports { - if r.commitRootsCache.IsSkipped(report.MerkleRoot) { - lggr.Debugw("Skipping snoozed root", - "minSeqNr", report.Interval.Min, - "maxSeqNr", report.Interval.Max, - "root", hex.EncodeToString(report.MerkleRoot[:]), - ) - continue - } - notSnoozedReports = append(notSnoozedReports, report) - } - - r.metricsCollector.UnexpiredCommitRoots(len(notSnoozedReports)) - lggr.Infow("Unexpired roots", "all", len(reports), "notSnoozed", len(notSnoozedReports)) - return notSnoozedReports, nil + r.metricsCollector.UnexpiredCommitRoots(len(eligibleRootsForExec)) + return eligibleRootsForExec, nil } type execTokenData struct { diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go index ae5fe8fcde0..830617013c8 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go @@ -1,195 +1,169 @@ package cache import ( + "context" "encoding/hex" + "slices" "sync" "time" "github.com/patrickmn/go-cache" + "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" orderedmap "github.com/wk8/go-ordered-map/v2" "github.com/smartcontractkit/chainlink/v2/core/logger" ) const ( - // EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache - EvictionGracePeriod = 1 * time.Hour // CleanupInterval defines how often roots cache is scanned to evict stale roots CleanupInterval = 30 * time.Minute ) type CommitsRootsCache interface { - // IsSkipped returns true if the root is either executed or snoozed. Snoozing can be temporary based on the configuration - IsSkipped(merkleRoot [32]byte) bool + RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) MarkAsExecuted(merkleRoot [32]byte) Snooze(merkleRoot [32]byte) - - // OldestRootTimestamp returns the oldest root timestamp that is not executed yet (minus 1 second). - // If there are no roots in the queue, it returns the messageVisibilityInterval - OldestRootTimestamp() time.Time - // AppendUnexecutedRoot appends the root to the unexecuted roots queue to keep track of the roots that are not executed yet - // Roots has to be added in the order they are fetched from the database - AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) } -type commitRootsCache struct { - lggr logger.Logger - // executedRoots is used to keep track of the roots that are executed. Roots that are considered as executed - // when all messages are executed on the dest and matching execution state change logs are finalized - executedRoots *cache.Cache - // snoozedRoots is used to keep track of the roots that are temporary snoozed - snoozedRoots *cache.Cache - // unexecutedRootsQueue is used to keep track of the unexecuted roots in the order they are fetched from database (should be ordered by block_number, log_index) - // First run of Exec will fill the queue with all the roots that are not executed yet within the [now-messageVisibilityInterval, now] window. - // When a root is executed, it is removed from the queue. Next database query instead of using entire messageVisibilityInterval window - // will use oldestRootTimestamp as the lower bound filter for block_timestamp. - // This way we can reduce the number of database rows fetched with every OCR round. - // We do it this way because roots for most of the cases are executed sequentially. - // Instead of skipping snoozed roots after we fetch them from the database, we do that on the db level by narrowing the search window. - // - // Example - // messageVisibilityInterval - 10 days, now - 2010-10-15 - // We fetch all the roots that within the [2010-10-05, 2010-10-15] window and load them to the queue - // [0xA - 2010-10-10, 0xB - 2010-10-11, 0xC - 2010-10-12] -> 0xA is the oldest root - // We executed 0xA and a couple of rounds later, we mark 0xA as executed and snoozed that forever which removes it from the queue. - // [0xB - 2010-10-11, 0xC - 2010-10-12] - // Now the search filter wil be 0xA timestamp -> [2010-10-11, 20-10-15] - // If roots are executed out of order, it's not going to change anything. However, for most of the cases we have sequential root execution and that is - // a huge improvement because we don't need to fetch all the roots from the database in every round. - unexecutedRootsQueue *orderedmap.OrderedMap[string, time.Time] - oldestRootTimestamp time.Time - rootsQueueMu sync.RWMutex - - // Both rootSnoozedTime and messageVisibilityInterval can be kept in the commitRootsCache without need to be updated. - // Those config properties are populates via onchain/offchain config. When changed, OCR plugin will be restarted and cache initialized with new config. - rootSnoozedTime time.Duration - messageVisibilityInterval time.Duration +func NewCommitRootsCache( + lggr logger.Logger, + reader ccip.CommitStoreReader, + messageVisibilityInterval time.Duration, + rootSnoozeTime time.Duration, +) CommitsRootsCache { + return newCommitRootsCache( + lggr, + reader, + messageVisibilityInterval, + rootSnoozeTime, + CleanupInterval, + ) } func newCommitRootsCache( lggr logger.Logger, + reader ccip.CommitStoreReader, messageVisibilityInterval time.Duration, rootSnoozeTime time.Duration, - evictionGracePeriod time.Duration, cleanupInterval time.Duration, ) *commitRootsCache { - executedRoots := cache.New(messageVisibilityInterval+evictionGracePeriod, cleanupInterval) snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval) return &commitRootsCache{ - lggr: lggr, - executedRoots: executedRoots, - snoozedRoots: snoozedRoots, - unexecutedRootsQueue: orderedmap.New[string, time.Time](), - rootSnoozedTime: rootSnoozeTime, - messageVisibilityInterval: messageVisibilityInterval, + lggr: lggr, + reader: reader, + finalizedRoots: orderedmap.New[string, ccip.CommitStoreReportWithTxMeta](), + snoozedRoots: snoozedRoots, + messageVisibilityInterval: messageVisibilityInterval, + latestFinalizedCommitRootTs: time.Now().Add(-messageVisibilityInterval), + cacheMu: sync.RWMutex{}, } } -func NewCommitRootsCache( - lggr logger.Logger, - messageVisibilityInterval time.Duration, - rootSnoozeTime time.Duration, -) *commitRootsCache { - return newCommitRootsCache( - lggr, - messageVisibilityInterval, - rootSnoozeTime, - EvictionGracePeriod, - CleanupInterval, - ) -} +type commitRootsCache struct { + lggr logger.Logger + reader ccip.CommitStoreReader + messageVisibilityInterval time.Duration -func (s *commitRootsCache) IsSkipped(merkleRoot [32]byte) bool { - _, snoozed := s.snoozedRoots.Get(merkleRootToString(merkleRoot)) - _, executed := s.executedRoots.Get(merkleRootToString(merkleRoot)) - return snoozed || executed + // Mutable state + cacheMu sync.RWMutex + finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta] + snoozedRoots *cache.Cache + latestFinalizedCommitRootTs time.Time } -func (s *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) { - prettyMerkleRoot := merkleRootToString(merkleRoot) - s.executedRoots.SetDefault(prettyMerkleRoot, struct{}{}) +func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) { + r.cacheMu.Lock() + defer r.cacheMu.Unlock() - s.rootsQueueMu.Lock() - defer s.rootsQueueMu.Unlock() - // if there is only one root in the queue, we put its block_timestamp as oldestRootTimestamp - if s.unexecutedRootsQueue.Len() == 1 { - s.oldestRootTimestamp = s.unexecutedRootsQueue.Oldest().Value - } - s.unexecutedRootsQueue.Delete(prettyMerkleRoot) - if head := s.unexecutedRootsQueue.Oldest(); head != nil { - s.oldestRootTimestamp = head.Value + // 1. Fetch all the logs from the database after the latest finalized commit root timestamp + logs, err := r.fetchLogsFromCommitStore(ctx) + if err != nil { + return nil, err } - s.lggr.Debugw("Deleting executed root from the queue", - "merkleRoot", prettyMerkleRoot, - "oldestRootTimestamp", s.oldestRootTimestamp, - ) + + // 2. Iterate over the logs and check if the root is finalized or not. Return finalized and unfinalized reports + finalizedReports, unfinalizedReports := r.updateFinalizedRoots(logs) + + // 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots. + return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil + } -func (s *commitRootsCache) Snooze(merkleRoot [32]byte) { - s.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{}) +func (r *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) { + r.cacheMu.Lock() + defer r.cacheMu.Unlock() + + prettyMerkleRoot := merkleRootToString(merkleRoot) + r.finalizedRoots.Delete(prettyMerkleRoot) } -func (s *commitRootsCache) OldestRootTimestamp() time.Time { - return time.Now().Add(-s.messageVisibilityInterval) - // TODO we can't rely on block timestamps, because in case of re-org they can change and therefore affect - // the logic in the case. In the meantime, always fallback to the default behaviour and use permissionlessThresholdWindow - //timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval) - // - //if ok { - // return timestamp - //} - // - //s.rootsQueueMu.Lock() - //defer s.rootsQueueMu.Unlock() - // - //// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed - //// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch. - //s.unexecutedRootsQueue = orderedmap.New[string, time.Time]() - //return messageVisibilityInterval +func (r *commitRootsCache) Snooze(merkleRoot [32]byte) { + r.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{}) } -//func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) { -// s.rootsQueueMu.RLock() -// defer s.rootsQueueMu.RUnlock() -// -// // If there are no roots in the queue, we can return the messageVisibilityInterval -// if s.oldestRootTimestamp.IsZero() { -// return messageVisibilityInterval, true -// } -// -// if s.oldestRootTimestamp.After(messageVisibilityInterval) { -// // Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp) -// // so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results -// return s.oldestRootTimestamp.Add(-time.Second), true -// } -// return time.Time{}, false -//} - -func (s *commitRootsCache) AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) { - prettyMerkleRoot := merkleRootToString(merkleRoot) +func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool { + _, snoozed := r.snoozedRoots.Get(merkleRootToString(merkleRoot)) + return snoozed +} + +func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip.CommitStoreReportWithTxMeta, error) { + r.cacheMu.Lock() + messageVisibilityWindow := time.Now().Add(-r.messageVisibilityInterval) + if r.latestFinalizedCommitRootTs.Before(messageVisibilityWindow) { + r.latestFinalizedCommitRootTs = messageVisibilityWindow + } + commitRootsFilterTimestamp := r.latestFinalizedCommitRootTs + r.cacheMu.Unlock() - s.rootsQueueMu.Lock() - defer s.rootsQueueMu.Unlock() + // IO operation, release lock before! + return r.reader.GetAcceptedCommitReportsGteTimestamp(ctx, commitRootsFilterTimestamp, 0) +} - // If the root is already in the queue, we must not add it to the queue - if _, found := s.unexecutedRootsQueue.Get(prettyMerkleRoot); found { - return +func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWithTxMeta) ([]ccip.CommitStoreReportWithTxMeta, []ccip.CommitStoreReportWithTxMeta) { + r.cacheMu.Lock() + defer r.cacheMu.Unlock() + + // Assuming logs are properly ordered by block_timestamp, log_index + var unfinalizedReports []ccip.CommitStoreReportWithTxMeta + for _, log := range logs { + if _, finalized := r.finalizedRoots.Get(merkleRootToString(log.MerkleRoot)); finalized { + r.finalizedRoots.Store(merkleRootToString(log.MerkleRoot), log) + } else { + unfinalizedReports = append(unfinalizedReports, log) + } } - // If the root is already executed, we must not add it to the queue - if _, executed := s.executedRoots.Get(prettyMerkleRoot); executed { - return + + if r.finalizedRoots.Newest() != nil { + r.latestFinalizedCommitRootTs = time.UnixMilli(r.finalizedRoots.Newest().Value.BlockTimestampUnixMilli) } - // Initialize the search filter with the first root that is added to the queue - if s.unexecutedRootsQueue.Len() == 0 { - s.oldestRootTimestamp = blockTimestamp + + var finalizedRoots []ccip.CommitStoreReportWithTxMeta + for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() { + // Evict stale items + if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(time.Now().Add(-r.messageVisibilityInterval)) { + r.finalizedRoots.Delete(pair.Key) + continue + } + finalizedRoots = append(finalizedRoots, pair.Value) } - s.unexecutedRootsQueue.Set(prettyMerkleRoot, blockTimestamp) - s.lggr.Debugw("Adding unexecuted root to the queue", - "merkleRoot", prettyMerkleRoot, - "blockTimestamp", blockTimestamp, - "oldestRootTimestamp", s.oldestRootTimestamp, - ) + return finalizedRoots, unfinalizedReports +} + +func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxMeta, r2 []ccip.CommitStoreReportWithTxMeta) []ccip.CommitStoreReport { + allReports := append(r1, r2...) + eligibleReports := make([]ccip.CommitStoreReport, 0, len(allReports)) + for _, report := range allReports { + if r.isSnoozed(report.MerkleRoot) { + continue + } + eligibleReports = append(eligibleReports, report.CommitStoreReport) + } + // safety check + slices.SortFunc(eligibleReports, func(i, j ccip.CommitStoreReport) int { + return int(i.Interval.Min - j.Interval.Min) + }) + return eligibleReports } func merkleRootToString(merkleRoot [32]byte) string { diff --git a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go index f79e365d0c3..c1ecf413de7 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go @@ -1,247 +1,3 @@ package cache -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/smartcontractkit/chainlink/v2/core/logger" -) - -func TestSnoozedRoots(t *testing.T) { - c := NewCommitRootsCache(logger.TestLogger(t), 1*time.Minute, 1*time.Minute) - - k1 := [32]byte{1} - k2 := [32]byte{2} - - // return false for non existing element - snoozed := c.IsSkipped(k1) - assert.False(t, snoozed) - - // after an element is marked as executed it should be snoozed - c.MarkAsExecuted(k1) - snoozed = c.IsSkipped(k1) - assert.True(t, snoozed) - - // after snoozing an element it should be snoozed - c.Snooze(k2) - snoozed = c.IsSkipped(k2) - assert.True(t, snoozed) -} - -func TestEvictingElements(t *testing.T) { - c := newCommitRootsCache(logger.TestLogger(t), 1*time.Hour, 1*time.Millisecond, 1*time.Millisecond, 1*time.Millisecond) - - k1 := [32]byte{1} - c.Snooze(k1) - - time.Sleep(10 * time.Millisecond) - - assert.False(t, c.IsSkipped(k1)) -} - -func Test_UnexecutedRoots(t *testing.T) { - type rootWithTs struct { - root [32]byte - ts time.Time - } - - r1 := [32]byte{1} - r2 := [32]byte{2} - r3 := [32]byte{3} - - t1 := time.Now().Add(-4 * time.Hour) - t2 := time.Now().Add(-3 * time.Hour) - t3 := time.Now().Add(-2 * time.Hour) - - tests := []struct { - name string - roots []rootWithTs - executedRoots [][32]byte - permissionLessThreshold time.Duration - expectedTimestamp time.Time - }{ - { - name: "empty", - roots: []rootWithTs{}, - permissionLessThreshold: 1 * time.Hour, - }, - //{ - // name: "returns first root when all are not executed", - // roots: []rootWithTs{ - // {r1, t1}, - // {r2, t2}, - // {r3, t3}, - // }, - // permissionLessThreshold: 10 * time.Hour, - // expectedTimestamp: t1, - //}, - //{ - // name: "returns first root when tail of queue is executed", - // roots: []rootWithTs{ - // {r1, t1}, - // {r2, t2}, - // {r3, t3}, - // }, - // executedRoots: [][32]byte{r2, r3}, - // permissionLessThreshold: 10 * time.Hour, - // expectedTimestamp: t1, - //}, - //{ - // name: "returns first not executed root", - // roots: []rootWithTs{ - // {r1, t1}, - // {r2, t2}, - // {r3, t3}, - // }, - // executedRoots: [][32]byte{r1, r2}, - // permissionLessThreshold: 10 * time.Hour, - // expectedTimestamp: t3, - //}, - //{ - // name: "returns r2 timestamp when r1 and r3 are executed", - // roots: []rootWithTs{ - // {r1, t1}, - // {r2, t2}, - // {r3, t3}, - // }, - // executedRoots: [][32]byte{r1, r3}, - // permissionLessThreshold: 10 * time.Hour, - // expectedTimestamp: t2, - //}, - //{ - // name: "returns oldest root even when all are executed", - // roots: []rootWithTs{ - // {r1, t1}, - // {r2, t2}, - // {r3, t3}, - // }, - // executedRoots: [][32]byte{r1, r2, r3}, - // permissionLessThreshold: 10 * time.Hour, - // expectedTimestamp: t3, - //}, - { - name: "returns permissionLessThreshold when all roots ale older that threshold", - roots: []rootWithTs{ - {r1, t1}, - {r2, t2}, - {r3, t3}, - }, - permissionLessThreshold: 1 * time.Minute, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := newCommitRootsCache(logger.TestLogger(t), tt.permissionLessThreshold, 1*time.Hour, 1*time.Millisecond, 1*time.Millisecond) - - for _, r := range tt.roots { - c.AppendUnexecutedRoot(r.root, r.ts) - } - - for _, r := range tt.executedRoots { - c.MarkAsExecuted(r) - } - - commitTs := c.OldestRootTimestamp() - if tt.expectedTimestamp.IsZero() { - assert.True(t, commitTs.Before(time.Now().Add(-tt.permissionLessThreshold))) - } else { - assert.Equal(t, tt.expectedTimestamp.Add(-time.Second), commitTs) - } - }) - } -} - -func Test_UnexecutedRootsScenario(t *testing.T) { - permissionLessThreshold := 10 * time.Hour - c := newCommitRootsCache(logger.TestLogger(t), permissionLessThreshold, 1*time.Hour, 1*time.Millisecond, 1*time.Millisecond) - - k1 := [32]byte{1} - k2 := [32]byte{2} - k3 := [32]byte{3} - //k4 := [32]byte{4} - - t1 := time.Now().Add(-4 * time.Hour) - t2 := time.Now().Add(-3 * time.Hour) - t3 := time.Now().Add(-2 * time.Hour) - //t4 := time.Now().Add(-1 * time.Hour) - - // First check should return permissionLessThreshold window - commitTs := c.OldestRootTimestamp() - assert.True(t, commitTs.Before(time.Now().Add(-permissionLessThreshold))) - - c.AppendUnexecutedRoot(k1, t1) - c.AppendUnexecutedRoot(k2, t2) - c.AppendUnexecutedRoot(k3, t3) - - commitTs = c.OldestRootTimestamp() - assert.True(t, commitTs.Before(time.Now().Add(-permissionLessThreshold))) - - //// After loading roots it should return the first one - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t1.Add(-time.Second), commitTs) - // - //// Marking root in the middle as executed shouldn't change the commitTs - //c.MarkAsExecuted(k2) - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t1.Add(-time.Second), commitTs) - // - //// Marking k1 as executed when k2 is already executed should return timestamp of k3 - //c.MarkAsExecuted(k1) - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t3.Add(-time.Second), commitTs) - // - //// Marking all as executed should return timestamp of the latest - //c.MarkAsExecuted(k3) - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t3.Add(-time.Second), commitTs) - // - //// Adding k4 should return timestamp of k4 - //c.AppendUnexecutedRoot(k4, t4) - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t4.Add(-time.Second), commitTs) - // - //c.MarkAsExecuted(k4) - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t4.Add(-time.Second), commitTs) - // - //// Appending already executed roots should be ignored - //c.AppendUnexecutedRoot(k1, t1) - //c.AppendUnexecutedRoot(k2, t2) - //commitTs = c.OldestRootTimestamp() - //assert.Equal(t, t4.Add(-time.Second), commitTs) -} - -func Test_UnexecutedRootsStaleQueue(t *testing.T) { - t.Skip("This test needs caching to properly handle re-orgs") - - permissionLessThreshold := 5 * time.Hour - c := newCommitRootsCache(logger.TestLogger(t), permissionLessThreshold, 1*time.Hour, 1*time.Millisecond, 1*time.Millisecond) - - k1 := [32]byte{1} - k2 := [32]byte{2} - k3 := [32]byte{3} - - t1 := time.Now().Add(-4 * time.Hour) - t2 := time.Now().Add(-3 * time.Hour) - t3 := time.Now().Add(-2 * time.Hour) - - c.AppendUnexecutedRoot(k1, t1) - c.AppendUnexecutedRoot(k2, t2) - c.AppendUnexecutedRoot(k3, t3) - - // First check should return permissionLessThreshold window - commitTs := c.OldestRootTimestamp() - assert.Equal(t, t1.Add(-time.Second), commitTs) - - // Reducing permissionLessExecutionThreshold works as speeding the clock - c.messageVisibilityInterval = 1 * time.Hour - - commitTs = c.OldestRootTimestamp() - assert.True(t, commitTs.Before(time.Now().Add(-1*time.Hour))) - assert.True(t, commitTs.After(t1)) - assert.True(t, commitTs.After(t2)) - assert.True(t, commitTs.After(t3)) -} +// Implement me from scratch! diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go index a21781e84c6..f4ee87f669c 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go @@ -343,6 +343,11 @@ func (c *CommitStore) GetCommitReportMatchingSeqNum(ctx context.Context, seqNr u } func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confs int) ([]cciptypes.CommitStoreReportWithTxMeta, error) { + latestBlock, err := c.lp.LatestBlock(ctx) + if err != nil { + return nil, err + } + logs, err := c.lp.LogsCreatedAfter( ctx, c.reportAcceptedSig, @@ -362,7 +367,7 @@ func (c *CommitStore) GetAcceptedCommitReportsGteTimestamp(ctx context.Context, res := make([]cciptypes.CommitStoreReportWithTxMeta, 0, len(parsedLogs)) for _, log := range parsedLogs { res = append(res, cciptypes.CommitStoreReportWithTxMeta{ - TxMeta: log.TxMeta, + TxMeta: log.TxMeta.UpdateFinalityStatus(uint64(latestBlock.FinalizedBlockNumber)), CommitStoreReport: log.Data, }) }