-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add logs to identify when assumptions of log queuing behaviour are violated #12846
Changes from 9 commits
47012b6
7ff1cd9
4e19065
381702a
7ef5f73
f6a8162
2710e62
396425e
ec2aa0a
7619d2d
4bf1c35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": patch | ||
--- | ||
|
||
Add logs for when the assumptions of how the log buffer will be used are violated #internal |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,38 +76,80 @@ type logBuffer struct { | |
lastBlockSeen *atomic.Int64 | ||
// map of upkeep id to its queue | ||
queues map[string]*upkeepLogQueue | ||
lock sync.RWMutex | ||
// map for then number of times we have enqueued logs for a block number | ||
enqueuedBlocks map[int64]map[string]int | ||
lock sync.RWMutex | ||
} | ||
|
||
func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { | ||
return &logBuffer{ | ||
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), | ||
opts: newLogBufferOptions(lookback, blockRate, logLimit), | ||
lastBlockSeen: new(atomic.Int64), | ||
queues: make(map[string]*upkeepLogQueue), | ||
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), | ||
opts: newLogBufferOptions(lookback, blockRate, logLimit), | ||
lastBlockSeen: new(atomic.Int64), | ||
enqueuedBlocks: map[int64]map[string]int{}, | ||
queues: make(map[string]*upkeepLogQueue), | ||
} | ||
} | ||
|
||
// Enqueue adds logs to the buffer and might also drop logs if the limit for the | ||
// given upkeep was exceeded. It will create a new buffer if it does not exist. | ||
// Logs are expected to be enqueued in increasing order of block number. | ||
// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call. | ||
// Returns the number of logs that were added and number of logs that were dropped. | ||
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { | ||
buf, ok := b.getUpkeepQueue(uid) | ||
if !ok || buf == nil { | ||
buf = newUpkeepLogQueue(b.lggr, uid, b.opts) | ||
b.setUpkeepQueue(uid, buf) | ||
} | ||
latestBlock := latestBlockNumber(logs...) | ||
if b.lastBlockSeen.Load() < latestBlock { | ||
b.lastBlockSeen.Store(latestBlock) | ||
|
||
latestLogBlock, uniqueBlocks := blockStatistics(logs...) | ||
if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock { | ||
b.lastBlockSeen.Store(latestLogBlock) | ||
} else if latestLogBlock < lastBlockSeen { | ||
b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) | ||
} | ||
|
||
b.trackBlockNumbersForUpkeep(uid, uniqueBlocks) | ||
|
||
blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) | ||
if blockThreshold <= 0 { | ||
blockThreshold = 1 | ||
} | ||
|
||
// clean up enqueued block counts | ||
for block := range b.enqueuedBlocks { | ||
if block < blockThreshold-reorgBuffer { | ||
delete(b.enqueuedBlocks, block) | ||
} | ||
} | ||
|
||
return buf.enqueue(blockThreshold, logs...) | ||
} | ||
|
||
// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep, | ||
// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a | ||
// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single | ||
// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once, | ||
// we log a message. | ||
func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) { | ||
for blockNumber := range uniqueBlocks { | ||
if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok { | ||
if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok { | ||
blockNumbers[uid.String()] = upkeepBlockInstances + 1 | ||
b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String()) | ||
Comment on lines
+139
to
+140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think this logic will change with reorg handling? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes; based on what I have in the spike for reorgs, we should have a set of block numbers available to us to perform this check against 👍 |
||
} else { | ||
blockNumbers[uid.String()] = 1 | ||
} | ||
b.enqueuedBlocks[blockNumber] = blockNumbers | ||
} else { | ||
b.enqueuedBlocks[blockNumber] = map[string]int{ | ||
uid.String(): 1, | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Dequeue greedly pulls logs from the buffers. | ||
// Returns logs and the number of remaining logs in the buffer. | ||
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"testing" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" | ||
|
@@ -50,6 +51,96 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) { | |
require.Equal(t, 1, buf.NumOfUpkeeps()) | ||
} | ||
|
||
type readableLogger struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for the purposes of catching and validating log messages within the tests |
||
logger.Logger | ||
DebugwFn func(msg string, keysAndValues ...interface{}) | ||
NamedFn func(name string) logger.Logger | ||
WithFn func(args ...interface{}) logger.Logger | ||
} | ||
|
||
func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) { | ||
l.DebugwFn(msg, keysAndValues...) | ||
} | ||
|
||
func (l *readableLogger) Named(name string) logger.Logger { | ||
return l | ||
} | ||
|
||
func (l *readableLogger) With(args ...interface{}) logger.Logger { | ||
return l | ||
} | ||
|
||
func TestLogEventBufferV1_Enqueueviolations(t *testing.T) { | ||
t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) { | ||
logReceived := false | ||
readableLogger := &readableLogger{ | ||
DebugwFn: func(msg string, keysAndValues ...interface{}) { | ||
if msg == "enqueuing logs from a block older than latest seen block" { | ||
logReceived = true | ||
assert.Equal(t, "logBlock", keysAndValues[0]) | ||
assert.Equal(t, int64(1), keysAndValues[1]) | ||
assert.Equal(t, "lastBlockSeen", keysAndValues[2]) | ||
assert.Equal(t, int64(2), keysAndValues[3]) | ||
} | ||
}, | ||
} | ||
|
||
logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) | ||
|
||
buf := logBufferV1.(*logBuffer) | ||
|
||
buf.Enqueue(big.NewInt(1), | ||
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, | ||
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, | ||
) | ||
buf.Enqueue(big.NewInt(2), | ||
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, | ||
) | ||
|
||
assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"]) | ||
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"]) | ||
assert.True(t, true, logReceived) | ||
}) | ||
|
||
t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) { | ||
logReceived := false | ||
readableLogger := &readableLogger{ | ||
DebugwFn: func(msg string, keysAndValues ...interface{}) { | ||
if msg == "enqueuing logs again for a previously seen block" { | ||
logReceived = true | ||
assert.Equal(t, "blockNumber", keysAndValues[0]) | ||
assert.Equal(t, int64(3), keysAndValues[1]) | ||
assert.Equal(t, "numberOfEnqueues", keysAndValues[2]) | ||
assert.Equal(t, 2, keysAndValues[3]) | ||
} | ||
}, | ||
} | ||
|
||
logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) | ||
|
||
buf := logBufferV1.(*logBuffer) | ||
|
||
buf.Enqueue(big.NewInt(1), | ||
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, | ||
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, | ||
) | ||
buf.Enqueue(big.NewInt(2), | ||
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, | ||
) | ||
buf.Enqueue(big.NewInt(3), | ||
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0}, | ||
) | ||
buf.Enqueue(big.NewInt(3), | ||
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, | ||
) | ||
|
||
assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"]) | ||
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"]) | ||
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"]) | ||
assert.True(t, true, logReceived) | ||
}) | ||
} | ||
|
||
func TestLogEventBufferV1_Dequeue(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not fully sure why we need reorgBuffer here?
i think the other functions in buffer simply work on blockThreshold-lookback to blockThreshold range, reorg handling is done at the provider level