From a003bfbbed7e014880873c6544e6b7719b1a1554 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sun, 28 Apr 2024 22:03:53 +0100 Subject: [PATCH] USe default selector, but map ordering --- .../evmregistry/v21/logprovider/buffer_v1.go | 35 +++++++++---------- .../evmregistry/v21/logprovider/provider.go | 8 ++--- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 43ed3f18edb..1fb0a416f1c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -78,9 +78,9 @@ type logBuffer struct { // last block number seen by the buffer lastBlockSeen *atomic.Int64 // map of upkeep id to its queue - queueIDs []string - queues map[string]*upkeepLogQueue - lock sync.RWMutex + //queueIDs []string + queues map[string]*upkeepLogQueue + lock sync.RWMutex } func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { @@ -88,8 +88,8 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), opts: newLogBufferOptions(lookback, blockRate, logLimit), lastBlockSeen: new(atomic.Int64), - queueIDs: []string{}, - queues: make(map[string]*upkeepLogQueue), + //queueIDs: []string{}, + queues: make(map[string]*upkeepLogQueue), } } @@ -132,8 +132,7 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS var remainingLogs int selectedUpkeeps := []string{} numLogs := 0 - for _, qid := range b.queueIDs { - q := b.queues[qid] + for _, q := range b.queues { if !upkeepSelector(q.id) { continue } @@ -189,7 +188,7 @@ func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error { b.lock.Lock() defer b.lock.Unlock() - for _, upkeepID := range b.queueIDs { + for upkeepID := range b.queues { uid := new(big.Int) _, ok := uid.SetString(upkeepID, 10) if ok && !filterStore.Has(uid) { @@ -213,16 +212,16 @@ func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) { b.lock.Lock() defer b.lock.Unlock() - found := false - for _, id := range b.queueIDs { - if id == uid.String() { - found = true - break - } - } - if !found { - b.queueIDs = append(b.queueIDs, uid.String()) - } + //found := false + //for _, id := range b.queueIDs { + // if id == uid.String() { + // found = true + // break + // } + //} + //if !found { + // b.queueIDs = append(b.queueIDs, uid.String()) + //} b.queues[uid.String()] = buf } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 5a853815905..9e4745ff908 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -305,14 +305,14 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up p.iterations = int(math.Ceil(float64(p.bufferV1.NumOfUpkeeps()*logLimitLow) / float64(maxResults))) } - //upkeepSelectorFn := func(id *big.Int) bool { - // return id.Int64()%int64(p.iterations) == int64(p.currentIteration) - //} + upkeepSelectorFn := func(id *big.Int) bool { + return id.Int64()%int64(p.iterations) == int64(p.currentIteration) + } for len(payloads) < maxResults && start <= latestBlock { startWindow, end := getBlockWindow(start, blockRate) - logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector) + logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn) if len(logs) > 0 { p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) }