Skip to content

Commit

Permalink
config and handle remaining
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Mar 14, 2024
1 parent 418db92 commit 39f9766
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ type LogTriggersOptions struct {
// Finality depth is the number of blocks to wait before considering a block final.
FinalityDepth int64

// v1 config

BufferVersion string

LogLimitLow int32

LogLimitHigh int32

BlockRate int64
}

func NewOptions(finalityDepth int64) LogTriggersOptions {
Expand Down Expand Up @@ -72,4 +80,13 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) {
if o.FinalityDepth == 0 {
o.FinalityDepth = finalityDepth
}
if o.BlockRate == 0 {
o.BlockRate = 1
}
if o.LogLimitLow == 0 {
o.LogLimitLow = 5
}
if o.LogLimitHigh == 0 {
o.LogLimitHigh = o.LogLimitLow * 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
bufferV1: NewLogBuffer(lggr, int(opts.LookbackBlocks), defaultLogLimitHigh),
bufferV1: NewLogBuffer(lggr, int(opts.LookbackBlocks), int(opts.LogLimitHigh)),
poller: poller,
opts: opts,
filterStore: filterStore,
Expand Down Expand Up @@ -169,7 +169,7 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers
payloads := p.getPayloadsFromBuffer(latest.BlockNumber)

if len(payloads) > 0 {
p.lggr.Debugw("Fetched payloads from buffer xxx", "latestBlock", latest.BlockNumber, "payloads", len(payloads))
p.lggr.Debugw("Fetched payloads from buffer", "latestBlock", latest.BlockNumber, "payloads", len(payloads))
}

return payloads, nil
Expand Down Expand Up @@ -200,18 +200,23 @@ func (p *logEventProvider) getPayloadsFromBuffer(latestBlock int64) []ocr2keeper

switch p.opts.BufferVersion {
case "v1":
blockRate, upkeepLowLimit, maxResults := 4, 6, MaxPayloads // TODO: use config
for len(payloads) < maxResults && start < latestBlock {
logs, _ := p.bufferV1.Dequeue(start, blockRate, upkeepLowLimit, maxResults-len(payloads), DefaultUpkeepSelector)
blockRate, logLimitLow, maxResults := int(p.opts.BlockRate), int(p.opts.LogLimitLow), MaxPayloads
for len(payloads) < maxResults && start <= latestBlock {
logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs xxx", "start", start, "latestBlock", latestBlock, "logs", len(logs))
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
}
}
if remaining > 0 {
p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining)
// TODO: handle remaining logs in a better way than consuming the entire window
continue
}
start += int64(blockRate)
}
default:
Expand Down

0 comments on commit 39f9766

Please sign in to comment.