Skip to content

Commit

Permalink
Better caching of CommitRoots
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Jul 10, 2024
1 parent dc2a6b2 commit 769c60d
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 415 deletions.
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
Expand Down Expand Up @@ -136,7 +137,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.Rep
offRampReader: rf.config.offRampReader,
tokenPoolBatchedReader: rf.config.tokenPoolBatchedReader,
inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()),
commitRootsCache: cache.NewCommitRootsCache(lggr, rf.config.commitStoreReader, msgVisibilityInterval, offchainConfig.RootSnoozeTime.Duration()),
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: rf.config.chainHealthcheck,
}
Expand Down
32 changes: 3 additions & 29 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,39 +865,13 @@ func (r *ExecutionReportingPlugin) getUnexpiredCommitReports(
commitStoreReader ccipdata.CommitStoreReader,
lggr logger.Logger,
) ([]cciptypes.CommitStoreReport, error) {
createdAfterTimestamp := r.commitRootsCache.OldestRootTimestamp()
lggr.Infow("Fetching unexpired commit roots from database", "createdAfterTimestamp", createdAfterTimestamp)
acceptedReports, err := commitStoreReader.GetAcceptedCommitReportsGteTimestamp(
ctx,
createdAfterTimestamp,
0,
)
eligibleRootsForExec, err := r.commitRootsCache.RootsEligibleForExecution(ctx)
if err != nil {
return nil, err
}

var reports []cciptypes.CommitStoreReport
for _, acceptedReport := range acceptedReports {
reports = append(reports, acceptedReport.CommitStoreReport)
r.commitRootsCache.AppendUnexecutedRoot(acceptedReport.MerkleRoot, time.UnixMilli(acceptedReport.TxMeta.BlockTimestampUnixMilli))
}

notSnoozedReports := make([]cciptypes.CommitStoreReport, 0)
for _, report := range reports {
if r.commitRootsCache.IsSkipped(report.MerkleRoot) {
lggr.Debugw("Skipping snoozed root",
"minSeqNr", report.Interval.Min,
"maxSeqNr", report.Interval.Max,
"root", hex.EncodeToString(report.MerkleRoot[:]),
)
continue
}
notSnoozedReports = append(notSnoozedReports, report)
}

r.metricsCollector.UnexpiredCommitRoots(len(notSnoozedReports))
lggr.Infow("Unexpired roots", "all", len(reports), "notSnoozed", len(notSnoozedReports))
return notSnoozedReports, nil
r.metricsCollector.UnexpiredCommitRoots(len(eligibleRootsForExec))
return eligibleRootsForExec, nil
}

type execTokenData struct {
Expand Down
252 changes: 113 additions & 139 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
@@ -1,195 +1,169 @@
package cache

import (
"context"
"encoding/hex"
"slices"
"sync"
"time"

"github.com/patrickmn/go-cache"
"github.com/smartcontractkit/chainlink-common/pkg/types/ccip"
orderedmap "github.com/wk8/go-ordered-map/v2"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const (
// EvictionGracePeriod defines how long after the messageVisibilityInterval a root is still kept in the cache
EvictionGracePeriod = 1 * time.Hour
// CleanupInterval defines how often roots cache is scanned to evict stale roots
CleanupInterval = 30 * time.Minute
)

type CommitsRootsCache interface {
// IsSkipped returns true if the root is either executed or snoozed. Snoozing can be temporary based on the configuration
IsSkipped(merkleRoot [32]byte) bool
RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error)
MarkAsExecuted(merkleRoot [32]byte)
Snooze(merkleRoot [32]byte)

// OldestRootTimestamp returns the oldest root timestamp that is not executed yet (minus 1 second).
// If there are no roots in the queue, it returns the messageVisibilityInterval
OldestRootTimestamp() time.Time
// AppendUnexecutedRoot appends the root to the unexecuted roots queue to keep track of the roots that are not executed yet
// Roots has to be added in the order they are fetched from the database
AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time)
}

type commitRootsCache struct {
lggr logger.Logger
// executedRoots is used to keep track of the roots that are executed. Roots that are considered as executed
// when all messages are executed on the dest and matching execution state change logs are finalized
executedRoots *cache.Cache
// snoozedRoots is used to keep track of the roots that are temporary snoozed
snoozedRoots *cache.Cache
// unexecutedRootsQueue is used to keep track of the unexecuted roots in the order they are fetched from database (should be ordered by block_number, log_index)
// First run of Exec will fill the queue with all the roots that are not executed yet within the [now-messageVisibilityInterval, now] window.
// When a root is executed, it is removed from the queue. Next database query instead of using entire messageVisibilityInterval window
// will use oldestRootTimestamp as the lower bound filter for block_timestamp.
// This way we can reduce the number of database rows fetched with every OCR round.
// We do it this way because roots for most of the cases are executed sequentially.
// Instead of skipping snoozed roots after we fetch them from the database, we do that on the db level by narrowing the search window.
//
// Example
// messageVisibilityInterval - 10 days, now - 2010-10-15
// We fetch all the roots that within the [2010-10-05, 2010-10-15] window and load them to the queue
// [0xA - 2010-10-10, 0xB - 2010-10-11, 0xC - 2010-10-12] -> 0xA is the oldest root
// We executed 0xA and a couple of rounds later, we mark 0xA as executed and snoozed that forever which removes it from the queue.
// [0xB - 2010-10-11, 0xC - 2010-10-12]
// Now the search filter wil be 0xA timestamp -> [2010-10-11, 20-10-15]
// If roots are executed out of order, it's not going to change anything. However, for most of the cases we have sequential root execution and that is
// a huge improvement because we don't need to fetch all the roots from the database in every round.
unexecutedRootsQueue *orderedmap.OrderedMap[string, time.Time]
oldestRootTimestamp time.Time
rootsQueueMu sync.RWMutex

// Both rootSnoozedTime and messageVisibilityInterval can be kept in the commitRootsCache without need to be updated.
// Those config properties are populates via onchain/offchain config. When changed, OCR plugin will be restarted and cache initialized with new config.
rootSnoozedTime time.Duration
messageVisibilityInterval time.Duration
func NewCommitRootsCache(
lggr logger.Logger,
reader ccip.CommitStoreReader,
messageVisibilityInterval time.Duration,
rootSnoozeTime time.Duration,
) CommitsRootsCache {
return newCommitRootsCache(
lggr,
reader,
messageVisibilityInterval,
rootSnoozeTime,
CleanupInterval,
)
}

func newCommitRootsCache(
lggr logger.Logger,
reader ccip.CommitStoreReader,
messageVisibilityInterval time.Duration,
rootSnoozeTime time.Duration,
evictionGracePeriod time.Duration,
cleanupInterval time.Duration,
) *commitRootsCache {
executedRoots := cache.New(messageVisibilityInterval+evictionGracePeriod, cleanupInterval)
snoozedRoots := cache.New(rootSnoozeTime, cleanupInterval)

return &commitRootsCache{
lggr: lggr,
executedRoots: executedRoots,
snoozedRoots: snoozedRoots,
unexecutedRootsQueue: orderedmap.New[string, time.Time](),
rootSnoozedTime: rootSnoozeTime,
messageVisibilityInterval: messageVisibilityInterval,
lggr: lggr,
reader: reader,
finalizedRoots: orderedmap.New[string, ccip.CommitStoreReportWithTxMeta](),
snoozedRoots: snoozedRoots,
messageVisibilityInterval: messageVisibilityInterval,
latestFinalizedCommitRootTs: time.Now().Add(-messageVisibilityInterval),
cacheMu: sync.RWMutex{},
}
}

func NewCommitRootsCache(
lggr logger.Logger,
messageVisibilityInterval time.Duration,
rootSnoozeTime time.Duration,
) *commitRootsCache {
return newCommitRootsCache(
lggr,
messageVisibilityInterval,
rootSnoozeTime,
EvictionGracePeriod,
CleanupInterval,
)
}
type commitRootsCache struct {
lggr logger.Logger
reader ccip.CommitStoreReader
messageVisibilityInterval time.Duration

func (s *commitRootsCache) IsSkipped(merkleRoot [32]byte) bool {
_, snoozed := s.snoozedRoots.Get(merkleRootToString(merkleRoot))
_, executed := s.executedRoots.Get(merkleRootToString(merkleRoot))
return snoozed || executed
// Mutable state
cacheMu sync.RWMutex
finalizedRoots *orderedmap.OrderedMap[string, ccip.CommitStoreReportWithTxMeta]
snoozedRoots *cache.Cache
latestFinalizedCommitRootTs time.Time
}

func (s *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) {
prettyMerkleRoot := merkleRootToString(merkleRoot)
s.executedRoots.SetDefault(prettyMerkleRoot, struct{}{})
func (r *commitRootsCache) RootsEligibleForExecution(ctx context.Context) ([]ccip.CommitStoreReport, error) {
r.cacheMu.Lock()
defer r.cacheMu.Unlock()

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()
// if there is only one root in the queue, we put its block_timestamp as oldestRootTimestamp
if s.unexecutedRootsQueue.Len() == 1 {
s.oldestRootTimestamp = s.unexecutedRootsQueue.Oldest().Value
}
s.unexecutedRootsQueue.Delete(prettyMerkleRoot)
if head := s.unexecutedRootsQueue.Oldest(); head != nil {
s.oldestRootTimestamp = head.Value
// 1. Fetch all the logs from the database after the latest finalized commit root timestamp
logs, err := r.fetchLogsFromCommitStore(ctx)
if err != nil {
return nil, err
}
s.lggr.Debugw("Deleting executed root from the queue",
"merkleRoot", prettyMerkleRoot,
"oldestRootTimestamp", s.oldestRootTimestamp,
)

// 2. Iterate over the logs and check if the root is finalized or not. Return finalized and unfinalized reports
finalizedReports, unfinalizedReports := r.updateFinalizedRoots(logs)

// 3. Join finalized commit reports with unfinalized reports and outfilter snoozed roots.
return r.pickReadyToExecute(finalizedReports, unfinalizedReports), nil

}

func (s *commitRootsCache) Snooze(merkleRoot [32]byte) {
s.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{})
func (r *commitRootsCache) MarkAsExecuted(merkleRoot [32]byte) {
r.cacheMu.Lock()
defer r.cacheMu.Unlock()

prettyMerkleRoot := merkleRootToString(merkleRoot)
r.finalizedRoots.Delete(prettyMerkleRoot)
}

func (s *commitRootsCache) OldestRootTimestamp() time.Time {
return time.Now().Add(-s.messageVisibilityInterval)
// TODO we can't rely on block timestamps, because in case of re-org they can change and therefore affect
// the logic in the case. In the meantime, always fallback to the default behaviour and use permissionlessThresholdWindow
//timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)
//
//if ok {
// return timestamp
//}
//
//s.rootsQueueMu.Lock()
//defer s.rootsQueueMu.Unlock()
//
//// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
//// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
//s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
//return messageVisibilityInterval
func (r *commitRootsCache) Snooze(merkleRoot [32]byte) {
r.snoozedRoots.SetDefault(merkleRootToString(merkleRoot), struct{}{})
}

//func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) {
// s.rootsQueueMu.RLock()
// defer s.rootsQueueMu.RUnlock()
//
// // If there are no roots in the queue, we can return the messageVisibilityInterval
// if s.oldestRootTimestamp.IsZero() {
// return messageVisibilityInterval, true
// }
//
// if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// // Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// // so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
// return s.oldestRootTimestamp.Add(-time.Second), true
// }
// return time.Time{}, false
//}

func (s *commitRootsCache) AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) {
prettyMerkleRoot := merkleRootToString(merkleRoot)
func (r *commitRootsCache) isSnoozed(merkleRoot [32]byte) bool {
_, snoozed := r.snoozedRoots.Get(merkleRootToString(merkleRoot))
return snoozed
}

func (r *commitRootsCache) fetchLogsFromCommitStore(ctx context.Context) ([]ccip.CommitStoreReportWithTxMeta, error) {
r.cacheMu.Lock()
messageVisibilityWindow := time.Now().Add(-r.messageVisibilityInterval)
if r.latestFinalizedCommitRootTs.Before(messageVisibilityWindow) {
r.latestFinalizedCommitRootTs = messageVisibilityWindow
}
commitRootsFilterTimestamp := r.latestFinalizedCommitRootTs
r.cacheMu.Unlock()

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()
// IO operation, release lock before!
return r.reader.GetAcceptedCommitReportsGteTimestamp(ctx, commitRootsFilterTimestamp, 0)
}

// If the root is already in the queue, we must not add it to the queue
if _, found := s.unexecutedRootsQueue.Get(prettyMerkleRoot); found {
return
func (r *commitRootsCache) updateFinalizedRoots(logs []ccip.CommitStoreReportWithTxMeta) ([]ccip.CommitStoreReportWithTxMeta, []ccip.CommitStoreReportWithTxMeta) {
r.cacheMu.Lock()
defer r.cacheMu.Unlock()

// Assuming logs are properly ordered by block_timestamp, log_index
var unfinalizedReports []ccip.CommitStoreReportWithTxMeta
for _, log := range logs {
if _, finalized := r.finalizedRoots.Get(merkleRootToString(log.MerkleRoot)); finalized {
r.finalizedRoots.Store(merkleRootToString(log.MerkleRoot), log)
} else {
unfinalizedReports = append(unfinalizedReports, log)
}
}
// If the root is already executed, we must not add it to the queue
if _, executed := s.executedRoots.Get(prettyMerkleRoot); executed {
return

if r.finalizedRoots.Newest() != nil {
r.latestFinalizedCommitRootTs = time.UnixMilli(r.finalizedRoots.Newest().Value.BlockTimestampUnixMilli)
}
// Initialize the search filter with the first root that is added to the queue
if s.unexecutedRootsQueue.Len() == 0 {
s.oldestRootTimestamp = blockTimestamp

var finalizedRoots []ccip.CommitStoreReportWithTxMeta
for pair := r.finalizedRoots.Oldest(); pair != nil; pair = pair.Next() {
// Evict stale items
if time.UnixMilli(pair.Value.BlockTimestampUnixMilli).Before(time.Now().Add(-r.messageVisibilityInterval)) {
r.finalizedRoots.Delete(pair.Key)
continue
}
finalizedRoots = append(finalizedRoots, pair.Value)
}
s.unexecutedRootsQueue.Set(prettyMerkleRoot, blockTimestamp)
s.lggr.Debugw("Adding unexecuted root to the queue",
"merkleRoot", prettyMerkleRoot,
"blockTimestamp", blockTimestamp,
"oldestRootTimestamp", s.oldestRootTimestamp,
)
return finalizedRoots, unfinalizedReports
}

func (r *commitRootsCache) pickReadyToExecute(r1 []ccip.CommitStoreReportWithTxMeta, r2 []ccip.CommitStoreReportWithTxMeta) []ccip.CommitStoreReport {
allReports := append(r1, r2...)
eligibleReports := make([]ccip.CommitStoreReport, 0, len(allReports))
for _, report := range allReports {
if r.isSnoozed(report.MerkleRoot) {
continue
}
eligibleReports = append(eligibleReports, report.CommitStoreReport)
}
// safety check
slices.SortFunc(eligibleReports, func(i, j ccip.CommitStoreReport) int {
return int(i.Interval.Min - j.Interval.Min)
})
return eligibleReports
}

func merkleRootToString(merkleRoot [32]byte) string {
Expand Down
Loading

0 comments on commit 769c60d

Please sign in to comment.