diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index c66c4350c02..c5cc6d2d6eb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -115,7 +115,9 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { blockThreshold = 1 } - return buf.enqueue(blockThreshold, logs...) + b.dequeueCoordinator.Clean(blockThreshold, b.opts.blockRate.Load()) + + return buf.enqueue(b.dequeueCoordinator, b.opts.blockRate.Load(), blockThreshold, logs...) } func (b *logBuffer) latestBlockNumber(logs ...logpoller.Log) int64 { @@ -342,7 +344,7 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, for _, blockNumber := range q.blockNumbers { if blockNumber >= start && blockNumber <= end { updatedLogs := make([]logpoller.Log, 0) - + blockResults := 0 for _, l := range q.logs[blockNumber] { if len(results) < limit { results = append(results, l) @@ -351,13 +353,16 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, s.state = logTriggerStateDequeued q.states[lid] = s } + blockResults++ } else { remaining++ updatedLogs = append(updatedLogs, l) } } - q.logs[blockNumber] = updatedLogs + if blockResults > 0 { + q.logs[blockNumber] = updatedLogs + } } } @@ -373,11 +378,13 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, // enqueue adds logs to the buffer and might also drop logs if the limit for the // given upkeep was exceeded. Additionally, it will drop logs that are older than blockThreshold. // Returns the number of logs that were added and number of logs that were dropped. -func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Log) (int, int) { +func (q *upkeepLogQueue) enqueue(coordinator *dequeueCoordinator, blockRate uint32, blockThreshold int64, logsToAdd ...logpoller.Log) (int, int) { q.lock.Lock() defer q.lock.Unlock() var added int + + logsAdded := map[int64]int{} for _, log := range logsToAdd { if log.BlockNumber < blockThreshold { // q.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex) @@ -390,6 +397,13 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo } q.states[lid] = logTriggerStateEntry{state: logTriggerStateEnqueued, block: log.BlockNumber} added++ + + if count, ok := logsAdded[log.BlockNumber]; ok { + logsAdded[log.BlockNumber] = count + 1 + } else { + logsAdded[log.BlockNumber] = 1 + } + if logList, ok := q.logs[log.BlockNumber]; ok { logList = append(logList, log) q.logs[log.BlockNumber] = logList @@ -400,6 +414,10 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo } } + for blockNumber, logsAddedForBlock := range logsAdded { + coordinator.CountEnqueuedLogsForWindow(blockNumber, blockRate, logsAddedForBlock) + } + var dropped int if added > 0 { q.orderLogs() diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index dac05958bf0..47c54ba06ed 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -504,10 +504,12 @@ func TestLogEventBufferV1_Enqueue(t *testing.T) { } func TestLogEventBufferV1_UpkeepQueue(t *testing.T) { + dequeueCoordinator := NewDequeueCoordinator() + t.Run("enqueue dequeue", func(t *testing.T) { q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) - added, dropped := q.enqueue(10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + added, dropped := q.enqueue(dequeueCoordinator, 1, 10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}) require.Equal(t, 0, dropped) require.Equal(t, 1, added) require.Equal(t, 1, q.sizeOfRange(1, 20)) @@ -519,7 +521,7 @@ func TestLogEventBufferV1_UpkeepQueue(t *testing.T) { t.Run("enqueue with limits", func(t *testing.T) { q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) - added, dropped := q.enqueue(10, + added, dropped := q.enqueue(dequeueCoordinator, 1, 10, createDummyLogSequence(15, 0, 20, common.HexToHash("0x20"))..., ) require.Equal(t, 5, dropped) @@ -529,7 +531,7 @@ func TestLogEventBufferV1_UpkeepQueue(t *testing.T) { t.Run("dequeue with limits", func(t *testing.T) { q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 3)) - added, dropped := q.enqueue(10, + added, dropped := q.enqueue(dequeueCoordinator, 1, 10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 1}, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -544,6 +546,8 @@ func TestLogEventBufferV1_UpkeepQueue(t *testing.T) { } func TestLogEventBufferV1_UpkeepQueue_sizeOfRange(t *testing.T) { + dequeueCoordinator := NewDequeueCoordinator() + t.Run("empty", func(t *testing.T) { q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) @@ -553,7 +557,7 @@ func TestLogEventBufferV1_UpkeepQueue_sizeOfRange(t *testing.T) { t.Run("happy path", func(t *testing.T) { q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1)) - added, dropped := q.enqueue(10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + added, dropped := q.enqueue(dequeueCoordinator, 1, 10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0}) require.Equal(t, 0, dropped) require.Equal(t, 1, added) require.Equal(t, 0, q.sizeOfRange(1, 10)) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator.go index 6134d414bc6..48ad33345cd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator.go @@ -1,55 +1,54 @@ package logprovider import ( - "math/big" "sync" ) type DequeueCoordinator interface { - // DequeueBlockWindow identifies a block window ready for processing between the given start and latest block numbers. + // CountEnqueuedLogsForWindow tracks how many logs are added for a particular block during the enqueue process. + CountEnqueuedLogsForWindow(block int64, blockRate uint32, added int) + // GetDequeueBlockWindow identifies a block window ready for processing between the given start and latest block numbers. // It prioritizes windows that need to have the minimum guaranteed logs dequeued before considering windows with // remaining logs to be dequeued, as a best effort. - DequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool) - // GetUpkeepSelector returns a function that accepts an upkeep ID, and performs a modulus against the number of - // iterations, and compares the result against the current iteration. When this comparison returns true, the - // upkeep is selected for the dequeuing. This means that, for a given set of upkeeps, a different subset of - // upkeeps will be dequeued for each iteration once only, and, across all iterations, all upkeeps will be - // dequeued once. - GetUpkeepSelector(startWindow int64, logLimitLow, iterations, currentIteration int) func(id *big.Int) bool - // TrackUpkeeps tracks how many times an upkeep has been dequeued for a given block window. - TrackUpkeeps(startWindow int64, upkeepID *big.Int) - // UpdateBlockWindow updates the status of a block window based on the number of logs dequeued, + GetDequeueBlockWindow(start int64, latestBlock int64, blockRate int, minGuarantee int) (int64, int64, bool) + // CountDequeuedLogsForWindow updates the status of a block window based on the number of logs dequeued, // remaining logs, and the number of upkeeps. This function tracks remaining and dequeued logs for the specified // block window, determines if a block window has had the minimum number of guaranteed logs dequeued, and marks a // window as not ready if there are not yet any logs available to dequeue from the window. - UpdateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int) + CountDequeuedLogsForWindow(startWindow int64, logs, minGuaranteedLogs int) // MarkReorg handles the detection of a reorg by resetting the state of the affected block window. It ensures that // upkeeps within the specified block window are marked as not having the minimum number of guaranteed logs dequeued. MarkReorg(block int64, blockRate uint32) + // Clean removes any data that is older than the block window of the blockThreshold from the dequeueCoordinator + Clean(blockThreshold int64, blockRate uint32) } type dequeueCoordinator struct { dequeuedMinimum map[int64]bool - notReady map[int64]bool - remainingLogs map[int64]int + enqueuedLogs map[int64]int dequeuedLogs map[int64]int completeWindows map[int64]bool - dequeuedUpkeeps map[int64]map[string]int mu sync.Mutex } func NewDequeueCoordinator() *dequeueCoordinator { return &dequeueCoordinator{ dequeuedMinimum: map[int64]bool{}, - notReady: map[int64]bool{}, - remainingLogs: map[int64]int{}, + enqueuedLogs: map[int64]int{}, dequeuedLogs: map[int64]int{}, completeWindows: map[int64]bool{}, - dequeuedUpkeeps: map[int64]map[string]int{}, } } -func (c *dequeueCoordinator) DequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool) { +func (c *dequeueCoordinator) CountEnqueuedLogsForWindow(block int64, blockRate uint32, added int) { + c.mu.Lock() + defer c.mu.Unlock() + + startWindow, _ := getBlockWindow(block, int(blockRate)) + c.enqueuedLogs[startWindow] += added +} + +func (c *dequeueCoordinator) GetDequeueBlockWindow(start int64, latestBlock int64, blockRate int, minGuarantee int) (int64, int64, bool) { c.mu.Lock() defer c.mu.Unlock() @@ -58,11 +57,14 @@ func (c *dequeueCoordinator) DequeueBlockWindow(start int64, latestBlock int64, startWindow, end := getBlockWindow(i, blockRate) if latestBlock >= end { c.completeWindows[startWindow] = true - } else if c.notReady[startWindow] { // the window is incomplete and has no logs to provide as of yet + } else if c.enqueuedLogs[startWindow] <= 0 { // the latest window is incomplete and has no logs to provide yet break } - if hasDequeued, ok := c.dequeuedMinimum[startWindow]; !ok || !hasDequeued { + enqueuedLogs := c.enqueuedLogs[startWindow] + dequeuedLogs := c.dequeuedLogs[startWindow] + + if enqueuedLogs > 0 && dequeuedLogs < minGuarantee { return startWindow, end, true } } @@ -71,76 +73,29 @@ func (c *dequeueCoordinator) DequeueBlockWindow(start int64, latestBlock int64, for i := start; i < latestBlock; i += int64(blockRate) { startWindow, end := getBlockWindow(i, blockRate) - if remainingLogs, ok := c.remainingLogs[startWindow]; ok { - if remainingLogs > 0 { - return startWindow, end, true - } + if remainingLogs, ok := c.enqueuedLogs[startWindow]; ok && remainingLogs > 0 { + return startWindow, end, true } } return 0, 0, false } -func (c *dequeueCoordinator) GetUpkeepSelector(startWindow int64, logLimitLow, iterations, currentIteration int) func(id *big.Int) bool { +func (c *dequeueCoordinator) CountDequeuedLogsForWindow(startWindow int64, logs, minGuaranteedLogs int) { c.mu.Lock() defer c.mu.Unlock() - bestEffort := false - - if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok { - if hasDequeued { - bestEffort = true - } - } - - return func(id *big.Int) bool { - // query the map of block number to upkeep ID for dequeued count here when the block window is incomplete - dequeueUpkeep := true - if !bestEffort { - if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok { - if windowUpkeeps[id.String()] >= logLimitLow { - dequeueUpkeep = false - } - } - } - return dequeueUpkeep && id.Int64()%int64(iterations) == int64(currentIteration) - } -} - -func (c *dequeueCoordinator) TrackUpkeeps(startWindow int64, upkeepID *big.Int) { - c.mu.Lock() - defer c.mu.Unlock() - - if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok { - windowUpkeeps[upkeepID.String()] = windowUpkeeps[upkeepID.String()] + 1 - c.dequeuedUpkeeps[startWindow] = windowUpkeeps - } else { - c.dequeuedUpkeeps[startWindow] = map[string]int{ - upkeepID.String(): 1, - } - } -} - -func (c *dequeueCoordinator) UpdateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int) { - c.mu.Lock() - defer c.mu.Unlock() - - c.remainingLogs[startWindow] = remaining + c.enqueuedLogs[startWindow] -= logs c.dequeuedLogs[startWindow] += logs - if isComplete, ok := c.completeWindows[startWindow]; ok { - if isComplete { + if c.completeWindows[startWindow] { + if c.enqueuedLogs[startWindow] <= 0 || c.dequeuedLogs[startWindow] >= minGuaranteedLogs { // if the window is complete, and there are no more logs, then we have to consider this as min dequeued, even if no logs were dequeued - if c.remainingLogs[startWindow] == 0 || c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { - c.dequeuedMinimum[startWindow] = true - } - } else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment c.dequeuedMinimum[startWindow] = true } - } else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment + } else if c.dequeuedLogs[startWindow] >= minGuaranteedLogs { + // if the window is not complete, but we were able to dequeue min guaranteed logs from the blocks that were available c.dequeuedMinimum[startWindow] = true - } else if logs == 0 && remaining == 0 { - c.notReady[startWindow] = true } } @@ -150,8 +105,22 @@ func (c *dequeueCoordinator) MarkReorg(block int64, blockRate uint32) { startWindow, _ := getBlockWindow(block, int(blockRate)) c.dequeuedMinimum[startWindow] = false - // TODO instead of wiping the count for all upkeeps, should we wipe for upkeeps only impacted by the reorg? - for upkeepID := range c.dequeuedUpkeeps[startWindow] { - c.dequeuedUpkeeps[startWindow][upkeepID] = 0 + c.enqueuedLogs[startWindow] = 0 + c.dequeuedLogs[startWindow] = 0 +} + +func (c *dequeueCoordinator) Clean(blockThreshold int64, blockRate uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + blockThresholdStartWindow, _ := getBlockWindow(blockThreshold, int(blockRate)) + + for block := range c.enqueuedLogs { + if blockThresholdStartWindow > block { + delete(c.enqueuedLogs, block) + delete(c.dequeuedLogs, block) + delete(c.dequeuedMinimum, block) + delete(c.completeWindows, block) + } } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator_test.go new file mode 100644 index 00000000000..eae892720dc --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/dequeue_coordinator_test.go @@ -0,0 +1,403 @@ +package logprovider + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDequeueCoordinator_DequeueBlockWindow(t *testing.T) { + t.Run("an empty dequeue coordinator should tell us not to dequeue", func(t *testing.T) { + c := NewDequeueCoordinator() + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(0), end) + assert.Equal(t, false, canDequeue) + }) + + t.Run("a populated dequeue coordinator should tell us to dequeue the first window with logs", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(3, 1, 0) + c.CountEnqueuedLogsForWindow(4, 1, 10) + c.CountEnqueuedLogsForWindow(5, 1, 10) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 is the first block with no logs dequeued yet + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + }) + + t.Run("a populated dequeue coordinator should tell us to dequeue the next window with logs", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(3, 1, 0) + c.CountEnqueuedLogsForWindow(4, 1, 10) + c.CountEnqueuedLogsForWindow(5, 1, 10) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 is the first block with no logs dequeued yet + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 has been dequeued, so block 5 is the next window to dequeue + assert.Equal(t, int64(5), start) + assert.Equal(t, int64(5), end) + assert.Equal(t, true, canDequeue) + }) + + t.Run("a populated dequeue coordinator with minimum dequeue met should tell us to dequeue the next window with logs as best effort", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(3, 1, 0) + c.CountEnqueuedLogsForWindow(4, 1, 20) + c.CountEnqueuedLogsForWindow(5, 1, 20) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 is the first block with no logs dequeued yet + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 has been dequeued, so block 5 is the next window to dequeue + assert.Equal(t, int64(5), start) + assert.Equal(t, int64(5), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // all windows have had minimum logs dequeued, so we go back to block 4 to dequeue as best effort + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + }) + + t.Run("a fully exhausted dequeue coordinator should not tell us to dequeue", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(3, 1, 0) + c.CountEnqueuedLogsForWindow(4, 1, 20) + c.CountEnqueuedLogsForWindow(5, 1, 20) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 is the first block with no logs dequeued yet + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 has been dequeued, so block 5 is the next window to dequeue + assert.Equal(t, int64(5), start) + assert.Equal(t, int64(5), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // all windows have had minimum logs dequeued, so we go back to block 4 to dequeue as best effort + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 has been fully dequeued, so we dequeue block 5 + assert.Equal(t, int64(5), start) + assert.Equal(t, int64(5), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // all block windows have been fully dequeued so the coordinator tells us not to dequeue + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(0), end) + assert.Equal(t, false, canDequeue) + }) + + t.Run("an incomplete latest window without logs to dequeue gets passed over and best effort is executed", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(0, 4, 10) + c.CountEnqueuedLogsForWindow(1, 4, 10) + c.CountEnqueuedLogsForWindow(2, 4, 10) + c.CountEnqueuedLogsForWindow(3, 4, 10) + c.CountEnqueuedLogsForWindow(4, 4, 0) + c.CountEnqueuedLogsForWindow(5, 4, 0) + c.CountEnqueuedLogsForWindow(6, 4, 0) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + + // multiple dequeues in best effort now exhaust block window 0 + c.CountDequeuedLogsForWindow(start, 10, 10) + c.CountDequeuedLogsForWindow(start, 10, 10) + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(0), end) + assert.Equal(t, false, canDequeue) + }) + + t.Run("an incomplete latest window with min logs to dequeue gets dequeued", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(0, 4, 10) + c.CountEnqueuedLogsForWindow(1, 4, 10) + c.CountEnqueuedLogsForWindow(2, 4, 10) + c.CountEnqueuedLogsForWindow(3, 4, 10) + c.CountEnqueuedLogsForWindow(4, 4, 10) + c.CountEnqueuedLogsForWindow(5, 4, 0) + c.CountEnqueuedLogsForWindow(6, 4, 0) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(7), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + }) + + t.Run("an incomplete latest window with less than min logs to dequeue gets dequeued", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(0, 4, 10) + c.CountEnqueuedLogsForWindow(1, 4, 10) + c.CountEnqueuedLogsForWindow(2, 4, 10) + c.CountEnqueuedLogsForWindow(3, 4, 10) + c.CountEnqueuedLogsForWindow(4, 4, 5) + c.CountEnqueuedLogsForWindow(5, 4, 0) + c.CountEnqueuedLogsForWindow(6, 4, 0) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(7), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 5, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + + // now that the second block window is complete and has enough logs to meet min dequeue, we revert to min guaranteed dequeue + c.CountEnqueuedLogsForWindow(7, 4, 5) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(7), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 5, 10) + + // now we revert to best effort dequeue for the first block window + start, end, canDequeue = c.GetDequeueBlockWindow(1, 6, 4, 10) + + assert.Equal(t, int64(0), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + }) + + t.Run("a reorg causes us to revert to min guaranteed log dequeue", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(3, 1, 0) + c.CountEnqueuedLogsForWindow(4, 1, 20) + c.CountEnqueuedLogsForWindow(5, 1, 20) + c.CountEnqueuedLogsForWindow(6, 1, 20) + c.CountEnqueuedLogsForWindow(7, 1, 20) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 is the first block with no logs dequeued yet + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 4 has been dequeued, so block 5 is the next window to dequeue + assert.Equal(t, int64(5), start) + assert.Equal(t, int64(5), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // block 5 has been dequeued, so block 6 is the next window to dequeue + assert.Equal(t, int64(6), start) + assert.Equal(t, int64(6), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + // reorg happens and only block 4 has been re orgd + c.MarkReorg(4, 1) + + c.CountEnqueuedLogsForWindow(4, 1, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // we now have to go back to block 4 to dequeue minimum guaranteed logs + assert.Equal(t, int64(4), start) + assert.Equal(t, int64(4), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // now that block 4 has been min dequeued, we jump forward to block 7 to continue min dequeue + assert.Equal(t, int64(7), start) + assert.Equal(t, int64(7), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(start, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + // now that all block windows have had min logs dequeued, we go back to the earliest block window with remaining logs to dequeue best effort, i.e. block window 5 + assert.Equal(t, int64(5), start) + assert.Equal(t, int64(5), end) + assert.Equal(t, true, canDequeue) + }) + + t.Run("cleaning deletes data from the coordinator older than the block window of block threshold", func(t *testing.T) { + c := NewDequeueCoordinator() + + c.CountEnqueuedLogsForWindow(1, 1, 20) + c.CountEnqueuedLogsForWindow(2, 1, 20) + c.CountEnqueuedLogsForWindow(3, 1, 20) + c.CountEnqueuedLogsForWindow(4, 1, 20) + c.CountEnqueuedLogsForWindow(5, 1, 20) + c.CountEnqueuedLogsForWindow(6, 1, 20) + c.CountEnqueuedLogsForWindow(7, 1, 20) + c.CountEnqueuedLogsForWindow(8, 1, 20) + c.CountEnqueuedLogsForWindow(9, 1, 20) + + start, end, canDequeue := c.GetDequeueBlockWindow(1, 10, 1, 10) + + assert.Equal(t, int64(1), start) + assert.Equal(t, int64(1), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(1, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + assert.Equal(t, int64(2), start) + assert.Equal(t, int64(2), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(2, 10, 10) + + start, end, canDequeue = c.GetDequeueBlockWindow(1, 10, 1, 10) + + assert.Equal(t, int64(3), start) + assert.Equal(t, int64(3), end) + assert.Equal(t, true, canDequeue) + + c.CountDequeuedLogsForWindow(3, 10, 10) + + assert.Equal(t, 10, c.enqueuedLogs[1]) + assert.Equal(t, 10, c.enqueuedLogs[2]) + assert.Equal(t, 10, c.enqueuedLogs[3]) + + assert.Equal(t, 10, c.dequeuedLogs[1]) + assert.Equal(t, 10, c.dequeuedLogs[2]) + assert.Equal(t, 10, c.dequeuedLogs[3]) + + assert.Equal(t, true, c.dequeuedMinimum[1]) + assert.Equal(t, true, c.dequeuedMinimum[2]) + assert.Equal(t, true, c.dequeuedMinimum[3]) + + assert.Equal(t, true, c.completeWindows[1]) + assert.Equal(t, true, c.completeWindows[2]) + assert.Equal(t, true, c.completeWindows[3]) + + c.Clean(3, 1) + + assert.Equal(t, 0, c.enqueuedLogs[1]) + assert.Equal(t, 0, c.enqueuedLogs[2]) + assert.Equal(t, 10, c.enqueuedLogs[3]) + + assert.Equal(t, 0, c.dequeuedLogs[1]) + assert.Equal(t, 0, c.dequeuedLogs[2]) + assert.Equal(t, 10, c.dequeuedLogs[3]) + + assert.Equal(t, false, c.dequeuedMinimum[1]) + assert.Equal(t, false, c.dequeuedMinimum[2]) + assert.Equal(t, true, c.dequeuedMinimum[3]) + + assert.Equal(t, false, c.completeWindows[1]) + assert.Equal(t, false, c.completeWindows[2]) + assert.Equal(t, true, c.completeWindows[3]) + }) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index a9a0f2a1561..8c09ead1129 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -311,27 +311,29 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up } for len(payloads) < maxResults { - startWindow, end, canDequeue := p.dequeueCoordinator.DequeueBlockWindow(start, latestBlock, blockRate) + startWindow, end, canDequeue := p.dequeueCoordinator.GetDequeueBlockWindow(start, latestBlock, blockRate, numOfUpkeeps*logLimitLow) if !canDequeue { p.lggr.Debugw("Nothing to dequeue", "start", start, "latestBlock", latestBlock) break } - upkeepSelectorFn := p.dequeueCoordinator.GetUpkeepSelector(startWindow, logLimitLow, p.iterations, p.currentIteration) + upkeepSelectorFn := func(id *big.Int) bool { + return id.Int64()%int64(p.iterations) == int64(p.currentIteration) + } logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn) if len(logs) > 0 { - p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) + p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining) + for _, l := range logs { payload, err := p.createPayload(l.ID, l.Log) if err == nil { payloads = append(payloads, payload) } - p.dequeueCoordinator.TrackUpkeeps(startWindow, l.ID) } } - p.dequeueCoordinator.UpdateBlockWindow(startWindow, len(logs), remaining, numOfUpkeeps, logLimitLow) + p.dequeueCoordinator.CountDequeuedLogsForWindow(startWindow, len(logs), numOfUpkeeps*logLimitLow) } p.currentIteration++ default: @@ -344,6 +346,8 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up } } + p.lggr.Debugw("getLogsFromBuffer returning payloads", "payloads", len(payloads)) + return payloads } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go index 47d3e5e4af6..b1a71a10fb0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go @@ -1609,7 +1609,7 @@ func TestLogEventProvider_GetLatestPayloads(t *testing.T) { dequeueCoordinator := provider.dequeueCoordinator.(*dequeueCoordinator) assert.Equal(t, false, dequeueCoordinator.dequeuedMinimum[0]) - assert.Equal(t, true, dequeueCoordinator.notReady[0]) + //assert.Equal(t, true, dequeueCoordinator.notReady[0]) blockWindowCounts = map[int64]int{} @@ -1638,8 +1638,7 @@ func TestLogEventProvider_GetLatestPayloads(t *testing.T) { assert.Equal(t, 0, len(payloads)) - assert.Equal(t, true, dequeueCoordinator.dequeuedMinimum[0]) // now that the window is complete, it should be marked as dequeued minimum - assert.Equal(t, true, dequeueCoordinator.notReady[0]) + //assert.Equal(t, true, dequeueCoordinator.dequeuedMinimum[0]) // now that the window is complete, it should be marked as dequeued minimum provider.poller = &mockLogPoller{ LatestBlockFn: func(ctx context.Context) (int64, error) { @@ -1720,7 +1719,7 @@ func TestLogEventProvider_GetLatestPayloads(t *testing.T) { assert.Equal(t, 180, blockWindowCounts[4]) assert.Equal(t, 190, blockWindowCounts[8]) - assert.Equal(t, true, dequeueCoordinator.dequeuedMinimum[0]) + assert.Equal(t, false, dequeueCoordinator.dequeuedMinimum[0]) assert.Equal(t, true, dequeueCoordinator.dequeuedMinimum[4]) assert.Equal(t, true, dequeueCoordinator.dequeuedMinimum[8]) }) @@ -2300,6 +2299,168 @@ func TestLogEventProvider_GetLatestPayloads(t *testing.T) { assert.Equal(t, 40, blockWindowCounts[101]) assert.Equal(t, 40, blockWindowCounts[102]) }) + + t.Run("sparsely populated blocks", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + upkeepOmittedOnBlocks := map[int][]int64{ + 1: {5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100}, // upkeep 1 won't have logs on 20 blocks + 2: {2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100}, // upkeep 2 won't have logs on 50 blocks + 3: {3, 13, 23, 33, 43, 53, 63, 73, 83, 93}, // upkeep 3 won't appear on 10 blocks + 4: {1, 25, 50, 75, 100}, // upkeep 4 won't appear on 5 blocks + 5: {}, // upkeep 5 appears on all blocks + } + + callCount := 0 + // this gets called once per upkeep ID + logGenerator := func(start, end int64) []logpoller.Log { + callCount++ + var res []logpoller.Log + outer: + for i := start; i < end; i++ { + for _, skip := range upkeepOmittedOnBlocks[callCount] { + if skip == i+1 { + continue outer + } + } + res = append(res, logpoller.Log{ + LogIndex: i, + BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), + BlockNumber: i + 1, + }) + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + bufV1 := provider.bufferV1.(*logBuffer) + + blockWindowCounts := map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 4, blockWindowCounts[1]) + assert.Equal(t, 4, blockWindowCounts[2]) + assert.Equal(t, 4, blockWindowCounts[3]) + assert.Equal(t, 4, blockWindowCounts[4]) + assert.Equal(t, 4, blockWindowCounts[5]) + assert.Equal(t, 4, blockWindowCounts[6]) + assert.Equal(t, 5, blockWindowCounts[7]) // block 7 is the first block to contain 1 log for all upkeeps + + assert.Equal(t, 80, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 50, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 90, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 95, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 100, countLogs(bufV1.queues["5"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps based on availability + assert.Equal(t, 77, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 48, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 88, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 94, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 98, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 0, blockWindowCounts[1]) + assert.Equal(t, 0, blockWindowCounts[2]) + assert.Equal(t, 2, blockWindowCounts[3]) + assert.Equal(t, 4, blockWindowCounts[4]) + assert.Equal(t, 4, blockWindowCounts[5]) + assert.Equal(t, 4, blockWindowCounts[6]) + assert.Equal(t, 5, blockWindowCounts[7]) // block 7 is the first block to contain 1 log for all upkeeps + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps based on availability + assert.Equal(t, 76, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 47, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 86, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 91, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 0, blockWindowCounts[1]) + assert.Equal(t, 0, blockWindowCounts[2]) + assert.Equal(t, 0, blockWindowCounts[3]) + assert.Equal(t, 0, blockWindowCounts[4]) + assert.Equal(t, 0, blockWindowCounts[5]) + assert.Equal(t, 4, blockWindowCounts[6]) + assert.Equal(t, 5, blockWindowCounts[7]) // block 7 is the first block to contain 1 log for all upkeeps + }) } type mockedPacker struct {