Skip to content

Commit

Permalink
Align log buffer to work with logs limits config (#11781)
Browse files Browse the repository at this point in the history
* Align log buffer to work with logs limits config:

- Used FastExecLogsHigh - limit the number of logs we save in a block for an upkeep
- Used FastExecLogsHigh * NumOfLogUpkeeps - limit the amount of logs we save in a block for all upkeeps
- Ensured these values can be set dynamically for the buffer, as the configs might change over time

* reduce numOfLogUpkeeps

* added test for dynamic limits

* remove redundant test

* increase default FastExecLogsHigh (32 instead of 10)

* Revert "reduce numOfLogUpkeeps"

This reverts commit f422a54.
  • Loading branch information
amirylm authored Jan 19, 2024
1 parent 9c22655 commit d805141
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
)

var (
// maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block.
maxLogsPerUpkeepInBlock = 32
// maxLogsPerBlock is the maximum number of blocks in the buffer.
maxLogsPerBlock = 1024
// defaultFastExecLogsHigh is the default upper bound / maximum number of logs that Automation is committed to process for each upkeep,
// based on available capacity, i.e. if there are no logs from other upkeeps.
// Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block
defaultFastExecLogsHigh = 32
// defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry.
defaultNumOfLogUpkeeps = 50
)

// fetchedLog holds the log and the ID of the upkeep
Expand Down Expand Up @@ -143,20 +145,20 @@ type logEventBuffer struct {
// size is the number of blocks supported by the buffer
size int32

maxBlockLogs, maxUpkeepLogsPerBlock int
numOfLogUpkeeps, fastExecLogsHigh uint32
// blocks is the circular buffer of fetched blocks
blocks []fetchedBlock
// latestBlock is the latest block number seen
latestBlock int64
}

func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer {
func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
maxBlockLogs: maxBlockLogs,
maxUpkeepLogsPerBlock: maxUpkeepLogsPerBlock,
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
numOfLogUpkeeps: uint32(numOfLogUpkeeps),
fastExecLogsHigh: uint32(fastExecLogsHigh),
}
}

Expand All @@ -168,6 +170,11 @@ func (b *logEventBuffer) bufferSize() int {
return int(atomic.LoadInt32(&b.size))
}

func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) {
atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps))
atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh))
}

// enqueue adds logs (if not exist) to the buffer, returning the number of logs added
// minus the number of logs dropped.
func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
Expand All @@ -176,8 +183,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {

lggr := b.lggr.With("id", id.String())

maxBlockLogs := b.maxBlockLogs
maxUpkeepLogs := b.maxUpkeepLogsPerBlock
maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps))
maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh))

latestBlock := b.latestBlockSeen()
added, dropped := 0, 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
})

t.Run("enqueue logs overflow", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 10)
buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 2)

require.Equal(t, 2, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
Expand All @@ -199,6 +199,50 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
buf.lock.Unlock()
})

t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2)

require.Equal(t, 2, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2},
))
buf.SetLimits(10, 3)
require.Equal(t, 3, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3},
))

buf.lock.Lock()
defer buf.lock.Unlock()
require.Equal(t, 2, len(buf.blocks[0].logs))
require.Equal(t, 3, len(buf.blocks[1].logs))
})

t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2)

require.Equal(t, 2, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3},
))
buf.SetLimits(10, 3)
require.Equal(t, 3, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3},
))

buf.lock.Lock()
defer buf.lock.Unlock()
require.Equal(t, 2, len(buf.blocks[0].logs))
})

t.Run("enqueue block overflow", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 3, 2, 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand Down

0 comments on commit d805141

Please sign in to comment.