Skip to content

Commit

Permalink
Returning messageVisibilityInterval always
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Jul 8, 2024
1 parent 54127b1 commit 4ff6465
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 116 deletions.
61 changes: 32 additions & 29 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,38 +131,41 @@ func (s *commitRootsCache) Snooze(merkleRoot [32]byte) {
}

func (s *commitRootsCache) OldestRootTimestamp() time.Time {
messageVisibilityInterval := time.Now().Add(-s.messageVisibilityInterval)
timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)

if ok {
return timestamp
}

s.rootsQueueMu.Lock()
defer s.rootsQueueMu.Unlock()

// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
return messageVisibilityInterval
return time.Now().Add(-s.messageVisibilityInterval)
// TODO we can't rely on block timestamps, because in case of re-org they can change and therefore affect
// the logic in the case. In the meantime, always fallback to the default behaviour and use permissionlessThresholdWindow
//timestamp, ok := s.pickOldestRootBlockTimestamp(messageVisibilityInterval)
//
//if ok {
// return timestamp
//}
//
//s.rootsQueueMu.Lock()
//defer s.rootsQueueMu.Unlock()
//
//// If rootsSearchFilter is before messageVisibilityInterval, it means that we have roots that are stuck forever and will never be executed
//// In that case, we wipe out the entire queue. Next round should start from the messageVisibilityInterval and rebuild cache from scratch.
//s.unexecutedRootsQueue = orderedmap.New[string, time.Time]()
//return messageVisibilityInterval
}

func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) {
s.rootsQueueMu.RLock()
defer s.rootsQueueMu.RUnlock()

// If there are no roots in the queue, we can return the messageVisibilityInterval
if s.oldestRootTimestamp.IsZero() {
return messageVisibilityInterval, true
}
//func (s *commitRootsCache) pickOldestRootBlockTimestamp(messageVisibilityInterval time.Time) (time.Time, bool) {
// s.rootsQueueMu.RLock()
// defer s.rootsQueueMu.RUnlock()
//
// // If there are no roots in the queue, we can return the messageVisibilityInterval
// if s.oldestRootTimestamp.IsZero() {
// return messageVisibilityInterval, true
// }
//
// if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// // Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// // so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
// return s.oldestRootTimestamp.Add(-time.Second), true
// }
// return time.Time{}, false
//}

if s.oldestRootTimestamp.After(messageVisibilityInterval) {
// Query used for fetching roots from the database is exclusive (block_timestamp > :timestamp)
// so we need to subtract 1 second from the head timestamp to make sure that this root is included in the results
return s.oldestRootTimestamp.Add(-time.Second), true
}
return time.Time{}, false
}
func (s *commitRootsCache) AppendUnexecutedRoot(merkleRoot [32]byte, blockTimestamp time.Time) {
prettyMerkleRoot := merkleRootToString(merkleRoot)

Expand Down
179 changes: 92 additions & 87 deletions core/services/ocr2/plugins/ccip/internal/cache/commit_roots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,60 +67,60 @@ func Test_UnexecutedRoots(t *testing.T) {
roots: []rootWithTs{},
permissionLessThreshold: 1 * time.Hour,
},
{
name: "returns first root when all are not executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t1,
},
{
name: "returns first root when tail of queue is executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r2, r3},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t1,
},
{
name: "returns first not executed root",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r1, r2},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t3,
},
{
name: "returns r2 timestamp when r1 and r3 are executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r1, r3},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t2,
},
{
name: "returns oldest root even when all are executed",
roots: []rootWithTs{
{r1, t1},
{r2, t2},
{r3, t3},
},
executedRoots: [][32]byte{r1, r2, r3},
permissionLessThreshold: 10 * time.Hour,
expectedTimestamp: t3,
},
//{
// name: "returns first root when all are not executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t1,
//},
//{
// name: "returns first root when tail of queue is executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r2, r3},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t1,
//},
//{
// name: "returns first not executed root",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r1, r2},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t3,
//},
//{
// name: "returns r2 timestamp when r1 and r3 are executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r1, r3},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t2,
//},
//{
// name: "returns oldest root even when all are executed",
// roots: []rootWithTs{
// {r1, t1},
// {r2, t2},
// {r3, t3},
// },
// executedRoots: [][32]byte{r1, r2, r3},
// permissionLessThreshold: 10 * time.Hour,
// expectedTimestamp: t3,
//},
{
name: "returns permissionLessThreshold when all roots ale older that threshold",
roots: []rootWithTs{
Expand Down Expand Up @@ -161,12 +161,12 @@ func Test_UnexecutedRootsScenario(t *testing.T) {
k1 := [32]byte{1}
k2 := [32]byte{2}
k3 := [32]byte{3}
k4 := [32]byte{4}
//k4 := [32]byte{4}

t1 := time.Now().Add(-4 * time.Hour)
t2 := time.Now().Add(-3 * time.Hour)
t3 := time.Now().Add(-2 * time.Hour)
t4 := time.Now().Add(-1 * time.Hour)
//t4 := time.Now().Add(-1 * time.Hour)

// First check should return permissionLessThreshold window
commitTs := c.OldestRootTimestamp()
Expand All @@ -176,42 +176,47 @@ func Test_UnexecutedRootsScenario(t *testing.T) {
c.AppendUnexecutedRoot(k2, t2)
c.AppendUnexecutedRoot(k3, t3)

// After loading roots it should return the first one
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t1.Add(-time.Second), commitTs)

// Marking root in the middle as executed shouldn't change the commitTs
c.MarkAsExecuted(k2)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t1.Add(-time.Second), commitTs)

// Marking k1 as executed when k2 is already executed should return timestamp of k3
c.MarkAsExecuted(k1)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t3.Add(-time.Second), commitTs)

// Marking all as executed should return timestamp of the latest
c.MarkAsExecuted(k3)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t3.Add(-time.Second), commitTs)

// Adding k4 should return timestamp of k4
c.AppendUnexecutedRoot(k4, t4)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t4.Add(-time.Second), commitTs)

c.MarkAsExecuted(k4)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t4.Add(-time.Second), commitTs)
assert.True(t, commitTs.Before(time.Now().Add(-permissionLessThreshold)))

// Appending already executed roots should be ignored
c.AppendUnexecutedRoot(k1, t1)
c.AppendUnexecutedRoot(k2, t2)
commitTs = c.OldestRootTimestamp()
assert.Equal(t, t4.Add(-time.Second), commitTs)
//// After loading roots it should return the first one
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t1.Add(-time.Second), commitTs)
//
//// Marking root in the middle as executed shouldn't change the commitTs
//c.MarkAsExecuted(k2)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t1.Add(-time.Second), commitTs)
//
//// Marking k1 as executed when k2 is already executed should return timestamp of k3
//c.MarkAsExecuted(k1)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t3.Add(-time.Second), commitTs)
//
//// Marking all as executed should return timestamp of the latest
//c.MarkAsExecuted(k3)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t3.Add(-time.Second), commitTs)
//
//// Adding k4 should return timestamp of k4
//c.AppendUnexecutedRoot(k4, t4)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t4.Add(-time.Second), commitTs)
//
//c.MarkAsExecuted(k4)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t4.Add(-time.Second), commitTs)
//
//// Appending already executed roots should be ignored
//c.AppendUnexecutedRoot(k1, t1)
//c.AppendUnexecutedRoot(k2, t2)
//commitTs = c.OldestRootTimestamp()
//assert.Equal(t, t4.Add(-time.Second), commitTs)
}

func Test_UnexecutedRootsStaleQueue(t *testing.T) {
t.Skip("This test needs caching to properly handle re-orgs")

permissionLessThreshold := 5 * time.Hour
c := newCommitRootsCache(logger.TestLogger(t), permissionLessThreshold, 1*time.Hour, 1*time.Millisecond, 1*time.Millisecond)

Expand Down

0 comments on commit 4ff6465

Please sign in to comment.