Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align log buffer to work with logs limits config #11781

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
)

var (
// maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block.
maxLogsPerUpkeepInBlock = 32
// maxLogsPerBlock is the maximum number of blocks in the buffer.
maxLogsPerBlock = 1024
// defaultFastExecLogsHigh is the default upper bound / maximum number of logs that Automation is committed to process for each upkeep,
// based on available capacity, i.e. if there are no logs from other upkeeps.
// Used by Log buffer to limit the number of logs we are saving in memory for each upkeep in a block
defaultFastExecLogsHigh = 32
// defaultNumOfLogUpkeeps is the default number of log upkeeps supported by the registry.
defaultNumOfLogUpkeeps = 50
)

// fetchedLog holds the log and the ID of the upkeep
Expand Down Expand Up @@ -143,20 +145,20 @@ type logEventBuffer struct {
// size is the number of blocks supported by the buffer
size int32

maxBlockLogs, maxUpkeepLogsPerBlock int
numOfLogUpkeeps, fastExecLogsHigh uint32
// blocks is the circular buffer of fetched blocks
blocks []fetchedBlock
// latestBlock is the latest block number seen
latestBlock int64
}

func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer {
func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
maxBlockLogs: maxBlockLogs,
maxUpkeepLogsPerBlock: maxUpkeepLogsPerBlock,
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
numOfLogUpkeeps: uint32(numOfLogUpkeeps),
fastExecLogsHigh: uint32(fastExecLogsHigh),
}
}

Expand All @@ -168,6 +170,11 @@ func (b *logEventBuffer) bufferSize() int {
return int(atomic.LoadInt32(&b.size))
}

func (b *logEventBuffer) SetLimits(numOfLogUpkeeps, fastExecLogsHigh int) {
atomic.StoreUint32(&b.numOfLogUpkeeps, uint32(numOfLogUpkeeps))
atomic.StoreUint32(&b.fastExecLogsHigh, uint32(fastExecLogsHigh))
}

// enqueue adds logs (if not exist) to the buffer, returning the number of logs added
// minus the number of logs dropped.
func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {
Expand All @@ -176,8 +183,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int {

lggr := b.lggr.With("id", id.String())

maxBlockLogs := b.maxBlockLogs
maxUpkeepLogs := b.maxUpkeepLogsPerBlock
maxBlockLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh) * atomic.LoadUint32(&b.numOfLogUpkeeps))
maxUpkeepLogs := int(atomic.LoadUint32(&b.fastExecLogsHigh))

latestBlock := b.latestBlockSeen()
added, dropped := 0, 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
})

t.Run("enqueue logs overflow", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 10)
buf := newLogEventBuffer(logger.TestLogger(t), 2, 2, 2)

require.Equal(t, 2, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
Expand All @@ -199,6 +199,50 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) {
buf.lock.Unlock()
})

t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2)

require.Equal(t, 2, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2},
))
buf.SetLimits(10, 3)
require.Equal(t, 3, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3},
))

buf.lock.Lock()
defer buf.lock.Unlock()
require.Equal(t, 2, len(buf.blocks[0].logs))
require.Equal(t, 3, len(buf.blocks[1].logs))
})

t.Run("enqueue logs overflow with dynamic limits", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 2, 10, 2)

require.Equal(t, 2, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 2},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 3},
))
buf.SetLimits(10, 3)
require.Equal(t, 3, buf.enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 1},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 2},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x21"), LogIndex: 3},
))

buf.lock.Lock()
defer buf.lock.Unlock()
require.Equal(t, 2, len(buf.blocks[0].logs))
})

t.Run("enqueue block overflow", func(t *testing.T) {
buf := newLogEventBuffer(logger.TestLogger(t), 3, 2, 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand Down
Loading