From 214b0c60391d99ea12d67d439be1dd6de262b35f Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sat, 8 Jun 2024 00:42:46 +0100 Subject: [PATCH] Add tests --- .../evmregistry/v21/logprovider/provider.go | 12 +- .../v21/logprovider/provider_test.go | 1158 ++++++++++++++++- 2 files changed, 1163 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 92b7f0848da..13a361adeff 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -125,6 +125,7 @@ type logEventProvider struct { func newDequeueCoordinator() *dequeueCoordinator { return &dequeueCoordinator{ dequeuedMinimum: map[int64]bool{}, + notReady: map[int64]bool{}, remainingLogs: map[int64]int{}, dequeuedLogs: map[int64]int{}, completeWindows: map[int64]bool{}, @@ -134,6 +135,7 @@ func newDequeueCoordinator() *dequeueCoordinator { type dequeueCoordinator struct { dequeuedMinimum map[int64]bool + notReady map[int64]bool remainingLogs map[int64]int dequeuedLogs map[int64]int completeWindows map[int64]bool @@ -142,10 +144,12 @@ type dequeueCoordinator struct { func (c *dequeueCoordinator) dequeueBlockWindow(start int64, latestBlock int64, blockRate int) (int64, int64, bool) { // check if minimum logs have been dequeued - for i := start; i < latestBlock; i += int64(blockRate) { - startWindow, end := getBlockWindow(start, blockRate) + for i := start; i <= latestBlock; i += int64(blockRate) { + startWindow, end := getBlockWindow(i, blockRate) if latestBlock >= end { c.completeWindows[startWindow] = true + } else if c.notReady[startWindow] { // the window is incomplete and has no logs to provide as of yet + return 0, 0, false } if hasDequeued, ok := c.dequeuedMinimum[startWindow]; ok { @@ -159,7 +163,7 @@ func (c *dequeueCoordinator) dequeueBlockWindow(start int64, latestBlock int64, // check best effort dequeue for i := start; i < latestBlock; i += int64(blockRate) { - startWindow, end := getBlockWindow(start, blockRate) + startWindow, end := getBlockWindow(i, blockRate) if remainingLogs, ok := c.remainingLogs[startWindow]; ok { if remainingLogs > 0 { @@ -220,6 +224,8 @@ func (c *dequeueCoordinator) updateBlockWindow(startWindow int64, logs, remainin } } else if c.dequeuedLogs[startWindow] >= numberOfUpkeeps*logLimitLow { // this assumes we don't dequeue the same upkeeps more than logLimitLow in min commitment c.dequeuedMinimum[startWindow] = true + } else if logs == 0 && remaining == 0 { + c.notReady[startWindow] = true } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go index 59716c2e3bc..cce694a48dd 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go @@ -754,10 +754,1160 @@ func TestLogEventProvider_GetLatestPayloads(t *testing.T) { assert.Equal(t, 2999300, remainingLogs) }) - // complete windows, dequeues min oldest to newest, then best effort oldest to newest - // complete window, no logs, dq min is true after - // incomplete window, dequeues but only considered minium dq when correct number of logs dequeued - // incomplete window, but with no logs, considered minium dq when window becomes complete + t.Run("minimum guaranteed for all windows followed by best effort", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + logGenerator := func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start; i < end; i++ { + for j := 0; j < 10; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), + BlockNumber: i + 1, + }) + } + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + bufV1 := provider.bufferV1.(*logBuffer) + + // each upkeep should have 10 logs * 100 blocks = 1000 logs + assert.Equal(t, 1000, len(bufV1.queues["1"].logs)) + assert.Equal(t, 1000, len(bufV1.queues["2"].logs)) + assert.Equal(t, 1000, len(bufV1.queues["3"].logs)) + assert.Equal(t, 1000, len(bufV1.queues["4"].logs)) + assert.Equal(t, 1000, len(bufV1.queues["5"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 998, len(bufV1.queues["1"].logs)) + assert.Equal(t, 998, len(bufV1.queues["2"].logs)) + assert.Equal(t, 998, len(bufV1.queues["3"].logs)) + assert.Equal(t, 998, len(bufV1.queues["4"].logs)) + assert.Equal(t, 998, len(bufV1.queues["5"].logs)) + + blockWindowCounts := map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the first block window + assert.Equal(t, 40, blockWindowCounts[1]) + assert.Equal(t, 50, blockWindowCounts[2]) + assert.Equal(t, 50, blockWindowCounts[3]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 996, len(bufV1.queues["1"].logs)) + assert.Equal(t, 996, len(bufV1.queues["2"].logs)) + assert.Equal(t, 996, len(bufV1.queues["3"].logs)) + assert.Equal(t, 996, len(bufV1.queues["4"].logs)) + assert.Equal(t, 996, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 40, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 50, blockWindowCounts[3]) + + for i := 0; i < 97; i++ { + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + } + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 802, len(bufV1.queues["1"].logs)) + assert.Equal(t, 802, len(bufV1.queues["2"].logs)) + assert.Equal(t, 802, len(bufV1.queues["3"].logs)) + assert.Equal(t, 802, len(bufV1.queues["4"].logs)) + assert.Equal(t, 802, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 40, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[3]) + assert.Equal(t, 40, blockWindowCounts[99]) + assert.Equal(t, 50, blockWindowCounts[100]) + + // at this point, all block windows except for the latest block window will have been dequeued + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 800, len(bufV1.queues["1"].logs)) + assert.Equal(t, 800, len(bufV1.queues["2"].logs)) + assert.Equal(t, 800, len(bufV1.queues["3"].logs)) + assert.Equal(t, 800, len(bufV1.queues["4"].logs)) + assert.Equal(t, 800, len(bufV1.queues["5"].logs)) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 798, len(bufV1.queues["1"].logs)) + assert.Equal(t, 798, len(bufV1.queues["2"].logs)) + assert.Equal(t, 798, len(bufV1.queues["3"].logs)) + assert.Equal(t, 798, len(bufV1.queues["4"].logs)) + assert.Equal(t, 798, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 30, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[3]) + assert.Equal(t, 40, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 796, len(bufV1.queues["1"].logs)) + assert.Equal(t, 796, len(bufV1.queues["2"].logs)) + assert.Equal(t, 796, len(bufV1.queues["3"].logs)) + assert.Equal(t, 796, len(bufV1.queues["4"].logs)) + assert.Equal(t, 796, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[3]) + assert.Equal(t, 40, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 794, len(bufV1.queues["1"].logs)) + assert.Equal(t, 794, len(bufV1.queues["2"].logs)) + assert.Equal(t, 794, len(bufV1.queues["3"].logs)) + assert.Equal(t, 794, len(bufV1.queues["4"].logs)) + assert.Equal(t, 794, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 10, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[3]) + assert.Equal(t, 40, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 792, len(bufV1.queues["1"].logs)) + assert.Equal(t, 792, len(bufV1.queues["2"].logs)) + assert.Equal(t, 792, len(bufV1.queues["3"].logs)) + assert.Equal(t, 792, len(bufV1.queues["4"].logs)) + assert.Equal(t, 792, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 0, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[3]) + assert.Equal(t, 40, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 790, len(bufV1.queues["1"].logs)) + assert.Equal(t, 790, len(bufV1.queues["2"].logs)) + assert.Equal(t, 790, len(bufV1.queues["3"].logs)) + assert.Equal(t, 790, len(bufV1.queues["4"].logs)) + assert.Equal(t, 790, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + blockWindowCounts[l.BlockNumber]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 0, blockWindowCounts[1]) + assert.Equal(t, 30, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[3]) + assert.Equal(t, 40, blockWindowCounts[100]) + }) + + t.Run("minimum guaranteed for all windows including an incomplete window followed by best effort", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + logGenerator := func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start; i <= end; i++ { + logsToAdd := 10 + if i >= 100 { + logsToAdd = 1 + } + for j := 0; j < logsToAdd; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), + BlockNumber: i, + }) + } + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 102, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + opts.BlockRate = 4 // block window will be 4 blocks big + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + blockWindowCounts := map[int64]int{} + + bufV1 := provider.bufferV1.(*logBuffer) + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the first block window + assert.Equal(t, 150, blockWindowCounts[0]) // block 0 is outside the block threshold of 1 and is not enqueued + assert.Equal(t, 200, blockWindowCounts[4]) + assert.Equal(t, 200, blockWindowCounts[8]) + assert.Equal(t, 15, blockWindowCounts[100]) // the block window starting at block 100 is only 3/4 complete as of block 102 + + // each upkeep should have 10 logs * 102 blocks = 1020 logs + assert.Equal(t, 993, len(bufV1.queues["1"].logs)) + assert.Equal(t, 993, len(bufV1.queues["2"].logs)) + assert.Equal(t, 993, len(bufV1.queues["3"].logs)) + assert.Equal(t, 993, len(bufV1.queues["4"].logs)) + assert.Equal(t, 993, len(bufV1.queues["5"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 991, len(bufV1.queues["1"].logs)) + assert.Equal(t, 991, len(bufV1.queues["2"].logs)) + assert.Equal(t, 991, len(bufV1.queues["3"].logs)) + assert.Equal(t, 991, len(bufV1.queues["4"].logs)) + assert.Equal(t, 991, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the first block window + assert.Equal(t, 140, blockWindowCounts[0]) + assert.Equal(t, 200, blockWindowCounts[4]) + assert.Equal(t, 200, blockWindowCounts[8]) + assert.Equal(t, 15, blockWindowCounts[100]) // the block window starting at block 100 is only 3/4 complete as of block 102 + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 989, len(bufV1.queues["1"].logs)) + assert.Equal(t, 989, len(bufV1.queues["2"].logs)) + assert.Equal(t, 989, len(bufV1.queues["3"].logs)) + assert.Equal(t, 989, len(bufV1.queues["4"].logs)) + assert.Equal(t, 989, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the second block window, since the first block window has met it's minimum commitment + assert.Equal(t, 140, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + assert.Equal(t, 200, blockWindowCounts[8]) + + for i := 0; i < 23; i++ { + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + } + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 943, len(bufV1.queues["1"].logs)) + assert.Equal(t, 943, len(bufV1.queues["2"].logs)) + assert.Equal(t, 943, len(bufV1.queues["3"].logs)) + assert.Equal(t, 943, len(bufV1.queues["4"].logs)) + assert.Equal(t, 943, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 140, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + assert.Equal(t, 190, blockWindowCounts[8]) + assert.Equal(t, 190, blockWindowCounts[96]) + assert.Equal(t, 15, blockWindowCounts[100]) // still not been dequeued at this point + + // at this point, all block windows except for the latest block window will have been dequeued + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 941, len(bufV1.queues["1"].logs)) + assert.Equal(t, 941, len(bufV1.queues["2"].logs)) + assert.Equal(t, 941, len(bufV1.queues["3"].logs)) + assert.Equal(t, 941, len(bufV1.queues["4"].logs)) + assert.Equal(t, 941, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 140, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + assert.Equal(t, 190, blockWindowCounts[8]) + assert.Equal(t, 190, blockWindowCounts[96]) + assert.Equal(t, 5, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 939, len(bufV1.queues["1"].logs)) + assert.Equal(t, 939, len(bufV1.queues["2"].logs)) + assert.Equal(t, 939, len(bufV1.queues["3"].logs)) + assert.Equal(t, 939, len(bufV1.queues["4"].logs)) + assert.Equal(t, 939, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 130, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + assert.Equal(t, 190, blockWindowCounts[8]) + assert.Equal(t, 190, blockWindowCounts[96]) + assert.Equal(t, 5, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 937, len(bufV1.queues["1"].logs)) + assert.Equal(t, 937, len(bufV1.queues["2"].logs)) + assert.Equal(t, 937, len(bufV1.queues["3"].logs)) + assert.Equal(t, 937, len(bufV1.queues["4"].logs)) + assert.Equal(t, 937, len(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 120, blockWindowCounts[0]) // first block window is repeatedly dequeued as best effort + assert.Equal(t, 190, blockWindowCounts[4]) + assert.Equal(t, 190, blockWindowCounts[8]) + assert.Equal(t, 190, blockWindowCounts[96]) + assert.Equal(t, 5, blockWindowCounts[100]) + + provider.poller = &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 103, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 935, len(bufV1.queues["1"].logs)) + assert.Equal(t, 935, len(bufV1.queues["2"].logs)) + assert.Equal(t, 935, len(bufV1.queues["3"].logs)) + assert.Equal(t, 935, len(bufV1.queues["4"].logs)) + assert.Equal(t, 935, len(bufV1.queues["5"].logs)) + + for i := 0; i < 467; i++ { + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + } + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 0, blockWindowCounts[0]) // first block window is repeatedly dequeued as best effort + assert.Equal(t, 0, blockWindowCounts[4]) + assert.Equal(t, 0, blockWindowCounts[8]) + assert.Equal(t, 0, blockWindowCounts[96]) + assert.Equal(t, 5, blockWindowCounts[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 5, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 0, blockWindowCounts[0]) + assert.Equal(t, 0, blockWindowCounts[4]) + assert.Equal(t, 0, blockWindowCounts[8]) + assert.Equal(t, 0, blockWindowCounts[96]) + assert.Equal(t, 0, blockWindowCounts[100]) + + }) + + t.Run("a complete window with no logs present is immediately marked as having the min logs dequeued, logs are dequeued from the next window", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + logGenerator := func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start + 4; i <= end; i++ { + logsToAdd := 10 + if i >= 100 { + logsToAdd = 1 + } + for j := 0; j < logsToAdd; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), + BlockNumber: i, + }) + } + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 99, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + opts.BlockRate = 4 // block window will be 4 blocks big + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + bufV1 := provider.bufferV1.(*logBuffer) + + blockWindowCounts := map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the first block window + assert.Equal(t, 0, blockWindowCounts[0]) + assert.Equal(t, 200, blockWindowCounts[4]) + assert.Equal(t, 200, blockWindowCounts[8]) + assert.Equal(t, 200, blockWindowCounts[96]) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + // the first block window does not contain any logs, so it automatically gets marked as having the minimum dequeued + assert.True(t, true, provider.dequeueCoordinator.dequeuedMinimum[0]) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the second block window + assert.Equal(t, 0, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + assert.Equal(t, 200, blockWindowCounts[8]) + assert.Equal(t, 200, blockWindowCounts[96]) + }) + + t.Run("an incomplete window with no logs present is marked as not ready then min dequeued when the window is complete", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + logGenerator := func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start + 4; i <= end; i++ { + logsToAdd := 10 + if i >= 100 { + logsToAdd = 1 + } + for j := 0; j < logsToAdd; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), + BlockNumber: i, + }) + } + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 2, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + opts.BlockRate = 4 // block window will be 4 blocks big + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + bufV1 := provider.bufferV1.(*logBuffer) + + blockWindowCounts := map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 0, blockWindowCounts[0]) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 0, len(payloads)) + + assert.Equal(t, false, provider.dequeueCoordinator.dequeuedMinimum[0]) + assert.Equal(t, true, provider.dequeueCoordinator.notReady[0]) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the second block window + assert.Equal(t, 0, blockWindowCounts[0]) + + provider.poller = &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 3, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 0, len(payloads)) + + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[0]) // now that the window is complete, it should be marked as dequeued minimum + assert.Equal(t, true, provider.dequeueCoordinator.notReady[0]) + + provider.poller = &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 7, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + err = provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 0, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 0, blockWindowCounts[0]) + assert.Equal(t, 180, blockWindowCounts[4]) + + provider.poller = &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 11, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + err = provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 0, blockWindowCounts[0]) + assert.Equal(t, 180, blockWindowCounts[4]) + assert.Equal(t, 190, blockWindowCounts[8]) + + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[0]) + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[4]) + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[8]) + + }) + + t.Run("an incomplete window with minimum logs already present is marked as min dequeued", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + logGenerator := func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start; i <= end; i++ { + logsToAdd := 10 + for j := 0; j < logsToAdd; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), + BlockNumber: i, + }) + } + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 2, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + opts.BlockRate = 4 // block window will be 4 blocks big + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + bufV1 := provider.bufferV1.(*logBuffer) + + blockWindowCounts := map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 100, blockWindowCounts[0]) // 100 logs because blocks 0, 1, 2 exist, 0 is omitted in enqueue, so blocks 1 and 2 have 10x5 logs each + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[0]) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + // all 10 logs should have been dequeued from the first block window + assert.Equal(t, 90, blockWindowCounts[0]) + + logGenerator = func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start + 4; i <= end; i++ { + logsToAdd := 10 + for j := 0; j < logsToAdd; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i)), + BlockNumber: i, + }) + } + } + return res + } + + provider.poller = &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 7, nil // make the latest window incomplete + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + err = provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 90, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[0]) + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[4]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 80, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + assert.Equal(t, 10, len(payloads)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for _, l := range q.logs { + startWindow, _ := getBlockWindow(l.BlockNumber, 4) + + blockWindowCounts[startWindow]++ + } + } + + assert.Equal(t, 70, blockWindowCounts[0]) + assert.Equal(t, 190, blockWindowCounts[4]) + }) + } type mockedPacker struct {