diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index afdb882cee5..9f11a1fca01 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -15,10 +15,12 @@ import ( ) var ( - // maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block. - maxLogsPerUpkeepInBlock = 32 - // maxLogsPerBlock is the maximum number of blocks in the buffer. - maxLogsPerBlock = 1024 + // defaultFastExecLogsHigh is the default upper bound / maximum number of logs that Automation is committed to process for each upkeep, + // based on available capacity, i.e. if there are no logs from other upkeeps. + // Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block + defaultFastExecLogsHigh = 32 + // defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry. + defaultNumOfLogUpkeeps = 50 ) // fetchedLog holds the log and the ID of the upkeep @@ -143,20 +145,20 @@ type logEventBuffer struct { // size is the number of blocks supported by the buffer size int32 - maxBlockLogs, maxUpkeepLogsPerBlock int + numOfLogUpkeeps, fastExecLogsHigh uint32 // blocks is the circular buffer of fetched blocks blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 } -func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer { +func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { return &logEventBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), - size: int32(size), - blocks: make([]fetchedBlock, size), - maxBlockLogs: maxBlockLogs, - maxUpkeepLogsPerBlock: maxUpkeepLogsPerBlock, + lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), + size: int32(size), + blocks: make([]fetchedBlock, size), + numOfLogUpkeeps: uint32(numOfLogUpkeeps), + fastExecLogsHigh: uint32(fastExecLogsHigh), } } @@ -168,6 +170,11 @@ func (b *logEventBuffer) bufferSize() int { return int(atomic.LoadInt32(&b.size)) } +func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) { + atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps)) + atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh)) +} + // enqueue adds logs (if not exist) to the buffer, returning the number of logs added // minus the number of logs dropped. func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { @@ -176,8 +183,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr := b.lggr.With("id", id.String()) - maxBlockLogs := b.maxBlockLogs - maxUpkeepLogs := b.maxUpkeepLogsPerBlock + maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps)) + maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh)) latestBlock := b.latestBlockSeen() added, dropped := 0, 0 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go index c90a0f34a1b..dca43ca14ac 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go @@ -187,7 +187,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue logs overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 2) require.Equal(t, 2, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -199,6 +199,50 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { buf.lock.Unlock() }) + t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + )) + buf.SetLimits(10, 3) + require.Equal(t, 3, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3}, + )) + + buf.lock.Lock() + defer buf.lock.Unlock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + require.Equal(t, 3, len(buf.blocks[1].logs)) + }) + + t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, + )) + buf.SetLimits(10, 3) + require.Equal(t, 3, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3}, + )) + + buf.lock.Lock() + defer buf.lock.Unlock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + }) + t.Run("enqueue block overflow", func(t *testing.T) { buf := newLogEventBuffer(logger.TestLogger(t), 3, 2, 10) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 4840fa10fa1..d1360faaf6d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), poller: poller, opts: opts, filterStore: filterStore,