Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle reorgs in the provider - rework #13480

Closed
wants to merge 15 commits into from
4 changes: 4 additions & 0 deletions .changeset/modern-ghosts-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
"chainlink": patch
---
Iterate over upkeeps using an upkeep selector #changed
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package logprovider

import (
"math"
"math/big"
"sort"
"sync"
Expand All @@ -26,7 +25,7 @@ type LogBuffer interface {
// It also accepts a function to select upkeeps.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand Down Expand Up @@ -65,10 +64,6 @@ func (o *logBufferOptions) override(lookback, blockRate, logLimit uint32) {
o.blockRate.Store(blockRate)
}

func (o *logBufferOptions) windows() int {
return int(math.Ceil(float64(o.lookback.Load()) / float64(o.blockRate.Load())))
}

type logBuffer struct {
lggr logger.Logger
opts *logBufferOptions
Expand All @@ -79,17 +74,23 @@ type logBuffer struct {
lock sync.RWMutex

// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
blockHashes map[int64]string
dequeueCoordinator *dequeueCoordinator
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32, dequeueCoordinator *dequeueCoordinator) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
blockHashes: map[int64]string{},
dequeueCoordinator: dequeueCoordinator,
}
}

Expand All @@ -105,7 +106,11 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
b.setUpkeepQueue(uid, buf)
}

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
latestLogBlock, uniqueBlocks, reorgBlocks := b.blockStatistics(logs...)
if len(reorgBlocks) > 0 {
b.evictReorgdLogs(reorgBlocks)
}

if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
Expand All @@ -124,6 +129,17 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
return buf.enqueue(blockThreshold, logs...)
}

func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) {
for _, queue := range b.queues {
for blockNumber := range reorgBlocks {
if _, ok := queue.logs[blockNumber]; ok {
queue.logs[blockNumber] = []logpoller.Log{}
b.dequeueCoordinator.markReorg(blockNumber, b.opts.blockRate.Load())
}
}
}
}

func (b *logBuffer) cleanupEnqueuedBlocks(blockThreshold int64) {
b.enqueuedBlockLock.Lock()
defer b.enqueuedBlockLock.Unlock()
Expand Down Expand Up @@ -167,11 +183,10 @@ func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[in

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

start, end := getBlockWindow(block, blockRate)
return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector)
}

Expand All @@ -183,11 +198,14 @@ func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int,
func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
var result []BufferedLog
var remainingLogs int
for _, q := range b.queues {
var selectedUpkeeps int
numLogs := 0
for _, qid := range b.queueIDs {
q := b.queues[qid]
if !upkeepSelector(q.id) {
// if the upkeep is not selected, skip it
continue
}
selectedUpkeeps++
logsInRange := q.sizeOfRange(start, end)
if logsInRange == 0 {
// if there are no logs in the range, skip the upkeep
Expand All @@ -207,8 +225,10 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS
result = append(result, BufferedLog{ID: q.id, Log: l})
capacity--
}
numLogs += len(logs)
remainingLogs += remaining
}
b.lggr.Debugw("dequeued logs for upkeeps", "selectedUpkeeps", selectedUpkeeps, "numLogs", numLogs)
return result, remainingLogs
}

Expand All @@ -230,12 +250,18 @@ func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error {
b.lock.Lock()
defer b.lock.Unlock()

for upkeepID := range b.queues {
for _, upkeepID := range b.queueIDs {
uid := new(big.Int)
_, ok := uid.SetString(upkeepID, 10)
if ok && !filterStore.Has(uid) {
// remove upkeep that is not in the filter store
delete(b.queues, upkeepID)
for i, v := range b.queueIDs {
if v == upkeepID {
b.queueIDs = append(b.queueIDs[:i], b.queueIDs[i+1:]...)
break
}
}
}
}

Expand All @@ -254,6 +280,16 @@ func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) {
b.lock.Lock()
defer b.lock.Unlock()

found := false
for _, id := range b.queueIDs {
if id == uid.String() {
found = true
break
}
}
if !found {
b.queueIDs = append(b.queueIDs, uid.String())
}
b.queues[uid.String()] = buf
}

Expand Down Expand Up @@ -287,21 +323,23 @@ type upkeepLogQueue struct {
opts *logBufferOptions

// logs is the buffer of logs for the upkeep
logs []logpoller.Log
logs map[int64][]logpoller.Log
// states keeps track of the state of the logs that are known to the queue
// and the block number they were seen at
states map[string]logTriggerStateEntry
lock sync.RWMutex
blockNumbers []int64
states map[string]logTriggerStateEntry
lock sync.RWMutex
}

func newUpkeepLogQueue(lggr logger.Logger, id *big.Int, opts *logBufferOptions) *upkeepLogQueue {
maxLogs := int(opts.windowLimit.Load()) * opts.windows() // limit per window * windows
//maxLogs := int(opts.windowLimit.Load()) * opts.windows() // limit per window * windows
return &upkeepLogQueue{
lggr: lggr.With("upkeepID", id.String()),
id: id,
opts: opts,
logs: make([]logpoller.Log, 0, maxLogs),
states: make(map[string]logTriggerStateEntry),
lggr: lggr.With("upkeepID", id.String()),
id: id,
opts: opts,
logs: map[int64][]logpoller.Log{},
blockNumbers: make([]int64, 0),
states: make(map[string]logTriggerStateEntry),
}
}

Expand All @@ -311,9 +349,9 @@ func (q *upkeepLogQueue) sizeOfRange(start, end int64) int {
defer q.lock.RUnlock()

size := 0
for _, l := range q.logs {
if l.BlockNumber >= start && l.BlockNumber <= end {
size++
for blockNumber, logs := range q.logs {
if blockNumber >= start && blockNumber <= end {
size += len(logs)
}
}
return size
Expand All @@ -331,25 +369,30 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log,

var results []logpoller.Log
var remaining int
updatedLogs := make([]logpoller.Log, 0)
for _, l := range q.logs {
if l.BlockNumber >= start && l.BlockNumber <= end {
if len(results) < limit {
results = append(results, l)
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDequeued
q.states[lid] = s

for _, blockNumber := range q.blockNumbers {
if blockNumber >= start && blockNumber <= end {
updatedLogs := make([]logpoller.Log, 0)

for _, l := range q.logs[blockNumber] {
if len(results) < limit {
results = append(results, l)
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDequeued
q.states[lid] = s
}
} else {
remaining++
updatedLogs = append(updatedLogs, l)
}
continue
}
remaining++

q.logs[blockNumber] = updatedLogs
}
updatedLogs = append(updatedLogs, l)
}

if len(results) > 0 {
q.logs = updatedLogs
q.lggr.Debugw("Dequeued logs", "start", start, "end", end, "limit", limit, "results", len(results), "remaining", remaining)
}

Expand Down Expand Up @@ -379,7 +422,13 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo
}
q.states[lid] = logTriggerStateEntry{state: logTriggerStateEnqueued, block: log.BlockNumber}
added++
logs = append(logs, log)
if logList, ok := q.logs[log.BlockNumber]; ok {
logList = append(logList, log)
q.logs[log.BlockNumber] = logList
} else {
q.logs[log.BlockNumber] = []logpoller.Log{log}
q.blockNumbers = append(q.blockNumbers, log.BlockNumber)
}
}
q.logs = logs

Expand All @@ -402,9 +451,13 @@ func (q *upkeepLogQueue) orderLogs() {
// sort logs by block number, tx hash and log index
// to keep the q sorted and to ensure that logs can be
// grouped by block windows for the cleanup
sort.SliceStable(q.logs, func(i, j int) bool {
return LogSorter(q.logs[i], q.logs[j])
})
for _, blockNumber := range q.blockNumbers {
toSort := q.logs[blockNumber]
sort.SliceStable(toSort, func(i, j int) bool {
return LogSorter(toSort[i], toSort[j])
})
q.logs[blockNumber] = toSort
}
}

// clean removes logs that are older than blockThreshold and drops logs if the limit for the
Expand All @@ -414,46 +467,49 @@ func (q *upkeepLogQueue) clean(blockThreshold int64) int {
var dropped, expired int
blockRate := int(q.opts.blockRate.Load())
windowLimit := int(q.opts.windowLimit.Load())
updated := make([]logpoller.Log, 0)
// helper variables to keep track of the current window capacity
currentWindowCapacity, currentWindowStart := 0, int64(0)
for _, l := range q.logs {
if blockThreshold > l.BlockNumber { // old log, removed
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc()
// q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(q.states, logid)
expired++
continue
}
start, _ := getBlockWindow(l.BlockNumber, blockRate)
if start != currentWindowStart {
// new window, reset capacity
currentWindowStart = start
currentWindowCapacity = 0
}
currentWindowCapacity++
// if capacity has been reached, drop the log
if currentWindowCapacity > windowLimit {
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDropped
q.states[lid] = s
for blockNumber, logs := range q.logs {
updated := make([]logpoller.Log, 0)

for _, l := range logs {
if blockThreshold > l.BlockNumber { // old log, removed
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc()
// q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(q.states, logid)
expired++
continue
}
dropped++
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc()
q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber,
"blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated),
"currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity,
"maxLogsPerWindow", windowLimit, "blockRate", blockRate)
continue
start, _ := getBlockWindow(l.BlockNumber, blockRate)
if start != currentWindowStart {
// new window, reset capacity
currentWindowStart = start
currentWindowCapacity = 0
}
currentWindowCapacity++
// if capacity has been reached, drop the log
if currentWindowCapacity > windowLimit {
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDropped
q.states[lid] = s
}
dropped++
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc()
q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber,
"blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated),
"currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity,
"maxLogsPerWindow", windowLimit, "blockRate", blockRate)
continue
}
updated = append(updated, l)
}
updated = append(updated, l)
}

if dropped > 0 || expired > 0 {
q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs))
q.logs = updated
if dropped > 0 || expired > 0 {
q.logs[blockNumber] = updated
q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs))
}
}

q.cleanStates(blockThreshold)
Expand Down
Loading
Loading