diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index af88cd85e8e..8a97763151d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -19,6 +19,8 @@ var ( // based on available capacity, i.e. if there are no logs from other upkeeps. // Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block defaultFastExecLogsHigh = 32 + // defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry. + defaultNumOfLogUpkeeps = 50 ) // fetchedLog holds the log and the ID of the upkeep @@ -52,7 +54,7 @@ type fetchedBlock struct { visited []fetchedLog } -func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs int) (fetchedLog, bool) { +func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { has, upkeepLogs := b.has(fl.upkeepID, fl.log) if has { // Skipping known logs @@ -77,6 +79,13 @@ func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs i } b.logs = currentLogs return dropped, true + } else if len(b.logs)+len(b.visited) > maxBlockLogs { + // in case we have logs overflow in the buffer level, we drop a log based on + // shared, random (per block) order of the logs in the block. + b.Sort() + dropped := b.logs[0] + b.logs = b.logs[1:] + return dropped, true } return fetchedLog{}, true @@ -136,18 +145,19 @@ type logEventBuffer struct { // size is the number of blocks supported by the buffer size int32 - fastExecLogsHigh uint32 + numOfLogUpkeeps, fastExecLogsHigh uint32 // blocks is the circular buffer of fetched blocks blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 } -func newLogEventBuffer(lggr logger.Logger, size, fastExecLogsHigh int) *logEventBuffer { +func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { return &logEventBuffer{ lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), size: int32(size), blocks: make([]fetchedBlock, size), + numOfLogUpkeeps: uint32(numOfLogUpkeeps), fastExecLogsHigh: uint32(fastExecLogsHigh), } } @@ -160,7 +170,8 @@ func (b *logEventBuffer) bufferSize() int { return int(atomic.LoadInt32(&b.size)) } -func (b *logEventBuffer) SetLimits(fastExecLogsHigh int) { +func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) { + atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps)) atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh)) } @@ -172,6 +183,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr := b.lggr.With("id", id.String()) + maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps)) maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh)) latestBlock := b.latestBlockSeen() @@ -194,7 +206,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber) continue } - droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxUpkeepLogs) + droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs) if !ok { // Skipping known logs continue diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go index 8b53a4adcd6..2b2e2e09846 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_test.go @@ -19,7 +19,7 @@ import ( func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { size := 3 maxSeenBlock := int64(4) - buf := newLogEventBuffer(logger.TestLogger(t), size, 10) + buf := newLogEventBuffer(logger.TestLogger(t), size, 10, 10) buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -104,7 +104,7 @@ func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { func TestLogEventBuffer_GetBlocksInRange_Circular(t *testing.T) { size := 4 - buf := newLogEventBuffer(logger.TestLogger(t), size, 10) + buf := newLogEventBuffer(logger.TestLogger(t), size, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -164,7 +164,7 @@ func TestLogEventBuffer_GetBlocksInRange_Circular(t *testing.T) { func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { t.Run("dequeue empty", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) results := buf.peekRange(int64(1), int64(2)) require.Equal(t, 0, len(results)) @@ -173,33 +173,54 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, ) buf.lock.Lock() - defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() }) t.Run("enqueue logs overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 2, 2) + buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 2) require.Equal(t, 2, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, - logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, )) + buf.lock.Lock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() + }) + + t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + )) + buf.SetLimits(10, 3) + require.Equal(t, 3, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3}, + )) + buf.lock.Lock() defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) + require.Equal(t, 3, len(buf.blocks[1].logs)) }) t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 2, 2) + buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2) require.Equal(t, 2, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -207,7 +228,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, )) - buf.SetLimits(3) + buf.SetLimits(10, 3) require.Equal(t, 3, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1}, @@ -218,11 +239,10 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { buf.lock.Lock() defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) - require.Equal(t, 3, len(buf.blocks[1].logs)) }) t.Run("enqueue block overflow", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 2, 10) require.Equal(t, 5, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -232,12 +252,26 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 1}, )) buf.lock.Lock() - defer buf.lock.Unlock() require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() + }) + + t.Run("enqueue upkeep block overflow", func(t *testing.T) { + buf := newLogEventBuffer(logger.TestLogger(t), 10, 10, 2) + + require.Equal(t, 2, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3}, + )) + buf.lock.Lock() + require.Equal(t, 2, len(buf.blocks[0].logs)) + buf.lock.Unlock() }) t.Run("peek range after dequeue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -257,7 +291,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("enqueue peek and dequeue", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 4, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 4, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -273,14 +307,14 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { removed := buf.dequeueRange(1, 3, 5, 5) require.Equal(t, 4, len(removed)) buf.lock.Lock() - defer buf.lock.Unlock() require.Equal(t, 0, len(buf.blocks[0].logs)) require.Equal(t, int64(2), buf.blocks[1].blockNumber) require.Equal(t, 1, len(buf.blocks[1].visited)) + buf.lock.Unlock() }) t.Run("enqueue and peek range circular", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 10, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, @@ -301,7 +335,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("doesnt enqueue old blocks", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) require.Equal(t, buf.enqueue(big.NewInt(10), logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x1"), LogIndex: 10}, @@ -318,7 +352,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("dequeue with limits returns latest block logs", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -342,7 +376,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) t.Run("dequeue doesn't return same logs again", func(t *testing.T) { - buf := newLogEventBuffer(logger.TestLogger(t), 3, 10) + buf := newLogEventBuffer(logger.TestLogger(t), 3, 5, 10) require.Equal(t, buf.enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, @@ -361,9 +395,9 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { type appendArgs struct { - fl fetchedLog - fastExecLogsHigh int - added, dropped bool + fl fetchedLog + maxBlockLogs, maxUpkeepLogs int + added, dropped bool } tests := []struct { @@ -390,8 +424,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, }, }, expected: []fetchedLog{ @@ -429,8 +464,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: false, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, }, }, expected: []fetchedLog{ @@ -468,8 +504,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: false, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, }, }, expected: []fetchedLog{}, @@ -489,8 +526,75 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + dropped: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "block log limits", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, }, { fl: fetchedLog{ @@ -501,8 +605,9 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, }, { fl: fetchedLog{ @@ -513,9 +618,10 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, - fastExecLogsHigh: 2, - added: true, - dropped: true, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + dropped: true, }, }, expected: []fetchedLog{ @@ -551,7 +657,7 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { copy(b.visited, tc.visited) for _, args := range tc.toAdd { - dropped, added := b.Append(lggr, args.fl, args.fastExecLogsHigh) + dropped, added := b.Append(lggr, args.fl, args.maxBlockLogs, args.maxUpkeepLogs) require.Equal(t, args.added, added) if args.dropped { require.NotNil(t, dropped.upkeepID) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 967216927a8..71db451f759 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa threadCtrl: utils.NewThreadControl(), lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultFastExecLogsHigh), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), poller: poller, opts: opts, filterStore: filterStore,