diff --git a/.changeset/heavy-mails-rule.md b/.changeset/heavy-mails-rule.md new file mode 100644 index 00000000000..fdb6b3929b3 --- /dev/null +++ b/.changeset/heavy-mails-rule.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add logs for when the assumptions of how the log buffer will be used are violated #internal diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index fbc1da075df..fe15e962e53 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -77,19 +77,26 @@ type logBuffer struct { // map of upkeep id to its queue queues map[string]*upkeepLogQueue lock sync.RWMutex + + // map for then number of times we have enqueued logs for a block number + enqueuedBlocks map[int64]map[string]int + enqueuedBlockLock sync.RWMutex } func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { return &logBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), - opts: newLogBufferOptions(lookback, blockRate, logLimit), - lastBlockSeen: new(atomic.Int64), - queues: make(map[string]*upkeepLogQueue), + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), + opts: newLogBufferOptions(lookback, blockRate, logLimit), + lastBlockSeen: new(atomic.Int64), + enqueuedBlocks: map[int64]map[string]int{}, + queues: make(map[string]*upkeepLogQueue), } } // Enqueue adds logs to the buffer and might also drop logs if the limit for the // given upkeep was exceeded. It will create a new buffer if it does not exist. +// Logs are expected to be enqueued in increasing order of block number. +// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call. // Returns the number of logs that were added and number of logs that were dropped. func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf, ok := b.getUpkeepQueue(uid) @@ -97,17 +104,67 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf = newUpkeepLogQueue(b.lggr, uid, b.opts) b.setUpkeepQueue(uid, buf) } - latestBlock := latestBlockNumber(logs...) - if b.lastBlockSeen.Load() < latestBlock { - b.lastBlockSeen.Store(latestBlock) + + latestLogBlock, uniqueBlocks := blockStatistics(logs...) + if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock { + b.lastBlockSeen.Store(latestLogBlock) + } else if latestLogBlock < lastBlockSeen { + b.lggr.Debugw("enqueuing logs with a latest block older older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) } + + b.trackBlockNumbersForUpkeep(uid, uniqueBlocks) + blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) if blockThreshold <= 0 { blockThreshold = 1 } + + b.cleanupEnqueuedBlocks(blockThreshold) + return buf.enqueue(blockThreshold, logs...) } +func (b *logBuffer) cleanupEnqueuedBlocks(blockThreshold int64) { + b.enqueuedBlockLock.Lock() + defer b.enqueuedBlockLock.Unlock() + // clean up enqueued block counts + for block := range b.enqueuedBlocks { + if block < blockThreshold { + delete(b.enqueuedBlocks, block) + } + } +} + +// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep, +// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a +// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single +// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once, +// we log a message. +func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) { + b.enqueuedBlockLock.Lock() + defer b.enqueuedBlockLock.Unlock() + + if uid == nil { + return + } + + for blockNumber := range uniqueBlocks { + if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok { + if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok { + blockNumbers[uid.String()] = upkeepBlockInstances + 1 + b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String()) + } else { + blockNumbers[uid.String()] = 1 + } + b.enqueuedBlocks[blockNumber] = blockNumbers + } else { + b.enqueuedBlocks[blockNumber] = map[string]int{ + uid.String(): 1, + } + } + } +} + // Dequeue greedly pulls logs from the buffers. // Returns logs and the number of remaining logs in the buffer. func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 19f806d35b9..c41dd3d9bcc 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -50,6 +51,96 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) { require.Equal(t, 1, buf.NumOfUpkeeps()) } +type readableLogger struct { + logger.Logger + DebugwFn func(msg string, keysAndValues ...interface{}) + NamedFn func(name string) logger.Logger + WithFn func(args ...interface{}) logger.Logger +} + +func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) { + l.DebugwFn(msg, keysAndValues...) +} + +func (l *readableLogger) Named(name string) logger.Logger { + return l +} + +func (l *readableLogger) With(args ...interface{}) logger.Logger { + return l +} + +func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { + t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) { + logReceived := false + readableLogger := &readableLogger{ + DebugwFn: func(msg string, keysAndValues ...interface{}) { + if msg == "enqueuing logs from a block older than latest seen block" { + logReceived = true + assert.Equal(t, "logBlock", keysAndValues[0]) + assert.Equal(t, int64(1), keysAndValues[1]) + assert.Equal(t, "lastBlockSeen", keysAndValues[2]) + assert.Equal(t, int64(2), keysAndValues[3]) + } + }, + } + + logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) + + buf := logBufferV1.(*logBuffer) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(2), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, + ) + + assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"]) + assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"]) + assert.True(t, true, logReceived) + }) + + t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) { + logReceived := false + readableLogger := &readableLogger{ + DebugwFn: func(msg string, keysAndValues ...interface{}) { + if msg == "enqueuing logs again for a previously seen block" { + logReceived = true + assert.Equal(t, "blockNumber", keysAndValues[0]) + assert.Equal(t, int64(3), keysAndValues[1]) + assert.Equal(t, "numberOfEnqueues", keysAndValues[2]) + assert.Equal(t, 2, keysAndValues[3]) + } + }, + } + + logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) + + buf := logBufferV1.(*logBuffer) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(2), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, + ) + buf.Enqueue(big.NewInt(3), + logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0}, + ) + buf.Enqueue(big.NewInt(3), + logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, + ) + + assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"]) + assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"]) + assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"]) + assert.True(t, true, logReceived) + }) +} + func TestLogEventBufferV1_Dequeue(t *testing.T) { tests := []struct { name string @@ -470,3 +561,107 @@ func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []lo } return logs } + +func Test_trackBlockNumbersForUpkeep(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1) + + logBuffer := buf.(*logBuffer) + + for _, tc := range []struct { + uid *big.Int + uniqueBlocks map[int64]bool + wantEnqueuedBlocks map[int64]map[string]int + }{ + { + uid: big.NewInt(1), + uniqueBlocks: map[int64]bool{ + 1: true, + 2: true, + 3: true, + }, + wantEnqueuedBlocks: map[int64]map[string]int{ + 1: { + "1": 1, + }, + 2: { + "1": 1, + }, + 3: { + "1": 1, + }, + }, + }, + { + uid: big.NewInt(2), + uniqueBlocks: map[int64]bool{ + 1: true, + 2: true, + 3: true, + }, + wantEnqueuedBlocks: map[int64]map[string]int{ + 1: { + "1": 1, + "2": 1, + }, + 2: { + "1": 1, + "2": 1, + }, + 3: { + "1": 1, + "2": 1, + }, + }, + }, + { + uid: big.NewInt(2), + uniqueBlocks: map[int64]bool{ + 3: true, + 4: true, + }, + wantEnqueuedBlocks: map[int64]map[string]int{ + 1: { + "1": 1, + "2": 1, + }, + 2: { + "1": 1, + "2": 1, + }, + 3: { + "1": 1, + "2": 2, + }, + 4: { + "2": 1, + }, + }, + }, + { + uniqueBlocks: map[int64]bool{ + 3: true, + 4: true, + }, + wantEnqueuedBlocks: map[int64]map[string]int{ + 1: { + "1": 1, + "2": 1, + }, + 2: { + "1": 1, + "2": 1, + }, + 3: { + "1": 1, + "2": 2, + }, + 4: { + "2": 1, + }, + }, + }, + } { + logBuffer.trackBlockNumbersForUpkeep(tc.uid, tc.uniqueBlocks) + assert.Equal(t, tc.wantEnqueuedBlocks, logBuffer.enqueuedBlocks) + } +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go index 9156e341688..9603d6da5be 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go @@ -8,7 +8,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" ) -// LogSorter sorts the logs based on block number, tx hash and log index. +// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash. // returns true if b should come before a. func LogSorter(a, b logpoller.Log) bool { return LogComparator(a, b) > 0 @@ -57,13 +57,17 @@ func logID(l logpoller.Log) string { return hex.EncodeToString(ext.LogIdentifier()) } -// latestBlockNumber returns the latest block number from the given logs -func latestBlockNumber(logs ...logpoller.Log) int64 { +// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers +func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) { var latest int64 + uniqueBlocks := map[int64]bool{} + for _, l := range logs { if l.BlockNumber > latest { latest = l.BlockNumber } + uniqueBlocks[l.BlockNumber] = true } - return latest + + return latest, uniqueBlocks }