Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Returning messageVisibilityInterval always from commit roots cache (… #1157

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,38 +131,41 @@ func (s *commitRootsCache) Snooze(merkleRoot [32]byte) {
}

func (s *commitRootsCache) OldestRootTimestamp() time.Time {
permissionlessExecWindow := time.Now().Add(-s.permissionLessExecutionThresholdDuration)
timestamp, ok := s.pickOldestRootBlockTimestamp(permissionlessExecWindow)

if ok {
return timestamp
}

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()

// If rootsSearchFilter is before permissionlessExecWindow, it means that we have roots that are stuck forever and will never be executed
// In that case, we wipe out the entire queue. Next round should start from the permissionlessExecThreshold and rebuild cache from scratch.
s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
return permissionlessExecWindow
return time.Now().Add(-s.permissionLessExecutionThresholdDuration)
// TODO we can't rely on block timestamps, because in case of re-org they can change and therefore affect
// the logic in the case. In the meantime, always fallback to the default behaviour and use permissionlessThresholdWindow
//timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)
//
//if ok {
// return timestamp
//}
//
//s.rootsQueueMu.Lock()
//defer s.rootsQueueMu.Unlock()
//
//// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
//// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
//s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
//return messageVisibilityInterval
}

func (s *commitRootsCache) pickOldestRootBlockTimestamp(permissionlessExecWindow time.Time) (time.Time, bool) {
s.rootsQueueMu.RLock()
defer s.rootsQueueMu.RUnlock()

// If there are no roots in the queue, we can return the permissionlessExecWindow
if s.oldestRootTimestamp.IsZero() {
return permissionlessExecWindow, true
}
//func (s *commitRootsCache) pickOldestRootBlockTimestamp(permissionlessExecWindow time.Time) (time.Time, bool) {
// s.rootsQueueMu.RLock()
// defer s.rootsQueueMu.RUnlock()
//
// // If there are no roots in the queue, we can return the permissionlessExecWindow
// if s.oldestRootTimestamp.IsZero() {
// return permissionlessExecWindow, true
// }
//
// if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// // Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// // so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
// return s.oldestRootTimestamp.Add(-time.Second), true
// }
// return time.Time{}, false
//}

if s.oldestRootTimestamp.After(permissionlessExecWindow) {
// Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
return s.oldestRootTimestamp.Add(-time.Second), true
}
return time.Time{}, false
}
func (s *commitRootsCache) AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) {
prettyMerkleRoot := merkleRootToString(merkleRoot)

Expand Down
179 changes: 92 additions & 87 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,60 +67,60 @@ func Test_UnexecutedRoots(t *testing.T) {
roots: []rootWithTs{},
permissionLessThreshold: 1 * time.Hour,
},
{
name: "returns first root when all are not executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t1,
},
{
name: "returns first root when tail of queue is executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r2, r3},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t1,
},
{
name: "returns first not executed root",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r1, r2},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t3,
},
{
name: "returns r2 timestamp when r1 and r3 are executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r1, r3},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t2,
},
{
name: "returns oldest root even when all are executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r1, r2, r3},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t3,
},
//{
// name: "returns first root when all are not executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t1,
//},
//{
// name: "returns first root when tail of queue is executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r2, r3},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t1,
//},
//{
// name: "returns first not executed root",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r1, r2},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t3,
//},
//{
// name: "returns r2 timestamp when r1 and r3 are executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r1, r3},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t2,
//},
//{
// name: "returns oldest root even when all are executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r1, r2, r3},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t3,
//},
{
name: "returns permissionLessThreshold when all roots ale older that threshold",
roots: []rootWithTs{
Expand Down Expand Up @@ -161,12 +161,12 @@ func Test_UnexecutedRootsScenario(t *testing.T) {
k1 := [32]byte{1}
k2 := [32]byte{2}
k3 := [32]byte{3}
k4 := [32]byte{4}
//k4 := [32]byte{4}

t1 := time.Now().Add(-4 * time.Hour)
t2 := time.Now().Add(-3 * time.Hour)
t3 := time.Now().Add(-2 * time.Hour)
t4 := time.Now().Add(-1 * time.Hour)
//t4 := time.Now().Add(-1 * time.Hour)

// First check should return permissionLessThreshold window
commitTs := c.OldestRootTimestamp()
Expand All @@ -176,42 +176,47 @@ func Test_UnexecutedRootsScenario(t *testing.T) {
c.AppendUnexecutedRoot(k2, t2)
c.AppendUnexecutedRoot(k3, t3)

// After loading roots it should return the first one
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t1.Add(-time.Second), commitTs)

// Marking root in the middle as executed shouldn't change the commitTs
c.MarkAsExecuted(k2)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t1.Add(-time.Second), commitTs)

// Marking k1 as executed when k2 is already executed should return timestamp of k3
c.MarkAsExecuted(k1)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t3.Add(-time.Second), commitTs)

// Marking all as executed should return timestamp of the latest
c.MarkAsExecuted(k3)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t3.Add(-time.Second), commitTs)

// Adding k4 should return timestamp of k4
c.AppendUnexecutedRoot(k4, t4)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t4.Add(-time.Second), commitTs)

c.MarkAsExecuted(k4)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t4.Add(-time.Second), commitTs)
assert.True(t, commitTs.Before(time.Now().Add(-permissionLessThreshold)))

// Appending already executed roots should be ignored
c.AppendUnexecutedRoot(k1, t1)
c.AppendUnexecutedRoot(k2, t2)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t4.Add(-time.Second), commitTs)
//// After loading roots it should return the first one
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t1.Add(-time.Second), commitTs)
//
//// Marking root in the middle as executed shouldn't change the commitTs
//c.MarkAsExecuted(k2)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t1.Add(-time.Second), commitTs)
//
//// Marking k1 as executed when k2 is already executed should return timestamp of k3
//c.MarkAsExecuted(k1)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t3.Add(-time.Second), commitTs)
//
//// Marking all as executed should return timestamp of the latest
//c.MarkAsExecuted(k3)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t3.Add(-time.Second), commitTs)
//
//// Adding k4 should return timestamp of k4
//c.AppendUnexecutedRoot(k4, t4)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t4.Add(-time.Second), commitTs)
//
//c.MarkAsExecuted(k4)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t4.Add(-time.Second), commitTs)
//
//// Appending already executed roots should be ignored
//c.AppendUnexecutedRoot(k1, t1)
//c.AppendUnexecutedRoot(k2, t2)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t4.Add(-time.Second), commitTs)
}

func Test_UnexecutedRootsStaleQueue(t *testing.T) {
t.Skip("This test needs caching to properly handle re-orgs")

permissionLessThreshold := 5 * time.Hour
c := newCommitRootsCache(logger.TestLogger(t), permissionLessThreshold, 1*time.Hour, 1*time.Millisecond, 1*time.Millisecond)

Expand Down
Loading