Skip to content

Commit

Permalink
Update tests, track reorgs by comparing block hashes, mark as missing…
Browse files Browse the repository at this point in the history
… min dequeue in coordinator
  • Loading branch information
ferglor committed Jun 8, 2024
1 parent ba0bae5 commit 18d653c
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,23 @@ type logBuffer struct {
lock sync.RWMutex

// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
blockHashes map[int64]string
dequeueCoordinator *dequeueCoordinator
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32, dequeueCoordinator *dequeueCoordinator) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
blockHashes: map[int64]string{},
dequeueCoordinator: dequeueCoordinator,
}
}

Expand All @@ -107,7 +111,11 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
b.setUpkeepQueue(uid, buf)
}

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
latestLogBlock, uniqueBlocks, reorgBlocks := b.blockStatistics(logs...)
if len(reorgBlocks) > 0 {
b.evictReorgdLogs(reorgBlocks)
}

if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
Expand All @@ -126,6 +134,17 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
return buf.enqueue(blockThreshold, logs...)
}

func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) {
for _, queue := range b.queues {
for blockNumber := range reorgBlocks {
if _, ok := queue.logs[blockNumber]; ok {
queue.logs[blockNumber] = []logpoller.Log{}
b.dequeueCoordinator.markReorg(blockNumber, b.opts.blockRate.Load())
}
}
}
}

func (b *logBuffer) cleanupEnqueuedBlocks(blockThreshold int64) {
b.enqueuedBlockLock.Lock()
defer b.enqueuedBlockLock.Unlock()
Expand Down Expand Up @@ -453,59 +472,51 @@ func (q *upkeepLogQueue) clean(blockThreshold int64) int {
var dropped, expired int
blockRate := int(q.opts.blockRate.Load())
windowLimit := int(q.opts.windowLimit.Load())
updated := make([]logpoller.Log, 0)
blockNumbers := make([]int64, 0)
// helper variables to keep track of the current window capacity
currentWindowCapacity, currentWindowStart := 0, int64(0)
for _, blockNumber := range q.blockNumbers {
logs := q.logs[blockNumber]
for blockNumber, logs := range q.logs {
updated := make([]logpoller.Log, 0)

if blockThreshold > blockNumber { // old log, removed
for _, l := range logs {
for _, l := range logs {
if blockThreshold > l.BlockNumber { // old log, removed
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc()
// q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex)
logid := logID(l)
delete(q.states, logid)
expired++
continue
}
delete(q.logs, blockNumber)
} else {
blockNumbers = append(blockNumbers, blockNumber)

start, _ := getBlockWindow(blockNumber, blockRate)
start, _ := getBlockWindow(l.BlockNumber, blockRate)
if start != currentWindowStart {
// new window, reset capacity
currentWindowStart = start
currentWindowCapacity = 0
}

for _, l := range logs {
currentWindowCapacity++
// if capacity has been reached, drop the log
if currentWindowCapacity > windowLimit {
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDropped
q.states[lid] = s
}
dropped++
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc()
q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber,
"blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated),
"currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity,
"maxLogsPerWindow", windowLimit, "blockRate", blockRate)
} else {
updated = append(updated, l)
currentWindowCapacity++
// if capacity has been reached, drop the log
if currentWindowCapacity > windowLimit {
lid := logID(l)
if s, ok := q.states[lid]; ok {
s.state = logTriggerStateDropped
q.states[lid] = s
}
dropped++
prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc()
q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber,
"blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated),
"currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity,
"maxLogsPerWindow", windowLimit, "blockRate", blockRate)
continue
}
updated = append(updated, l)
}

if dropped > 0 || expired > 0 {
q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs))
q.logs[blockNumber] = updated
q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs))
}
}

q.blockNumbers = blockNumbers
q.cleanStates(blockThreshold)

return dropped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestLogEventBufferV1(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1)
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1, newDequeueCoordinator())

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
Expand All @@ -35,7 +35,7 @@ func TestLogEventBufferV1(t *testing.T) {
}

func TestLogEventBufferV1_SyncFilters(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1)
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1, newDequeueCoordinator())

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)
logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1, newDequeueCoordinator())

buf := logBufferV1.(*logBuffer)

Expand Down Expand Up @@ -118,7 +118,7 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)
logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1, newDequeueCoordinator())

buf := logBufferV1.(*logBuffer)

Expand Down Expand Up @@ -229,7 +229,7 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), uint32(tc.lookback), uint32(tc.args.blockRate), uint32(tc.args.upkeepLimit))
buf := NewLogBuffer(logger.TestLogger(t), uint32(tc.lookback), uint32(tc.args.blockRate), uint32(tc.args.upkeepLimit), newDequeueCoordinator())
for id, logs := range tc.logsInBuffer {
added, dropped := buf.Enqueue(id, logs...)
require.Equal(t, len(logs), added+dropped)
Expand All @@ -247,7 +247,7 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
lookback := uint32(20)
blockRate := uint32(1)
logLimit := uint32(1)
buf := NewLogBuffer(logger.TestLogger(t), lookback, blockRate, logLimit)
buf := NewLogBuffer(logger.TestLogger(t), lookback, blockRate, logLimit, newDequeueCoordinator())

upkeepIDs := []*big.Int{
big.NewInt(1),
Expand Down Expand Up @@ -281,11 +281,11 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
assert.Equal(t, 5, len(bufV1.queues))

// each queue should have 100 logs
assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 100, len(bufV1.queues["5"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["1"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["2"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["3"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["4"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["5"].logs))

maxResults := 5
iterations := int(math.Ceil(float64(numUpkeeps*5) / float64(maxResults)))
Expand All @@ -302,11 +302,11 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["1"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["2"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["3"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["4"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(1) // on this dequeue attempt, current iteration will be 1
Expand All @@ -318,11 +318,11 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["1"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["2"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["3"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["4"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(2) // on this dequeue attempt, current iteration will be 2
Expand All @@ -334,11 +334,11 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["1"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["2"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["3"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["4"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(3) // on this dequeue attempt, current iteration will be 3
Expand All @@ -350,11 +350,11 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["1"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["2"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["3"].logs))
assert.Equal(t, 100, countLogs(bufV1.queues["4"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(4) // on this dequeue attempt, current iteration will be 4
Expand All @@ -366,11 +366,11 @@ func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 95, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["1"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["2"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["3"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["4"].logs))
assert.Equal(t, 95, countLogs(bufV1.queues["5"].logs))
})
}

Expand Down Expand Up @@ -474,7 +474,7 @@ func TestLogEventBufferV1_Enqueue(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), tc.lookback, tc.blockRate, tc.upkeepLimit)
buf := NewLogBuffer(logger.TestLogger(t), tc.lookback, tc.blockRate, tc.upkeepLimit, newDequeueCoordinator())
for id, logs := range tc.logsToAdd {
added, dropped := buf.Enqueue(id, logs...)
sid := id.String()
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestLogEventBufferV1_UpkeepQueue_clean(t *testing.T) {
})

t.Run("happy path", func(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), 10, 5, 1)
buf := NewLogBuffer(logger.TestLogger(t), 10, 5, 1, newDequeueCoordinator())

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
Expand Down Expand Up @@ -698,7 +698,7 @@ func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []lo
}

func Test_trackBlockNumbersForUpkeep(t *testing.T) {
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1)
buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1, newDequeueCoordinator())

logBuffer := buf.(*logBuffer)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,23 @@ func logID(l logpoller.Log) string {
}

// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers
func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) {
func (b *logBuffer) blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool, map[int64]bool) {
var latest int64
uniqueBlocks := map[int64]bool{}
reorgBlocks := map[int64]bool{}

for _, l := range logs {
if l.BlockNumber > latest {
latest = l.BlockNumber
}
uniqueBlocks[l.BlockNumber] = true
if hash, ok := b.blockHashes[l.BlockNumber]; ok {
if hash != l.BlockHash.String() {
reorgBlocks[l.BlockNumber] = true
}
}
b.blockHashes[l.BlockNumber] = l.BlockHash.String()
}

return latest, uniqueBlocks
return latest, uniqueBlocks, reorgBlocks
}
Loading

0 comments on commit 18d653c

Please sign in to comment.