Skip to content

Commit

Permalink
Simplify the dequeue coordinator
Browse files Browse the repository at this point in the history
Track enqueued logs
Add tests
  • Loading branch information
ferglor committed Jun 21, 2024
1 parent 4c73949 commit 8ec492e
Show file tree
Hide file tree
Showing 6 changed files with 656 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
blockThreshold = 1
}

return buf.enqueue(blockThreshold, logs...)
b.dequeueCoordinator.Clean(blockThreshold, b.opts.blockRate.Load())

return buf.enqueue(b.dequeueCoordinator, b.opts.blockRate.Load(), blockThreshold, logs...)
}

func (b *logBuffer) latestBlockNumber(logs ...logpoller.Log) int64 {
Expand Down Expand Up @@ -342,7 +344,7 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log,
for _, blockNumber := range q.blockNumbers {
if blockNumber >= start && blockNumber <= end {
updatedLogs := make([]logpoller.Log, 0)

blockResults := 0
for _, l := range q.logs[blockNumber] {
if len(results) < limit {
results = append(results, l)
Expand All @@ -351,13 +353,16 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log,
s.state = logTriggerStateDequeued
q.states[lid] = s
}
blockResults++
} else {
remaining++
updatedLogs = append(updatedLogs, l)
}
}

q.logs[blockNumber] = updatedLogs
if blockResults > 0 {
q.logs[blockNumber] = updatedLogs
}
}
}

Expand All @@ -373,11 +378,13 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log,
// enqueue adds logs to the buffer and might also drop logs if the limit for the
// given upkeep was exceeded. Additionally, it will drop logs that are older than blockThreshold.
// Returns the number of logs that were added and number of logs that were dropped.
func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Log) (int, int) {
func (q *upkeepLogQueue) enqueue(coordinator *dequeueCoordinator, blockRate uint32, blockThreshold int64, logsToAdd ...logpoller.Log) (int, int) {
q.lock.Lock()
defer q.lock.Unlock()

var added int

logsAdded := map[int64]int{}
for _, log := range logsToAdd {
if log.BlockNumber < blockThreshold {
// q.lggr.Debugw("Skipping log from old block", "blockThreshold", blockThreshold, "logBlock", log.BlockNumber, "logIndex", log.LogIndex)
Expand All @@ -390,6 +397,13 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo
}
q.states[lid] = logTriggerStateEntry{state: logTriggerStateEnqueued, block: log.BlockNumber}
added++

if count, ok := logsAdded[log.BlockNumber]; ok {
logsAdded[log.BlockNumber] = count + 1
} else {
logsAdded[log.BlockNumber] = 1
}

if logList, ok := q.logs[log.BlockNumber]; ok {
logList = append(logList, log)
q.logs[log.BlockNumber] = logList
Expand All @@ -400,6 +414,10 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo
}
}

for blockNumber, logsAddedForBlock := range logsAdded {
coordinator.CountEnqueuedLogsForWindow(blockNumber, blockRate, logsAddedForBlock)
}

var dropped int
if added > 0 {
q.orderLogs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,12 @@ func TestLogEventBufferV1_Enqueue(t *testing.T) {
}

func TestLogEventBufferV1_UpkeepQueue(t *testing.T) {
dequeueCoordinator := NewDequeueCoordinator()

t.Run("enqueue dequeue", func(t *testing.T) {
q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1))

added, dropped := q.enqueue(10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0})
added, dropped := q.enqueue(dequeueCoordinator, 1, 10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0})
require.Equal(t, 0, dropped)
require.Equal(t, 1, added)
require.Equal(t, 1, q.sizeOfRange(1, 20))
Expand All @@ -519,7 +521,7 @@ func TestLogEventBufferV1_UpkeepQueue(t *testing.T) {
t.Run("enqueue with limits", func(t *testing.T) {
q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1))

added, dropped := q.enqueue(10,
added, dropped := q.enqueue(dequeueCoordinator, 1, 10,
createDummyLogSequence(15, 0, 20, common.HexToHash("0x20"))...,
)
require.Equal(t, 5, dropped)
Expand All @@ -529,7 +531,7 @@ func TestLogEventBufferV1_UpkeepQueue(t *testing.T) {
t.Run("dequeue with limits", func(t *testing.T) {
q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 3))

added, dropped := q.enqueue(10,
added, dropped := q.enqueue(dequeueCoordinator, 1, 10,
logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 1},
logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 10},
Expand All @@ -544,6 +546,8 @@ func TestLogEventBufferV1_UpkeepQueue(t *testing.T) {
}

func TestLogEventBufferV1_UpkeepQueue_sizeOfRange(t *testing.T) {
dequeueCoordinator := NewDequeueCoordinator()

t.Run("empty", func(t *testing.T) {
q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1))

Expand All @@ -553,7 +557,7 @@ func TestLogEventBufferV1_UpkeepQueue_sizeOfRange(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
q := newUpkeepLogQueue(logger.TestLogger(t), big.NewInt(1), newLogBufferOptions(10, 1, 1))

added, dropped := q.enqueue(10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0})
added, dropped := q.enqueue(dequeueCoordinator, 1, 10, logpoller.Log{BlockNumber: 20, TxHash: common.HexToHash("0x1"), LogIndex: 0})
require.Equal(t, 0, dropped)
require.Equal(t, 1, added)
require.Equal(t, 0, q.sizeOfRange(1, 10))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,54 @@
package logprovider

import (
"math/big"
"sync"
)

type DequeueCoordinator interface {
// DequeueBlockWindow identifies a block window ready for processing between the given start and latest block numbers.
// CountEnqueuedLogsForWindow tracks how many logs are added for a particular block during the enqueue process.
CountEnqueuedLogsForWindow(block int64, blockRate uint32, added int)
// GetDequeueBlockWindow identifies a block window ready for processing between the given start and latest block numbers.
// It prioritizes windows that need to have the minimum guaranteed logs dequeued before considering windows with
// remaining logs to be dequeued, as a best effort.
DequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool)
// GetUpkeepSelector returns a function that accepts an upkeep ID, and performs a modulus against the number of
// iterations, and compares the result against the current iteration. When this comparison returns true, the
// upkeep is selected for the dequeuing. This means that, for a given set of upkeeps, a different subset of
// upkeeps will be dequeued for each iteration once only, and, across all iterations, all upkeeps will be
// dequeued once.
GetUpkeepSelector(startWindow int64, logLimitLow, iterations, currentIteration int) func(id *big.Int) bool
// TrackUpkeeps tracks how many times an upkeep has been dequeued for a given block window.
TrackUpkeeps(startWindow int64, upkeepID *big.Int)
// UpdateBlockWindow updates the status of a block window based on the number of logs dequeued,
GetDequeueBlockWindow(start int64, latestBlock int64, blockRate int, minGuarantee int) (int64, int64, bool)
// CountDequeuedLogsForWindow updates the status of a block window based on the number of logs dequeued,
// remaining logs, and the number of upkeeps. This function tracks remaining and dequeued logs for the specified
// block window, determines if a block window has had the minimum number of guaranteed logs dequeued, and marks a
// window as not ready if there are not yet any logs available to dequeue from the window.
UpdateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int)
CountDequeuedLogsForWindow(startWindow int64, logs, minGuaranteedLogs int)
// MarkReorg handles the detection of a reorg by resetting the state of the affected block window. It ensures that
// upkeeps within the specified block window are marked as not having the minimum number of guaranteed logs dequeued.
MarkReorg(block int64, blockRate uint32)
// Clean removes any data that is older than the block window of the blockThreshold from the dequeueCoordinator
Clean(blockThreshold int64, blockRate uint32)
}

type dequeueCoordinator struct {
dequeuedMinimum map[int64]bool
notReady map[int64]bool
remainingLogs map[int64]int
enqueuedLogs map[int64]int
dequeuedLogs map[int64]int
completeWindows map[int64]bool
dequeuedUpkeeps map[int64]map[string]int
mu sync.Mutex
}

func NewDequeueCoordinator() *dequeueCoordinator {
return &dequeueCoordinator{
dequeuedMinimum: map[int64]bool{},
notReady: map[int64]bool{},
remainingLogs: map[int64]int{},
enqueuedLogs: map[int64]int{},
dequeuedLogs: map[int64]int{},
completeWindows: map[int64]bool{},
dequeuedUpkeeps: map[int64]map[string]int{},
}
}

func (c *dequeueCoordinator) DequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool) {
func (c *dequeueCoordinator) CountEnqueuedLogsForWindow(block int64, blockRate uint32, added int) {
c.mu.Lock()
defer c.mu.Unlock()

startWindow, _ := getBlockWindow(block, int(blockRate))
c.enqueuedLogs[startWindow] += added
}

func (c *dequeueCoordinator) GetDequeueBlockWindow(start int64, latestBlock int64, blockRate int, minGuarantee int) (int64, int64, bool) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -58,11 +57,14 @@ func (c *dequeueCoordinator) DequeueBlockWindow(start int64, latestBlock int64,
startWindow, end := getBlockWindow(i, blockRate)
if latestBlock >= end {
c.completeWindows[startWindow] = true
} else if c.notReady[startWindow] { // the window is incomplete and has no logs to provide as of yet
} else if c.enqueuedLogs[startWindow] <= 0 { // the latest window is incomplete and has no logs to provide yet
break
}

if hasDequeued, ok := c.dequeuedMinimum[startWindow]; !ok || !hasDequeued {
enqueuedLogs := c.enqueuedLogs[startWindow]
dequeuedLogs := c.dequeuedLogs[startWindow]

if enqueuedLogs > 0 && dequeuedLogs < minGuarantee {
return startWindow, end, true
}
}
Expand All @@ -71,76 +73,29 @@ func (c *dequeueCoordinator) DequeueBlockWindow(start int64, latestBlock int64,
for i := start; i < latestBlock; i += int64(blockRate) {
startWindow, end := getBlockWindow(i, blockRate)

if remainingLogs, ok := c.remainingLogs[startWindow]; ok {
if remainingLogs > 0 {
return startWindow, end, true
}
if remainingLogs, ok := c.enqueuedLogs[startWindow]; ok && remainingLogs > 0 {
return startWindow, end, true
}
}

return 0, 0, false
}

func (c *dequeueCoordinator) GetUpkeepSelector(startWindow int64, logLimitLow, iterations, currentIteration int) func(id *big.Int) bool {
func (c *dequeueCoordinator) CountDequeuedLogsForWindow(startWindow int64, logs, minGuaranteedLogs int) {
c.mu.Lock()
defer c.mu.Unlock()

bestEffort := false

if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok {
if hasDequeued {
bestEffort = true
}
}

return func(id *big.Int) bool {
// query the map of block number to upkeep ID for dequeued count here when the block window is incomplete
dequeueUpkeep := true
if !bestEffort {
if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok {
if windowUpkeeps[id.String()] >= logLimitLow {
dequeueUpkeep = false
}
}
}
return dequeueUpkeep && id.Int64()%int64(iterations) == int64(currentIteration)
}
}

func (c *dequeueCoordinator) TrackUpkeeps(startWindow int64, upkeepID *big.Int) {
c.mu.Lock()
defer c.mu.Unlock()

if windowUpkeeps, ok := c.dequeuedUpkeeps[startWindow]; ok {
windowUpkeeps[upkeepID.String()] = windowUpkeeps[upkeepID.String()] + 1
c.dequeuedUpkeeps[startWindow] = windowUpkeeps
} else {
c.dequeuedUpkeeps[startWindow] = map[string]int{
upkeepID.String(): 1,
}
}
}

func (c *dequeueCoordinator) UpdateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int) {
c.mu.Lock()
defer c.mu.Unlock()

c.remainingLogs[startWindow] = remaining
c.enqueuedLogs[startWindow] -= logs
c.dequeuedLogs[startWindow] += logs

if isComplete, ok := c.completeWindows[startWindow]; ok {
if isComplete {
if c.completeWindows[startWindow] {
if c.enqueuedLogs[startWindow] <= 0 || c.dequeuedLogs[startWindow] >= minGuaranteedLogs {
// if the window is complete, and there are no more logs, then we have to consider this as min dequeued, even if no logs were dequeued
if c.remainingLogs[startWindow] == 0 || c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow {
c.dequeuedMinimum[startWindow] = true
}
} else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment
c.dequeuedMinimum[startWindow] = true
}
} else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment
} else if c.dequeuedLogs[startWindow] >= minGuaranteedLogs {
// if the window is not complete, but we were able to dequeue min guaranteed logs from the blocks that were available
c.dequeuedMinimum[startWindow] = true
} else if logs == 0 && remaining == 0 {
c.notReady[startWindow] = true
}
}

Expand All @@ -150,8 +105,22 @@ func (c *dequeueCoordinator) MarkReorg(block int64, blockRate uint32) {

startWindow, _ := getBlockWindow(block, int(blockRate))
c.dequeuedMinimum[startWindow] = false
// TODO instead of wiping the count for all upkeeps, should we wipe for upkeeps only impacted by the reorg?
for upkeepID := range c.dequeuedUpkeeps[startWindow] {
c.dequeuedUpkeeps[startWindow][upkeepID] = 0
c.enqueuedLogs[startWindow] = 0
c.dequeuedLogs[startWindow] = 0
}

func (c *dequeueCoordinator) Clean(blockThreshold int64, blockRate uint32) {
c.mu.Lock()
defer c.mu.Unlock()

blockThresholdStartWindow, _ := getBlockWindow(blockThreshold, int(blockRate))

for block := range c.enqueuedLogs {
if blockThresholdStartWindow > block {
delete(c.enqueuedLogs, block)
delete(c.dequeuedLogs, block)
delete(c.dequeuedMinimum, block)
delete(c.completeWindows, block)
}
}
}
Loading

0 comments on commit 8ec492e

Please sign in to comment.