diff --git a/.changeset/heavy-mails-rule.md b/.changeset/heavy-mails-rule.md deleted file mode 100644 index fdb6b3929b3..00000000000 --- a/.changeset/heavy-mails-rule.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chainlink": patch ---- - -Add logs for when the assumptions of how the log buffer will be used are violated #internal diff --git a/.changeset/moody-candles-compare.md b/.changeset/moody-candles-compare.md new file mode 100644 index 00000000000..b235b284a1e --- /dev/null +++ b/.changeset/moody-candles-compare.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Revert block number tracking #changed diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index b556b142984..fbc1da075df 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -76,25 +76,20 @@ type logBuffer struct { lastBlockSeen *atomic.Int64 // map of upkeep id to its queue queues map[string]*upkeepLogQueue - // map for then number of times we have enqueued logs for a block number - enqueuedBlocks map[int64]map[string]int - lock sync.RWMutex + lock sync.RWMutex } func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { return &logBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), - opts: newLogBufferOptions(lookback, blockRate, logLimit), - lastBlockSeen: new(atomic.Int64), - enqueuedBlocks: map[int64]map[string]int{}, - queues: make(map[string]*upkeepLogQueue), + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), + opts: newLogBufferOptions(lookback, blockRate, logLimit), + lastBlockSeen: new(atomic.Int64), + queues: make(map[string]*upkeepLogQueue), } } // Enqueue adds logs to the buffer and might also drop logs if the limit for the // given upkeep was exceeded. It will create a new buffer if it does not exist. -// Logs are expected to be enqueued in increasing order of block number. -// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call. // Returns the number of logs that were added and number of logs that were dropped. func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf, ok := b.getUpkeepQueue(uid) @@ -102,54 +97,17 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf = newUpkeepLogQueue(b.lggr, uid, b.opts) b.setUpkeepQueue(uid, buf) } - - latestLogBlock, uniqueBlocks := blockStatistics(logs...) - if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock { - b.lastBlockSeen.Store(latestLogBlock) - } else if latestLogBlock < lastBlockSeen { - b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) + latestBlock := latestBlockNumber(logs...) + if b.lastBlockSeen.Load() < latestBlock { + b.lastBlockSeen.Store(latestBlock) } - - b.trackBlockNumbersForUpkeep(uid, uniqueBlocks) - blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) if blockThreshold <= 0 { blockThreshold = 1 } - - // clean up enqueued block counts - for block := range b.enqueuedBlocks { - if block < blockThreshold { - delete(b.enqueuedBlocks, block) - } - } - return buf.enqueue(blockThreshold, logs...) } -// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep, -// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a -// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single -// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once, -// we log a message. -func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) { - for blockNumber := range uniqueBlocks { - if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok { - if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok { - blockNumbers[uid.String()] = upkeepBlockInstances + 1 - b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String()) - } else { - blockNumbers[uid.String()] = 1 - } - b.enqueuedBlocks[blockNumber] = blockNumbers - } else { - b.enqueuedBlocks[blockNumber] = map[string]int{ - uid.String(): 1, - } - } - } -} - // Dequeue greedly pulls logs from the buffers. // Returns logs and the number of remaining logs in the buffer. func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index d6b4f43ac6e..19f806d35b9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -51,96 +50,6 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) { require.Equal(t, 1, buf.NumOfUpkeeps()) } -type readableLogger struct { - logger.Logger - DebugwFn func(msg string, keysAndValues ...interface{}) - NamedFn func(name string) logger.Logger - WithFn func(args ...interface{}) logger.Logger -} - -func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) { - l.DebugwFn(msg, keysAndValues...) -} - -func (l *readableLogger) Named(name string) logger.Logger { - return l -} - -func (l *readableLogger) With(args ...interface{}) logger.Logger { - return l -} - -func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { - t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) { - logReceived := false - readableLogger := &readableLogger{ - DebugwFn: func(msg string, keysAndValues ...interface{}) { - if msg == "enqueuing logs from a block older than latest seen block" { - logReceived = true - assert.Equal(t, "logBlock", keysAndValues[0]) - assert.Equal(t, int64(1), keysAndValues[1]) - assert.Equal(t, "lastBlockSeen", keysAndValues[2]) - assert.Equal(t, int64(2), keysAndValues[3]) - } - }, - } - - logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) - - buf := logBufferV1.(*logBuffer) - - buf.Enqueue(big.NewInt(1), - logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, - logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, - ) - buf.Enqueue(big.NewInt(2), - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, - ) - - assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"]) - assert.True(t, true, logReceived) - }) - - t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) { - logReceived := false - readableLogger := &readableLogger{ - DebugwFn: func(msg string, keysAndValues ...interface{}) { - if msg == "enqueuing logs again for a previously seen block" { - logReceived = true - assert.Equal(t, "blockNumber", keysAndValues[0]) - assert.Equal(t, int64(3), keysAndValues[1]) - assert.Equal(t, "numberOfEnqueues", keysAndValues[2]) - assert.Equal(t, 2, keysAndValues[3]) - } - }, - } - - logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) - - buf := logBufferV1.(*logBuffer) - - buf.Enqueue(big.NewInt(1), - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, - ) - buf.Enqueue(big.NewInt(2), - logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, - ) - buf.Enqueue(big.NewInt(3), - logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0}, - ) - buf.Enqueue(big.NewInt(3), - logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, - ) - - assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"]) - assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"]) - assert.True(t, true, logReceived) - }) -} - func TestLogEventBufferV1_Dequeue(t *testing.T) { tests := []struct { name string diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go index 9603d6da5be..9156e341688 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go @@ -8,7 +8,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" ) -// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash. +// LogSorter sorts the logs based on block number, tx hash and log index. // returns true if b should come before a. func LogSorter(a, b logpoller.Log) bool { return LogComparator(a, b) > 0 @@ -57,17 +57,13 @@ func logID(l logpoller.Log) string { return hex.EncodeToString(ext.LogIdentifier()) } -// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers -func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) { +// latestBlockNumber returns the latest block number from the given logs +func latestBlockNumber(logs ...logpoller.Log) int64 { var latest int64 - uniqueBlocks := map[int64]bool{} - for _, l := range logs { if l.BlockNumber > latest { latest = l.BlockNumber } - uniqueBlocks[l.BlockNumber] = true } - - return latest, uniqueBlocks + return latest }