Skip to content

Commit

Permalink
Try to collect stats on dequeued logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Jul 18, 2024
1 parent 3b43302 commit fa76ced
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ type logBuffer struct {
// last block number seen by the buffer
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
queueIDs []string
blockHashes map[int64]string
queues map[string]*upkeepLogQueue
queueIDs []string
blockHashes map[int64]string
availableLogs map[string]map[int64]int
dequeuedLogs map[string]map[int64]int

lock sync.RWMutex
}
Expand All @@ -85,6 +87,8 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB
queueIDs: []string{},
blockHashes: map[int64]string{},
queues: make(map[string]*upkeepLogQueue),
availableLogs: map[string]map[int64]int{},
dequeuedLogs: map[string]map[int64]int{},
}
}

Expand Down Expand Up @@ -121,7 +125,9 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
blockThreshold = 1
}

return buf.enqueue(blockThreshold, logs...)
added, dropped := buf.enqueue(blockThreshold, logs...)

return added, dropped
}

// blockStatistics returns the latest block number from the given logs, and updates any blocks that have been reorgd
Expand Down Expand Up @@ -200,6 +206,18 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B
}
queuesInRange++

upkeepAvailableLogs, ok := b.availableLogs[qid]
if !ok {
upkeepAvailableLogs = map[int64]int{}
}

_, ok2 := upkeepAvailableLogs[start]
if !ok2 {
upkeepAvailableLogs[start] = logsInRange
}

b.availableLogs[qid] = upkeepAvailableLogs

if capacity == 0 {
// if there is no more capacity for results, just count the remaining logs
remainingLogs += logsInRange
Expand All @@ -224,6 +242,20 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B

// update the buffer with how many logs we have dequeued for this window
q.dequeued[start] += len(logs)

dequeuedLogs, ok := b.dequeuedLogs[qid]
if !ok {
dequeuedLogs = map[int64]int{}
}

_, ok3 := dequeuedLogs[start]
if !ok3 {
dequeuedLogs[start] = len(logs)
} else {
dequeuedLogs[start] += len(logs)
}

b.dequeuedLogs[qid] = dequeuedLogs
}
b.lggr.Debugw("minimum commitment logs dequeued", "start", start, "end", end, "numUpkeeps", len(b.queues), "numUpkeepIDs", len(b.queueIDs), "minimumDequeueMet", minimumDequeueMet, "logLimit", logLimit, "queuesQueried", queuesQueried, "queuesInRange", queuesInRange, "queuesIterated", queuesIterated)
return result, remainingLogs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,14 @@ func (p *logEventProvider) Start(context.Context) error {
func (p *logEventProvider) Close() error {
return p.StopOnce(LogProviderServiceName, func() error {
p.threadCtrl.Close()

bufV1 := p.buffer.(*logBuffer)
availableLogsJSON, _ := json.Marshal(bufV1.availableLogs)
dequeuedLogsJSON, _ := json.Marshal(bufV1.dequeuedLogs)
p.lggr.Debugw("shutting down LogProvider", "availableLogsJSON", availableLogsJSON, "dequeuedLogsJSON", dequeuedLogsJSON)
return nil
})

}

Check failure on line 205 in core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

func (p *logEventProvider) HealthReport() map[string]error {
Expand Down

0 comments on commit fa76ced

Please sign in to comment.