Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Jul 11, 2024
1 parent 5e7f182 commit 92e4382
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 18 deletions.
86 changes: 69 additions & 17 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cache

import (
"context"
"encoding/hex"
"slices"
"sync"
"time"
Expand All @@ -15,6 +14,8 @@ import (
)

const (
// EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache
EvictionGracePeriod = 1 * time.Hour
// CleanupInterval defines how often roots cache is scanned to evict stale roots
CleanupInterval = 30 * time.Minute
)
Expand Down Expand Up @@ -48,11 +49,14 @@ func newCommitRootsCache(
cleanupInterval time.Duration,
) *commitRootsCache {
snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval)
executedRoots := cache.New(messageVisibilityInterval+EvictionGracePeriod, cleanupInterval)

return &commitRootsCache{
lggr: lggr,
reader: reader,
rootSnoozeTime: rootSnoozeTime,
finalizedRoots: orderedmap.New[string, ccip.CommitStoreReportWithTxMeta](),
executedRoots: executedRoots,
snoozedRoots: snoozedRoots,
messageVisibilityInterval: messageVisibilityInterval,
latestFinalizedCommitRootTs: time.Now().Add(-messageVisibilityInterval),
Expand All @@ -64,39 +68,65 @@ type commitRootsCache struct {
lggr logger.Logger
reader ccip.CommitStoreReader
messageVisibilityInterval time.Duration

// Mutable state
cacheMu sync.RWMutex
finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta]
snoozedRoots *cache.Cache
rootSnoozeTime time.Duration

// Mutable state. finalizedRoots is thread-safe by default, but updating latestFinalizedCommitRootTs and finalizedRoots requires locking.
cacheMu sync.RWMutex
// finalizedRoots is a map of merkleRoot -> CommitStoreReportWithTxMeta. It stores all the CommitReports that are
// marked as finalized by LogPoller, but not executed yet. Keeping only finalized reports doesn't require any state sync between LP and the cache.
// In order to keep this map size under control, we evict stale items every time we fetch new logs from the database.
// Also, ccip.CommitStoreReportWithTxMeta is a very tiny entity with almost fixed size, so it's not a big deal to keep it in memory.
// In case of high memory footprint caused by storing roots, we can make these even more lightweight by removing token/gas price updates.
// Whenever the root is executed (all messages executed and ExecutionStateChange events are finalized), we remove the root from the map.
finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta]
// snoozedRoots used only for temporary snoozing roots. It's a cache with TTL (usually around 5 minutes, but this configuration is set up on chain using rootSnoozeTime)
snoozedRoots *cache.Cache
executedRoots *cache.Cache
// latestFinalizedCommitRootTs is the timestamp of the latest finalized commit root (youngest in terms of timestamp).
// It's used get only the logs that were considered as unfinalized in a previous run.
// This way we limit database scans to the minimum and keep polling "unfinalized" part of the ReportAccepted events queue.
latestFinalizedCommitRootTs time.Time
}

func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) {
// 1. Fetch all the logs from the database after the latest finalized commit root timestamp
// 1. Fetch all the logs from the database after the latest finalized commit root timestamp.
// If this is a first run, it will fetch all the logs based on the messageVisibilityInterval.
// Worst case scenario, it will fetch around 480 reports (OCR Commit 60 seconds (fast chains default) * messageVisibilityInterval set to 8 hours (mainnet default))
// Even with the larger messageVisibilityInterval window (e.g. 24 hours) it should be acceptable (around 1500 logs).
// Keep in mind that this potentially heavy operation happens only once during the plugin boot and it's no different from the previous implementation.
logs, err := r.fetchLogsFromCommitStore(ctx)
if err != nil {
return nil, err
}

// 2. Iterate over the logs and check if the root is finalized or not. Return finalized and unfinalized reports
// It promotes finalized roots to the finalizedRoots map and evicts stale roots.
finalizedReports, unfinalizedReports := r.updateFinalizedRoots(logs)

// 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots.
// Return only the reports that are not snoozed.
return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil

}

// MarkAsExecuted marks the root as executed. It means that all the messages from the root were executed and the ExecutionStateChange event was finalized.
// Executed roots are removed from the cache.
func (r *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) {
prettyMerkleRoot := merkleRootToString(merkleRoot)
r.lggr.Infow("Marking root as executed and removing entirely from cache", "merkleRoot", prettyMerkleRoot)

r.cacheMu.Lock()
defer r.cacheMu.Unlock()

prettyMerkleRoot := merkleRootToString(merkleRoot)
r.finalizedRoots.Delete(prettyMerkleRoot)
r.executedRoots.SetDefault(prettyMerkleRoot, struct{}{})
}

// Snooze temporarily snoozes the root. It means that the root is not eligible for execution for a certain period of time.
// Snoozed roots are skipped when calling RootsEligibleForExecution
func (r *commitRootsCache) Snooze(merkleRoot [32]byte) {
r.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{})
prettyMerkleRoot := merkleRootToString(merkleRoot)
r.lggr.Infow("Snoozing root temporarily", "merkleRoot", prettyMerkleRoot, "rootSnoozeTime", r.rootSnoozeTime)
r.snoozedRoots.SetDefault(prettyMerkleRoot, struct{}{})
}

func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool {
Expand All @@ -114,6 +144,7 @@ func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip
r.cacheMu.Unlock()

// IO operation, release lock before!
r.lggr.Infow("Fetching Commit Reports with timestamp greater than or equal to", "timestamp", commitRootsFilterTimestamp)
return r.reader.GetAcceptedCommitReportsGteTimestamp(ctx, commitRootsFilterTimestamp, 0)
}

Expand All @@ -124,15 +155,24 @@ func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWit
// Assuming logs are properly ordered by block_timestamp, log_index
var unfinalizedReports []ccip.CommitStoreReportWithTxMeta
for _, log := range logs {
if _, finalized := r.finalizedRoots.Get(merkleRootToString(log.MerkleRoot)); finalized {
r.finalizedRoots.Store(merkleRootToString(log.MerkleRoot), log)
prettyMerkleRoot := merkleRootToString(log.MerkleRoot)
// Defensive check, if something is marked as executed, never allow it to come back to the cache
if _, executed := r.executedRoots.Get(prettyMerkleRoot); executed {
r.lggr.Debugw("Ignoring root marked as executed", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli)
continue
}

if log.IsFinalized() {
r.lggr.Debugw("Adding finalized root to cache", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli)
r.finalizedRoots.Store(prettyMerkleRoot, log)
} else {
r.lggr.Debugw("Bypassing unfinalized root", "merkleRoot", prettyMerkleRoot, "blockTimestamp", log.BlockTimestampUnixMilli)
unfinalizedReports = append(unfinalizedReports, log)
}
}

if r.finalizedRoots.Newest() != nil {
r.latestFinalizedCommitRootTs = time.UnixMilli(r.finalizedRoots.Newest().Value.BlockTimestampUnixMilli)
if newest := r.finalizedRoots.Newest(); newest != nil {
r.latestFinalizedCommitRootTs = time.UnixMilli(newest.Value.BlockTimestampUnixMilli)
}

var finalizedRoots []ccip.CommitStoreReportWithTxMeta
Expand All @@ -156,13 +196,25 @@ func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxM
}
eligibleReports = append(eligibleReports, report.CommitStoreReport)
}
// safety check
// safety check, probably not needed
slices.SortFunc(eligibleReports, func(i, j ccip.CommitStoreReport) int {
return int(i.Interval.Min - j.Interval.Min)
})
return eligibleReports
}

func merkleRootToString(merkleRoot [32]byte) string {
return hex.EncodeToString(merkleRoot[:])
// internal use only for testing
func (r *commitRootsCache) finalizedCachedLogs() []ccip.CommitStoreReport {
r.cacheMu.RLock()
defer r.cacheMu.RUnlock()

var finalizedRoots []ccip.CommitStoreReport
for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() {
finalizedRoots = append(finalizedRoots, pair.Value.CommitStoreReport)
}
return finalizedRoots
}

func merkleRootToString(merkleRoot ccip.Hash) string {
return merkleRoot.String()
}
142 changes: 141 additions & 1 deletion core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,143 @@
package cache

// Implement me from scratch!
import (
"testing"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks"
)

func Test_CacheIsInitializedWithFirstCall(t *testing.T) {
commitStoreReader := mocks.NewCommitStoreReader(t)
cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, time.Hour, time.Hour, time.Hour)
commitStoreReader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, mock.Anything, mock.Anything).Return([]ccip.CommitStoreReportWithTxMeta{}, nil)

roots, err := cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assert.Len(t, roots, 0)
}

func Test_CacheProgression_Internal(t *testing.T) {
ts1 := time.Now().Add(-5 * time.Hour).Truncate(time.Millisecond)
ts2 := time.Now().Add(-3 * time.Hour).Truncate(time.Millisecond)
ts3 := time.Now().Add(-1 * time.Hour).Truncate(time.Millisecond)

root1 := utils.RandomBytes32()
root2 := utils.RandomBytes32()
root3 := utils.RandomBytes32()

commitStoreReader := mocks.NewCommitStoreReader(t)

cache := newCommitRootsCache(logger.TestLogger(t), commitStoreReader, 10*time.Hour, time.Hour, time.Hour)

// Empty cache, no results from the reader
mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{})
roots, err := cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots)
assertRoots(t, cache.finalizedCachedLogs())

// Single unfinalized root returned
mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{createCommitStoreEntry(root1, ts1, false)})
roots, err = cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots, root1)
assertRoots(t, cache.finalizedCachedLogs())

// Finalized and unfinalized roots returned
mockCommitStoreReader(commitStoreReader, time.Time{}, []ccip.CommitStoreReportWithTxMeta{
createCommitStoreEntry(root1, ts1, true),
createCommitStoreEntry(root2, ts2, false),
})
roots, err = cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots, root1, root2)
assertRoots(t, cache.finalizedCachedLogs(), root1)

// Returning the same data should not impact cache state (no duplicates)
mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{
createCommitStoreEntry(root1, ts1, true),
createCommitStoreEntry(root2, ts2, false),
})
roots, err = cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots, root1, root2)
assertRoots(t, cache.finalizedCachedLogs(), root1)

// Snoozing oldest root
cache.Snooze(root1)
mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{
createCommitStoreEntry(root2, ts2, false),
createCommitStoreEntry(root3, ts3, false),
})
roots, err = cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots, root2, root3)
assertRoots(t, cache.finalizedCachedLogs(), root1)

// Snoozing everything
cache.Snooze(root2)
cache.Snooze(root3)
mockCommitStoreReader(commitStoreReader, ts1, []ccip.CommitStoreReportWithTxMeta{
createCommitStoreEntry(root2, ts2, true),
createCommitStoreEntry(root3, ts3, true),
})
roots, err = cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots)
assertRoots(t, cache.finalizedCachedLogs(), root1, root2, root3)

// Marking everything as executed removes it entirely, even if root is returned from the CommitStore
cache.MarkAsExecuted(root1)
cache.MarkAsExecuted(root2)
cache.MarkAsExecuted(root3)
mockCommitStoreReader(commitStoreReader, ts3, []ccip.CommitStoreReportWithTxMeta{
createCommitStoreEntry(root2, ts2, true),
createCommitStoreEntry(root3, ts3, true),
})
roots, err = cache.RootsEligibleForExecution(tests.Context(t))
require.NoError(t, err)
assertRoots(t, roots)
assertRoots(t, cache.finalizedCachedLogs())
}

func assertRoots(t *testing.T, reports []ccip.CommitStoreReport, expectedRoots ...[32]byte) {
require.Len(t, reports, len(expectedRoots))
for i, report := range reports {
assert.Equal(t, expectedRoots[i], report.MerkleRoot)
}
}

func mockCommitStoreReader(reader *mocks.CommitStoreReader, blockTimestamp time.Time, roots []ccip.CommitStoreReportWithTxMeta) {
if blockTimestamp.IsZero() {
reader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, mock.Anything, mock.Anything).
Return(roots, nil).Once()
} else {
reader.On("GetAcceptedCommitReportsGteTimestamp", mock.Anything, blockTimestamp, mock.Anything).
Return(roots, nil).Once()
}
}

func createCommitStoreEntry(root [32]byte, ts time.Time, finalized bool) ccip.CommitStoreReportWithTxMeta {
status := ccip.FinalizedStatusNotFinalized
if finalized {
status = ccip.FinalizedStatusFinalized
}
return ccip.CommitStoreReportWithTxMeta{
CommitStoreReport: ccip.CommitStoreReport{
MerkleRoot: root,
},
TxMeta: ccip.TxMeta{
BlockTimestampUnixMilli: ts.UnixMilli(),
Finalized: status,
},
}
}

0 comments on commit 92e4382

Please sign in to comment.