diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index dbd131af919..bd7d2b66570 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -283,7 +283,7 @@ func (ub *upkeepLogBuffer) enqueue(blockThreshold int64, logsToAdd ...logpoller. ub.q = logs dropped := ub.clean(blockThreshold) - ub.lggr.Debugf("Enqueued %d logs, dropped %d", added, dropped) + ub.lggr.Debugf("Enqueued %d logs, dropped %d with blockThreshold %d", added, dropped, blockThreshold) prommetrics.AutomationLogsInLogBuffer.Add(float64(added)) return added, dropped @@ -318,6 +318,9 @@ func (ub *upkeepLogBuffer) clean(blockThreshold int64) int { delete(ub.visited, logid) } } + + ub.lggr.Debugw("Cleaned logs", "dropped", dropped, "blockThreshold", blockThreshold, "len updated", len(updated), "len ub.q", len(ub.q), "maxLogs", maxLogs) + ub.q = updated for lid, block := range ub.visited { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index ac6a8b83548..1b63a3ece45 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -68,6 +68,40 @@ func TestLogEventBufferV2_EnqueueDequeue(t *testing.T) { require.Equal(t, 2, upkeepBuf.size()) }) + t.Run("enqueue upkeeps limits", func(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), 3, 2) + + added, dropped := buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 9, TxHash: common.HexToHash("0x9"), LogIndex: 0}, + logpoller.Log{BlockNumber: 9, TxHash: common.HexToHash("0x9"), LogIndex: 1}, + logpoller.Log{BlockNumber: 10, TxHash: common.HexToHash("0x10"), LogIndex: 0}, + logpoller.Log{BlockNumber: 10, TxHash: common.HexToHash("0x10"), LogIndex: 1}, + logpoller.Log{BlockNumber: 11, TxHash: common.HexToHash("0x11"), LogIndex: 1}, + logpoller.Log{BlockNumber: 11, TxHash: common.HexToHash("0x11"), LogIndex: 2}, + logpoller.Log{BlockNumber: 11, TxHash: common.HexToHash("0x11"), LogIndex: 3}, + ) + require.Equal(t, 7, added) + require.Equal(t, 1, dropped) + upkeepBuf, ok := buf.(*logBuffer).getUpkeepBuffer(big.NewInt(1)) + require.True(t, ok) + require.Equal(t, 6, upkeepBuf.size()) + }) + + t.Run("enqueue out of block range", func(t *testing.T) { + buf := NewLogBuffer(logger.TestLogger(t), 5, 4) + + added, dropped := buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x10"), LogIndex: 0}, + logpoller.Log{BlockNumber: 10, TxHash: common.HexToHash("0x10"), LogIndex: 1}, + logpoller.Log{BlockNumber: 11, TxHash: common.HexToHash("0x11"), LogIndex: 1}, + ) + require.Equal(t, 2, added) + require.Equal(t, 0, dropped) + upkeepBuf, ok := buf.(*logBuffer).getUpkeepBuffer(big.NewInt(1)) + require.True(t, ok) + require.Equal(t, 2, upkeepBuf.size()) + }) + t.Run("enqueue dequeue", func(t *testing.T) { buf := NewLogBuffer(logger.TestLogger(t), 10, 10)