From c6065d48eae5b824a5817ba042e827fb1e9b8af4 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 18 Jul 2024 11:24:07 +0100 Subject: [PATCH] Append as time series --- .../evmregistry/v21/logprovider/buffer_v1.go | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 8019a010635..c03c65481d1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -73,8 +73,8 @@ type logBuffer struct { queues map[string]*upkeepLogQueue queueIDs []string blockHashes map[int64]string - availableLogs map[string]map[int64]int - dequeuedLogs map[string]map[int64]int + availableLogs map[string]map[int64][]int + dequeuedLogs map[string]map[int64][]int lock sync.RWMutex } @@ -87,8 +87,8 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB queueIDs: []string{}, blockHashes: map[int64]string{}, queues: make(map[string]*upkeepLogQueue), - availableLogs: map[string]map[int64]int{}, - dequeuedLogs: map[string]map[int64]int{}, + availableLogs: map[string]map[int64][]int{}, + dequeuedLogs: map[string]map[int64][]int{}, } } @@ -208,14 +208,18 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B upkeepAvailableLogs, ok := b.availableLogs[qid] if !ok { - upkeepAvailableLogs = map[int64]int{} + upkeepAvailableLogs = map[int64][]int{} } - _, ok2 := upkeepAvailableLogs[start] - if !ok2 { - upkeepAvailableLogs[start] = logsInRange + series2, ok3 := upkeepAvailableLogs[start] + if !ok3 { + series2 = []int{logsInRange} + } else { + series2 = append(series2, logsInRange) } + upkeepAvailableLogs[start] = series2 + b.availableLogs[qid] = upkeepAvailableLogs if capacity == 0 { @@ -245,16 +249,18 @@ func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]B dequeuedLogs, ok := b.dequeuedLogs[qid] if !ok { - dequeuedLogs = map[int64]int{} + dequeuedLogs = map[int64][]int{} } - _, ok3 := dequeuedLogs[start] + series, ok3 := dequeuedLogs[start] if !ok3 { - dequeuedLogs[start] = len(logs) + series = []int{len(logs)} } else { - dequeuedLogs[start] += len(logs) + series = append(series, len(logs)) } + dequeuedLogs[start] = series + b.dequeuedLogs[qid] = dequeuedLogs } b.lggr.Debugw("minimum commitment logs dequeued", "start", start, "end", end, "numUpkeeps", len(b.queues), "numUpkeepIDs", len(b.queueIDs), "minimumDequeueMet", minimumDequeueMet, "logLimit", logLimit, "queuesQueried", queuesQueried, "queuesInRange", queuesInRange, "queuesIterated", queuesIterated)