diff --git a/.changeset/cold-seals-listen.md b/.changeset/cold-seals-listen.md new file mode 100644 index 0000000000..efc52933f2 --- /dev/null +++ b/.changeset/cold-seals-listen.md @@ -0,0 +1,5 @@ +--- +"ccip": patch +--- + +Fetching nonces from chain instead of relying on inflight cache values diff --git a/.changeset/green-fishes-fix.md b/.changeset/green-fishes-fix.md new file mode 100644 index 0000000000..1b85e80174 --- /dev/null +++ b/.changeset/green-fishes-fix.md @@ -0,0 +1,5 @@ +--- +"ccip": patch +--- + +remove inflight cache from commit plugin diff --git a/VERSION b/VERSION index 98651bfee6..a5e3eb5055 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.9.1-ccip1.4.8-release \ No newline at end of file +2.9.1-ccip1.4.9-release \ No newline at end of file diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go index e29cbebbad..b79f87bc2b 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go @@ -93,7 +93,6 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin priceGetter: rf.config.priceGetter, F: config.F, lggr: lggr, - inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry), destPriceRegistryReader: rf.destPriceRegReader, offRampReader: rf.config.offRamp, gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/inflight.go b/core/services/ocr2/plugins/ccip/ccipcommit/inflight.go deleted file mode 100644 index 32f76d2abd..0000000000 --- a/core/services/ocr2/plugins/ccip/ccipcommit/inflight.go +++ /dev/null @@ -1,176 +0,0 @@ -package ccipcommit - -import ( - "sync" - "time" - - "github.com/ethereum/go-ethereum/common/hexutil" - - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" -) - -const ( - PRICE_EXPIRY_MULTIPLIER = 3 // Keep price update cache longer and use it as source of truth -) - -// InflightCommitReport represents a commit report which has been submitted -// to the transaction manager and we expect to be included in the chain. -// By keeping track of the inflight reports, we are able to build subsequent -// reports "on top" of the inflight ones for improved throughput - for example -// if seqNrs=[1,2] are inflight, we can build and send [3,4] while [1,2] is still confirming -// and optimistically assume they will complete in order. If for whatever reason (re-org or -// RPC timing) leads to [3,4] arriving before [1,2], we'll revert onchain. Once the cache -// expires we'll then build from the onchain state again and retries. In this manner, -// we are able to obtain high throughput during happy path yet still naturally recover -// if a reorg or issue causes onchain reverts. -type InflightCommitReport struct { - report cciptypes.CommitStoreReport - createdAt time.Time -} - -type InflightPriceUpdate struct { - gasPrices []cciptypes.GasPrice - tokenPrices []cciptypes.TokenPrice - createdAt time.Time - epochAndRound uint64 -} - -// inflightExecReportsContainer holds existing inflight reports. -// it provides a thread-safe access as it is called from multiple goroutines, -// e.g. reporting and transmission protocols. -type inflightCommitReportsContainer struct { - locker sync.RWMutex - inFlight map[[32]byte]InflightCommitReport - inFlightPriceUpdates []InflightPriceUpdate - cacheExpiry time.Duration -} - -func newInflightCommitReportsContainer(inflightCacheExpiry time.Duration) *inflightCommitReportsContainer { - return &inflightCommitReportsContainer{ - locker: sync.RWMutex{}, - inFlight: make(map[[32]byte]InflightCommitReport), - inFlightPriceUpdates: []InflightPriceUpdate{}, - cacheExpiry: inflightCacheExpiry, - } -} - -func (c *inflightCommitReportsContainer) maxInflightSeqNr() uint64 { - c.locker.RLock() - defer c.locker.RUnlock() - var maxSeqNr uint64 - for _, report := range c.inFlight { - if report.report.Interval.Max >= maxSeqNr { - maxSeqNr = report.report.Interval.Max - } - } - return maxSeqNr -} - -// latestInflightGasPriceUpdates returns a map of the latest gas price updates. -func (c *inflightCommitReportsContainer) latestInflightGasPriceUpdates() map[uint64]update { - c.locker.RLock() - defer c.locker.RUnlock() - latestGasPriceUpdates := make(map[uint64]update) - latestEpochAndRounds := make(map[uint64]uint64) - - for _, inflight := range c.inFlightPriceUpdates { - for _, inflightGasUpdate := range inflight.gasPrices { - _, ok := latestGasPriceUpdates[inflightGasUpdate.DestChainSelector] - if !ok || inflight.epochAndRound > latestEpochAndRounds[inflightGasUpdate.DestChainSelector] { - latestGasPriceUpdates[inflightGasUpdate.DestChainSelector] = update{ - value: inflightGasUpdate.Value, - timestamp: inflight.createdAt, - } - latestEpochAndRounds[inflightGasUpdate.DestChainSelector] = inflight.epochAndRound - } - } - } - - return latestGasPriceUpdates -} - -// latestInflightTokenPriceUpdates returns a map of the latest token price updates -func (c *inflightCommitReportsContainer) latestInflightTokenPriceUpdates() map[cciptypes.Address]update { - c.locker.RLock() - defer c.locker.RUnlock() - latestTokenPriceUpdates := make(map[cciptypes.Address]update) - latestEpochAndRounds := make(map[cciptypes.Address]uint64) - for _, inflight := range c.inFlightPriceUpdates { - for _, inflightTokenUpdate := range inflight.tokenPrices { - _, ok := latestTokenPriceUpdates[inflightTokenUpdate.Token] - if !ok || inflight.epochAndRound > latestEpochAndRounds[inflightTokenUpdate.Token] { - latestTokenPriceUpdates[inflightTokenUpdate.Token] = update{ - value: inflightTokenUpdate.Value, - timestamp: inflight.createdAt, - } - latestEpochAndRounds[inflightTokenUpdate.Token] = inflight.epochAndRound - } - } - } - return latestTokenPriceUpdates -} - -func (c *inflightCommitReportsContainer) reset(lggr logger.Logger) { - lggr.Infow("Inflight report reset") - c.locker.Lock() - defer c.locker.Unlock() - c.inFlight = make(map[[32]byte]InflightCommitReport) - c.inFlightPriceUpdates = []InflightPriceUpdate{} -} - -func (c *inflightCommitReportsContainer) expire(lggr logger.Logger) { - c.locker.Lock() - defer c.locker.Unlock() - // Reap any expired entries from inflight. - for root, inFlightReport := range c.inFlight { - if time.Since(inFlightReport.createdAt) > c.cacheExpiry { - // Happy path: inflight report was successfully transmitted onchain, we remove it from inflight and onchain state reflects inflight. - // Sad path: inflight report reverts onchain, we remove it from inflight, onchain state does not reflect the chains so we retry. - lggr.Infow("Inflight report expired", "rootOfRoots", hexutil.Encode(inFlightReport.report.MerkleRoot[:])) - delete(c.inFlight, root) - } - } - - lggr.Infow("Inflight expire with price count", "count", len(c.inFlightPriceUpdates)) - - var stillInflight []InflightPriceUpdate - for _, inFlightFeeUpdate := range c.inFlightPriceUpdates { - timeSinceUpdate := time.Since(inFlightFeeUpdate.createdAt) - // If time passed since the price update is greater than multiplier * cache expiry, we remove it from the inflight list. - if timeSinceUpdate > c.cacheExpiry*PRICE_EXPIRY_MULTIPLIER { - // Happy path: inflight report was successfully transmitted onchain, we remove it from inflight and onchain state reflects inflight. - // Sad path: inflight report reverts onchain, we remove it from inflight, onchain state does not reflect the chains, so we retry. - lggr.Infow("Inflight price update expired", "gasPrices", inFlightFeeUpdate.gasPrices, "tokenPrices", inFlightFeeUpdate.tokenPrices) - } else { - // If the update is still valid, we keep it in the inflight list. - stillInflight = append(stillInflight, inFlightFeeUpdate) - } - } - c.inFlightPriceUpdates = stillInflight -} - -func (c *inflightCommitReportsContainer) add(lggr logger.Logger, report cciptypes.CommitStoreReport, epochAndRound uint64) error { - c.locker.Lock() - defer c.locker.Unlock() - - if report.MerkleRoot != [32]byte{} { - // Set new inflight ones as pending - lggr.Infow("Adding to inflight report", "rootOfRoots", hexutil.Encode(report.MerkleRoot[:])) - c.inFlight[report.MerkleRoot] = InflightCommitReport{ - report: report, - createdAt: time.Now(), - } - } - - if len(report.GasPrices) != 0 || len(report.TokenPrices) != 0 { - lggr.Infow("Adding to inflight fee updates", "gasPrices", report.GasPrices, "tokenPrices", report.TokenPrices) - c.inFlightPriceUpdates = append(c.inFlightPriceUpdates, InflightPriceUpdate{ - gasPrices: report.GasPrices, - tokenPrices: report.TokenPrices, - createdAt: time.Now(), - epochAndRound: epochAndRound, - }) - } - return nil -} diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/inflight_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/inflight_test.go deleted file mode 100644 index 0dd11aca8e..0000000000 --- a/core/services/ocr2/plugins/ccip/ccipcommit/inflight_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package ccipcommit - -import ( - "math/big" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" -) - -func TestCommitInflight(t *testing.T) { - lggr := logger.TestLogger(t) - c := newInflightCommitReportsContainer(time.Hour) - - c.inFlightPriceUpdates = append(c.inFlightPriceUpdates, InflightPriceUpdate{ - gasPrices: []cciptypes.GasPrice{}, - createdAt: time.Now(), - epochAndRound: ccipcalc.MergeEpochAndRound(2, 4), - }) - - // Initially should be empty - inflightGasUpdates := c.latestInflightGasPriceUpdates() - assert.Equal(t, 0, len(inflightGasUpdates)) - assert.Equal(t, uint64(0), c.maxInflightSeqNr()) - - epochAndRound := uint64(1) - - // Add a single report inflight - root1 := utils.Keccak256Fixed(hexutil.MustDecode("0xaa")) - require.NoError(t, c.add(lggr, cciptypes.CommitStoreReport{ - Interval: cciptypes.CommitStoreInterval{Min: 1, Max: 2}, - MerkleRoot: root1, - GasPrices: []cciptypes.GasPrice{ - {DestChainSelector: 123, Value: big.NewInt(999)}, - }, - }, epochAndRound)) - inflightGasUpdates = c.latestInflightGasPriceUpdates() - assert.Equal(t, 1, len(inflightGasUpdates)) - assert.Equal(t, big.NewInt(999), inflightGasUpdates[123].value) - assert.Equal(t, uint64(2), c.maxInflightSeqNr()) - epochAndRound++ - - // Add another price report - root2 := utils.Keccak256Fixed(hexutil.MustDecode("0xab")) - require.NoError(t, c.add(lggr, cciptypes.CommitStoreReport{ - Interval: cciptypes.CommitStoreInterval{Min: 3, Max: 4}, - MerkleRoot: root2, - GasPrices: []cciptypes.GasPrice{ - {DestChainSelector: 321, Value: big.NewInt(888)}, - }, - }, epochAndRound)) - inflightGasUpdates = c.latestInflightGasPriceUpdates() - assert.Equal(t, 2, len(inflightGasUpdates)) - assert.Equal(t, big.NewInt(999), inflightGasUpdates[123].value) - assert.Equal(t, big.NewInt(888), inflightGasUpdates[321].value) - assert.Equal(t, uint64(4), c.maxInflightSeqNr()) - epochAndRound++ - - // Add gas price updates - require.NoError(t, c.add(lggr, cciptypes.CommitStoreReport{ - GasPrices: []cciptypes.GasPrice{ - { - DestChainSelector: uint64(1), - Value: big.NewInt(1), - }, - }}, epochAndRound)) - - inflightGasUpdates = c.latestInflightGasPriceUpdates() - assert.Equal(t, 3, len(inflightGasUpdates)) - assert.Equal(t, big.NewInt(999), inflightGasUpdates[123].value) - assert.Equal(t, big.NewInt(888), inflightGasUpdates[321].value) - assert.Equal(t, big.NewInt(1), inflightGasUpdates[1].value) - assert.Equal(t, uint64(4), c.maxInflightSeqNr()) - epochAndRound++ - - // Add a token price update - token := common.HexToAddress("0xa") - require.NoError(t, c.add(lggr, cciptypes.CommitStoreReport{ - TokenPrices: []cciptypes.TokenPrice{ - { - Token: ccipcalc.EvmAddrToGeneric(token), - Value: big.NewInt(10), - }, - }, - GasPrices: []cciptypes.GasPrice{}, - }, epochAndRound)) - // Apply cache price to existing - latestInflightTokenPriceUpdates := c.latestInflightTokenPriceUpdates() - require.Equal(t, len(latestInflightTokenPriceUpdates), 1) - assert.Equal(t, big.NewInt(10), latestInflightTokenPriceUpdates[ccipcalc.EvmAddrToGeneric(token)].value) - - // larger epoch and round overrides existing price update - c.inFlightPriceUpdates = append(c.inFlightPriceUpdates, InflightPriceUpdate{ - tokenPrices: []cciptypes.TokenPrice{ - {Token: ccipcalc.EvmAddrToGeneric(token), Value: big.NewInt(9999)}, - }, - gasPrices: []cciptypes.GasPrice{ - { - DestChainSelector: uint64(1), - Value: big.NewInt(999), - }, - }, - createdAt: time.Now(), - epochAndRound: ccipcalc.MergeEpochAndRound(999, 99), - }) - latestInflightTokenPriceUpdates = c.latestInflightTokenPriceUpdates() - require.Equal(t, len(latestInflightTokenPriceUpdates), 1) - assert.Equal(t, big.NewInt(9999), latestInflightTokenPriceUpdates[ccipcalc.EvmAddrToGeneric(token)].value) - inflightGasUpdates = c.latestInflightGasPriceUpdates() - assert.Equal(t, 3, len(inflightGasUpdates)) - assert.Equal(t, big.NewInt(999), inflightGasUpdates[123].value) - assert.Equal(t, big.NewInt(888), inflightGasUpdates[321].value) - assert.Equal(t, big.NewInt(999), inflightGasUpdates[1].value) -} - -func Test_inflightCommitReportsContainer_expire(t *testing.T) { - c := &inflightCommitReportsContainer{ - cacheExpiry: time.Minute, - inFlight: map[[32]byte]InflightCommitReport{ - common.HexToHash("1"): { - report: cciptypes.CommitStoreReport{}, - createdAt: time.Now().Add(-5 * time.Minute), - }, - common.HexToHash("2"): { - report: cciptypes.CommitStoreReport{}, - createdAt: time.Now().Add(-10 * time.Second), - }, - }, - inFlightPriceUpdates: []InflightPriceUpdate{ - { - gasPrices: []cciptypes.GasPrice{{DestChainSelector: 100, Value: big.NewInt(0)}}, - createdAt: time.Now().Add(-PRICE_EXPIRY_MULTIPLIER * time.Minute), - epochAndRound: ccipcalc.MergeEpochAndRound(10, 5), - }, - { - gasPrices: []cciptypes.GasPrice{{DestChainSelector: 200, Value: big.NewInt(0)}}, - createdAt: time.Now().Add(-PRICE_EXPIRY_MULTIPLIER * time.Second), - epochAndRound: ccipcalc.MergeEpochAndRound(20, 5), - }, - }, - } - c.expire(logger.NullLogger) - - assert.Len(t, c.inFlight, 1) - _, exists := c.inFlight[common.HexToHash("2")] - assert.True(t, exists) - - assert.Len(t, c.inFlightPriceUpdates, 1) - assert.Equal(t, ccipcalc.MergeEpochAndRound(20, 5), c.inFlightPriceUpdates[0].epochAndRound) -} diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go index 4fcf2cb564..f8feea20e7 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go @@ -14,8 +14,6 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" @@ -87,7 +85,6 @@ type CommitReportingPlugin struct { priceGetter pricegetter.PriceGetter metricsCollector ccip.PluginMetricsCollector // State - inflightReports *inflightCommitReportsContainer chainHealthcheck cache.ChainHealthcheck } @@ -107,7 +104,6 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t } else if !healthy { return nil, ccip.ErrChainIsNotHealthy } - r.inflightReports.expire(lggr) // Will return 0,0 if no messages are found. This is a valid case as the report could // still contain fee updates. @@ -150,17 +146,17 @@ func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound t } func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Context, lggr logger.Logger) (uint64, uint64, []cciptypes.Hash, error) { - nextInflightMin, _, err := r.nextMinSeqNum(ctx, lggr) + nextSeqNum, err := r.commitStoreReader.GetExpectedNextSequenceNumber(ctx) if err != nil { return 0, 0, []cciptypes.Hash{}, err } - msgRequests, err := r.onRampReader.GetSendRequestsBetweenSeqNums(ctx, nextInflightMin, nextInflightMin+OnRampMessagesScanLimit, true) + msgRequests, err := r.onRampReader.GetSendRequestsBetweenSeqNums(ctx, nextSeqNum, nextSeqNum+OnRampMessagesScanLimit, true) if err != nil { return 0, 0, []cciptypes.Hash{}, err } if len(msgRequests) == 0 { - lggr.Infow("No new requests", "minSeqNr", nextInflightMin) + lggr.Infow("No new requests", "nextSeqNum", nextSeqNum) return 0, 0, []cciptypes.Hash{}, nil } @@ -173,10 +169,10 @@ func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Conte minSeqNr := seqNrs[0] maxSeqNr := seqNrs[len(seqNrs)-1] - if minSeqNr != nextInflightMin { + if minSeqNr != nextSeqNum { // Still report the observation as even partial reports have value e.g. all nodes are // missing a single, different log each, they would still be able to produce a valid report. - lggr.Warnf("Missing sequence number range [%d-%d]", nextInflightMin, minSeqNr) + lggr.Warnf("Missing sequence number range [%d-%d]", nextSeqNum, minSeqNr) } if !ccipcalc.ContiguousReqs(lggr, minSeqNr, maxSeqNr, seqNrs) { return 0, 0, []cciptypes.Hash{}, errors.New("unexpected gap in seq nums") @@ -184,32 +180,6 @@ func (r *CommitReportingPlugin) calculateMinMaxSequenceNumbers(ctx context.Conte return minSeqNr, maxSeqNr, messageIDs, nil } -func (r *CommitReportingPlugin) nextMinSeqNum(ctx context.Context, lggr logger.Logger) (inflightMin, onChainMin uint64, err error) { - nextMinOnChain, err := r.commitStoreReader.GetExpectedNextSequenceNumber(ctx) - if err != nil { - return 0, 0, err - } - // There are several scenarios to consider here for nextMin and inflight intervals. - // 1. nextMin=2, inflight=[[2,3],[4,5]]. Node is waiting for [2,3] and [4,5] to be included, should return 6 to build on top. - // 2. nextMin=2, inflight=[[4,5]]. [2,3] is expired but not yet visible onchain (means our cache expiry - // was too low). In this case still want to return 6. - // 3. nextMin=2, inflight=[] but other nodes have inflight=[2,3]. Say our node restarted and lost its cache. In this case - // we still return the chain's nextMin, other oracles will ignore our observation. Other nodes however will build [4,5] - // and then we'll add that to our cache in ShouldAcceptFinalizedReport, putting us into the previous position at which point - // we can start contributing again. - // 4. nextMin=4, inflight=[[2,3],[4,5]]. We see the onchain update, but haven't expired from our cache yet. Should happen - // regularly and we just return 6. - // 5. nextMin=2, inflight=[[4,5]]. [2,3] failed to get onchain for some reason. We'll return 6 and continue building even though - // subsequent reports will revert, but eventually they will all expire OR we'll hit MaxInflightSeqNumGap and forcibly - // expire them all. This scenario can also occur if there is a reorg which reorders the reports such that one reverts. - maxInflight := r.inflightReports.maxInflightSeqNr() - if (maxInflight > nextMinOnChain) && ((maxInflight - nextMinOnChain) > MaxInflightSeqNumGap) { - r.inflightReports.reset(lggr) - return nextMinOnChain, nextMinOnChain, nil - } - return mathutil.Max(nextMinOnChain, maxInflight+1), nextMinOnChain, nil -} - // All prices are USD ($1=1e18) denominated. All prices must be not nil. // Return token prices should contain the exact same tokens as in tokenDecimals. func (r *CommitReportingPlugin) generatePriceUpdates( @@ -277,7 +247,7 @@ func calculateUsdPer1e18TokenAmount(price *big.Int, decimals uint8) *big.Int { // Gets the latest token price updates based on logs within the heartbeat // The updates returned by this function are guaranteed to not contain nil values. -func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time, checkInflight bool) (map[cciptypes.Address]update, error) { +func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, now time.Time) (map[cciptypes.Address]update, error) { tokenPriceUpdates, err := r.destPriceRegistryReader.GetTokenPriceUpdatesCreatedAfter( ctx, now.Add(-r.offchainConfig.TokenPriceHeartBeat), @@ -299,38 +269,13 @@ func (r *CommitReportingPlugin) getLatestTokenPriceUpdates(ctx context.Context, } } } - if !checkInflight { - return latestUpdates, nil - } - - // todo this comparison is faulty, as a previously-sent update's onchain timestamp can be higher than inflight timestamp - // to properly fix, need a solution to map from onchain request to offchain timestamp - // leaving it as is, as token prices are updated infrequently, so this should not cause many issues - latestInflightTokenPriceUpdates := r.inflightReports.latestInflightTokenPriceUpdates() - for inflightToken, latestInflightUpdate := range latestInflightTokenPriceUpdates { - if latestInflightUpdate.timestamp.After(latestUpdates[inflightToken].timestamp) && latestInflightUpdate.value != nil { - latestUpdates[inflightToken] = latestInflightUpdate - } - } return latestUpdates, nil } // getLatestGasPriceUpdate returns the latest gas price update based on logs within the heartbeat. // If an update is found, it is not expected to contain a nil value. If no updates found, empty update with nil value is returned. -func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time, checkInflight bool) (gasUpdate update, error error) { - if checkInflight { - latestInflightGasPriceUpdates := r.inflightReports.latestInflightGasPriceUpdates() - if inflightUpdate, exists := latestInflightGasPriceUpdates[r.sourceChainSelector]; exists { - gasUpdate = inflightUpdate - r.lggr.Infow("Latest gas price from inflight", "gasPriceUpdateVal", gasUpdate.value, "gasPriceUpdateTs", gasUpdate.timestamp) - - // Gas price can fluctuate frequently, many updates may be in flight. - // If there is gas price update inflight, use it as source of truth, no need to check onchain. - return gasUpdate, nil - } - } - +func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now time.Time) (gasUpdate update, error error) { // If there are no price updates inflight, check latest prices onchain gasPriceUpdates, err := r.destPriceRegistryReader.GetGasPriceUpdatesCreatedAfter( ctx, @@ -390,12 +335,12 @@ func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types. return false, nil, err } - latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, now, true) + latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, now) if err != nil { return false, nil, err } - latestTokenPrices, err := r.getLatestTokenPriceUpdates(ctx, now, true) + latestTokenPrices, err := r.getLatestTokenPriceUpdates(ctx, now) if err != nil { return false, nil, err } @@ -681,15 +626,11 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context, return false, ccip.ErrChainIsNotHealthy } - if r.isStaleReport(ctx, lggr, parsedReport, true, reportTimestamp) { + if r.isStaleReport(ctx, lggr, parsedReport, reportTimestamp) { lggr.Infow("Rejecting stale report") return false, nil } - epochAndRound := ccipcalc.MergeEpochAndRound(reportTimestamp.Epoch, reportTimestamp.Round) - if err := r.inflightReports.add(lggr, parsedReport, epochAndRound); err != nil { - return false, err - } r.metricsCollector.SequenceNumber(parsedReport.Interval.Max) lggr.Infow("Accepting finalized report", "merkleRoot", hexutil.Encode(parsedReport.MerkleRoot[:])) return true, nil @@ -710,7 +651,7 @@ func (r *CommitReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context // If report is not stale we transmit. // When the commitTransmitter enqueues the tx for tx manager, // we mark it as fulfilled, effectively removing it from the set of inflight messages. - shouldTransmit := !r.isStaleReport(ctx, lggr, parsedReport, false, reportTimestamp) + shouldTransmit := !r.isStaleReport(ctx, lggr, parsedReport, reportTimestamp) lggr.Infow("ShouldTransmitAcceptedReport", "shouldTransmit", shouldTransmit, @@ -731,10 +672,10 @@ func (r *CommitReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context // If there is no merkle root but there is a gas update, only this gas update is used for staleness checks. // If only price updates are included, the price updates are used to check for staleness // If nothing is included the report is always considered stale. -func (r *CommitReportingPlugin) isStaleReport(ctx context.Context, lggr logger.Logger, report cciptypes.CommitStoreReport, checkInflight bool, reportTimestamp types.ReportTimestamp) bool { +func (r *CommitReportingPlugin) isStaleReport(ctx context.Context, lggr logger.Logger, report cciptypes.CommitStoreReport, reportTimestamp types.ReportTimestamp) bool { // If there is a merkle root, ignore all other staleness checks and only check for sequence number staleness if report.MerkleRoot != [32]byte{} { - return r.isStaleMerkleRoot(ctx, lggr, report.Interval, checkInflight) + return r.isStaleMerkleRoot(ctx, lggr, report.Interval) } hasGasPriceUpdate := len(report.GasPrices) > 0 @@ -752,8 +693,8 @@ func (r *CommitReportingPlugin) isStaleReport(ctx context.Context, lggr logger.L } // We consider a price update as stale when, there isn't an update or there is an update that is stale. - gasPriceStale := !hasGasPriceUpdate || r.isStaleGasPrice(ctx, lggr, report.GasPrices[0], checkInflight) - tokenPricesStale := !hasTokenPriceUpdates || r.isStaleTokenPrices(ctx, lggr, report.TokenPrices, checkInflight) + gasPriceStale := !hasGasPriceUpdate || r.isStaleGasPrice(ctx, lggr, report.GasPrices[0]) + tokenPricesStale := !hasTokenPriceUpdates || r.isStaleTokenPrices(ctx, lggr, report.TokenPrices) if gasPriceStale && tokenPricesStale { return true @@ -770,26 +711,16 @@ func (r *CommitReportingPlugin) isStaleReport(ctx context.Context, lggr logger.L return lastPriceEpochAndRound >= thisEpochAndRound } -func (r *CommitReportingPlugin) isStaleMerkleRoot(ctx context.Context, lggr logger.Logger, reportInterval cciptypes.CommitStoreInterval, checkInflight bool) bool { - nextInflightMin, nextOnChainMin, err := r.nextMinSeqNum(ctx, lggr) +func (r *CommitReportingPlugin) isStaleMerkleRoot(ctx context.Context, lggr logger.Logger, reportInterval cciptypes.CommitStoreInterval) bool { + nextSeqNum, err := r.commitStoreReader.GetExpectedNextSequenceNumber(ctx) if err != nil { // Assume it's a transient issue getting the last report and try again on the next round return true } - if checkInflight && nextInflightMin != reportInterval.Min { - // There are sequence numbers missing between the commitStoreReader/inflight txs and the proposed report. - // The report will fail onchain unless the inflight cache is in an incorrect state. A state like this - // could happen for various reasons, e.g. a reboot of the node emptying the caches, and should be self-healing. - // We do not submit a tx and wait for the protocol to self-heal by updating the caches or invalidating - // inflight caches over time. - lggr.Errorw("Next inflight min is not equal to the proposed min of the report", "nextInflightMin", nextInflightMin) - return true - } - - if !checkInflight && nextOnChainMin > reportInterval.Min { + if nextSeqNum > reportInterval.Min { // If the next min is already greater than this reports min, this report is stale. - lggr.Infow("Report is stale because of root", "onchain min", nextOnChainMin, "report min", reportInterval.Min) + lggr.Infow("Report is stale because of root", "onchain min", nextSeqNum, "report min", reportInterval.Min) return true } @@ -797,8 +728,8 @@ func (r *CommitReportingPlugin) isStaleMerkleRoot(ctx context.Context, lggr logg return false } -func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger.Logger, gasPrice cciptypes.GasPrice, checkInflight bool) bool { - latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, time.Now(), checkInflight) +func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger.Logger, gasPrice cciptypes.GasPrice) bool { + latestGasPrice, err := r.getLatestGasPriceUpdate(ctx, time.Now()) if err != nil { lggr.Errorw("Report is stale because getLatestGasPriceUpdate failed", "err", err) return true @@ -823,10 +754,10 @@ func (r *CommitReportingPlugin) isStaleGasPrice(ctx context.Context, lggr logger return false } -func (r *CommitReportingPlugin) isStaleTokenPrices(ctx context.Context, lggr logger.Logger, priceUpdates []cciptypes.TokenPrice, checkInflight bool) bool { +func (r *CommitReportingPlugin) isStaleTokenPrices(ctx context.Context, lggr logger.Logger, priceUpdates []cciptypes.TokenPrice) bool { // getting the last price updates without including inflight is like querying // current prices onchain, but uses logpoller's data to save on the RPC requests - latestTokenPriceUpdates, err := r.getLatestTokenPriceUpdates(ctx, time.Now(), checkInflight) + latestTokenPriceUpdates, err := r.getLatestTokenPriceUpdates(ctx, time.Now()) if err != nil { return true } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go index ed87e5b883..2e37738f59 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/Masterminds/semver/v3" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" @@ -159,7 +158,6 @@ func TestCommitReportingPlugin_Observation(t *testing.T) { p := &CommitReportingPlugin{} p.lggr = logger.TestLogger(t) - p.inflightReports = newInflightCommitReportsContainer(time.Hour) p.commitStoreReader = commitStoreReader p.onRampReader = onRampReader p.offRampReader = offRampReader @@ -338,7 +336,6 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p := &CommitReportingPlugin{} p.lggr = logger.TestLogger(t) - p.inflightReports = newInflightCommitReportsContainer(time.Minute) p.destPriceRegistryReader = destPriceRegistryReader p.onRampReader = onRampReader p.sourceChainSelector = sourceChainSelector @@ -381,7 +378,6 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { newPlugin := func() *CommitReportingPlugin { p := &CommitReportingPlugin{} p.lggr = logger.TestLogger(t) - p.inflightReports = newInflightCommitReportsContainer(time.Minute) p.metricsCollector = ccip.NoopMetricsCollector return p } @@ -423,8 +419,6 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { t.Run("stale report should not be accepted", func(t *testing.T) { onChainSeqNum := uint64(100) - //_, _ := testhelpers.NewFakeCommitStore(t, onChainSeqNum) - commitStoreReader := ccipdatamocks.NewCommitStoreReader(t) p := newPlugin() @@ -452,7 +446,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { assert.False(t, shouldAccept) }) - t.Run("non-stale report should be accepted and added inflight", func(t *testing.T) { + t.Run("non-stale report should be accepted", func(t *testing.T) { onChainSeqNum := uint64(100) p := newPlugin() @@ -498,11 +492,6 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { shouldAccept, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport) assert.NoError(t, err) assert.True(t, shouldAccept) - - // make sure that the report was added inflight - tokenPriceUpdates := p.inflightReports.latestInflightTokenPriceUpdates() - priceUpdate := tokenPriceUpdates[report.TokenPrices[0].Token] - assert.Equal(t, report.TokenPrices[0].Value.Uint64(), priceUpdate.value.Uint64()) }) } @@ -527,7 +516,6 @@ func TestCommitReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) { onChainSeqNum := uint64(100) commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(onChainSeqNum, nil) p.commitStoreReader = commitStoreReader - p.inflightReports = newInflightCommitReportsContainer(time.Minute) p.lggr = logger.TestLogger(t) chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t) @@ -1207,74 +1195,15 @@ func TestCommitReportingPlugin_generatePriceUpdates(t *testing.T) { } } -func TestCommitReportingPlugin_nextMinSeqNum(t *testing.T) { - lggr := logger.TestLogger(t) - root1 := utils.Keccak256Fixed(hexutil.MustDecode("0xaa")) - - var tt = []struct { - onChainMin uint64 - inflight []cciptypes.CommitStoreReport - expectedOnChainMin uint64 - expectedInflightMin uint64 - }{ - { - onChainMin: uint64(1), - inflight: nil, - expectedInflightMin: uint64(1), - expectedOnChainMin: uint64(1), - }, - { - onChainMin: uint64(1), - inflight: []cciptypes.CommitStoreReport{ - {Interval: cciptypes.CommitStoreInterval{Min: uint64(1), Max: uint64(2)}, MerkleRoot: root1}}, - expectedInflightMin: uint64(3), - expectedOnChainMin: uint64(1), - }, - { - onChainMin: uint64(1), - inflight: []cciptypes.CommitStoreReport{ - {Interval: cciptypes.CommitStoreInterval{Min: uint64(3), Max: uint64(4)}, MerkleRoot: root1}}, - expectedInflightMin: uint64(5), - expectedOnChainMin: uint64(1), - }, - { - onChainMin: uint64(1), - inflight: []cciptypes.CommitStoreReport{ - {Interval: cciptypes.CommitStoreInterval{Min: uint64(1), Max: uint64(MaxInflightSeqNumGap + 2)}, MerkleRoot: root1}}, - expectedInflightMin: uint64(1), - expectedOnChainMin: uint64(1), - }, - } - for _, tc := range tt { - commitStoreReader := ccipdatamocks.NewCommitStoreReader(t) - commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(tc.onChainMin, nil).Maybe() - cp := CommitReportingPlugin{commitStoreReader: commitStoreReader, inflightReports: newInflightCommitReportsContainer(time.Hour)} - epochAndRound := uint64(1) - for _, rep := range tc.inflight { - rc := rep - rc.GasPrices = []cciptypes.GasPrice{{}} - require.NoError(t, cp.inflightReports.add(lggr, rc, epochAndRound)) - epochAndRound++ - } - t.Log("inflight", cp.inflightReports.maxInflightSeqNr()) - inflightMin, onchainMin, err := cp.nextMinSeqNum(context.Background(), lggr) - require.NoError(t, err) - assert.Equal(t, tc.expectedInflightMin, inflightMin) - assert.Equal(t, tc.expectedOnChainMin, onchainMin) - cp.inflightReports.reset(lggr) - } -} - func TestCommitReportingPlugin_isStaleReport(t *testing.T) { ctx := context.Background() lggr := logger.TestLogger(t) merkleRoot1 := utils.Keccak256Fixed([]byte("some merkle root 1")) - merkleRoot2 := utils.Keccak256Fixed([]byte("some merkle root 2")) t.Run("empty report", func(t *testing.T) { commitStoreReader := ccipdatamocks.NewCommitStoreReader(t) r := &CommitReportingPlugin{commitStoreReader: commitStoreReader} - isStale := r.isStaleReport(ctx, lggr, cciptypes.CommitStoreReport{}, false, types.ReportTimestamp{}) + isStale := r.isStaleReport(ctx, lggr, cciptypes.CommitStoreReport{}, types.ReportTimestamp{}) assert.True(t, isStale) }) @@ -1285,29 +1214,15 @@ func TestCommitReportingPlugin_isStaleReport(t *testing.T) { r := &CommitReportingPlugin{ commitStoreReader: commitStoreReader, - inflightReports: &inflightCommitReportsContainer{ - inFlight: map[[32]byte]InflightCommitReport{ - merkleRoot2: { - report: cciptypes.CommitStoreReport{ - Interval: cciptypes.CommitStoreInterval{Min: expNextSeqNum + 1, Max: expNextSeqNum + 10}, - }, - }, - }, - }, } assert.False(t, r.isStaleReport(ctx, lggr, cciptypes.CommitStoreReport{ MerkleRoot: merkleRoot1, Interval: cciptypes.CommitStoreInterval{Min: expNextSeqNum + 1, Max: expNextSeqNum + 10}, - }, false, types.ReportTimestamp{})) - - assert.True(t, r.isStaleReport(ctx, lggr, cciptypes.CommitStoreReport{ - MerkleRoot: merkleRoot1, - Interval: cciptypes.CommitStoreInterval{Min: expNextSeqNum + 1, Max: expNextSeqNum + 10}, - }, true, types.ReportTimestamp{})) + }, types.ReportTimestamp{})) assert.True(t, r.isStaleReport(ctx, lggr, cciptypes.CommitStoreReport{ - MerkleRoot: merkleRoot1}, false, types.ReportTimestamp{})) + MerkleRoot: merkleRoot1}, types.ReportTimestamp{})) }) } @@ -1315,7 +1230,6 @@ func TestCommitReportingPlugin_calculateMinMaxSequenceNumbers(t *testing.T) { testCases := []struct { name string commitStoreSeqNum uint64 - inflightSeqNum uint64 msgSeqNums []uint64 expQueryMin uint64 // starting seq num that is used in the query to get messages @@ -1324,17 +1238,16 @@ func TestCommitReportingPlugin_calculateMinMaxSequenceNumbers(t *testing.T) { expErr bool }{ { - name: "happy flow inflight", + name: "happy flow", commitStoreSeqNum: 9, - inflightSeqNum: 10, msgSeqNums: []uint64{11, 12, 13, 14}, - expQueryMin: 11, // inflight+1 + expQueryMin: 9, expMin: 11, expMax: 14, expErr: false, }, { - name: "happy flow no inflight", + name: "happy flow 2", commitStoreSeqNum: 9, msgSeqNums: []uint64{11, 12, 13, 14}, expQueryMin: 9, // from commit store @@ -1345,7 +1258,6 @@ func TestCommitReportingPlugin_calculateMinMaxSequenceNumbers(t *testing.T) { { name: "gap in msg seq nums", commitStoreSeqNum: 10, - inflightSeqNum: 9, expQueryMin: 10, msgSeqNums: []uint64{11, 12, 14}, expErr: true, @@ -1378,18 +1290,6 @@ func TestCommitReportingPlugin_calculateMinMaxSequenceNumbers(t *testing.T) { commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(tc.commitStoreSeqNum, nil) p.commitStoreReader = commitStoreReader - p.inflightReports = newInflightCommitReportsContainer(time.Minute) - if tc.inflightSeqNum > 0 { - p.inflightReports.inFlight[[32]byte{}] = InflightCommitReport{ - report: cciptypes.CommitStoreReport{ - Interval: cciptypes.CommitStoreInterval{ - Min: tc.inflightSeqNum, - Max: tc.inflightSeqNum, - }, - }, - } - } - onRampReader := ccipdatamocks.NewOnRampReader(t) var sendReqs []cciptypes.EVM2EVMMessageWithTxMeta for _, seqNum := range tc.msgSeqNums { @@ -1419,35 +1319,21 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { chainSelector := uint64(1234) testCases := []struct { - name string - checkInflight bool - inflightGasPriceUpdate *update - destGasPriceUpdates []update - expUpdate update - expErr bool + name string + destGasPriceUpdates []update + expUpdate update + expErr bool }{ { - name: "only inflight gas price", - checkInflight: true, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, - expUpdate: update{timestamp: now, value: big.NewInt(1000)}, - expErr: false, - }, - { - name: "inflight price is nil", - checkInflight: true, - inflightGasPriceUpdate: nil, + name: "happy path", destGasPriceUpdates: []update{ - {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, - {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + {timestamp: now, value: big.NewInt(1000)}, }, - expUpdate: update{timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, + expUpdate: update{timestamp: now, value: big.NewInt(1000)}, expErr: false, }, { - name: "inflight updates are skipped", - checkInflight: false, - inflightGasPriceUpdate: &update{timestamp: now, value: big.NewInt(1000)}, + name: "happy path two updates", destGasPriceUpdates: []update{ {timestamp: now.Add(time.Minute), value: big.NewInt(2000)}, {timestamp: now.Add(2 * time.Minute), value: big.NewInt(3000)}, @@ -1463,24 +1349,10 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &CommitReportingPlugin{} p.sourceChainSelector = chainSelector - p.inflightReports = newInflightCommitReportsContainer(time.Minute) p.lggr = lggr destPriceRegistry := ccipdatamocks.NewPriceRegistryReader(t) p.destPriceRegistryReader = destPriceRegistry - if tc.inflightGasPriceUpdate != nil { - p.inflightReports.inFlightPriceUpdates = append( - p.inflightReports.inFlightPriceUpdates, - InflightPriceUpdate{ - createdAt: tc.inflightGasPriceUpdate.timestamp, - gasPrices: []cciptypes.GasPrice{{ - DestChainSelector: chainSelector, - Value: tc.inflightGasPriceUpdate.value, - }}, - }, - ) - } - if len(tc.destGasPriceUpdates) > 0 { var events []cciptypes.GasPriceUpdateWithTxMeta for _, u := range tc.destGasPriceUpdates { @@ -1496,7 +1368,7 @@ func TestCommitReportingPlugin_getLatestGasPriceUpdate(t *testing.T) { p.destPriceRegistryReader = destReader } - priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now(), tc.checkInflight) + priceUpdate, err := p.getLatestGasPriceUpdate(ctx, time.Now()) if tc.expErr { assert.Error(t, err) return @@ -1517,13 +1389,11 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { testCases := []struct { name string priceRegistryUpdates []cciptypes.TokenPriceUpdate - checkInflight bool - inflightUpdates map[cciptypes.Address]update expUpdates map[cciptypes.Address]update expErr bool }{ { - name: "ignore inflight updates", + name: "happy path", priceRegistryUpdates: []cciptypes.TokenPriceUpdate{ { TokenPrice: cciptypes.TokenPrice{ @@ -1540,42 +1410,12 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { TimestampUnixSec: big.NewInt(now.Add(2 * time.Minute).Unix()), }, }, - checkInflight: false, expUpdates: map[cciptypes.Address]update{ tk1: {timestamp: now.Add(1 * time.Minute), value: big.NewInt(1000)}, tk2: {timestamp: now.Add(2 * time.Minute), value: big.NewInt(2000)}, }, expErr: false, }, - { - name: "consider inflight updates", - priceRegistryUpdates: []cciptypes.TokenPriceUpdate{ - { - TokenPrice: cciptypes.TokenPrice{ - Token: tk1, - Value: big.NewInt(1000), - }, - TimestampUnixSec: big.NewInt(now.Add(1 * time.Minute).Unix()), - }, - { - TokenPrice: cciptypes.TokenPrice{ - Token: tk2, - Value: big.NewInt(2000), - }, - TimestampUnixSec: big.NewInt(now.Add(2 * time.Minute).Unix()), - }, - }, - checkInflight: true, - inflightUpdates: map[cciptypes.Address]update{ - tk1: {timestamp: now, value: big.NewInt(500)}, // inflight but older - tk2: {timestamp: now.Add(4 * time.Minute), value: big.NewInt(4000)}, - }, - expUpdates: map[cciptypes.Address]update{ - tk1: {timestamp: now.Add(1 * time.Minute), value: big.NewInt(1000)}, - tk2: {timestamp: now.Add(4 * time.Minute), value: big.NewInt(4000)}, - }, - expErr: false, - }, } ctx := testutils.Context(t) @@ -1583,33 +1423,19 @@ func TestCommitReportingPlugin_getLatestTokenPriceUpdates(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &CommitReportingPlugin{} - //_, priceRegAddr := testhelpers.NewFakePriceRegistry(t) priceReg := ccipdatamocks.NewPriceRegistryReader(t) p.destPriceRegistryReader = priceReg - //destReader := ccipdata.NewMockReader(t) var events []cciptypes.TokenPriceUpdateWithTxMeta for _, up := range tc.priceRegistryUpdates { events = append(events, cciptypes.TokenPriceUpdateWithTxMeta{ TokenPriceUpdate: up, }) } - //destReader.On("GetTokenPriceUpdatesCreatedAfter", ctx, priceRegAddr, mock.Anything, 0).Return(events, nil) - priceReg.On("GetTokenPriceUpdatesCreatedAfter", ctx, mock.Anything, 0).Return(events, nil) - p.inflightReports = newInflightCommitReportsContainer(time.Minute) - if len(tc.inflightUpdates) > 0 { - for tk, upd := range tc.inflightUpdates { - p.inflightReports.inFlightPriceUpdates = append(p.inflightReports.inFlightPriceUpdates, InflightPriceUpdate{ - createdAt: upd.timestamp, - tokenPrices: []cciptypes.TokenPrice{ - {Token: tk, Value: upd.value}, - }, - }) - } - } + priceReg.On("GetTokenPriceUpdatesCreatedAfter", ctx, mock.Anything, 0).Return(events, nil) - updates, err := p.getLatestTokenPriceUpdates(ctx, now, tc.checkInflight) + updates, err := p.getLatestTokenPriceUpdates(ctx, now) if tc.expErr { assert.Error(t, err) return diff --git a/core/services/ocr2/plugins/ccip/ccipexec/helpers.go b/core/services/ocr2/plugins/ccip/ccipexec/helpers.go new file mode 100644 index 0000000000..baaecc08d0 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/ccipexec/helpers.go @@ -0,0 +1,54 @@ +package ccipexec + +import ( + "github.com/pkg/errors" + + mapset "github.com/deckarep/golang-set/v2" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" +) + +// helper struct to hold the commitReport and the related send requests +type commitReportWithSendRequests struct { + commitReport cciptypes.CommitStoreReport + sendRequestsWithMeta []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta +} + +func (r *commitReportWithSendRequests) validate() error { + // make sure that number of messages is the expected + if exp := int(r.commitReport.Interval.Max - r.commitReport.Interval.Min + 1); len(r.sendRequestsWithMeta) != exp { + return errors.Errorf( + "unexpected missing sendRequestsWithMeta in committed root %x have %d want %d", r.commitReport.MerkleRoot, len(r.sendRequestsWithMeta), exp) + } + + return nil +} + +// uniqueSenders returns slice of unique senders based on the send requests. Order is preserved based on the order of the send requests (by sequence number). +func (r *commitReportWithSendRequests) uniqueSenders() []cciptypes.Address { + orderedUniqueSenders := make([]cciptypes.Address, 0, len(r.sendRequestsWithMeta)) + visitedSenders := mapset.NewSet[cciptypes.Address]() + + for _, req := range r.sendRequestsWithMeta { + if !visitedSenders.Contains(req.Sender) { + orderedUniqueSenders = append(orderedUniqueSenders, req.Sender) + visitedSenders.Add(req.Sender) + } + } + return orderedUniqueSenders +} + +func (r *commitReportWithSendRequests) allRequestsAreExecutedAndFinalized() bool { + for _, req := range r.sendRequestsWithMeta { + if !req.Executed || !req.Finalized { + return false + } + } + return true +} + +// checks if the send request fits the commit report interval +func (r *commitReportWithSendRequests) sendReqFits(sendReq cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta) bool { + return sendReq.SequenceNumber >= r.commitReport.Interval.Min && + sendReq.SequenceNumber <= r.commitReport.Interval.Max +} diff --git a/core/services/ocr2/plugins/ccip/ccipexec/helpers_test.go b/core/services/ocr2/plugins/ccip/ccipexec/helpers_test.go new file mode 100644 index 0000000000..3ba2531073 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/ccipexec/helpers_test.go @@ -0,0 +1,96 @@ +package ccipexec + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" +) + +func Test_CommitReportWithSendRequests_uniqueSenders(t *testing.T) { + messageFn := func(address cciptypes.Address) cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta { + return cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{EVM2EVMMessage: cciptypes.EVM2EVMMessage{Sender: address}} + } + + tests := []struct { + name string + sendRequests []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta + expUniqueSenders int + expSendersOrder []cciptypes.Address + }{ + { + name: "all unique senders", + sendRequests: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + messageFn(cciptypes.Address(utils.RandomAddress().String())), + messageFn(cciptypes.Address(utils.RandomAddress().String())), + messageFn(cciptypes.Address(utils.RandomAddress().String())), + }, + expUniqueSenders: 3, + }, + { + name: "some senders are the same", + sendRequests: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + messageFn("0x1"), + messageFn("0x2"), + messageFn("0x1"), + messageFn("0x2"), + messageFn("0x3"), + }, + expUniqueSenders: 3, + expSendersOrder: []cciptypes.Address{ + cciptypes.Address("0x1"), + cciptypes.Address("0x2"), + cciptypes.Address("0x3"), + }, + }, + { + name: "all senders are the same", + sendRequests: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + messageFn("0x1"), + messageFn("0x1"), + messageFn("0x1"), + }, + expUniqueSenders: 1, + expSendersOrder: []cciptypes.Address{ + cciptypes.Address("0x1"), + }, + }, + { + name: "order is preserved", + sendRequests: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{ + messageFn("0x3"), + messageFn("0x1"), + messageFn("0x3"), + messageFn("0x2"), + messageFn("0x2"), + messageFn("0x1"), + }, + expUniqueSenders: 3, + expSendersOrder: []cciptypes.Address{ + cciptypes.Address("0x3"), + cciptypes.Address("0x1"), + cciptypes.Address("0x2"), + }, + }, + { + name: "no senders", + sendRequests: []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta{}, + expUniqueSenders: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rep := commitReportWithSendRequests{sendRequestsWithMeta: tt.sendRequests} + uniqueSenders := rep.uniqueSenders() + + assert.Len(t, uniqueSenders, tt.expUniqueSenders) + if tt.expSendersOrder != nil { + assert.Equal(t, tt.expSendersOrder, uniqueSenders) + } + }) + } +} diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go index 0cb3ef44ab..998fe638a5 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go @@ -9,7 +9,6 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -343,11 +342,22 @@ func (r *ExecutionReportingPlugin) buildBatch( sourceToDestToken map[cciptypes.Address]cciptypes.Address, destTokenPoolRateLimits map[cciptypes.Address]*big.Int, ) (executableMessages []ccip.ObservedMessage) { - inflightSeqNrs, inflightAggregateValue, maxInflightSenderNonces, inflightTokenAmounts, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken) + inflightAggregateValue, inflightTokenAmounts, err := inflightAggregates(inflight, destTokenPricesUSD, sourceToDestToken) if err != nil { lggr.Errorw("Unexpected error computing inflight values", "err", err) return []ccip.ObservedMessage{} } + // We assume that next observation will start after previous epoch transmission so nonces should be already updated onchain. + // Worst case scenario we will try to process the same message again, and it will be skipped but protocol would progress anyway. + // We don't use inflightCache here to avoid cases in which inflight cache keeps progressing but due to transmission failures + // previous reports are not included onchain. That can lead to issues with IncorrectNonce skips, + // because we enforce sequential processing per sender (per sender's nonce ordering is enforced by Offramp contract) + sendersNonce, err := r.offRampReader.GetSendersNonce(ctx, report.uniqueSenders()) + if err != nil { + lggr.Errorw("fetching senders nonce", "err", err) + return []ccip.ObservedMessage{} + } + availableGas := uint64(r.offchainConfig.BatchGasLimit) expectedNonces := make(map[cciptypes.Address]uint64) availableDataLen := MaxDataLenPerBatch @@ -358,26 +368,16 @@ func (r *ExecutionReportingPlugin) buildBatch( msgLggr.Infow("Skipping message already executed", "seqNr", msg.SequenceNumber) continue } - if inflightSeqNrs.Contains(msg.SequenceNumber) { - msgLggr.Infow("Skipping message already inflight", "seqNr", msg.SequenceNumber) - continue - } + if _, ok := expectedNonces[msg.Sender]; !ok { - // First message in batch, need to populate expected nonce - if maxInflight, ok := maxInflightSenderNonces[msg.Sender]; ok { - // Sender already has inflight nonce, populate from there - expectedNonces[msg.Sender] = maxInflight + 1 - } else { - // Nothing inflight take from chain. - // Chain holds existing nonce. - nonce, err := r.offRampReader.GetSenderNonce(ctx, msg.Sender) - if err != nil { - lggr.Errorw("unable to get sender nonce", "err", err, "seqNr", msg.SequenceNumber) - continue - } - expectedNonces[msg.Sender] = nonce + 1 + nonce, ok1 := sendersNonce[msg.Sender] + if !ok1 { + lggr.Errorw("Skipping message nonce not found", "sender", msg.Sender) + continue } + expectedNonces[msg.Sender] = nonce + 1 } + // Check expected nonce is valid if msg.Nonce != expectedNonces[msg.Sender] { msgLggr.Warnw("Skipping message invalid nonce", "have", msg.Nonce, "want", expectedNonces[msg.Sender]) @@ -576,37 +576,6 @@ func calculateMessageMaxGas(gasLimit *big.Int, numRequests, dataLen, numTokens i return messageMaxGas, nil } -// helper struct to hold the commitReport and the related send requests -type commitReportWithSendRequests struct { - commitReport cciptypes.CommitStoreReport - sendRequestsWithMeta []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta -} - -func (r *commitReportWithSendRequests) validate() error { - // make sure that number of messages is the expected - if exp := int(r.commitReport.Interval.Max - r.commitReport.Interval.Min + 1); len(r.sendRequestsWithMeta) != exp { - return errors.Errorf( - "unexpected missing sendRequestsWithMeta in committed root %x have %d want %d", r.commitReport.MerkleRoot, len(r.sendRequestsWithMeta), exp) - } - - return nil -} - -func (r *commitReportWithSendRequests) allRequestsAreExecutedAndFinalized() bool { - for _, req := range r.sendRequestsWithMeta { - if !req.Executed || !req.Finalized { - return false - } - } - return true -} - -// checks if the send request fits the commit report interval -func (r *commitReportWithSendRequests) sendReqFits(sendReq cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta) bool { - return sendReq.SequenceNumber >= r.commitReport.Interval.Min && - sendReq.SequenceNumber <= r.commitReport.Interval.Max -} - // getReportsWithSendRequests returns the target reports with populated send requests. func (r *ExecutionReportingPlugin) getReportsWithSendRequests( ctx context.Context, @@ -941,24 +910,17 @@ func inflightAggregates( inflight []InflightInternalExecutionReport, destTokenPrices map[cciptypes.Address]*big.Int, sourceToDest map[cciptypes.Address]cciptypes.Address, -) (mapset.Set[uint64], *big.Int, map[cciptypes.Address]uint64, map[cciptypes.Address]*big.Int, error) { - inflightSeqNrs := mapset.NewSet[uint64]() +) (*big.Int, map[cciptypes.Address]*big.Int, error) { inflightAggregateValue := big.NewInt(0) - maxInflightSenderNonces := make(map[cciptypes.Address]uint64) inflightTokenAmounts := make(map[cciptypes.Address]*big.Int) for _, rep := range inflight { for _, message := range rep.messages { - inflightSeqNrs.Add(message.SequenceNumber) msgValue, err := aggregateTokenValue(destTokenPrices, sourceToDest, message.TokenAmounts) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, err } inflightAggregateValue.Add(inflightAggregateValue, msgValue) - maxInflightSenderNonce, ok := maxInflightSenderNonces[message.Sender] - if !ok || message.Nonce > maxInflightSenderNonce { - maxInflightSenderNonces[message.Sender] = message.Nonce - } for _, tk := range message.TokenAmounts { if rl, exists := inflightTokenAmounts[tk.Token]; exists { @@ -969,7 +931,7 @@ func inflightAggregates( } } } - return inflightSeqNrs, inflightAggregateValue, maxInflightSenderNonces, inflightTokenAmounts, nil + return inflightAggregateValue, inflightTokenAmounts, nil } // getTokensPrices returns token prices of the given price registry, diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go index 85b827f642..496771b4cc 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go @@ -172,7 +172,10 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { Return(executionEvents, nil).Maybe() mockOffRampReader.On("CurrentRateLimiterState", mock.Anything).Return(tc.rateLimiterState, nil).Maybe() mockOffRampReader.On("Address").Return(cciptypes.Address(offRamp.Address().String())).Maybe() - mockOffRampReader.On("GetSenderNonce", mock.Anything, mock.Anything).Return(offRamp.GetSenderNonce(nil, utils.RandomAddress())).Maybe() + senderNonces := map[cciptypes.Address]uint64{ + cciptypes.Address(utils.RandomAddress().String()): tc.senderNonce, + } + mockOffRampReader.On("GetSendersNonce", mock.Anything, mock.Anything).Return(senderNonces, nil).Maybe() mockOffRampReader.On("GetTokenPoolsRateLimits", ctx, []ccipdata.TokenPoolReader{}). Return([]cciptypes.TokenBucketRateLimit{}, nil).Maybe() @@ -682,7 +685,7 @@ func TestExecutionReportingPlugin_buildBatch(t *testing.T) { // Mock calls to reader. mockOffRampReader := ccipdatamocks.NewOffRampReader(t) - mockOffRampReader.On("GetSenderNonce", mock.Anything, sender1).Return(uint64(0), nil).Maybe() + mockOffRampReader.On("GetSendersNonce", mock.Anything, mock.Anything).Return(tc.offRampNoncesBySender, nil).Maybe() plugin := ExecutionReportingPlugin{ tokenDataWorker: tokendata.NewBackgroundWorker(ctx, map[cciptypes.Address]tokendata.Reader{}, 10, 5*time.Second, time.Hour), @@ -1513,7 +1516,7 @@ func Test_inflightAggregates(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - inflightSeqNrs, inflightAggrVal, maxInflightSenderNonces, inflightTokenAmounts, err := inflightAggregates( + inflightAggrVal, inflightTokenAmounts, err := inflightAggregates( tc.inflight, tc.destTokenPrices, tc.sourceToDest) if tc.expErr { @@ -1521,9 +1524,7 @@ func Test_inflightAggregates(t *testing.T) { return } assert.NoError(t, err) - assert.True(t, tc.expInflightSeqNrs.Equal(inflightSeqNrs)) assert.True(t, reflect.DeepEqual(tc.expInflightAggrVal, inflightAggrVal)) - assert.True(t, reflect.DeepEqual(tc.expMaxInflightSenderNonces, maxInflightSenderNonces)) assert.True(t, reflect.DeepEqual(tc.expInflightTokenAmounts, inflightTokenAmounts)) }) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/offramp_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/offramp_reader_mock.go index cf6e6bfbdc..d98181ffea 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/offramp_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/offramp_reader_mock.go @@ -260,6 +260,36 @@ func (_m *OffRampReader) GetSenderNonce(ctx context.Context, sender cciptypes.Ad return r0, r1 } +// GetSendersNonce provides a mock function with given fields: ctx, senders +func (_m *OffRampReader) GetSendersNonce(ctx context.Context, senders []cciptypes.Address) (map[cciptypes.Address]uint64, error) { + ret := _m.Called(ctx, senders) + + if len(ret) == 0 { + panic("no return value specified for GetSendersNonce") + } + + var r0 map[cciptypes.Address]uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []cciptypes.Address) (map[cciptypes.Address]uint64, error)); ok { + return rf(ctx, senders) + } + if rf, ok := ret.Get(0).(func(context.Context, []cciptypes.Address) map[cciptypes.Address]uint64); ok { + r0 = rf(ctx, senders) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[cciptypes.Address]uint64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []cciptypes.Address) error); ok { + r1 = rf(ctx, senders) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetSourceToDestTokensMapping provides a mock function with given fields: ctx func (_m *OffRampReader) GetSourceToDestTokensMapping(ctx context.Context) (map[cciptypes.Address]cciptypes.Address, error) { ret := _m.Called(ctx) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader.go index d44143ea02..26024222b2 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/offramp_reader.go @@ -1,6 +1,10 @@ package ccipdata -import "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" +import ( + "context" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" +) const ( ManuallyExecute = "manuallyExecute" @@ -9,4 +13,6 @@ const ( //go:generate mockery --quiet --name OffRampReader --filename offramp_reader_mock.go --case=underscore type OffRampReader interface { cciptypes.OffRampReader + //TODO Move to chainlink-common + GetSendersNonce(ctx context.Context, senders []cciptypes.Address) (map[cciptypes.Address]uint64, error) } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go index c1e45e909d..25dc71e210 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp.go @@ -197,6 +197,52 @@ func (o *OffRamp) GetSenderNonce(ctx context.Context, sender cciptypes.Address) return o.offRampV100.GetSenderNonce(&bind.CallOpts{Context: ctx}, evmAddr) } +func (o *OffRamp) GetSendersNonce(ctx context.Context, senders []cciptypes.Address) (map[cciptypes.Address]uint64, error) { + if len(senders) == 0 { + return make(map[cciptypes.Address]uint64), nil + } + + evmSenders, err := ccipcalc.GenericAddrsToEvm(senders...) + if err != nil { + return nil, errors.Wrap(err, "failed to convert generic addresses to evm addresses") + } + + evmCalls := make([]rpclib.EvmCall, 0, len(evmSenders)) + for _, evmAddr := range evmSenders { + evmCalls = append(evmCalls, rpclib.NewEvmCall( + abiOffRamp, + "getSenderNonce", + o.addr, + evmAddr, + )) + } + + results, err := o.evmBatchCaller.BatchCall(ctx, 0, evmCalls) + if err != nil { + o.Logger.Errorw("error while batch fetching sender nonces", "err", err, "senders", evmSenders) + return nil, err + } + + nonces, err := rpclib.ParseOutputs[uint64](results, func(d rpclib.DataAndErr) (uint64, error) { + return rpclib.ParseOutput[uint64](d, 0) + }) + if err != nil { + o.Logger.Errorw("error while parsing sender nonces", "err", err, "senders", evmSenders) + return nil, err + } + + if len(senders) != len(nonces) { + o.Logger.Errorw("unexpected number of nonces returned", "senders", evmSenders, "nonces", nonces) + return nil, errors.New("unexpected number of nonces returned") + } + + senderNonce := make(map[cciptypes.Address]uint64, len(senders)) + for i, sender := range senders { + senderNonce[sender] = nonces[i] + } + return senderNonce, nil +} + func (o *OffRamp) CurrentRateLimiterState(ctx context.Context) (cciptypes.TokenBucketRateLimit, error) { state, err := o.offRampV100.CurrentRateLimiterState(&bind.CallOpts{Context: ctx}) if err != nil { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go index 925a601941..2003de9cbe 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/offramp_test.go @@ -5,10 +5,19 @@ import ( "testing" "time" + "github.com/pkg/errors" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/config" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/rpclib/rpclibmocks" ) func TestExecOffchainConfig100_Encoding(t *testing.T) { @@ -105,3 +114,110 @@ func TestExecOffchainConfig100_AllFieldsRequired(t *testing.T) { require.ErrorContains(t, err, keyToDelete) } } + +func Test_GetSendersNonce(t *testing.T) { + sender1 := cciptypes.Address(utils.RandomAddress().String()) + sender2 := cciptypes.Address(utils.RandomAddress().String()) + + tests := []struct { + name string + addresses []cciptypes.Address + batchCaller *rpclibmocks.EvmBatchCaller + expectedResult map[cciptypes.Address]uint64 + expectedError bool + }{ + { + name: "return empty map when input is empty", + addresses: []cciptypes.Address{}, + batchCaller: rpclibmocks.NewEvmBatchCaller(t), + expectedResult: map[cciptypes.Address]uint64{}, + }, + { + name: "return error when batch call fails", + addresses: []cciptypes.Address{sender1}, + batchCaller: func() *rpclibmocks.EvmBatchCaller { + mockBatchCaller := rpclibmocks.NewEvmBatchCaller(t) + mockBatchCaller.On("BatchCall", mock.Anything, mock.Anything, mock.Anything). + Return(nil, errors.New("batch call error")) + return mockBatchCaller + }(), + expectedError: true, + }, + { + name: "return error when nonces dont match senders", + addresses: []cciptypes.Address{sender1, sender2}, + batchCaller: func() *rpclibmocks.EvmBatchCaller { + mockBatchCaller := rpclibmocks.NewEvmBatchCaller(t) + results := []rpclib.DataAndErr{ + { + Outputs: []any{uint64(1)}, + Err: nil, + }, + } + mockBatchCaller.On("BatchCall", mock.Anything, mock.Anything, mock.Anything). + Return(results, nil) + return mockBatchCaller + }(), + expectedError: true, + }, + { + name: "return error when single request from batch fails", + addresses: []cciptypes.Address{sender1, sender2}, + batchCaller: func() *rpclibmocks.EvmBatchCaller { + mockBatchCaller := rpclibmocks.NewEvmBatchCaller(t) + results := []rpclib.DataAndErr{ + { + Outputs: []any{uint64(1)}, + Err: nil, + }, + { + Outputs: []any{}, + Err: errors.New("request failed"), + }, + } + mockBatchCaller.On("BatchCall", mock.Anything, mock.Anything, mock.Anything). + Return(results, nil) + return mockBatchCaller + }(), + expectedError: true, + }, + { + name: "return map of nonce per sender", + addresses: []cciptypes.Address{sender1, sender2}, + batchCaller: func() *rpclibmocks.EvmBatchCaller { + mockBatchCaller := rpclibmocks.NewEvmBatchCaller(t) + results := []rpclib.DataAndErr{ + { + Outputs: []any{uint64(1)}, + Err: nil, + }, + { + Outputs: []any{uint64(2)}, + Err: nil, + }, + } + mockBatchCaller.On("BatchCall", mock.Anything, mock.Anything, mock.Anything). + Return(results, nil) + return mockBatchCaller + }(), + expectedResult: map[cciptypes.Address]uint64{ + sender1: uint64(1), + sender2: uint64(2), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + offramp := OffRamp{evmBatchCaller: test.batchCaller, Logger: logger.TestLogger(t)} + nonce, err := offramp.GetSendersNonce(testutils.Context(t), test.addresses) + + if test.expectedError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, test.expectedResult, nonce) + } + }) + } +} diff --git a/core/services/ocr2/plugins/ccip/internal/observability/offramp.go b/core/services/ocr2/plugins/ccip/internal/observability/offramp.go index 15bfe20dd6..782883ad7d 100644 --- a/core/services/ocr2/plugins/ccip/internal/observability/offramp.go +++ b/core/services/ocr2/plugins/ccip/internal/observability/offramp.go @@ -66,3 +66,9 @@ func (o *ObservedOffRampReader) GetTokens(ctx context.Context) (cciptypes.OffRam return o.OffRampReader.GetTokens(ctx) }) } + +func (o *ObservedOffRampReader) GetSendersNonce(ctx context.Context, senders []cciptypes.Address) (map[cciptypes.Address]uint64, error) { + return withObservedInteraction(o.metric, "GetSendersNonce", func() (map[cciptypes.Address]uint64, error) { + return o.OffRampReader.GetSendersNonce(ctx, senders) + }) +}