From f186ddc4553dddb7fbbb999cb2da36e59df949fe Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 11:50:56 +0300 Subject: [PATCH 01/21] update ocr2keepers --- core/scripts/go.mod | 2 +- core/scripts/go.sum | 2 ++ go.mod | 2 +- go.sum | 2 ++ integration-tests/go.mod | 2 +- integration-tests/go.sum | 2 ++ 6 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 8966ac2d2f4..8fe0de46f40 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -19,7 +19,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 - github.com/smartcontractkit/ocr2keepers v0.7.19 + github.com/smartcontractkit/ocr2keepers v0.7.20 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb github.com/spf13/cobra v1.6.1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 1a4bba2fccc..77cdf6659d6 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1386,6 +1386,8 @@ github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoS github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY= github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= +github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU= +github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A= diff --git a/go.mod b/go.mod index a2007f5bac3..76ff887c970 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230831134610-680240b97aca github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20230901115736-bbabe542a918 github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 - github.com/smartcontractkit/ocr2keepers v0.7.19 + github.com/smartcontractkit/ocr2keepers v0.7.20 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230829114801-14bf715f805e diff --git a/go.sum b/go.sum index 4deab41bc40..0ff08b535b9 100644 --- a/go.sum +++ b/go.sum @@ -1386,6 +1386,8 @@ github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoS github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY= github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= +github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU= +github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 32e13bbd80e..7bd8efe005b 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -23,7 +23,7 @@ require ( github.com/smartcontractkit/chainlink-testing-framework v1.16.1-0.20230829222228-4afd1b3d385c github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 - github.com/smartcontractkit/ocr2keepers v0.7.19 + github.com/smartcontractkit/ocr2keepers v0.7.20 github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230829114801-14bf715f805e github.com/smartcontractkit/wasp v0.3.0 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index b5948436181..b685bd42d55 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2262,6 +2262,8 @@ github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoS github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY= github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= +github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU= +github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687/go.mod h1:YYZq52t4wcHoMQeITksYsorD+tZcOyuVU5+lvot3VFM= github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A= From c6aa9454b41e13b37d9a3c5ea190eb49dd89f5e9 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 15:23:30 +0300 Subject: [PATCH 02/21] go mod tidy --- core/scripts/go.sum | 2 -- go.sum | 2 -- integration-tests/go.sum | 2 -- 3 files changed, 6 deletions(-) diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 77cdf6659d6..f23f8b32317 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1384,8 +1384,6 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoScs9VHGnyCKF7AoQEuUfwJnzcKmGIfaczeanA= github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= -github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY= -github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU= github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= diff --git a/go.sum b/go.sum index 0ff08b535b9..991d436e3c4 100644 --- a/go.sum +++ b/go.sum @@ -1384,8 +1384,6 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoScs9VHGnyCKF7AoQEuUfwJnzcKmGIfaczeanA= github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= -github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY= -github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU= github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= diff --git a/integration-tests/go.sum b/integration-tests/go.sum index b685bd42d55..0cea85f0421 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2260,8 +2260,6 @@ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f h1:hgJ github.com/smartcontractkit/grpc-proxy v0.0.0-20230731113816-f1be6620749f/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5 h1:rzbqGoScs9VHGnyCKF7AoQEuUfwJnzcKmGIfaczeanA= github.com/smartcontractkit/libocr v0.0.0-20230816220705-665e93233ae5/go.mod h1:2lyRkw/qLQgUWlrWWmq5nj0y90rWeO6Y+v+fCakRgb0= -github.com/smartcontractkit/ocr2keepers v0.7.19 h1:w9CMs1V8pmxdRX6ME98goIRPMuN9DOkfMmZHeDPDQXY= -github.com/smartcontractkit/ocr2keepers v0.7.19/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2keepers v0.7.20 h1:1FOeJ0p4mWHqJiX01v/J2S1C1LhU8iqJM1hDcD38aZU= github.com/smartcontractkit/ocr2keepers v0.7.20/go.mod h1:AjcIEKeNnU7NRlvnuMCTjBIQ1kpW0YHhlFdeDa/3hs0= github.com/smartcontractkit/ocr2vrf v0.0.0-20230804151440-2f1eb1e20687 h1:NwC3SOc25noBTe1KUQjt45fyTIuInhoE2UfgcHAdihM= From ae2f43ba964a285df935672d107f067e8b0d671d Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 14:19:38 +0300 Subject: [PATCH 03/21] buffer: drop logs by seed-order --- .../ocr2keeper/evm21/logprovider/buffer.go | 94 +++++++++++++++---- 1 file changed, 77 insertions(+), 17 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index ad0ae5e1024..6889a314ece 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -1,6 +1,7 @@ package logprovider import ( + "encoding/hex" "math/big" "sort" "sync" @@ -8,6 +9,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/ocr2keepers/pkg/v3/random" + ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" ) var ( @@ -33,9 +36,46 @@ type fetchedBlock struct { visited []fetchedLog } +func (currentBlock *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { + has, upkeepLogs := currentBlock.has(fl.upkeepID, fl.log) + if has { + // Skipping known logs + return fetchedLog{}, false + } + // lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id) + currentBlock.logs = append(currentBlock.logs, fl) + + // drop logs if we reached limits. + if upkeepLogs+1 > maxUpkeepLogs { + // in case we have logs overflow for a particular upkeep, we drop a log of that upkeep, + // based on shared, random (per block) order of the logs in the block. + currentBlock.Sort() + var dropped fetchedLog + currentLogs := make([]fetchedLog, 0, len(currentBlock.logs)-1) + for _, l := range currentBlock.logs { + if dropped.upkeepID == nil && l.upkeepID.Cmp(fl.upkeepID) == 0 { + dropped = l + continue + } + currentLogs = append(currentLogs, l) + } + currentBlock.logs = currentLogs + return dropped, true + } else if len(currentBlock.logs) > maxBlockLogs { + // in case we have logs overflow in the buffer level, we drop a log based on + // shared, random (per block) order of the logs in the block. + currentBlock.Sort() + dropped := currentBlock.logs[0] + currentBlock.logs = currentBlock.logs[1:] + return dropped, true + } + + return fetchedLog{}, true +} + // Has returns true if the block has the log, // and the number of logs for that upkeep in the block. -func (b fetchedBlock) Has(id *big.Int, log logpoller.Log) (bool, int) { +func (b fetchedBlock) has(id *big.Int, log logpoller.Log) (bool, int) { allLogs := append(b.logs, b.visited...) upkeepLogs := 0 for _, l := range allLogs { @@ -62,6 +102,27 @@ func (b fetchedBlock) Clone() fetchedBlock { } } +// Sort by log identifiers, shuffled using a pseduorandom souce that is shared across all nodes +// for a given block. +func (b *fetchedBlock) Sort() { + randSeed := random.GetRandomKeySource(nil, uint64(b.blockNumber)) + + shuffledLogIDs := make([]string, len(b.logs)) + for i, log := range b.logs { + ext := ocr2keepers.LogTriggerExtension{ + TxHash: log.log.TxHash, + Index: uint32(log.log.LogIndex), + BlockHash: log.log.BlockHash, + } + logID := hex.EncodeToString(ext.LogIdentifier()) + shuffledLogIDs[i] = random.ShuffleString(logID, randSeed) + } + + sort.Slice(b.logs, func(i, j int) bool { + return shuffledLogIDs[i] < shuffledLogIDs[j] + }) +} + // logEventBuffer is a circular/ring buffer of fetched logs. // Each entry in the buffer represents a block, // and holds the logs fetched for that block. @@ -97,6 +158,7 @@ func (b *logEventBuffer) bufferSize() int { } // enqueue adds logs (if not exist) to the buffer, returning the number of logs added +// minus the number of logs dropped. func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { b.lock.Lock() defer b.lock.Unlock() @@ -107,7 +169,8 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { maxUpkeepLogs := int(b.maxUpkeepLogsPerBlock) latestBlock := b.latestBlockSeen() - added := 0 + added, dropped := 0, 0 + for _, log := range logs { if log.BlockNumber == 0 { // invalid log @@ -125,23 +188,20 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { lggr.Debugw("Skipping log from old block", "currentBlock", currentBlock.blockNumber, "newBlock", log.BlockNumber) continue } - if len(currentBlock.logs)+1 > maxBlockLogs { - lggr.Debugw("Reached max logs number per block, dropping log", "blockNumber", log.BlockNumber, - "blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex) + droppedLog, ok := currentBlock.Append(lggr, fetchedLog{upkeepID: id, log: log}, maxBlockLogs, maxUpkeepLogs) + if !ok { + // Skipping known logs continue } - if has, upkeepLogs := currentBlock.Has(id, log); has { - // Skipping existing log - continue - } else if upkeepLogs+1 > maxUpkeepLogs { - lggr.Debugw("Reached max logs number per upkeep, dropping log", "blockNumber", log.BlockNumber, - "blockHash", log.BlockHash, "txHash", log.TxHash, "logIndex", log.LogIndex) - continue + if droppedLog.upkeepID != nil { + dropped++ + lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", droppedLog.log.BlockNumber, + "blockHash", droppedLog.log.BlockHash, "txHash", droppedLog.log.TxHash, "logIndex", droppedLog.log.LogIndex, + "upkeepID", droppedLog.upkeepID.String()) } - // lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id) - currentBlock.logs = append(currentBlock.logs, fetchedLog{upkeepID: id, log: log}) - b.blocks[i] = currentBlock added++ + b.blocks[i] = currentBlock + if log.BlockNumber > latestBlock { latestBlock = log.BlockNumber } @@ -151,10 +211,10 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { atomic.StoreInt64(&b.latestBlock, latestBlock) } if added > 0 { - lggr.Debugw("Added logs to buffer", "addedLogs", added, "latestBlock", latestBlock) + lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) } - return added + return added - dropped } // peek returns the logs in range [latestBlock-blocks, latestBlock] From dcf2ec5b1a3a40641fd08f0d0fbcf3ceeb385ef5 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 14:19:51 +0300 Subject: [PATCH 04/21] comment out noisy log --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 8fbbb1e0a9d..78f8d58d52e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -318,7 +318,7 @@ func (p *logEventProvider) updateFiltersLastPoll(entries []upkeepFilter) { p.filterStore.UpdateFilters(func(orig, f upkeepFilter) upkeepFilter { if f.lastPollBlock > orig.lastPollBlock { orig.lastPollBlock = f.lastPollBlock - p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID) + // p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID) } return orig }, entries...) From 77b2c21f5828d86b651507451cfb00a229572b73 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 14:39:21 +0300 Subject: [PATCH 05/21] ensure order for provider.GetLatestPayloads() --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 6889a314ece..820c85ea943 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -280,6 +280,9 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetch if block.blockNumber < start || block.blockNumber > end { continue } + // Sort the logs in random order that is shared across all nodes. + // This ensures that nodes across the network will process the same logs. + block.Sort() var remainingLogs, blockResults []fetchedLog for _, log := range block.logs { if logsCount[log.upkeepID.String()] >= upkeepLimit { From dd934dbbfa5aef6f8a3df8cba9ac8c754321c5e9 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 14:39:44 +0300 Subject: [PATCH 06/21] ensure order for recoverer.GetRecoveryProposals() --- .../ocr2keeper/evm21/logprovider/recoverer.go | 23 +++++++++++++++++++ .../evm21/logprovider/recoverer_test.go | 4 +++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 5acfa1ee0c8..7566b1734f3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -13,6 +13,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/smartcontractkit/ocr2keepers/pkg/v3/random" ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -243,6 +244,11 @@ func (r *logRecoverer) getLogTriggerCheckData(ctx context.Context, proposal ocr2 } func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { + latestBlock, err := r.poller.LatestBlock(pg.WithParentCtx(ctx)) + if err != nil { + return nil, fmt.Errorf("%w: %s", ErrHeadNotAvailable, err) + } + r.lock.Lock() defer r.lock.Unlock() @@ -252,6 +258,8 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. logsCount := map[string]int{} + r.sortPending(uint64(latestBlock)) + var results, pending []ocr2keepers.UpkeepPayload for _, payload := range r.pending { uid := payload.UpkeepID.String() @@ -596,3 +604,18 @@ func (r *logRecoverer) removePending(workID string) { } r.pending = updated } + +// sortPending sorts the pending list by a random order based on the latest block. +// NOTE: the lock must be held before calling this function. +func (r *logRecoverer) sortPending(latestBlock uint64) { + randSeed := random.GetRandomKeySource(nil, latestBlock) + + shuffledIDs := make([]string, len(r.pending)) + for i, p := range r.pending { + shuffledIDs[i] = random.ShuffleString(p.WorkID, randSeed) + } + + sort.Slice(r.pending, func(i, j int) bool { + return shuffledIDs[i] < shuffledIDs[j] + }) +} diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index 7f01434c2b9..4121064935a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -31,7 +31,9 @@ import ( func TestLogRecoverer_GetRecoverables(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200)) + lp := &lpmocks.LogPoller{} + lp.On("LatestBlock", mock.Anything).Return(int64(100), nil) + r := NewLogRecoverer(logger.TestLogger(t), lp, nil, nil, nil, nil, NewOptions(200)) tests := []struct { name string From 62d0722b07be7133194d73b4bf164cc7a683a809 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 15:36:48 +0300 Subject: [PATCH 07/21] use a normalized value of latestBlock --- .../plugins/ocr2keeper/evm21/logprovider/recoverer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 7566b1734f3..b2a56466631 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -605,10 +605,15 @@ func (r *logRecoverer) removePending(workID string) { r.pending = updated } -// sortPending sorts the pending list by a random order based on the latest block. +// sortPending sorts the pending list by a random order based on the normalized latest block number. +// Divided by 10 to ensure that nodes with similar block numbers won't end up with different order. // NOTE: the lock must be held before calling this function. func (r *logRecoverer) sortPending(latestBlock uint64) { - randSeed := random.GetRandomKeySource(nil, latestBlock) + normalized := latestBlock / 10 + if normalized == 0 { + normalized = 1 + } + randSeed := random.GetRandomKeySource(nil, normalized) shuffledIDs := make([]string, len(r.pending)) for i, p := range r.pending { From 7007c6e37cc7f981b2a8ac3311d843b3ab725cc3 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 15:48:03 +0300 Subject: [PATCH 08/21] set overall limit for recovery proposals (MaxProposals) value TBD, currently set to 50 --- .../evm21/logprovider/integration_test.go | 6 ++--- .../ocr2keeper/evm21/logprovider/recoverer.go | 27 ++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go index 30994543eb6..b5f229f6015 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go @@ -478,10 +478,10 @@ func TestIntegration_LogRecoverer_Backfill(t *testing.T) { } lp, ethClient, utilsABI := setupDependencies(t, db, backend) filterStore := logprovider.NewUpkeepFilterStore() - origDefaultRecoveryInterval := logprovider.DefaultRecoveryInterval - logprovider.DefaultRecoveryInterval = time.Millisecond * 200 + origDefaultRecoveryInterval := logprovider.RecoveryInterval + logprovider.RecoveryInterval = time.Millisecond * 200 defer func() { - logprovider.DefaultRecoveryInterval = origDefaultRecoveryInterval + logprovider.RecoveryInterval = origDefaultRecoveryInterval }() provider, recoverer := setup(logger.TestLogger(t), lp, nil, utilsABI, &mockUpkeepStateStore{}, filterStore, opts) logProvider := provider.(logprovider.LogEventProviderTest) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index b2a56466631..7562efc7202 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -25,11 +25,17 @@ import ( ) var ( - DefaultRecoveryInterval = 5 * time.Second - RecoveryCacheTTL = 10*time.Minute - time.Second - GCInterval = RecoveryCacheTTL - - recoveryBatchSize = 10 + // RecoveryInterval is the interval at which the recovery scanning processing is triggered + RecoveryInterval = 5 * time.Second + // RecoveryCacheTTL is the time to live for the recovery cache + RecoveryCacheTTL = 10 * time.Minute + // GCInterval is the interval at which the recovery cache is cleaned up + GCInterval = RecoveryCacheTTL - time.Second + // MaxProposals is the maximum number of proposals that can be returned by GetRecoveryProposals + MaxProposals = 50 + // recoveryBatchSize is the number of filters to recover in a single batch + recoveryBatchSize = 10 + // recoveryLogsBuffer is the number of blocks to be used as a safety buffer when reading logs recoveryLogsBuffer = int64(50) ) @@ -256,20 +262,29 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. return nil, nil } + allLogsCounter := 0 logsCount := map[string]int{} r.sortPending(uint64(latestBlock)) var results, pending []ocr2keepers.UpkeepPayload for _, payload := range r.pending { + if allLogsCounter >= MaxProposals { + // we have enough proposals, pushed the rest are pushed back to pending + pending = append(pending, payload) + continue + } uid := payload.UpkeepID.String() if logsCount[uid] >= AllowedLogsPerUpkeep { + // we have enough proposals for this upkeep, the rest are pushed back to pending pending = append(pending, payload) continue } - logsCount[uid]++ results = append(results, payload) + logsCount[uid]++ + allLogsCounter++ } + r.pending = pending r.lggr.Debugf("found %d pending payloads", len(pending)) From 4818acc8c21c5b257845c3460d3bf12e0472d982 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 15:57:19 +0300 Subject: [PATCH 09/21] set max proposals to 20 (was 50) --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 7562efc7202..c0daf3eceb2 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -32,7 +32,7 @@ var ( // GCInterval is the interval at which the recovery cache is cleaned up GCInterval = RecoveryCacheTTL - time.Second // MaxProposals is the maximum number of proposals that can be returned by GetRecoveryProposals - MaxProposals = 50 + MaxProposals = 20 // recoveryBatchSize is the number of filters to recover in a single batch recoveryBatchSize = 10 // recoveryLogsBuffer is the number of blocks to be used as a safety buffer when reading logs From 62380f2576c216c4304a99560653e4e3c6618190 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 15:59:50 +0300 Subject: [PATCH 10/21] apply total limits when dequeing for payloads MaxPayloads was set to 100 --- .../ocr2keeper/evm21/logprovider/buffer.go | 16 +++++++++++++--- .../ocr2keeper/evm21/logprovider/buffer_test.go | 15 ++++++++++----- .../ocr2keeper/evm21/logprovider/provider.go | 4 +++- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 820c85ea943..2348adee739 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -256,7 +256,7 @@ func (b *logEventBuffer) peekRange(start, end int64) []fetchedLog { } // dequeueRange returns the logs between start and end inclusive. -func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetchedLog { +func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit int) []fetchedLog { b.lock.Lock() defer b.lock.Unlock() @@ -274,23 +274,33 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit int) []fetch }) logsCount := map[string]int{} + totalCount := 0 var results []fetchedLog for _, block := range fetchedBlocks { - // double checking that we don't have any gaps in the range if block.blockNumber < start || block.blockNumber > end { + // double checking that we don't have any gaps in the range continue } + if totalCount >= totalLimit { + // reached total limit, no need to process more blocks + break + } // Sort the logs in random order that is shared across all nodes. // This ensures that nodes across the network will process the same logs. block.Sort() var remainingLogs, blockResults []fetchedLog for _, log := range block.logs { + if totalCount >= totalLimit { + remainingLogs = append(remainingLogs, log) + continue + } if logsCount[log.upkeepID.String()] >= upkeepLimit { remainingLogs = append(remainingLogs, log) continue } - logsCount[log.upkeepID.String()]++ blockResults = append(blockResults, log) + logsCount[log.upkeepID.String()]++ + totalCount++ } if len(blockResults) == 0 { continue diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index 18eecb748a5..733209c4166 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -236,7 +236,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { results := buf.peekRange(int64(1), int64(2)) require.Equal(t, 2, len(results)) verifyBlockNumbers(t, results, 1, 2) - removed := buf.dequeueRange(int64(1), int64(2), 2) + removed := buf.dequeueRange(int64(1), int64(2), 2, 10) require.Equal(t, 2, len(removed)) results = buf.peekRange(int64(1), int64(2)) require.Equal(t, 0, len(results)) @@ -256,7 +256,7 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { results := buf.peek(8) require.Equal(t, 4, len(results)) verifyBlockNumbers(t, results, 1, 2, 3, 3) - removed := buf.dequeueRange(1, 3, 5) + removed := buf.dequeueRange(1, 3, 5, 5) require.Equal(t, 4, len(removed)) buf.lock.Lock() require.Equal(t, 0, len(buf.blocks[0].logs)) @@ -313,7 +313,12 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 5, TxHash: common.HexToHash("0x5"), LogIndex: 0}, ), 5) - logs := buf.dequeueRange(1, 5, 2) + logs := buf.dequeueRange(1, 5, 2, 10) + require.Equal(t, 2, len(logs)) + require.Equal(t, int64(5), logs[0].log.BlockNumber) + require.Equal(t, int64(4), logs[1].log.BlockNumber) + + logs = buf.dequeueRange(1, 5, 3, 2) require.Equal(t, 2, len(logs)) require.Equal(t, int64(5), logs[0].log.BlockNumber) require.Equal(t, int64(4), logs[1].log.BlockNumber) @@ -327,11 +332,11 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3"), LogIndex: 0}, ), 3) - logs := buf.dequeueRange(3, 3, 2) + logs := buf.dequeueRange(3, 3, 2, 10) fmt.Println(logs) require.Equal(t, 1, len(logs)) - logs = buf.dequeueRange(3, 3, 2) + logs = buf.dequeueRange(3, 3, 2, 10) fmt.Println(logs) require.Equal(t, 0, len(logs)) }) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 78f8d58d52e..57356606ff5 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -30,6 +30,8 @@ var ( // AllowedLogsPerUpkeep is the maximum number of logs allowed per upkeep every single call. AllowedLogsPerUpkeep = 5 + // MaxPayloads is the maximum number of payloads to return per call. + MaxPayloads = 100 readJobQueueSize = 64 readLogsTimeout = 10 * time.Second @@ -177,7 +179,7 @@ func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers if start <= 0 { start = 1 } - logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep) + logs := p.buffer.dequeueRange(start, latest, AllowedLogsPerUpkeep, MaxPayloads) // p.lggr.Debugw("got latest logs from buffer", "latest", latest, "diff", diff, "logs", len(logs)) From a48fd561af6d4b4a77b17617c14f4ff956d62ab7 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 16:03:01 +0300 Subject: [PATCH 11/21] fix test --- .../plugins/ocr2keeper/evm21/logprovider/buffer_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index 733209c4166..91f23654255 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -318,10 +318,13 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { require.Equal(t, int64(5), logs[0].log.BlockNumber) require.Equal(t, int64(4), logs[1].log.BlockNumber) + require.Equal(t, buf.enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 1}, + logpoller.Log{BlockNumber: 5, TxHash: common.HexToHash("0x5"), LogIndex: 1}, + ), 2) + logs = buf.dequeueRange(1, 5, 3, 2) require.Equal(t, 2, len(logs)) - require.Equal(t, int64(5), logs[0].log.BlockNumber) - require.Equal(t, int64(4), logs[1].log.BlockNumber) }) t.Run("dequeue doesn't return same logs again", func(t *testing.T) { From 1cebe48a51eb3d5269d75e78874797600a719028 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 16:38:14 +0300 Subject: [PATCH 12/21] fix max block logs --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 2348adee739..5ff06af6a63 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -61,7 +61,7 @@ func (currentBlock *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxB } currentBlock.logs = currentLogs return dropped, true - } else if len(currentBlock.logs) > maxBlockLogs { + } else if len(currentBlock.logs)+len(currentBlock.visited) > maxBlockLogs { // in case we have logs overflow in the buffer level, we drop a log based on // shared, random (per block) order of the logs in the block. currentBlock.Sort() From 0903e8f5d4582088d7ff643473d4058aa55f4d3f Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 16:39:45 +0300 Subject: [PATCH 13/21] protect log spamming --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index 57356606ff5..a34cc1a4d01 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -320,7 +320,10 @@ func (p *logEventProvider) updateFiltersLastPoll(entries []upkeepFilter) { p.filterStore.UpdateFilters(func(orig, f upkeepFilter) upkeepFilter { if f.lastPollBlock > orig.lastPollBlock { orig.lastPollBlock = f.lastPollBlock - // p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID) + if f.lastPollBlock%10 == 0 { + // print log occasionally to avoid spamming logs + p.lggr.Debugw("Updated lastPollBlock", "lastPollBlock", f.lastPollBlock, "upkeepID", f.upkeepID) + } } return orig }, entries...) From 320c532f5216f5f5cef9e687a991990fa87c9759 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 16:42:33 +0300 Subject: [PATCH 14/21] renaming --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go | 8 ++++---- .../ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 5ff06af6a63..d74c33e5494 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -14,10 +14,10 @@ import ( ) var ( - // allowedLogsPerBlock is the maximum number of logs allowed per upkeep in a block. - allowedLogsPerBlock = 128 - // bufferMaxBlockSize is the maximum number of blocks in the buffer. - bufferMaxBlockSize = 1024 + // maxLogsPerUpkeepInBlock is the maximum number of logs allowed per upkeep in a block. + maxLogsPerUpkeepInBlock = 32 + // maxLogsPerBlock is the maximum number of blocks in the buffer. + maxLogsPerBlock = 1024 ) // fetchedLog holds the log and the ID of the upkeep diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index a34cc1a4d01..6b89dfd0e72 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -101,7 +101,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa return &logEventProvider{ packer: packer, lggr: lggr.Named("KeepersRegistry.LogEventProvider"), - buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), bufferMaxBlockSize, allowedLogsPerBlock), + buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, opts: opts, filterStore: filterStore, From ddf86e80f4cebdf6bc2e14ff659ff0bfbc248265 Mon Sep 17 00:00:00 2001 From: amirylm Date: Tue, 5 Sep 2023 18:14:22 +0300 Subject: [PATCH 15/21] lint --- .../ocr2keeper/evm21/logprovider/buffer.go | 27 ++++++++++--------- .../ocr2/plugins/ocr2keeper/evm21/registry.go | 1 + 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index d74c33e5494..d8ca3e4b52c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -7,10 +7,11 @@ import ( "sync" "sync/atomic" - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/ocr2keepers/pkg/v3/random" ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) var ( @@ -36,37 +37,37 @@ type fetchedBlock struct { visited []fetchedLog } -func (currentBlock *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { - has, upkeepLogs := currentBlock.has(fl.upkeepID, fl.log) +func (b *fetchedBlock) Append(lggr logger.Logger, fl fetchedLog, maxBlockLogs, maxUpkeepLogs int) (fetchedLog, bool) { + has, upkeepLogs := b.has(fl.upkeepID, fl.log) if has { // Skipping known logs return fetchedLog{}, false } // lggr.Debugw("Adding log", "i", i, "blockBlock", currentBlock.blockNumber, "logBlock", log.BlockNumber, "id", id) - currentBlock.logs = append(currentBlock.logs, fl) + b.logs = append(b.logs, fl) // drop logs if we reached limits. if upkeepLogs+1 > maxUpkeepLogs { // in case we have logs overflow for a particular upkeep, we drop a log of that upkeep, // based on shared, random (per block) order of the logs in the block. - currentBlock.Sort() + b.Sort() var dropped fetchedLog - currentLogs := make([]fetchedLog, 0, len(currentBlock.logs)-1) - for _, l := range currentBlock.logs { + currentLogs := make([]fetchedLog, 0, len(b.logs)-1) + for _, l := range b.logs { if dropped.upkeepID == nil && l.upkeepID.Cmp(fl.upkeepID) == 0 { dropped = l continue } currentLogs = append(currentLogs, l) } - currentBlock.logs = currentLogs + b.logs = currentLogs return dropped, true - } else if len(currentBlock.logs)+len(currentBlock.visited) > maxBlockLogs { + } else if len(b.logs)+len(b.visited) > maxBlockLogs { // in case we have logs overflow in the buffer level, we drop a log based on // shared, random (per block) order of the logs in the block. - currentBlock.Sort() - dropped := currentBlock.logs[0] - currentBlock.logs = currentBlock.logs[1:] + b.Sort() + dropped := b.logs[0] + b.logs = b.logs[1:] return dropped, true } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 849463e53a2..1c54bf553d9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -283,6 +283,7 @@ func (r *EvmRegistry) refreshActiveUpkeeps() error { switch core.GetUpkeepType(*uid) { case ocr2keepers.LogTrigger: logTriggerIDs = append(logTriggerIDs, id) + default: } } From 81a2aa7d269ac6d291864bde28ef90aa771659d4 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 6 Sep 2023 10:51:22 +0300 Subject: [PATCH 16/21] set offset to 100 --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c0daf3eceb2..c3286fa11b9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -624,7 +624,7 @@ func (r *logRecoverer) removePending(workID string) { // Divided by 10 to ensure that nodes with similar block numbers won't end up with different order. // NOTE: the lock must be held before calling this function. func (r *logRecoverer) sortPending(latestBlock uint64) { - normalized := latestBlock / 10 + normalized := latestBlock / 100 if normalized == 0 { normalized = 1 } From 6143ccaa4de282efd6561073766a03c42aa05531 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 6 Sep 2023 10:51:55 +0300 Subject: [PATCH 17/21] added tests --- .../evm21/logprovider/buffer_test.go | 454 ++++++++++++++++++ 1 file changed, 454 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index 91f23654255..d48aca48f62 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -1,15 +1,18 @@ package logprovider import ( + "encoding/hex" "fmt" "math/big" "testing" "github.com/ethereum/go-ethereum/common" + ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/core" ) func TestLogEventBuffer_GetBlocksInRange(t *testing.T) { @@ -345,9 +348,460 @@ func TestLogEventBuffer_EnqueueDequeue(t *testing.T) { }) } +func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { + type appendArgs struct { + fl fetchedLog + maxBlockLogs, maxUpkeepLogs int + added, dropped bool + } + + tests := []struct { + name string + blockNumber int64 + logs []fetchedLog + visited []fetchedLog + toAdd []appendArgs + expected []fetchedLog + added bool + }{ + { + name: "empty block", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "existing log", + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "visited log", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: false, + }, + }, + expected: []fetchedLog{}, + }, + { + name: "upkeep log limits", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 10, + maxUpkeepLogs: 2, + added: true, + dropped: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + { + name: "block log limits", + blockNumber: 1, + logs: []fetchedLog{}, + visited: []fetchedLog{}, + toAdd: []appendArgs{ + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + }, + { + fl: fetchedLog{ + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + maxBlockLogs: 2, + maxUpkeepLogs: 4, + added: true, + dropped: true, + }, + }, + expected: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + lggr := logger.TestLogger(t) + b := fetchedBlock{ + blockNumber: tc.blockNumber, + logs: make([]fetchedLog, len(tc.logs)), + visited: make([]fetchedLog, len(tc.visited)), + } + copy(b.logs, tc.logs) + copy(b.visited, tc.visited) + + for _, args := range tc.toAdd { + dropped, added := b.Append(lggr, args.fl, args.maxBlockLogs, args.maxUpkeepLogs) + require.Equal(t, args.added, added) + if args.dropped { + require.NotNil(t, dropped.upkeepID) + } else { + require.Nil(t, dropped.upkeepID) + } + } + + require.Equal(t, tc.expected, b.logs) + }) + } +} +func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { + tests := []struct { + name string + blockNumber int64 + logs []fetchedLog + beforeSort []string + afterSort []string + iterations int + }{ + { + name: "no logs", + blockNumber: 10, + logs: []fetchedLog{}, + beforeSort: []string{}, + afterSort: []string{}, + }, + { + name: "single log", + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + }, + }, + beforeSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000100000000", + }, + afterSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000100000000", + }, + }, + { + name: "multiple logs with 10 iterations", + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 4, + }, + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 3, + }, + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 5, + }, + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 1, + }, + }, + }, + beforeSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000100000000", + "000000000000000000000000000000000000000000000000000000000000000100000002", + "000000000000000000000000000000000000000000000000000000000000000100000004", + "000000000000000000000000000000000000000000000000000000000000000100000003", + "000000000000000000000000000000000000000000000000000000000000000100000005", + "000000000000000000000000000000000000000000000000000000000000000100000001", + }, + afterSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000100000000", + "000000000000000000000000000000000000000000000000000000000000000100000002", + "000000000000000000000000000000000000000000000000000000000000000100000003", + "000000000000000000000000000000000000000000000000000000000000000100000004", + "000000000000000000000000000000000000000000000000000000000000000100000001", + "000000000000000000000000000000000000000000000000000000000000000100000005", + }, + iterations: 10, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + b := fetchedBlock{ + blockNumber: tc.blockNumber, + logs: make([]fetchedLog, len(tc.logs)), + } + if tc.iterations == 0 { + tc.iterations = 1 + } + // performing the same multiple times should yield the same result + // default is one iteration + for i := 0; i < tc.iterations; i++ { + copy(b.logs, tc.logs) + logIDs := getLogIds(b) + require.Equal(t, len(tc.beforeSort), len(logIDs)) + require.Equal(t, tc.beforeSort, logIDs) + b.Sort() + logIDsAfterSort := getLogIds(b) + require.Equal(t, len(tc.afterSort), len(logIDsAfterSort)) + require.Equal(t, tc.afterSort, logIDsAfterSort) + } + }) + } +} + +func TestLogEventBuffer_FetchedBlock_Clone(t *testing.T) { + b1 := fetchedBlock{ + blockNumber: 1, + logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 2, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, + }, + } + + b2 := b1.Clone() + require.Equal(t, b1.blockNumber, b2.blockNumber) + require.Equal(t, len(b1.logs), len(b2.logs)) + require.Equal(t, b1.logs[0].log.BlockNumber, b2.logs[0].log.BlockNumber) + + b1.blockNumber = 2 + b1.logs[0].log.BlockNumber = 2 + require.NotEqual(t, b1.blockNumber, b2.blockNumber) + require.NotEqual(t, b1.logs[0].log.BlockNumber, b2.logs[0].log.BlockNumber) +} + func verifyBlockNumbers(t *testing.T, logs []fetchedLog, bns ...int64) { require.Equal(t, len(bns), len(logs), "expected length mismatch") for i, log := range logs { require.Equal(t, bns[i], log.log.BlockNumber, "wrong block number") } } + +func getLogIds(b fetchedBlock) []string { + logIDs := make([]string, len(b.logs)) + for i, l := range b.logs { + ext := ocr2keepers.LogTriggerExtension{ + TxHash: l.log.TxHash, + Index: uint32(l.log.LogIndex), + BlockHash: l.log.BlockHash, + } + logIDs[i] = hex.EncodeToString(ext.LogIdentifier()) + } + return logIDs +} From a4ad4a2329c281155acd4c082e11dceeb776aaae Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 6 Sep 2023 11:22:16 +0300 Subject: [PATCH 18/21] use maps when sorting --- .../ocr2keeper/evm21/logprovider/buffer.go | 32 ++++++++++++------- .../evm21/logprovider/buffer_test.go | 28 +++++++++++++++- .../ocr2keeper/evm21/logprovider/recoverer.go | 10 +++--- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index d8ca3e4b52c..0524a151839 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -2,6 +2,7 @@ package logprovider import ( "encoding/hex" + "fmt" "math/big" "sort" "sync" @@ -25,6 +26,20 @@ var ( type fetchedLog struct { upkeepID *big.Int log logpoller.Log + + logId string +} + +func (l fetchedLog) getLogID() string { + if len(l.logId) == 0 { + ext := ocr2keepers.LogTriggerExtension{ + TxHash: l.log.TxHash, + Index: uint32(l.log.LogIndex), + BlockHash: l.log.BlockHash, + } + l.logId = hex.EncodeToString(ext.LogIdentifier()) + } + return l.logId } // fetchedBlock holds the logs fetched for a block @@ -108,19 +123,14 @@ func (b fetchedBlock) Clone() fetchedBlock { func (b *fetchedBlock) Sort() { randSeed := random.GetRandomKeySource(nil, uint64(b.blockNumber)) - shuffledLogIDs := make([]string, len(b.logs)) - for i, log := range b.logs { - ext := ocr2keepers.LogTriggerExtension{ - TxHash: log.log.TxHash, - Index: uint32(log.log.LogIndex), - BlockHash: log.log.BlockHash, - } - logID := hex.EncodeToString(ext.LogIdentifier()) - shuffledLogIDs[i] = random.ShuffleString(logID, randSeed) + shuffledLogIDs := make(map[string]string, len(b.logs)) + for _, log := range b.logs { + logID := log.getLogID() + shuffledLogIDs[logID] = random.ShuffleString(fmt.Sprintf("%s:%s", log.upkeepID, logID), randSeed) } - sort.Slice(b.logs, func(i, j int) bool { - return shuffledLogIDs[i] < shuffledLogIDs[j] + sort.SliceStable(b.logs, func(i, j int) bool { + return shuffledLogIDs[b.logs[i].getLogID()] < shuffledLogIDs[b.logs[j].getLogID()] }) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index d48aca48f62..6ee0ed65a4a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -664,12 +664,21 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { name: "multiple logs with 10 iterations", blockNumber: 1, logs: []fetchedLog{ + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x2"), + LogIndex: 0, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + }, { log: logpoller.Log{ BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0, }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), }, { log: logpoller.Log{ @@ -677,6 +686,7 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { TxHash: common.HexToHash("0x1"), LogIndex: 2, }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ @@ -684,6 +694,7 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { TxHash: common.HexToHash("0x1"), LogIndex: 4, }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ @@ -691,6 +702,15 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { TxHash: common.HexToHash("0x1"), LogIndex: 3, }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), + }, + { + log: logpoller.Log{ + BlockNumber: 1, + TxHash: common.HexToHash("0x1"), + LogIndex: 3, + }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ @@ -698,6 +718,7 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { TxHash: common.HexToHash("0x1"), LogIndex: 5, }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ @@ -705,23 +726,28 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { TxHash: common.HexToHash("0x1"), LogIndex: 1, }, + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, }, beforeSort: []string{ + "000000000000000000000000000000000000000000000000000000000000000200000000", "000000000000000000000000000000000000000000000000000000000000000100000000", "000000000000000000000000000000000000000000000000000000000000000100000002", "000000000000000000000000000000000000000000000000000000000000000100000004", "000000000000000000000000000000000000000000000000000000000000000100000003", + "000000000000000000000000000000000000000000000000000000000000000100000003", "000000000000000000000000000000000000000000000000000000000000000100000005", "000000000000000000000000000000000000000000000000000000000000000100000001", }, afterSort: []string{ "000000000000000000000000000000000000000000000000000000000000000100000000", + "000000000000000000000000000000000000000000000000000000000000000100000001", "000000000000000000000000000000000000000000000000000000000000000100000002", "000000000000000000000000000000000000000000000000000000000000000100000003", + "000000000000000000000000000000000000000000000000000000000000000100000003", "000000000000000000000000000000000000000000000000000000000000000100000004", - "000000000000000000000000000000000000000000000000000000000000000100000001", "000000000000000000000000000000000000000000000000000000000000000100000005", + "000000000000000000000000000000000000000000000000000000000000000200000000", }, iterations: 10, }, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c3286fa11b9..9a92c58d904 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -630,12 +630,12 @@ func (r *logRecoverer) sortPending(latestBlock uint64) { } randSeed := random.GetRandomKeySource(nil, normalized) - shuffledIDs := make([]string, len(r.pending)) - for i, p := range r.pending { - shuffledIDs[i] = random.ShuffleString(p.WorkID, randSeed) + shuffledIDs := make(map[string]string, len(r.pending)) + for _, p := range r.pending { + shuffledIDs[p.WorkID] = random.ShuffleString(p.WorkID, randSeed) } - sort.Slice(r.pending, func(i, j int) bool { - return shuffledIDs[i] < shuffledIDs[j] + sort.SliceStable(r.pending, func(i, j int) bool { + return shuffledIDs[r.pending[i].WorkID] < shuffledIDs[r.pending[j].WorkID] }) } From 8cab91401c28d85131ddec2427353976d767e6df Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 6 Sep 2023 14:56:41 +0300 Subject: [PATCH 19/21] temporary added blockhash to log id --- .../ocr2keeper/evm21/logprovider/buffer.go | 13 ++-- .../evm21/logprovider/buffer_test.go | 71 +++++++++++-------- 2 files changed, 47 insertions(+), 37 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 0524a151839..257430e1312 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -2,7 +2,6 @@ package logprovider import ( "encoding/hex" - "fmt" "math/big" "sort" "sync" @@ -33,11 +32,13 @@ type fetchedLog struct { func (l fetchedLog) getLogID() string { if len(l.logId) == 0 { ext := ocr2keepers.LogTriggerExtension{ - TxHash: l.log.TxHash, - Index: uint32(l.log.LogIndex), - BlockHash: l.log.BlockHash, + Index: uint32(l.log.LogIndex), } - l.logId = hex.EncodeToString(ext.LogIdentifier()) + copy(ext.TxHash[:], l.log.TxHash[:]) + // TODO: uncomment avoid block hash bytes once log identifier func + // changes in ocr2keepers to work with block hash + // copy(ext.BlockHash[:], l.log.BlockHash[:]) + l.logId = hex.EncodeToString(append(l.log.BlockHash.Bytes(), ext.LogIdentifier()...)) } return l.logId } @@ -126,7 +127,7 @@ func (b *fetchedBlock) Sort() { shuffledLogIDs := make(map[string]string, len(b.logs)) for _, log := range b.logs { logID := log.getLogID() - shuffledLogIDs[logID] = random.ShuffleString(fmt.Sprintf("%s:%s", log.upkeepID, logID), randSeed) + shuffledLogIDs[logID] = random.ShuffleString(logID, randSeed) } sort.SliceStable(b.logs, func(i, j int) bool { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index 6ee0ed65a4a..6998fb29613 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -647,6 +647,7 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { logs: []fetchedLog{ { log: logpoller.Log{ + BlockHash: common.HexToHash("0x111"), BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0, @@ -667,7 +668,8 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x2"), + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xb711bd1103927611ee41152aa8ae27f3330"), LogIndex: 0, }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), @@ -675,7 +677,8 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), LogIndex: 0, }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), @@ -683,71 +686,77 @@ func TestLogEventBuffer_FetchedBlock_Sort(t *testing.T) { { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 2, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 4, }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 4, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 3, }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), }, { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 3, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 2, }, - upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "222").BigInt(), + upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 3, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 5, }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), - LogIndex: 5, + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), + LogIndex: 3, }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, { log: logpoller.Log{ BlockNumber: 1, - TxHash: common.HexToHash("0x1"), + BlockHash: common.HexToHash("0xa25ebae1099f3fbae2525ebae279f3ae25e"), + TxHash: common.HexToHash("0xa651bd1109922111ee411525ebae27f3fb6"), LogIndex: 1, }, upkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "111").BigInt(), }, }, beforeSort: []string{ - "000000000000000000000000000000000000000000000000000000000000000200000000", - "000000000000000000000000000000000000000000000000000000000000000100000000", - "000000000000000000000000000000000000000000000000000000000000000100000002", - "000000000000000000000000000000000000000000000000000000000000000100000004", - "000000000000000000000000000000000000000000000000000000000000000100000003", - "000000000000000000000000000000000000000000000000000000000000000100000003", - "000000000000000000000000000000000000000000000000000000000000000100000005", - "000000000000000000000000000000000000000000000000000000000000000100000001", + "00000000000000000000000000000b711bd1103927611ee41152aa8ae27f333000000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000004", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000002", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000005", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000001", }, afterSort: []string{ - "000000000000000000000000000000000000000000000000000000000000000100000000", - "000000000000000000000000000000000000000000000000000000000000000100000001", - "000000000000000000000000000000000000000000000000000000000000000100000002", - "000000000000000000000000000000000000000000000000000000000000000100000003", - "000000000000000000000000000000000000000000000000000000000000000100000003", - "000000000000000000000000000000000000000000000000000000000000000100000004", - "000000000000000000000000000000000000000000000000000000000000000100000005", - "000000000000000000000000000000000000000000000000000000000000000200000000", + "00000000000000000000000000000b711bd1103927611ee41152aa8ae27f333000000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000000", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000001", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000002", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000003", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000004", + "00000000000000000000000000000a651bd1109922111ee411525ebae27f3fb600000005", }, iterations: 10, }, From 4ea0536cf76728f98e3adbcc1795e772cc0f8e56 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 6 Sep 2023 17:06:31 +0300 Subject: [PATCH 20/21] lint --- .../plugins/ocr2keeper/evm21/logprovider/buffer.go | 13 +++++++------ .../ocr2keeper/evm21/logprovider/buffer_test.go | 5 ++++- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 257430e1312..05f205fe767 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -25,12 +25,13 @@ var ( type fetchedLog struct { upkeepID *big.Int log logpoller.Log - - logId string + // cachedLogID is the cached log identifier, used for sorting. + // It is calculated lazily, and cached for performance. + cachedLogID string } -func (l fetchedLog) getLogID() string { - if len(l.logId) == 0 { +func (l *fetchedLog) getLogID() string { + if len(l.cachedLogID) == 0 { ext := ocr2keepers.LogTriggerExtension{ Index: uint32(l.log.LogIndex), } @@ -38,9 +39,9 @@ func (l fetchedLog) getLogID() string { // TODO: uncomment avoid block hash bytes once log identifier func // changes in ocr2keepers to work with block hash // copy(ext.BlockHash[:], l.log.BlockHash[:]) - l.logId = hex.EncodeToString(append(l.log.BlockHash.Bytes(), ext.LogIdentifier()...)) + l.cachedLogID = hex.EncodeToString(append(l.log.BlockHash.Bytes(), ext.LogIdentifier()...)) } - return l.logId + return l.cachedLogID } // fetchedBlock holds the logs fetched for a block diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go index 6998fb29613..0f389d0d418 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer_test.go @@ -620,7 +620,10 @@ func TestLogEventBuffer_FetchedBlock_Append(t *testing.T) { require.Nil(t, dropped.upkeepID) } } - + // clear cached logIDs + for i := range b.logs { + b.logs[i].cachedLogID = "" + } require.Equal(t, tc.expected, b.logs) }) } From e4e64bd4deddda032c394587e9fb81af39bd7ee2 Mon Sep 17 00:00:00 2001 From: amirylm Date: Wed, 6 Sep 2023 18:57:33 +0300 Subject: [PATCH 21/21] remove todo from log id func --- .../ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index 05f205fe767..1835ac69f09 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -36,10 +36,8 @@ func (l *fetchedLog) getLogID() string { Index: uint32(l.log.LogIndex), } copy(ext.TxHash[:], l.log.TxHash[:]) - // TODO: uncomment avoid block hash bytes once log identifier func - // changes in ocr2keepers to work with block hash - // copy(ext.BlockHash[:], l.log.BlockHash[:]) - l.cachedLogID = hex.EncodeToString(append(l.log.BlockHash.Bytes(), ext.LogIdentifier()...)) + copy(ext.BlockHash[:], l.log.BlockHash[:]) + l.cachedLogID = hex.EncodeToString(ext.LogIdentifier()) } return l.cachedLogID }