Skip to content

Commit

Permalink
Revert "reduce numOfLogUpkeeps"
Browse files Browse the repository at this point in the history
This reverts commit f422a54.
  • Loading branch information
amirylm committed Jan 18, 2024
1 parent 0cdfa09 commit 382fcaa
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var (
// based on available capacity, i.e. if there are no logs from other upkeeps.
// Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block
defaultFastExecLogsHigh = 32
// defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry.
defaultNumOfLogUpkeeps = 50
)

// fetchedLog holds the log and the ID of the upkeep
Expand Down Expand Up @@ -52,7 +54,7 @@ type fetchedBlock struct {
visited []fetchedLog
}

func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs int) (fetchedLog, bool) {
func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) {
has, upkeepLogs := b.has(fl.upkeepID, fl.log)
if has {
// Skipping known logs
Expand All @@ -77,6 +79,13 @@ func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxUpkeepLogs i
}
b.logs = currentLogs
return dropped, true
} else if len(b.logs)+len(b.visited) > maxBlockLogs {
// in case we have logs overflow in the buffer level, we drop a log based on
// shared, random (per block) order of the logs in the block.
b.Sort()
dropped := b.logs[0]
b.logs = b.logs[1:]
return dropped, true
}

return fetchedLog{}, true
Expand Down Expand Up @@ -136,18 +145,19 @@ type logEventBuffer struct {
// size is the number of blocks supported by the buffer
size int32

fastExecLogsHigh uint32
numOfLogUpkeeps, fastExecLogsHigh uint32
// blocks is the circular buffer of fetched blocks
blocks []fetchedBlock
// latestBlock is the latest block number seen
latestBlock int64
}

func newLogEventBuffer(lggr logger.Logger, size, fastExecLogsHigh int) *logEventBuffer {
func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
numOfLogUpkeeps: uint32(numOfLogUpkeeps),
fastExecLogsHigh: uint32(fastExecLogsHigh),
}
}
Expand All @@ -160,7 +170,8 @@ func (b *logEventBuffer) bufferSize() int {
return int(atomic.LoadInt32(&b.size))
}

func (b *logEventBuffer) SetLimits(fastExecLogsHigh int) {
func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) {
atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps))
atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh))
}

Expand All @@ -172,6 +183,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {

lggr := b.lggr.With("id", id.String())

maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps))
maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh))

latestBlock := b.latestBlockSeen()
Expand All @@ -194,7 +206,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber)
continue
}
droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxUpkeepLogs)
droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs)
if !ok {
// Skipping known logs
continue
Expand Down
Loading

0 comments on commit 382fcaa

Please sign in to comment.