Skip to content

Commit

Permalink
USe default selector, but map ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Apr 28, 2024
1 parent 93bbc1e commit a003bfb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ type logBuffer struct {
// last block number seen by the buffer
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queueIDs []string
queues map[string]*upkeepLogQueue
lock sync.RWMutex
//queueIDs []string
queues map[string]*upkeepLogQueue
lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
//queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
}
}

Expand Down Expand Up @@ -132,8 +132,7 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS
var remainingLogs int
selectedUpkeeps := []string{}
numLogs := 0
for _, qid := range b.queueIDs {
q := b.queues[qid]
for _, q := range b.queues {
if !upkeepSelector(q.id) {
continue
}
Expand Down Expand Up @@ -189,7 +188,7 @@ func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error {
b.lock.Lock()
defer b.lock.Unlock()

for _, upkeepID := range b.queueIDs {
for upkeepID := range b.queues {
uid := new(big.Int)
_, ok := uid.SetString(upkeepID, 10)
if ok && !filterStore.Has(uid) {
Expand All @@ -213,16 +212,16 @@ func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) {
b.lock.Lock()
defer b.lock.Unlock()

found := false
for _, id := range b.queueIDs {
if id == uid.String() {
found = true
break
}
}
if !found {
b.queueIDs = append(b.queueIDs, uid.String())
}
//found := false
//for _, id := range b.queueIDs {
// if id == uid.String() {
// found = true
// break
// }
//}
//if !found {
// b.queueIDs = append(b.queueIDs, uid.String())
//}
b.queues[uid.String()] = buf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,14 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
p.iterations = int(math.Ceil(float64(p.bufferV1.NumOfUpkeeps()*logLimitLow) / float64(maxResults)))
}

//upkeepSelectorFn := func(id *big.Int) bool {
// return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
//}
upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}

for len(payloads) < maxResults && start <= latestBlock {
startWindow, end := getBlockWindow(start, blockRate)

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}
Expand Down

0 comments on commit a003bfb

Please sign in to comment.