Skip to content

Commit

Permalink
Extract dequeue_coordinator into its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Jun 16, 2024
1 parent 2ef29fa commit 2b3c03a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package logprovider

import "math/big"

type dequeueCoordinator struct {
dequeuedMinimum map[int64]bool
notReady map[int64]bool
remainingLogs map[int64]int
dequeuedLogs map[int64]int
completeWindows map[int64]bool
dequeuedUpkeeps map[int64]map[string]int
}

func (c *dequeueCoordinator) dequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool) {
// check if minimum logs have been dequeued
for i := start; i <= latestBlock; i += int64(blockRate) {
startWindow, end := getBlockWindow(i, blockRate)
if latestBlock >= end {
c.completeWindows[startWindow] = true
} else if c.notReady[startWindow] { // the window is incomplete and has no logs to provide as of yet
return 0, 0, false
}

if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok {
if !hasDequeued {
return startWindow, end, true
}
} else {
return startWindow, end, true
}
}

// check best effort dequeue
for i := start; i < latestBlock; i += int64(blockRate) {
startWindow, end := getBlockWindow(i, blockRate)

if remainingLogs, ok := c.remainingLogs[startWindow]; ok {
if remainingLogs > 0 {
return startWindow, end, true
}
}
}

return 0, 0, false
}

// getUpkeepSelector returns a function that accepts an upkeep ID, and performs a modulus against the number of
// iterations, and compares the result against the current iteration. When this comparison returns true, the
// upkeep is selected for the dequeuing. This means that, for a given set of upkeeps, a different subset of
// upkeeps will be dequeued for each iteration once only, and, across all iterations, all upkeeps will be
// dequeued once.
func (c *dequeueCoordinator) getUpkeepSelector(startWindow int64, logLimitLow, iterations, currentIteration int) func(id *big.Int) bool {
bestEffort := false

if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok {
if hasDequeued {
bestEffort = true
}
}

return func(id *big.Int) bool {
// query the map of block number to upkeep ID for dequeued count here when the block window is incomplete
dequeueUpkeep := true
if !bestEffort {
if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok {
if windowUpkeeps[id.String()] >= logLimitLow {
dequeueUpkeep = false
}
}
}
return dequeueUpkeep && id.Int64()%int64(iterations) == int64(currentIteration)
}
}

func (c *dequeueCoordinator) trackUpkeeps(startWindow int64, upkeepID *big.Int) {
if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok {
windowUpkeeps[upkeepID.String()] = windowUpkeeps[upkeepID.String()] + 1
c.dequeuedUpkeeps[startWindow] = windowUpkeeps
} else {
c.dequeuedUpkeeps[startWindow] = map[string]int{
upkeepID.String(): 1,
}
}
}

func (c *dequeueCoordinator) updateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int) {
c.remainingLogs[startWindow] = remaining
c.dequeuedLogs[startWindow] += logs

if isComplete, ok := c.completeWindows[startWindow]; ok {
if isComplete {
// if the window is complete, and there are no more logs, then we have to consider this as min dequeued, even if no logs were dequeued
if c.remainingLogs[startWindow] == 0 || c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow {
c.dequeuedMinimum[startWindow] = true
}
} else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment
c.dequeuedMinimum[startWindow] = true
}
} else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment
c.dequeuedMinimum[startWindow] = true
} else if logs == 0 && remaining == 0 {
c.notReady[startWindow] = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,107 +133,6 @@ func newDequeueCoordinator() *dequeueCoordinator {
}
}

type dequeueCoordinator struct {
dequeuedMinimum map[int64]bool
notReady map[int64]bool
remainingLogs map[int64]int
dequeuedLogs map[int64]int
completeWindows map[int64]bool
dequeuedUpkeeps map[int64]map[string]int
}

func (c *dequeueCoordinator) dequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool) {
// check if minimum logs have been dequeued
for i := start; i <= latestBlock; i += int64(blockRate) {
startWindow, end := getBlockWindow(i, blockRate)
if latestBlock >= end {
c.completeWindows[startWindow] = true
} else if c.notReady[startWindow] { // the window is incomplete and has no logs to provide as of yet
return 0, 0, false
}

if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok {
if !hasDequeued {
return startWindow, end, true
}
} else {
return startWindow, end, true
}
}

// check best effort dequeue
for i := start; i < latestBlock; i += int64(blockRate) {
startWindow, end := getBlockWindow(i, blockRate)

if remainingLogs, ok := c.remainingLogs[startWindow]; ok {
if remainingLogs > 0 {
return startWindow, end, true
}
}
}

return 0, 0, false
}

// getUpkeepSelector returns a function that accepts an upkeep ID, and performs a modulus against the number of
// iterations, and compares the result against the current iteration. When this comparison returns true, the
// upkeep is selected for the dequeuing. This means that, for a given set of upkeeps, a different subset of
// upkeeps will be dequeued for each iteration once only, and, across all iterations, all upkeeps will be
// dequeued once.
func (c *dequeueCoordinator) getUpkeepSelector(startWindow int64, logLimitLow, iterations, currentIteration int) func(id *big.Int) bool {
bestEffort := false

if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok {
if hasDequeued {
bestEffort = true
}
}

return func(id *big.Int) bool {
// query the map of block number to upkeep ID for dequeued count here when the block window is incomplete
dequeueUpkeep := true
if !bestEffort {
if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok {
if windowUpkeeps[id.String()] >= logLimitLow {
dequeueUpkeep = false
}
}
}
return dequeueUpkeep && id.Int64()%int64(iterations) == int64(currentIteration)
}
}

func (c *dequeueCoordinator) trackUpkeeps(startWindow int64, upkeepID *big.Int) {
if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok {
windowUpkeeps[upkeepID.String()] = windowUpkeeps[upkeepID.String()] + 1
c.dequeuedUpkeeps[startWindow] = windowUpkeeps
} else {
c.dequeuedUpkeeps[startWindow] = map[string]int{
upkeepID.String(): 1,
}
}
}

func (c *dequeueCoordinator) updateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int) {
c.remainingLogs[startWindow] = remaining
c.dequeuedLogs[startWindow] += logs

if isComplete, ok := c.completeWindows[startWindow]; ok {
if isComplete {
// if the window is complete, and there are no more logs, then we have to consider this as min dequeued, even if no logs were dequeued
if c.remainingLogs[startWindow] == 0 || c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow {
c.dequeuedMinimum[startWindow] = true
}
} else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment
c.dequeuedMinimum[startWindow] = true
}
} else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment
c.dequeuedMinimum[startWindow] = true
} else if logs == 0 && remaining == 0 {
c.notReady[startWindow] = true
}
}

func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
Expand Down Expand Up @@ -456,15 +355,7 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
}
}

p.lggr.With("where", "getLogsFromBuffer").Infow("built payloads", "payloadsBuilt", payloadsBuilt)
if remaining > 0 {
p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining)
// TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times
continue
}

start += int64(blockRate)
p.lggr.With("where", "getLogsFromBuffer").Infow("advancing window", "start", start)
p.dequeueCoordinator.updateBlockWindow(startWindow, len(logs), remaining, numOfUpkeeps, logLimitLow)
}
p.currentIteration++
p.lggr.With("where", "getLogsFromBuffer").Infow("advanced iteration", "p.currentIteration", p.currentIteration)
Expand Down

0 comments on commit 2b3c03a

Please sign in to comment.