From f6c05eb8133b91f1673434aaec2e6f8ce946e0a0 Mon Sep 17 00:00:00 2001 From: amirylm Date: Mon, 15 Jan 2024 20:33:25 +0200 Subject: [PATCH 1/6] Align log buffer to work with logs limits config: - Used FastExecLogsHigh - limit the number of logs we save in a block for an upkeep - Used FastExecLogsHigh * NumOfLogUpkeeps - limit the amount of logs we save in a block for all upkeeps - Ensured these values can be set dynamically for the buffer, as the configs might change over time --- .../evmregistry/v21/logprovider/buffer.go | 33 +++++++++++-------- .../evmregistry/v21/logprovider/provider.go | 2 +- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index f2c58fd30c1..4bca2bfbf9e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -15,10 +15,12 @@ import ( ) var ( - // maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block. - maxLogsPerUpkeepInBlock = 32 - // maxLogsPerBlock is the maximum number of blocks in the buffer. - maxLogsPerBlock = 1024 + // defaultFastExecLogsHigh is the default upper bound / maximum number of logs that Automation is committed to process for each upkeep, + // based on available capacity, i.e. if there are no logs from other upkeeps. + // Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block + defaultFastExecLogsHigh = 10 + // defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry. + defaultNumOfLogUpkeeps = 50 ) // fetchedLog holds the log and the ID of the upkeep @@ -143,20 +145,20 @@ type logEventBuffer struct { // size is the number of blocks supported by the buffer size int32 - maxBlockLogs, maxUpkeepLogsPerBlock int + numOfLogUpkeeps, fastExecLogsHigh uint32 // blocks is the circular buffer of fetched blocks blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 } -func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer { +func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { return &logEventBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), - size: int32(size), - blocks: make([]fetchedBlock, size), - maxBlockLogs: maxBlockLogs, - maxUpkeepLogsPerBlock: maxUpkeepLogsPerBlock, + lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), + size: int32(size), + blocks: make([]fetchedBlock, size), + numOfLogUpkeeps: uint32(numOfLogUpkeeps), + fastExecLogsHigh: uint32(fastExecLogsHigh), } } @@ -168,6 +170,11 @@ func (b *logEventBuffer) bufferSize() int { return int(atomic.LoadInt32(&b.size)) } +func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) { + atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps)) + atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh)) +} + // enqueue adds logs (if not exist) to the buffer, returning the number of logs added // minus the number of logs dropped. func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { @@ -176,8 +183,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr := b.lggr.With("id", id.String()) - maxBlockLogs := b.maxBlockLogs - maxUpkeepLogs := b.maxUpkeepLogsPerBlock + maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps)) + maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh)) latestBlock := b.latestBlockSeen() added, dropped := 0, 0 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 2dabcc82671..71db451f759 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), poller: poller, opts: opts, filterStore: filterStore, From f422a54af75bd112b9a21838a78d881778446385 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 16 Jan 2024 11:06:49 +0200 Subject: [PATCH 2/6] reduce numOfLogUpkeeps --- .../evmregistry/v21/logprovider/buffer.go | 22 +--- .../v21/logprovider/buffer_test.go | 104 +++++++----------- .../evmregistry/v21/logprovider/provider.go | 2 +- 3 files changed, 47 insertions(+), 81 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 4bca2bfbf9e..6e8fe299653 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -19,8 +19,6 @@ var ( // based on available capacity, i.e. if there are no logs from other upkeeps. // Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block defaultFastExecLogsHigh = 10 - // defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry. - defaultNumOfLogUpkeeps = 50 ) // fetchedLog holds the log and the ID of the upkeep @@ -54,7 +52,7 @@ type fetchedBlock struct { visited []fetchedLog } -func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { +func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs int) (fetchedLog, bool) { has, upkeepLogs := b.has(fl.upkeepID, fl.log) if has { // Skipping known logs @@ -79,13 +77,6 @@ func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, m } b.logs = currentLogs return dropped, true - } else if len(b.logs)+len(b.visited) > maxBlockLogs { - // in case we have logs overflow in the buffer level, we drop a log based on - // shared, random (per block) order of the logs in the block. - b.Sort() - dropped := b.logs[0] - b.logs = b.logs[1:] - return dropped, true } return fetchedLog{}, true @@ -145,19 +136,18 @@ type logEventBuffer struct { // size is the number of blocks supported by the buffer size int32 - numOfLogUpkeeps, fastExecLogsHigh uint32 + fastExecLogsHigh uint32 // blocks is the circular buffer of fetched blocks blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 } -func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { +func newLogEventBuffer(lggr logger.Logger, size, fastExecLogsHigh int) *logEventBuffer { return &logEventBuffer{ lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), size: int32(size), blocks: make([]fetchedBlock, size), - numOfLogUpkeeps: uint32(numOfLogUpkeeps), fastExecLogsHigh: uint32(fastExecLogsHigh), } } @@ -170,8 +160,7 @@ func (b *logEventBuffer) bufferSize() int { return int(atomic.LoadInt32(&b.size)) } -func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) { - atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps)) +func (b *logEventBuffer) SetLimits(fastExecLogsHigh int) { atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh)) } @@ -183,7 +172,6 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr := b.lggr.With("id", id.String()) - maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps)) maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh)) latestBlock := b.latestBlockSeen() @@ -206,7 +194,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber) continue } - droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs) + droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxUpkeepLogs) if !ok { // Skipping known logs continue diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go index 046c93a428a..acd779fd98c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go @@ -19,7 +19,7 @@ import ( func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { size := 3 maxSeenBlock := int64(4) - buf := newLogEventBuffer(logger.TestLogger(t), size, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), size, 10) buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -104,7 +104,7 @@ func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { func TestLogEventBuffer_GetBlocksInRange_Circular(t *testing.T) { size := 4 - buf := newLogEventBuffer(logger.TestLogger(t), size, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), size, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -164,7 +164,7 @@ func TestLogEventBuffer_GetBlocksInRange_Circular(t *testing.T) { func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { t.Run("dequeue empty", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) results := buf.peekRange(int64(1), int64(2)) require.Equal(t, 0, len(results)) @@ -173,32 +173,33 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, ) buf.lock.Lock() + defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) - buf.lock.Unlock() }) t.Run("enqueue logs overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 2, 2) require.Equal(t, 2, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, )) buf.lock.Lock() + defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) - buf.lock.Unlock() }) t.Run("enqueue block overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 2, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) require.Equal(t, 5, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -208,26 +209,12 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 1}, )) buf.lock.Lock() + defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) - buf.lock.Unlock() - }) - - t.Run("enqueue upkeep block overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 10, 10, 2) - - require.Equal(t, 2, buf.enqueue(big.NewInt(1), - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, - )) - buf.lock.Lock() - require.Equal(t, 2, len(buf.blocks[0].logs)) - buf.lock.Unlock() }) t.Run("peek range after dequeue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -247,7 +234,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue peek and dequeue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 4, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 4, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -263,14 +250,14 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { removed := buf.dequeueRange(1, 3, 5, 5) require.Equal(t, 4, len(removed)) buf.lock.Lock() + defer buf.lock.Unlock() require.Equal(t, 0, len(buf.blocks[0].logs)) require.Equal(t, int64(2), buf.blocks[1].blockNumber) require.Equal(t, 1, len(buf.blocks[1].visited)) - buf.lock.Unlock() }) t.Run("enqueue and peek range circular", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -291,7 +278,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("doesnt enqueue old blocks", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -308,7 +295,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("dequeue with limits returns latest block logs", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -332,7 +319,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("dequeue doesn't return same logs again", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -351,9 +338,9 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { type appendArgs struct { - fl fetchedLog - maxBlockLogs, maxUpkeepLogs int - added, dropped bool + fl fetchedLog + fastExecLogsHigh int + added, dropped bool } tests := []struct { @@ -380,9 +367,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 10, - maxUpkeepLogs: 2, - added: true, + fastExecLogsHigh: 2, + added: true, }, }, expected: []fetchedLog{ @@ -420,9 +406,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 10, - maxUpkeepLogs: 2, - added: false, + fastExecLogsHigh: 2, + added: false, }, }, expected: []fetchedLog{ @@ -460,9 +445,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 10, - maxUpkeepLogs: 2, - added: false, + fastExecLogsHigh: 2, + added: false, }, }, expected: []fetchedLog{}, @@ -482,9 +466,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 10, - maxUpkeepLogs: 2, - added: true, + fastExecLogsHigh: 2, + added: true, }, { fl: fetchedLog{ @@ -495,9 +478,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 10, - maxUpkeepLogs: 2, - added: true, + fastExecLogsHigh: 2, + added: true, }, { fl: fetchedLog{ @@ -508,10 +490,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 10, - maxUpkeepLogs: 2, - added: true, - dropped: true, + fastExecLogsHigh: 2, + added: true, + dropped: true, }, }, expected: []fetchedLog{ @@ -548,9 +529,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 2, - maxUpkeepLogs: 4, - added: true, + fastExecLogsHigh: 4, + added: true, }, { fl: fetchedLog{ @@ -561,9 +541,8 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 2, - maxUpkeepLogs: 4, - added: true, + fastExecLogsHigh: 4, + added: true, }, { fl: fetchedLog{ @@ -574,10 +553,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - maxBlockLogs: 2, - maxUpkeepLogs: 4, - added: true, - dropped: true, + fastExecLogsHigh: 4, + added: true, + dropped: true, }, }, expected: []fetchedLog{ @@ -613,7 +591,7 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { copy(b.visited, tc.visited) for _, args := range tc.toAdd { - dropped, added := b.Append(lggr, args.fl, args.maxBlockLogs, args.maxUpkeepLogs) + dropped, added := b.Append(lggr, args.fl, args.fastExecLogsHigh) require.Equal(t, args.added, added) if args.dropped { require.NotNil(t, dropped.upkeepID) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 71db451f759..967216927a8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultFastExecLogsHigh), poller: poller, opts: opts, filterStore: filterStore, From 13388728fc8f3b28aa2f7b35370c00ad3798c9ee Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 16 Jan 2024 11:13:45 +0200 Subject: [PATCH 3/6] added test for dynamic limits --- .../v21/logprovider/buffer_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go index acd779fd98c..87b313d853e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go @@ -198,6 +198,29 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { require.Equal(t, 2, len(buf.blocks[0].logs)) }) + t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 2, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, + )) + buf.SetLimits(3) + require.Equal(t, 3, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3}, + )) + + buf.lock.Lock() + defer buf.lock.Unlock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + require.Equal(t, 3, len(buf.blocks[1].logs)) + }) + t.Run("enqueue block overflow", func(t *testing.T) { buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) From ca65be124099092c55301065084e97df422e58c6 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 16 Jan 2024 11:58:53 +0200 Subject: [PATCH 4/6] remove redundant test --- .../v21/logprovider/buffer_test.go | 63 ------------------- 1 file changed, 63 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go index 87b313d853e..8b53a4adcd6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go @@ -537,69 +537,6 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, }, }, - { - name: "block log limits", - blockNumber: 1, - logs: []fetchedLog{}, - visited: []fetchedLog{}, - toAdd: []appendArgs{ - { - fl: fetchedLog{ - log: logpoller.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 0, - }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), - }, - fastExecLogsHigh: 4, - added: true, - }, - { - fl: fetchedLog{ - log: logpoller.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 1, - }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), - }, - fastExecLogsHigh: 4, - added: true, - }, - { - fl: fetchedLog{ - log: logpoller.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 2, - }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), - }, - fastExecLogsHigh: 4, - added: true, - dropped: true, - }, - }, - expected: []fetchedLog{ - { - log: logpoller.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 1, - }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), - }, - { - log: logpoller.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 2, - }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), - }, - }, - }, } for _, tc := range tests { From 0cdfa09a0dd3be103d9938032198b900c44a4a7f Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 18 Jan 2024 18:40:34 +0200 Subject: [PATCH 5/6] increase default FastExecLogsHigh (32 instead of 10) --- .../plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 6e8fe299653..af88cd85e8e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -18,7 +18,7 @@ var ( // defaultFastExecLogsHigh is the default upper bound / maximum number of logs that Automation is committed to process for each upkeep, // based on available capacity, i.e. if there are no logs from other upkeeps. // Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block - defaultFastExecLogsHigh = 10 + defaultFastExecLogsHigh = 32 ) // fetchedLog holds the log and the ID of the upkeep From 382fcaaf03bb550f4a1eddf827547e5f92438500 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 18 Jan 2024 18:48:12 +0200 Subject: [PATCH 6/6] Revert "reduce numOfLogUpkeeps" This reverts commit f422a54af75bd112b9a21838a78d881778446385. --- .../evmregistry/v21/logprovider/buffer.go | 22 ++- .../v21/logprovider/buffer_test.go | 178 ++++++++++++++---- .../evmregistry/v21/logprovider/provider.go | 2 +- 3 files changed, 160 insertions(+), 42 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index af88cd85e8e..8a97763151d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -19,6 +19,8 @@ var ( // based on available capacity, i.e. if there are no logs from other upkeeps. // Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block defaultFastExecLogsHigh = 32 + // defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry. + defaultNumOfLogUpkeeps = 50 ) // fetchedLog holds the log and the ID of the upkeep @@ -52,7 +54,7 @@ type fetchedBlock struct { visited []fetchedLog } -func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs int) (fetchedLog, bool) { +func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { has, upkeepLogs := b.has(fl.upkeepID, fl.log) if has { // Skipping known logs @@ -77,6 +79,13 @@ func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs i } b.logs = currentLogs return dropped, true + } else if len(b.logs)+len(b.visited) > maxBlockLogs { + // in case we have logs overflow in the buffer level, we drop a log based on + // shared, random (per block) order of the logs in the block. + b.Sort() + dropped := b.logs[0] + b.logs = b.logs[1:] + return dropped, true } return fetchedLog{}, true @@ -136,18 +145,19 @@ type logEventBuffer struct { // size is the number of blocks supported by the buffer size int32 - fastExecLogsHigh uint32 + numOfLogUpkeeps, fastExecLogsHigh uint32 // blocks is the circular buffer of fetched blocks blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 } -func newLogEventBuffer(lggr logger.Logger, size, fastExecLogsHigh int) *logEventBuffer { +func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { return &logEventBuffer{ lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), size: int32(size), blocks: make([]fetchedBlock, size), + numOfLogUpkeeps: uint32(numOfLogUpkeeps), fastExecLogsHigh: uint32(fastExecLogsHigh), } } @@ -160,7 +170,8 @@ func (b *logEventBuffer) bufferSize() int { return int(atomic.LoadInt32(&b.size)) } -func (b *logEventBuffer) SetLimits(fastExecLogsHigh int) { +func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) { + atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps)) atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh)) } @@ -172,6 +183,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr := b.lggr.With("id", id.String()) + maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps)) maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh)) latestBlock := b.latestBlockSeen() @@ -194,7 +206,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber) continue } - droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxUpkeepLogs) + droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs) if !ok { // Skipping known logs continue diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go index 8b53a4adcd6..2b2e2e09846 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go @@ -19,7 +19,7 @@ import ( func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { size := 3 maxSeenBlock := int64(4) - buf := newLogEventBuffer(logger.TestLogger(t), size, 10) + buf := newLogEventBuffer(logger.TestLogger(t), size, 10, 10) buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -104,7 +104,7 @@ func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { func TestLogEventBuffer_GetBlocksInRange_Circular(t *testing.T) { size := 4 - buf := newLogEventBuffer(logger.TestLogger(t), size, 10) + buf := newLogEventBuffer(logger.TestLogger(t), size, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -164,7 +164,7 @@ func TestLogEventBuffer_GetBlocksInRange_Circular(t *testing.T) { func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { t.Run("dequeue empty", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) results := buf.peekRange(int64(1), int64(2)) require.Equal(t, 0, len(results)) @@ -173,33 +173,54 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, ) buf.lock.Lock() - defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() }) t.Run("enqueue logs overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 2, 2) + buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 2) require.Equal(t, 2, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, )) + buf.lock.Lock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() + }) + + t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + )) + buf.SetLimits(10, 3) + require.Equal(t, 3, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3}, + )) + buf.lock.Lock() defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) + require.Equal(t, 3, len(buf.blocks[1].logs)) }) t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 2, 2) + buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2) require.Equal(t, 2, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -207,7 +228,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, )) - buf.SetLimits(3) + buf.SetLimits(10, 3) require.Equal(t, 3, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, @@ -218,11 +239,10 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { buf.lock.Lock() defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) - require.Equal(t, 3, len(buf.blocks[1].logs)) }) t.Run("enqueue block overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 2, 10) require.Equal(t, 5, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -232,12 +252,26 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 1}, )) buf.lock.Lock() - defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() + }) + + t.Run("enqueue upkeep block overflow", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 10, 10, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, + )) + buf.lock.Lock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() }) t.Run("peek range after dequeue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -257,7 +291,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue peek and dequeue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 4, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 4, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -273,14 +307,14 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { removed := buf.dequeueRange(1, 3, 5, 5) require.Equal(t, 4, len(removed)) buf.lock.Lock() - defer buf.lock.Unlock() require.Equal(t, 0, len(buf.blocks[0].logs)) require.Equal(t, int64(2), buf.blocks[1].blockNumber) require.Equal(t, 1, len(buf.blocks[1].visited)) + buf.lock.Unlock() }) t.Run("enqueue and peek range circular", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -301,7 +335,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("doesnt enqueue old blocks", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -318,7 +352,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("dequeue with limits returns latest block logs", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -342,7 +376,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("dequeue doesn't return same logs again", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -361,9 +395,9 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { type appendArgs struct { - fl fetchedLog - fastExecLogsHigh int - added, dropped bool + fl fetchedLog + maxBlockLogs, maxUpkeepLogs int + added, dropped bool } tests := []struct { @@ -390,8 +424,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, }, }, expected: []fetchedLog{ @@ -429,8 +464,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: false, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, }, }, expected: []fetchedLog{ @@ -468,8 +504,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: false, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, }, }, expected: []fetchedLog{}, @@ -489,8 +526,75 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + dropped: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "block log limits", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, }, { fl: fetchedLog{ @@ -501,8 +605,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, }, { fl: fetchedLog{ @@ -513,9 +618,10 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, - dropped: true, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + dropped: true, }, }, expected: []fetchedLog{ @@ -551,7 +657,7 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { copy(b.visited, tc.visited) for _, args := range tc.toAdd { - dropped, added := b.Append(lggr, args.fl, args.fastExecLogsHigh) + dropped, added := b.Append(lggr, args.fl, args.maxBlockLogs, args.maxUpkeepLogs) require.Equal(t, args.added, added) if args.dropped { require.NotNil(t, dropped.upkeepID) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 967216927a8..71db451f759 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultFastExecLogsHigh), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), poller: poller, opts: opts, filterStore: filterStore,