diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index fbc1da075df..7f0edffcb79 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -26,7 +26,7 @@ type LogBuffer interface { // It also accepts a function to select upkeeps. // Returns logs (associated to upkeeps) and the number of remaining // logs in that window for the involved upkeeps. - Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) + Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) // SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep. SetConfig(lookback, blockRate, logLimit uint32) // NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer. @@ -110,11 +110,10 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { // Dequeue greedly pulls logs from the buffers. // Returns logs and the number of remaining logs in the buffer. -func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { +func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { b.lock.RLock() defer b.lock.RUnlock() - start, end := getBlockWindow(block, blockRate) return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector) } @@ -126,11 +125,14 @@ func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { var result []BufferedLog var remainingLogs int + selectedUpkeeps := []string{} + numLogs := 0 for _, q := range b.queues { if !upkeepSelector(q.id) { // if the upkeep is not selected, skip it continue } + selectedUpkeeps = append(selectedUpkeeps, q.id.String()) logsInRange := q.sizeOfRange(start, end) if logsInRange == 0 { // if there are no logs in the range, skip the upkeep @@ -150,8 +152,10 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS result = append(result, BufferedLog{ID: q.id, Log: l}) capacity-- } + numLogs += len(logs) remainingLogs += remaining } + b.lggr.Debugw("dequeued logs for upkeeps", "numUpkeeps", len(selectedUpkeeps), "numLogs", numLogs) return result, remainingLogs } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 19f806d35b9..c630c3fdc14 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -141,7 +141,8 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) { added, dropped := buf.Enqueue(id, logs...) require.Equal(t, len(logs), added+dropped) } - results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector) + start, end := getBlockWindow(tc.args.block, tc.args.blockRate) + results, remaining := buf.Dequeue(start, end, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector) require.Equal(t, len(tc.results), len(results)) require.Equal(t, tc.remaining, remaining) }) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index b07b08d3354..91ddc42854c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -7,6 +7,7 @@ import ( "fmt" "hash" "io" + "math" "math/big" "runtime" "sync" @@ -114,6 +115,10 @@ type logEventProvider struct { currentPartitionIdx uint64 chainID *big.Int + + currentIteration int + iterations int + previousStartWindow *int64 } func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { @@ -290,7 +295,34 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up // in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range. blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs() for len(payloads) < maxResults && start <= latestBlock { - logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector) + startWindow, end := getBlockWindow(start, blockRate) + + if p.previousStartWindow == nil { + p.previousStartWindow = &startWindow + + } else if p.previousStartWindow != nil && startWindow != *p.previousStartWindow { + p.lggr.Debugw("new block window", "windowStart", startWindow) + p.currentIteration = 0 + p.previousStartWindow = &startWindow + } + + if p.currentIteration == p.iterations { + p.currentIteration = 0 + } + + if p.currentIteration == 0 { + p.iterations = int(math.Ceil(float64(p.bufferV1.NumOfUpkeeps()*logLimitLow) / float64(maxResults))) + if p.iterations == 0 { + p.iterations = 1 + } + p.lggr.Debugw("calculated iterations", "iterations", p.iterations, "upkeeps", p.bufferV1.NumOfUpkeeps(), "logLimitLow", logLimitLow, "maxResults", maxResults) + } + + upkeepSelectorFn := func(id *big.Int) bool { + return id.Int64()%int64(p.iterations) == int64(p.currentIteration) + } + + logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn) if len(logs) > 0 { p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) }