From 39f97669455c475a7fa93ba239688f8de3e0a2f8 Mon Sep 17 00:00:00 2001 From: amirylm Date: Thu, 14 Mar 2024 11:54:45 +0200 Subject: [PATCH] config and handle remaining --- .../evmregistry/v21/logprovider/factory.go | 17 +++++++++++++++++ .../evmregistry/v21/logprovider/provider.go | 17 +++++++++++------ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go index 1d5ed590603..2a5f4f11aad 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go @@ -41,7 +41,15 @@ type LogTriggersOptions struct { // Finality depth is the number of blocks to wait before considering a block final. FinalityDepth int64 + // v1 config + BufferVersion string + + LogLimitLow int32 + + LogLimitHigh int32 + + BlockRate int64 } func NewOptions(finalityDepth int64) LogTriggersOptions { @@ -72,4 +80,13 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) { if o.FinalityDepth == 0 { o.FinalityDepth = finalityDepth } + if o.BlockRate == 0 { + o.BlockRate = 1 + } + if o.LogLimitLow == 0 { + o.LogLimitLow = 5 + } + if o.LogLimitHigh == 0 { + o.LogLimitHigh = o.LogLimitLow * 2 + } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index eaefdee2250..9650003316a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -112,7 +112,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa lggr: lggr.Named("KeepersRegistry.LogEventProvider"), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh), - bufferV1: NewLogBuffer(lggr, int(opts.LookbackBlocks), defaultLogLimitHigh), + bufferV1: NewLogBuffer(lggr, int(opts.LookbackBlocks), int(opts.LogLimitHigh)), poller: poller, opts: opts, filterStore: filterStore, @@ -169,7 +169,7 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers payloads := p.getPayloadsFromBuffer(latest.BlockNumber) if len(payloads) > 0 { - p.lggr.Debugw("Fetched payloads from buffer xxx", "latestBlock", latest.BlockNumber, "payloads", len(payloads)) + p.lggr.Debugw("Fetched payloads from buffer", "latestBlock", latest.BlockNumber, "payloads", len(payloads)) } return payloads, nil @@ -200,11 +200,11 @@ func (p *logEventProvider) getPayloadsFromBuffer(latestBlock int64) []ocr2keeper switch p.opts.BufferVersion { case "v1": - blockRate, upkeepLowLimit, maxResults := 4, 6, MaxPayloads // TODO: use config - for len(payloads) < maxResults && start < latestBlock { - logs, _ := p.bufferV1.Dequeue(start, blockRate, upkeepLowLimit, maxResults-len(payloads), DefaultUpkeepSelector) + blockRate, logLimitLow, maxResults := int(p.opts.BlockRate), int(p.opts.LogLimitLow), MaxPayloads + for len(payloads) < maxResults && start <= latestBlock { + logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector) if len(logs) > 0 { - p.lggr.Debugw("Dequeued logs xxx", "start", start, "latestBlock", latestBlock, "logs", len(logs)) + p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) } for _, l := range logs { payload, err := p.createPayload(l.ID, l.Log) @@ -212,6 +212,11 @@ func (p *logEventProvider) getPayloadsFromBuffer(latestBlock int64) []ocr2keeper payloads = append(payloads, payload) } } + if remaining > 0 { + p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining) + // TODO: handle remaining logs in a better way than consuming the entire window + continue + } start += int64(blockRate) } default: