Skip to content

Commit

Permalink
Iterate over upkeeps
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Apr 22, 2024
1 parent 9d359de commit e399dee
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogBuffer interface {
// It also accepts a function to select upkeeps.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand Down Expand Up @@ -110,11 +110,10 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

start, end := getBlockWindow(block, blockRate)
return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector)
}

Expand All @@ -126,11 +125,14 @@ func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int,
func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
var result []BufferedLog
var remainingLogs int
selectedUpkeeps := []string{}
numLogs := 0
for _, q := range b.queues {
if !upkeepSelector(q.id) {
// if the upkeep is not selected, skip it
continue
}
selectedUpkeeps = append(selectedUpkeeps, q.id.String())
logsInRange := q.sizeOfRange(start, end)
if logsInRange == 0 {
// if there are no logs in the range, skip the upkeep
Expand All @@ -150,8 +152,10 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS
result = append(result, BufferedLog{ID: q.id, Log: l})
capacity--
}
numLogs += len(logs)
remainingLogs += remaining
}
b.lggr.Debugw("dequeued logs for upkeeps", "numUpkeeps", len(selectedUpkeeps), "numLogs", numLogs)
return result, remainingLogs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) {
added, dropped := buf.Enqueue(id, logs...)
require.Equal(t, len(logs), added+dropped)
}
results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
start, end := getBlockWindow(tc.args.block, tc.args.blockRate)
results, remaining := buf.Dequeue(start, end, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
require.Equal(t, len(tc.results), len(results))
require.Equal(t, tc.remaining, remaining)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash"
"io"
"math"
"math/big"
"runtime"
"sync"
Expand Down Expand Up @@ -114,6 +115,10 @@ type logEventProvider struct {
currentPartitionIdx uint64

chainID *big.Int

currentIteration int
iterations int
previousStartWindow *int64
}

func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
Expand Down Expand Up @@ -290,7 +295,34 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
// in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range.
blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs()
for len(payloads) < maxResults && start <= latestBlock {
logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
startWindow, end := getBlockWindow(start, blockRate)

if p.previousStartWindow == nil {
p.previousStartWindow = &startWindow

} else if p.previousStartWindow != nil && startWindow != *p.previousStartWindow {
p.lggr.Debugw("new block window", "windowStart", startWindow)
p.currentIteration = 0
p.previousStartWindow = &startWindow
}

if p.currentIteration == p.iterations {
p.currentIteration = 0
}

if p.currentIteration == 0 {
p.iterations = int(math.Ceil(float64(p.bufferV1.NumOfUpkeeps()*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}
p.lggr.Debugw("calculated iterations", "iterations", p.iterations, "upkeeps", p.bufferV1.NumOfUpkeeps(), "logLimitLow", logLimitLow, "maxResults", maxResults)
}

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}
Expand Down

0 comments on commit e399dee

Please sign in to comment.