Skip to content

Commit

Permalink
Align log buffer to work with logs limits config:
Browse files Browse the repository at this point in the history
- Used FastExecLogsHigh - limit the number of logs we save in a block for an upkeep
- Used FastExecLogsHigh * NumOfLogUpkeeps - limit the amount of logs we save in a block for all upkeeps
- Ensured these values can be set dynamically for the buffer, as the configs might change over time
  • Loading branch information
amirylm committed Jan 15, 2024
1 parent c32efca commit f6c05eb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
)

var (
// maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block.
maxLogsPerUpkeepInBlock = 32
// maxLogsPerBlock is the maximum number of blocks in the buffer.
maxLogsPerBlock = 1024
// defaultFastExecLogsHigh is the default upper bound / maximum number of logs that Automation is committed to process for each upkeep,
// based on available capacity, i.e. if there are no logs from other upkeeps.
// Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block
defaultFastExecLogsHigh = 10
// defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry.
defaultNumOfLogUpkeeps = 50
)

// fetchedLog holds the log and the ID of the upkeep
Expand Down Expand Up @@ -143,20 +145,20 @@ type logEventBuffer struct {
// size is the number of blocks supported by the buffer
size int32

maxBlockLogs, maxUpkeepLogsPerBlock int
numOfLogUpkeeps, fastExecLogsHigh uint32
// blocks is the circular buffer of fetched blocks
blocks []fetchedBlock
// latestBlock is the latest block number seen
latestBlock int64
}

func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer {
func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
maxBlockLogs: maxBlockLogs,
maxUpkeepLogsPerBlock: maxUpkeepLogsPerBlock,
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
numOfLogUpkeeps: uint32(numOfLogUpkeeps),
fastExecLogsHigh: uint32(fastExecLogsHigh),
}
}

Expand All @@ -168,6 +170,11 @@ func (b *logEventBuffer) bufferSize() int {
return int(atomic.LoadInt32(&b.size))
}

func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) {
atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps))
atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh))
}

// enqueue adds logs (if not exist) to the buffer, returning the number of logs added
// minus the number of logs dropped.
func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
Expand All @@ -176,8 +183,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {

lggr := b.lggr.With("id", id.String())

maxBlockLogs := b.maxBlockLogs
maxUpkeepLogs := b.maxUpkeepLogsPerBlock
maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps))
maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh))

latestBlock := b.latestBlockSeen()
added, dropped := 0, 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand Down

0 comments on commit f6c05eb

Please sign in to comment.