Skip to content

Commit

Permalink
WIP use a dequeue coordinator
Browse files Browse the repository at this point in the history
WIP only dequeue upkeeps under the low limit

Update tests

WIP tests

Add tests

Switch to a new structure of storing logs by block number, update tests

Update tests, track reorgs by comparing block hashes, mark as missing min dequeue in coordinator

Add tests showing reorg handling
  • Loading branch information
ferglor committed Jun 10, 2024
1 parent b1a9f33 commit e8eed1e
Show file tree
Hide file tree
Showing 6 changed files with 838 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,23 @@ type logBuffer struct {
lock sync.RWMutex

// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
blockHashes map[int64]string
dequeueCoordinator *dequeueCoordinator
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32, dequeueCoordinator *dequeueCoordinator) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
blockHashes: map[int64]string{},
dequeueCoordinator: dequeueCoordinator,
}
}

Expand All @@ -107,7 +111,11 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
b.setUpkeepQueue(uid, buf)
}

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
latestLogBlock, uniqueBlocks, reorgBlocks := b.blockStatistics(logs...)
if len(reorgBlocks) > 0 {
b.evictReorgdLogs(reorgBlocks)
}

if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
Expand All @@ -126,6 +134,17 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
return buf.enqueue(blockThreshold, logs...)
}

func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) {
for _, queue := range b.queues {
for blockNumber := range reorgBlocks {
if _, ok := queue.logs[blockNumber]; ok {
queue.logs[blockNumber] = []logpoller.Log{}
b.dequeueCoordinator.markReorg(blockNumber, b.opts.blockRate.Load())
}
}
}
}

func (b *logBuffer) cleanupEnqueuedBlocks(blockThreshold int64) {
b.enqueuedBlockLock.Lock()
defer b.enqueuedBlockLock.Unlock()
Expand Down Expand Up @@ -309,21 +328,23 @@ type upkeepLogQueue struct {
opts *logBufferOptions

// logs is the buffer of logs for the upkeep
logs []logpoller.Log
logs map[int64][]logpoller.Log
// states keeps track of the state of the logs that are known to the queue
// and the block number they were seen at
states map[string]logTriggerStateEntry
lock sync.RWMutex
blockNumbers []int64
states map[string]logTriggerStateEntry
lock sync.RWMutex
}

func newUpkeepLogQueue(lggr logger.Logger, id *big.Int, opts *logBufferOptions) *upkeepLogQueue {
maxLogs := int(opts.windowLimit.Load()) * opts.windows() // limit per window * windows
//maxLogs := int(opts.windowLimit.Load()) * opts.windows() // limit per window * windows
return &upkeepLogQueue{
lggr: lggr.With("upkeepID", id.String()),
id: id,
opts: opts,
logs: make([]logpoller.Log, 0, maxLogs),
states: make(map[string]logTriggerStateEntry),
lggr: lggr.With("upkeepID", id.String()),
id: id,
opts: opts,
logs: map[int64][]logpoller.Log{},
blockNumbers: make([]int64, 0),
states: make(map[string]logTriggerStateEntry),
}
}

Expand All @@ -333,9 +354,9 @@ func (q *upkeepLogQueue) sizeOfRange(start, end int64) int {
defer q.lock.RUnlock()

size := 0
for _, l := range q.logs {
if l.BlockNumber >= start && l.BlockNumber <= end {
size++
for blockNumber, logs := range q.logs {
if blockNumber >= start && blockNumber <= end {
size += len(logs)
}
}
return size
Expand All @@ -353,25 +374,30 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log,

var results []logpoller.Log
var remaining int
updatedLogs := make([]logpoller.Log, 0)
for _, l := range q.logs {
if l.BlockNumber >= start && l.BlockNumber <= end {
if len(results) < limit {
results = append(results, l)
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDequeued
q.states[lid] = s

for _, blockNumber := range q.blockNumbers {
if blockNumber >= start && blockNumber <= end {
updatedLogs := make([]logpoller.Log, 0)

for _, l := range q.logs[blockNumber] {
if len(results) < limit {
results = append(results, l)
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDequeued
q.states[lid] = s
}
} else {
remaining++
updatedLogs = append(updatedLogs, l)
}
continue
}
remaining++

q.logs[blockNumber] = updatedLogs
}
updatedLogs = append(updatedLogs, l)
}

if len(results) > 0 {
q.logs = updatedLogs
q.lggr.Debugw("Dequeued logs", "start", start, "end", end, "limit", limit, "results", len(results), "remaining", remaining)
}

Expand Down Expand Up @@ -401,7 +427,13 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo
}
q.states[lid] = logTriggerStateEntry{state: logTriggerStateEnqueued, block: log.BlockNumber}
added++
logs = append(logs, log)
if logList, ok := q.logs[log.BlockNumber]; ok {
logList = append(logList, log)
q.logs[log.BlockNumber] = logList
} else {
q.logs[log.BlockNumber] = []logpoller.Log{log}
q.blockNumbers = append(q.blockNumbers, log.BlockNumber)
}
}
q.logs = logs

Expand All @@ -424,9 +456,13 @@ func (q *upkeepLogQueue) orderLogs() {
// sort logs by block number, tx hash and log index
// to keep the q sorted and to ensure that logs can be
// grouped by block windows for the cleanup
sort.SliceStable(q.logs, func(i, j int) bool {
return LogSorter(q.logs[i], q.logs[j])
})
for _, blockNumber := range q.blockNumbers {
toSort := q.logs[blockNumber]
sort.SliceStable(toSort, func(i, j int) bool {
return LogSorter(toSort[i], toSort[j])
})
q.logs[blockNumber] = toSort
}
}

// clean removes logs that are older than blockThreshold and drops logs if the limit for the
Expand All @@ -436,46 +472,49 @@ func (q *upkeepLogQueue) clean(blockThreshold int64) int {
var dropped, expired int
blockRate := int(q.opts.blockRate.Load())
windowLimit := int(q.opts.windowLimit.Load())
updated := make([]logpoller.Log, 0)
// helper variables to keep track of the current window capacity
currentWindowCapacity, currentWindowStart := 0, int64(0)
for _, l := range q.logs {
if blockThreshold > l.BlockNumber { // old log, removed
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc()
// q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(q.states, logid)
expired++
continue
}
start, _ := getBlockWindow(l.BlockNumber, blockRate)
if start != currentWindowStart {
// new window, reset capacity
currentWindowStart = start
currentWindowCapacity = 0
}
currentWindowCapacity++
// if capacity has been reached, drop the log
if currentWindowCapacity > windowLimit {
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDropped
q.states[lid] = s
for blockNumber, logs := range q.logs {
updated := make([]logpoller.Log, 0)

for _, l := range logs {
if blockThreshold > l.BlockNumber { // old log, removed
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc()
// q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(q.states, logid)
expired++
continue
}
dropped++
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc()
q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber,
"blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated),
"currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity,
"maxLogsPerWindow", windowLimit, "blockRate", blockRate)
continue
start, _ := getBlockWindow(l.BlockNumber, blockRate)
if start != currentWindowStart {
// new window, reset capacity
currentWindowStart = start
currentWindowCapacity = 0
}
currentWindowCapacity++
// if capacity has been reached, drop the log
if currentWindowCapacity > windowLimit {
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDropped
q.states[lid] = s
}
dropped++
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc()
q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber,
"blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated),
"currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity,
"maxLogsPerWindow", windowLimit, "blockRate", blockRate)
continue
}
updated = append(updated, l)
}
updated = append(updated, l)
}

if dropped > 0 || expired > 0 {
q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs))
q.logs = updated
if dropped > 0 || expired > 0 {
q.logs[blockNumber] = updated
q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs))
}
}

q.cleanStates(blockThreshold)
Expand Down
Loading

0 comments on commit e8eed1e

Please sign in to comment.