diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index c5e3be20012..8019a010635 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -70,9 +70,11 @@ type logBuffer struct { // last block number seen by the buffer lastBlockSeen *atomic.Int64 // map of upkeep id to its queue - queues map[string]*upkeepLogQueue - queueIDs []string - blockHashes map[int64]string + queues map[string]*upkeepLogQueue + queueIDs []string + blockHashes map[int64]string + availableLogs map[string]map[int64]int + dequeuedLogs map[string]map[int64]int lock sync.RWMutex } @@ -85,6 +87,8 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB queueIDs: []string{}, blockHashes: map[int64]string{}, queues: make(map[string]*upkeepLogQueue), + availableLogs: map[string]map[int64]int{}, + dequeuedLogs: map[string]map[int64]int{}, } } @@ -121,7 +125,9 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { blockThreshold = 1 } - return buf.enqueue(blockThreshold, logs...) + added, dropped := buf.enqueue(blockThreshold, logs...) + + return added, dropped } // blockStatistics returns the latest block number from the given logs, and updates any blocks that have been reorgd @@ -200,6 +206,18 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B } queuesInRange++ + upkeepAvailableLogs, ok := b.availableLogs[qid] + if !ok { + upkeepAvailableLogs = map[int64]int{} + } + + _, ok2 := upkeepAvailableLogs[start] + if !ok2 { + upkeepAvailableLogs[start] = logsInRange + } + + b.availableLogs[qid] = upkeepAvailableLogs + if capacity == 0 { // if there is no more capacity for results, just count the remaining logs remainingLogs += logsInRange @@ -224,6 +242,20 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B // update the buffer with how many logs we have dequeued for this window q.dequeued[start] += len(logs) + + dequeuedLogs, ok := b.dequeuedLogs[qid] + if !ok { + dequeuedLogs = map[int64]int{} + } + + _, ok3 := dequeuedLogs[start] + if !ok3 { + dequeuedLogs[start] = len(logs) + } else { + dequeuedLogs[start] += len(logs) + } + + b.dequeuedLogs[qid] = dequeuedLogs } b.lggr.Debugw("minimum commitment logs dequeued", "start", start, "end", end, "numUpkeeps", len(b.queues), "numUpkeepIDs", len(b.queueIDs), "minimumDequeueMet", minimumDequeueMet, "logLimit", logLimit, "queuesQueried", queuesQueried, "queuesInRange", queuesInRange, "queuesIterated", queuesIterated) return result, remainingLogs diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 57bd76171b0..e74d3bcb026 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -194,8 +194,14 @@ func (p *logEventProvider) Start(context.Context) error { func (p *logEventProvider) Close() error { return p.StopOnce(LogProviderServiceName, func() error { p.threadCtrl.Close() + + bufV1 := p.buffer.(*logBuffer) + availableLogsJSON, _ := json.Marshal(bufV1.availableLogs) + dequeuedLogsJSON, _ := json.Marshal(bufV1.dequeuedLogs) + p.lggr.Debugw("shutting down LogProvider", "availableLogsJSON", availableLogsJSON, "dequeuedLogsJSON", dequeuedLogsJSON) return nil }) + } func (p *logEventProvider) HealthReport() map[string]error {