From f683950dc0de1fede8e3b426b7a7e392e718b142 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 7 Mar 2024 21:18:18 +0100 Subject: [PATCH] CCIP-1704 More robust checks of RMN/Chain every phase (#583) More robust way of verifying if CCIP should halt processing, it's based on four items: 1. Source chain is healthy (this is verified by checking if source LogPoller saw finality violation) 2. Dest chain is healthy (this is verified by checking if destination LogPoller saw finality violation) 3. CommitStore is down (this is verified by checking if CommitStore is down and destination RMN is not cursed) 4. Source chain is cursed (this is verified by checking if source RMN is not cursed) Whenever any of the above checks fail, the chain is considered unhealthy and the CCIP should stop processing messages. Additionally, when the chain is unhealthy, this information is considered "sticky" and is cached for 30 minutes. This may lead to some false-positives, but in this case, we want to be extra cautious and avoid executing any reorged messages. Health checks are now verified in every OCR2 phase and during Observation and ShouldTransmit we enforce reading data from chain. Additionally, to reduce the number of calls to the RPC, we cache RMN curse state for 20 seconds. --- .../ocr2/plugins/ccip/ccipcommit/factory.go | 5 +- .../ocr2/plugins/ccip/ccipcommit/ocr2.go | 33 ++- .../ocr2/plugins/ccip/ccipcommit/ocr2_test.go | 28 +++ .../ocr2/plugins/ccip/ccipexec/factory.go | 5 +- .../ocr2/plugins/ccip/ccipexec/ocr2.go | 31 ++- .../ocr2/plugins/ccip/ccipexec/ocr2_test.go | 95 ++++++--- .../plugins/ccip/cciptypes/commitstore.go | 3 + .../ocr2/plugins/ccip/cciptypes/onramp.go | 2 + .../ccip/internal/cache/chain_health.go | 199 ++++++++++++++++++ .../ccip/internal/cache/chain_health_test.go | 182 ++++++++++++++++ .../internal/cache/mocks/chain_health_mock.go | 56 +++++ .../ccip/internal/ccipcommon/shortcuts.go | 41 ---- .../internal/ccipcommon/shortcuts_test.go | 61 ------ .../mocks/commit_store_reader_mock.go | 28 +++ .../ccipdata/mocks/onramp_reader_mock.go | 28 +++ .../internal/ccipdata/v1_0_0/commit_store.go | 8 + .../ccip/internal/ccipdata/v1_0_0/onramp.go | 7 + .../internal/ccipdata/v1_2_0/commit_store.go | 8 + .../ccip/internal/ccipdata/v1_2_0/onramp.go | 7 + .../ccip/internal/ccipdata/v1_5_0/onramp.go | 7 + core/services/ocr2/plugins/ccip/vars.go | 2 +- 21 files changed, 694 insertions(+), 142 deletions(-) create mode 100644 core/services/ocr2/plugins/ccip/internal/cache/chain_health.go create mode 100644 core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go create mode 100644 core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go index 9aee6a3a28..27c4a9a0fe 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/factory.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" ) @@ -84,6 +85,7 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin pluginOffChainConfig := rf.config.commitStore.OffchainConfig() + lggr := rf.config.lggr.Named("CommitReportingPlugin") return &CommitReportingPlugin{ sourceChainSelector: rf.config.sourceChainSelector, sourceNative: rf.config.sourceNative, @@ -91,13 +93,14 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin commitStoreReader: rf.config.commitStore, priceGetter: rf.config.priceGetter, F: config.F, - lggr: rf.config.lggr.Named("CommitReportingPlugin"), + lggr: lggr, inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry), destPriceRegistryReader: rf.destPriceRegReader, offRampReader: rf.config.offRamp, gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), offchainConfig: pluginOffChainConfig, metricsCollector: rf.config.metricsCollector, + chainHealthcheck: cache.NewChainHealthcheck(lggr, rf.config.onRampReader, rf.config.commitStore), }, types.ReportingPluginInfo{ Name: "CCIPCommit", diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go index 8e3c31bb40..82f9db260c 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" @@ -85,11 +86,17 @@ type CommitReportingPlugin struct { priceGetter pricegetter.PriceGetter metricsCollector ccip.PluginMetricsCollector // State - inflightReports *inflightCommitReportsContainer + inflightReports *inflightCommitReportsContainer + chainHealthcheck cache.ChainHealthcheck } // Query is not used by the CCIP Commit plugin. -func (r *CommitReportingPlugin) Query(context.Context, types.ReportTimestamp) (types.Query, error) { +func (r *CommitReportingPlugin) Query(ctx context.Context, _ types.ReportTimestamp) (types.Query, error) { + if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil { + return nil, err + } else if !healthy { + return nil, ccip.ErrChainIsNotHealthy + } return types.Query{}, nil } @@ -99,8 +106,10 @@ func (r *CommitReportingPlugin) Query(context.Context, types.ReportTimestamp) (t // the observation will be considered invalid and rejected. func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query) (types.Observation, error) { lggr := r.lggr.Named("CommitObservation") - if err := ccipcommon.VerifyNotDown(ctx, r.lggr, r.commitStoreReader, r.onRampReader); err != nil { + if healthy, err := r.chainHealthcheck.IsHealthy(ctx, true); err != nil { return nil, err + } else if !healthy { + return nil, ccip.ErrChainIsNotHealthy } r.inflightReports.expire(lggr) @@ -355,6 +364,12 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query, observations []types.AttributedObservation) (bool, types.Report, error) { now := time.Now() lggr := r.lggr.Named("CommitReport") + if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil { + return false, nil, err + } else if !healthy { + return false, nil, ccip.ErrChainIsNotHealthy + } + parsableObservations := ccip.GetParsableObservations[ccip.CommitObservation](lggr, observations) feeTokens, bridgeableTokens, err := ccipcommon.GetDestinationTokens(ctx, r.offRampReader, r.destPriceRegistryReader) @@ -650,7 +665,6 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context, if err != nil { return false, err } - lggr := r.lggr.Named("CommitShouldAcceptFinalizedReport").With( "merkleRoot", parsedReport.MerkleRoot, "minSeqNum", parsedReport.Interval.Min, @@ -665,6 +679,12 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context, return false, nil } + if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, false); err1 != nil { + return false, err1 + } else if !healthy { + return false, ccip.ErrChainIsNotHealthy + } + if r.isStaleReport(ctx, lggr, parsedReport, true, reportTimestamp) { lggr.Infow("Rejecting stale report") return false, nil @@ -686,6 +706,11 @@ func (r *CommitReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context if err != nil { return false, err } + if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, true); err1 != nil { + return false, err1 + } else if !healthy { + return false, ccip.ErrChainIsNotHealthy + } // If report is not stale we transmit. // When the commitTransmitter enqueues the tx for tx manager, // we mark it as fulfilled, effectively removing it from the set of inflight messages. diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go index 255c15bacb..ed87e5b883 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go @@ -34,6 +34,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" + ccipcachemocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/factory" @@ -109,11 +111,13 @@ func TestCommitReportingPlugin_Observation(t *testing.T) { t.Run(tc.name, func(t *testing.T) { commitStoreReader := ccipdatamocks.NewCommitStoreReader(t) commitStoreReader.On("IsDown", ctx).Return(tc.commitStorePaused, nil) + commitStoreReader.On("IsDestChainHealthy", ctx).Return(true, nil) if !tc.commitStorePaused && !tc.sourceChainCursed { commitStoreReader.On("GetExpectedNextSequenceNumber", ctx).Return(tc.commitStoreSeqNum, nil) } onRampReader := ccipdatamocks.NewOnRampReader(t) + onRampReader.On("IsSourceChainHealthy", ctx).Return(true, nil) onRampReader.On("IsSourceCursed", ctx).Return(tc.sourceChainCursed, nil) if len(tc.sendReqs) > 0 { onRampReader.On("GetSendRequestsBetweenSeqNums", ctx, tc.commitStoreSeqNum, tc.commitStoreSeqNum+OnRampMessagesScanLimit, true). @@ -164,6 +168,7 @@ func TestCommitReportingPlugin_Observation(t *testing.T) { p.sourceNative = sourceNativeTokenAddr p.gasPriceEstimator = gasPriceEstimator p.metricsCollector = ccip.NoopMetricsCollector + p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, onRampReader, commitStoreReader) obs, err := p.Observation(ctx, tc.epochAndRound, types.Query{}) @@ -197,6 +202,9 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.destPriceRegistryReader = destPriceRegReader offRampReader.On("GetTokens", ctx).Return(cciptypes.OffRampTokens{}, nil).Maybe() destPriceRegReader.On("GetFeeTokens", ctx).Return(nil, nil).Maybe() + chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthcheck.On("IsHealthy", ctx, false).Return(true, nil).Maybe() + p.chainHealthcheck = chainHealthcheck o := ccip.CommitObservation{Interval: cciptypes.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)} obs, err := o.Marshal() @@ -325,6 +333,9 @@ func TestCommitReportingPlugin_Report(t *testing.T) { commitStoreReader, err := v1_2_0.NewCommitStore(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil) assert.NoError(t, err) + healthCheck := ccipcachemocks.NewChainHealthcheck(t) + healthCheck.On("IsHealthy", ctx, false).Return(true, nil) + p := &CommitReportingPlugin{} p.lggr = logger.TestLogger(t) p.inflightReports = newInflightCommitReportsContainer(time.Minute) @@ -337,6 +348,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) { p.commitStoreReader = commitStoreReader p.F = tc.f p.metricsCollector = ccip.NoopMetricsCollector + p.chainHealthcheck = healthCheck aos := make([]types.AttributedObservation, 0, len(tc.observations)) for _, o := range tc.observations { @@ -397,6 +409,10 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { p.commitStoreReader = commitStoreReader commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil) + chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthCheck.On("IsHealthy", ctx).Return(true, nil).Maybe() + p.chainHealthcheck = chainHealthCheck + encodedReport, err := encodeCommitReport(report) assert.NoError(t, err) shouldAccept, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport) @@ -422,6 +438,10 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil) commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(onChainSeqNum, nil) + chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthCheck.On("IsHealthy", ctx, false).Return(true, nil) + p.chainHealthcheck = chainHealthCheck + // stale since report interval is behind on chain seq num report.Interval = cciptypes.CommitStoreInterval{Min: onChainSeqNum - 2, Max: onChainSeqNum + 10} encodedReport, err := encodeCommitReport(report) @@ -471,6 +491,10 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { encodedReport, err := encodeCommitReport(report) assert.NoError(t, err) + chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthCheck.On("IsHealthy", ctx, false).Return(true, nil) + p.chainHealthcheck = chainHealthCheck + shouldAccept, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport) assert.NoError(t, err) assert.True(t, shouldAccept) @@ -506,6 +530,10 @@ func TestCommitReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) { p.inflightReports = newInflightCommitReportsContainer(time.Minute) p.lggr = logger.TestLogger(t) + chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthCheck.On("IsHealthy", ctx, true).Return(true, nil).Maybe() + p.chainHealthcheck = chainHealthCheck + t.Run("should transmit when report is not stale", func(t *testing.T) { // not-stale since report interval is not behind on chain seq num report.Interval = cciptypes.CommitStoreInterval{Min: onChainSeqNum, Max: onChainSeqNum + 10} diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory.go b/core/services/ocr2/plugins/ccip/ccipexec/factory.go index 8fcc368241..54a9bdfdd0 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory.go @@ -70,10 +70,10 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor } offchainConfig := rf.config.offRampReader.OffchainConfig() - + lggr := rf.config.lggr.Named("ExecutionReportingPlugin") return &ExecutionReportingPlugin{ F: config.F, - lggr: rf.config.lggr.Named("ExecutionReportingPlugin"), + lggr: lggr, offchainConfig: offchainConfig, tokenDataWorker: rf.config.tokenDataWorker, gasPriceEstimator: rf.config.offRampReader.GasPriceEstimator(), @@ -90,6 +90,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()), snoozedRoots: cache.NewSnoozedRoots(rf.config.offRampReader.OnchainConfig().PermissionLessExecutionThresholdSeconds, offchainConfig.RootSnoozeTime.Duration()), metricsCollector: rf.config.metricsCollector, + chainHealthcheck: cache.NewChainHealthcheck(lggr, rf.config.onRampReader, rf.config.commitStoreReader), }, types.ReportingPluginInfo{ Name: "CCIPExecution", // Setting this to false saves on calldata since OffRamp doesn't require agreement between NOPs diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go index e01a1dfa29..55ac30d7fc 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2.go @@ -87,18 +87,26 @@ type ExecutionReportingPlugin struct { tokenPoolBatchedReader batchreader.TokenPoolBatchedReader // State - inflightReports *inflightExecReportsContainer - snoozedRoots cache.SnoozedRoots + inflightReports *inflightExecReportsContainer + snoozedRoots cache.SnoozedRoots + chainHealthcheck cache.ChainHealthcheck } -func (r *ExecutionReportingPlugin) Query(context.Context, types.ReportTimestamp) (types.Query, error) { +func (r *ExecutionReportingPlugin) Query(ctx context.Context, _ types.ReportTimestamp) (types.Query, error) { + if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil { + return nil, err + } else if !healthy { + return nil, ccip.ErrChainIsNotHealthy + } return types.Query{}, nil } func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp types.ReportTimestamp, query types.Query) (types.Observation, error) { lggr := r.lggr.Named("ExecutionObservation") - if err := ccipcommon.VerifyNotDown(ctx, r.lggr, r.commitStoreReader, r.onRampReader); err != nil { + if healthy, err := r.chainHealthcheck.IsHealthy(ctx, true); err != nil { return nil, err + } else if !healthy { + return nil, ccip.ErrChainIsNotHealthy } // Ensure that the source price registry is synchronized with the onRamp. @@ -761,6 +769,11 @@ func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger. func (r *ExecutionReportingPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp, query types.Query, observations []types.AttributedObservation) (bool, types.Report, error) { lggr := r.lggr.Named("ExecutionReport") + if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil { + return false, nil, err + } else if !healthy { + return false, nil, ccip.ErrChainIsNotHealthy + } parsableObservations := ccip.GetParsableObservations[ccip.ExecutionObservation](lggr, observations) // Need at least F+1 observations if len(parsableObservations) <= r.F { @@ -853,6 +866,11 @@ func (r *ExecutionReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Conte } lggr = lggr.With("messageIDs", ccipcommon.GetMessageIDsAsHexString(execReport.Messages)) + if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, false); err1 != nil { + return false, err1 + } else if !healthy { + return false, ccip.ErrChainIsNotHealthy + } // If the first message is executed already, this execution report is stale, and we do not accept it. stale, err := r.isStaleReport(ctx, execReport.Messages) if err != nil { @@ -879,6 +897,11 @@ func (r *ExecutionReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Cont } lggr = lggr.With("messageIDs", ccipcommon.GetMessageIDsAsHexString(execReport.Messages)) + if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, true); err1 != nil { + return false, err1 + } else if !healthy { + return false, ccip.ErrChainIsNotHealthy + } // If report is not stale we transmit. // When the executeTransmitter enqueues the tx for tx manager, // we mark it as execution_sent, removing it from the set of inflight messages. diff --git a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go index 86fcbe580c..85b827f642 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go @@ -29,6 +29,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache" + ccipcachemocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/batchreader" @@ -46,36 +47,60 @@ import ( func TestExecutionReportingPlugin_Observation(t *testing.T) { testCases := []struct { - name string - commitStorePaused bool - sourceChainCursed bool - inflightReports []InflightInternalExecutionReport - unexpiredReports []cciptypes.CommitStoreReportWithTxMeta - sendRequests []cciptypes.EVM2EVMMessageWithTxMeta - executedSeqNums []uint64 - tokenPoolsMapping map[common.Address]common.Address - blessedRoots map[[32]byte]bool - senderNonce uint64 - rateLimiterState cciptypes.TokenBucketRateLimit - expErr bool + name string + commitStorePaused bool + sourceChainCursed bool + inflightReports []InflightInternalExecutionReport + unexpiredReports []cciptypes.CommitStoreReportWithTxMeta + sendRequests []cciptypes.EVM2EVMMessageWithTxMeta + executedSeqNums []uint64 + tokenPoolsMapping map[common.Address]common.Address + blessedRoots map[[32]byte]bool + senderNonce uint64 + rateLimiterState cciptypes.TokenBucketRateLimit + expErr bool + sourceChainHealthy bool + destChainHealthy bool }{ { - name: "commit store is down", - commitStorePaused: true, - sourceChainCursed: false, - expErr: true, + name: "commit store is down", + commitStorePaused: true, + sourceChainCursed: false, + sourceChainHealthy: true, + destChainHealthy: true, + expErr: true, }, { - name: "source chain is cursed", - commitStorePaused: false, - sourceChainCursed: true, - expErr: true, + name: "source chain is cursed", + commitStorePaused: false, + sourceChainCursed: true, + sourceChainHealthy: true, + destChainHealthy: true, + expErr: true, }, { - name: "happy flow", - commitStorePaused: false, - sourceChainCursed: false, - inflightReports: []InflightInternalExecutionReport{}, + name: "source chain not healthy", + commitStorePaused: false, + sourceChainCursed: false, + sourceChainHealthy: false, + destChainHealthy: true, + expErr: true, + }, + { + name: "dest chain not healthy", + commitStorePaused: false, + sourceChainCursed: false, + sourceChainHealthy: true, + destChainHealthy: false, + expErr: true, + }, + { + name: "happy flow", + commitStorePaused: false, + sourceChainCursed: false, + sourceChainHealthy: true, + destChainHealthy: true, + inflightReports: []InflightInternalExecutionReport{}, unexpiredReports: []cciptypes.CommitStoreReportWithTxMeta{ { CommitStoreReport: cciptypes.CommitStoreReport{ @@ -118,7 +143,8 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { p.metricsCollector = ccip.NoopMetricsCollector commitStoreReader := ccipdatamocks.NewCommitStoreReader(t) - commitStoreReader.On("IsDown", mock.Anything).Return(tc.commitStorePaused, nil) + commitStoreReader.On("IsDown", mock.Anything).Return(tc.commitStorePaused, nil).Maybe() + commitStoreReader.On("IsDestChainHealthy", mock.Anything).Return(tc.destChainHealthy, nil).Maybe() // Blessed roots return true for root, blessed := range tc.blessedRoots { commitStoreReader.On("IsBlessed", mock.Anything, root).Return(blessed, nil).Maybe() @@ -159,6 +185,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { mockOnRampReader := ccipdatamocks.NewOnRampReader(t) mockOnRampReader.On("IsSourceCursed", ctx).Return(tc.sourceChainCursed, nil).Maybe() + mockOnRampReader.On("IsSourceChainHealthy", ctx).Return(tc.sourceChainHealthy, nil).Maybe() mockOnRampReader.On("GetSendRequestsBetweenSeqNums", ctx, mock.Anything, mock.Anything, false). Return(tc.sendRequests, nil).Maybe() sourcePriceRegistryAddress := cciptypes.Address(utils.RandomAddress().String()) @@ -186,6 +213,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) { p.sourcePriceRegistryProvider = mockOnRampPriceRegistryProvider p.snoozedRoots = cache.NewSnoozedRoots(time.Minute, time.Minute) + p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader) _, err = p.Observation(ctx, types.ReportTimestamp{}, types.Query{}) if tc.expErr { @@ -237,6 +265,9 @@ func TestExecutionReportingPlugin_Report(t *testing.T) { p.F = tc.f p.commitStoreReader = ccipdatamocks.NewCommitStoreReader(t) + chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthcheck.On("IsHealthy", ctx, false).Return(true, nil) + p.chainHealthcheck = chainHealthcheck observations := make([]types.AttributedObservation, len(tc.observations)) for i := range observations { @@ -281,10 +312,14 @@ func TestExecutionReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) { mockOffRampReader := ccipdatamocks.NewOffRampReader(t) mockOffRampReader.On("DecodeExecutionReport", encodedReport).Return(report, nil) + chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthcheck.On("IsHealthy", mock.Anything, false).Return(true, nil) + plugin := ExecutionReportingPlugin{ - offRampReader: mockOffRampReader, - lggr: logger.TestLogger(t), - inflightReports: newInflightExecReportsContainer(1 * time.Hour), + offRampReader: mockOffRampReader, + lggr: logger.TestLogger(t), + inflightReports: newInflightExecReportsContainer(1 * time.Hour), + chainHealthcheck: chainHealthcheck, } mockedExecState := mockOffRampReader.On("GetExecutionState", mock.Anything, uint64(12)).Return(uint8(cciptypes.ExecutionStateUntouched), nil).Once() @@ -327,11 +362,15 @@ func TestExecutionReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) { mockOffRampReader.On("DecodeExecutionReport", encodedReport).Return(report, nil) mockedExecState := mockOffRampReader.On("GetExecutionState", mock.Anything, uint64(12)).Return(uint8(cciptypes.ExecutionStateUntouched), nil).Once() + chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t) + chainHealthcheck.On("IsHealthy", mock.Anything, true).Return(true, nil) + plugin := ExecutionReportingPlugin{ commitStoreReader: mockCommitStoreReader, offRampReader: mockOffRampReader, lggr: logger.TestLogger(t), inflightReports: newInflightExecReportsContainer(1 * time.Hour), + chainHealthcheck: chainHealthcheck, } should, err := plugin.ShouldTransmitAcceptedReport(testutils.Context(t), ocrtypes.ReportTimestamp{}, encodedReport) diff --git a/core/services/ocr2/plugins/ccip/cciptypes/commitstore.go b/core/services/ocr2/plugins/ccip/cciptypes/commitstore.go index e14a697f87..872bab476d 100644 --- a/core/services/ocr2/plugins/ccip/cciptypes/commitstore.go +++ b/core/services/ocr2/plugins/ccip/cciptypes/commitstore.go @@ -18,6 +18,9 @@ type CommitStoreReader interface { // Returned Commit Reports have to be sorted by Interval.Min/Interval.Max in ascending order. GetAcceptedCommitReportsGteTimestamp(ctx context.Context, ts time.Time, confirmations int) ([]CommitStoreReportWithTxMeta, error) + // IsDestChainHealthy returns true if the destination chain is healthy. + IsDestChainHealthy(ctx context.Context) (bool, error) + IsDown(ctx context.Context) (bool, error) IsBlessed(ctx context.Context, root [32]byte) (bool, error) diff --git a/core/services/ocr2/plugins/ccip/cciptypes/onramp.go b/core/services/ocr2/plugins/ccip/cciptypes/onramp.go index d75cf7f810..4cd7a29ca0 100644 --- a/core/services/ocr2/plugins/ccip/cciptypes/onramp.go +++ b/core/services/ocr2/plugins/ccip/cciptypes/onramp.go @@ -54,6 +54,8 @@ type OnRampReader interface { // If some requests do not exist in the provided sequence numbers range they will not be part of the response. // It's the responsibility of the caller to validate whether all the requests exist or not. GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64, finalized bool) ([]EVM2EVMMessageWithTxMeta, error) + // IsSourceChainHealthy returns true if the source chain is healthy. + IsSourceChainHealthy(ctx context.Context) (bool, error) // IsSourceCursed returns true if the source chain is cursed. OnRamp communicates with the underlying RMN // to verify if source chain was cursed or not. IsSourceCursed(ctx context.Context) (bool, error) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go b/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go new file mode 100644 index 0000000000..49d60d0e13 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go @@ -0,0 +1,199 @@ +package cache + +import ( + "context" + "time" + + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" +) + +// ChainHealthcheck checks the health of the both source and destination chain. +// Based on the values returned, CCIP can make a decision to stop or continue processing messages. +// There are four things verified here: +// 1. Source chain is healthy (this is verified by checking if source LogPoller saw finality violation) +// 2. Dest chain is healthy (this is verified by checking if destination LogPoller saw finality violation) +// 3. CommitStore is down (this is verified by checking if CommitStore is down and destination RMN is not cursed) +// 4. Source chain is cursed (this is verified by checking if source RMN is not cursed) +// +// Whenever any of the above checks fail, the chain is considered unhealthy and the CCIP should stop +// processing messages. Additionally, when the chain is unhealthy, this information is considered "sticky" +// and is cached for a certain period of time based on defaultGlobalStatusDuration. +// This may lead to some false-positives, but in this case we want to be extra cautious and avoid executing any reorged messages. +// +// Additionally, to reduce the number of calls to the RPC, we cache RMN curse state for a certain period of +// time based on defaultRmnStatusDuration. +// +//go:generate mockery --quiet --name ChainHealthcheck --filename chain_health_mock.go --case=underscore +type ChainHealthcheck interface { + // IsHealthy checks if the chain is healthy and returns true if it is, false otherwise + // If forceRefresh is set to true, it will refresh the RMN curse state. Should be used in the Observation and ShouldTransmit phases of OCR2. + // Otherwise, it will use the cached value of the RMN curse state. + IsHealthy(ctx context.Context, forceRefresh bool) (bool, error) +} + +const ( + // RMN curse state is refreshed every 20 seconds or when ForceIsHealthy is called + defaultRmnStatusDuration = 20 * time.Second + defaultGlobalStatusDuration = 30 * time.Minute + + globalStatusKey = "globalStatus" + rmnStatusKey = "rmnCurseCheck" +) + +type chainHealthcheck struct { + cache *cache.Cache + globalStatusKey string + rmnStatusKey string + globalStatusExpiration time.Duration + rmnStatusExpiration time.Duration + + lggr logger.Logger + onRamp ccipdata.OnRampReader + commitStore ccipdata.CommitStoreReader +} + +func NewChainHealthcheck( + lggr logger.Logger, + onRamp ccipdata.OnRampReader, + commitStore ccipdata.CommitStoreReader, +) *chainHealthcheck { + return &chainHealthcheck{ + cache: cache.New(defaultRmnStatusDuration, 0), + globalStatusKey: globalStatusKey, + rmnStatusKey: rmnStatusKey, + globalStatusExpiration: defaultGlobalStatusDuration, + rmnStatusExpiration: defaultRmnStatusDuration, + + lggr: lggr, + onRamp: onRamp, + commitStore: commitStore, + } +} + +func newChainHealthcheckWithCustomEviction( + lggr logger.Logger, + onRamp ccipdata.OnRampReader, + commitStore ccipdata.CommitStoreReader, + globalStatusDuration time.Duration, + rmnStatusDuration time.Duration, +) *chainHealthcheck { + return &chainHealthcheck{ + cache: cache.New(rmnStatusDuration, 0), + rmnStatusKey: rmnStatusKey, + globalStatusKey: globalStatusKey, + globalStatusExpiration: globalStatusDuration, + rmnStatusExpiration: rmnStatusDuration, + + lggr: lggr, + onRamp: onRamp, + commitStore: commitStore, + } +} + +func (c *chainHealthcheck) IsHealthy(ctx context.Context, forceRefresh bool) (bool, error) { + // Verify if flag is raised to indicate that the chain is not healthy + // If set to false then immediately return false without checking the chain + if healthy, found := c.cache.Get(c.globalStatusKey); found && !healthy.(bool) { + return false, nil + } + + if healthy, err := c.checkIfReadersAreHealthy(ctx); err != nil { + return false, err + } else if !healthy { + c.cache.Set(c.globalStatusKey, false, c.globalStatusExpiration) + return healthy, nil + } + + if healthy, err := c.checkIfRMNsAreHealthy(ctx, forceRefresh); err != nil { + return false, err + } else if !healthy { + c.cache.Set(c.globalStatusKey, false, c.globalStatusExpiration) + return healthy, nil + } + return true, nil +} + +// checkIfReadersAreHealthy checks if the source and destination chains are healthy by calling underlying LogPoller +// These calls are cheap because they don't require any communication with the database or RPC, so we don't have +// to cache the result of these calls. +func (c *chainHealthcheck) checkIfReadersAreHealthy(ctx context.Context) (bool, error) { + sourceChainHealthy, err := c.onRamp.IsSourceChainHealthy(ctx) + if err != nil { + return false, errors.Wrap(err, "onRamp IsSourceChainHealthy errored") + } + + destChainHealthy, err := c.commitStore.IsDestChainHealthy(ctx) + if err != nil { + return false, errors.Wrap(err, "commitStore IsDestChainHealthy errored") + } + + if !sourceChainHealthy || !destChainHealthy { + c.lggr.Criticalw( + "Lane processing is stopped because source or destination chain is reported unhealthy", + "sourceChainHealthy", sourceChainHealthy, + "destChainHealthy", destChainHealthy, + ) + } + return sourceChainHealthy && destChainHealthy, nil +} + +func (c *chainHealthcheck) checkIfRMNsAreHealthy(ctx context.Context, forceFetch bool) (bool, error) { + if !forceFetch { + if healthy, found := c.cache.Get(c.rmnStatusKey); found { + return healthy.(bool), nil + } + } + + healthy, err := c.fetchRMNCurseState(ctx) + if err != nil { + return false, err + } + + c.cache.Set(c.rmnStatusKey, healthy, c.rmnStatusExpiration) + return healthy, nil +} + +func (c *chainHealthcheck) fetchRMNCurseState(ctx context.Context) (bool, error) { + var ( + eg = new(errgroup.Group) + isCommitStoreDown bool + isSourceCursed bool + ) + + eg.Go(func() error { + var err error + isCommitStoreDown, err = c.commitStore.IsDown(ctx) + if err != nil { + return errors.Wrap(err, "commitStore isDown check errored") + } + return nil + }) + + eg.Go(func() error { + var err error + isSourceCursed, err = c.onRamp.IsSourceCursed(ctx) + if err != nil { + return errors.Wrap(err, "onRamp isSourceCursed errored") + } + return nil + }) + + if err := eg.Wait(); err != nil { + return false, err + } + + if isCommitStoreDown || isSourceCursed { + c.lggr.Criticalw( + "Lane processing is stopped because source chain is cursed or CommitStore is down", + "isCommitStoreDown", isCommitStoreDown, + "isSourceCursed", isSourceCursed, + ) + return false, nil + } + return true, nil +} diff --git a/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go b/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go new file mode 100644 index 0000000000..24fc441e62 --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go @@ -0,0 +1,182 @@ +package cache + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" +) + +func Test_RMNStateCaching(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + mockCommitStore := mocks.NewCommitStoreReader(t) + mockOnRamp := mocks.NewOnRampReader(t) + + chainState := newChainHealthcheckWithCustomEviction( + lggr, + mockOnRamp, + mockCommitStore, + 10*time.Hour, + 10*time.Hour, + ) + + // Chain is not cursed and healthy + mockCommitStore.On("IsDown", ctx).Return(false, nil).Once() + mockCommitStore.On("IsDestChainHealthy", ctx).Return(true, nil).Maybe() + mockOnRamp.On("IsSourceCursed", ctx).Return(false, nil).Once() + mockOnRamp.On("IsSourceChainHealthy", ctx).Return(true, nil).Maybe() + healthy, err := chainState.IsHealthy(ctx, false) + assert.NoError(t, err) + assert.True(t, healthy) + + // Chain is cursed, but cache is stale + mockCommitStore.On("IsDown", ctx).Return(true, nil).Once() + mockOnRamp.On("IsSourceCursed", ctx).Return(true, nil).Once() + healthy, err = chainState.IsHealthy(ctx, false) + assert.NoError(t, err) + assert.True(t, healthy) + + // Enforce cache refresh + healthy, err = chainState.IsHealthy(ctx, true) + assert.Nil(t, err) + assert.False(t, healthy) + + // Chain is not cursed, but previous curse should be "sticky" even when force refreshing + mockCommitStore.On("IsDown", ctx).Return(false, nil).Maybe() + mockOnRamp.On("IsSourceCursed", ctx).Return(false, nil).Maybe() + // Enforce cache refresh + healthy, err = chainState.IsHealthy(ctx, true) + assert.Nil(t, err) + assert.False(t, healthy) +} + +func Test_ChainStateIsCached(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + mockCommitStore := mocks.NewCommitStoreReader(t) + mockOnRamp := mocks.NewOnRampReader(t) + + chainState := newChainHealthcheckWithCustomEviction( + lggr, + mockOnRamp, + mockCommitStore, + 10*time.Hour, + 10*time.Hour, + ) + + // Chain is not cursed and healthy + mockCommitStore.On("IsDown", ctx).Return(false, nil).Maybe() + mockCommitStore.On("IsDestChainHealthy", ctx).Return(true, nil).Once() + mockOnRamp.On("IsSourceCursed", ctx).Return(false, nil).Maybe() + mockOnRamp.On("IsSourceChainHealthy", ctx).Return(true, nil).Once() + healthy, err := chainState.IsHealthy(ctx, false) + assert.NoError(t, err) + assert.True(t, healthy) + + // Chain is not healthy + mockCommitStore.On("IsDestChainHealthy", ctx).Return(false, nil).Once() + mockOnRamp.On("IsSourceChainHealthy", ctx).Return(false, nil).Once() + healthy, err = chainState.IsHealthy(ctx, false) + assert.NoError(t, err) + assert.False(t, healthy) + + // Previous value is returned + mockCommitStore.On("IsDestChainHealthy", ctx).Return(true, nil).Maybe() + mockOnRamp.On("IsSourceChainHealthy", ctx).Return(true, nil).Maybe() + healthy, err = chainState.IsHealthy(ctx, false) + assert.NoError(t, err) + assert.False(t, healthy) +} + +func Test_ChainStateIsHealthy(t *testing.T) { + ctx := tests.Context(t) + + testCases := []struct { + name string + commitStoreDown bool + commitStoreErr error + onRampCursed bool + onRampErr error + sourceChainUnhealthy bool + sourceChainErr error + destChainUnhealthy bool + destChainErr error + + expectedState bool + expectedErr bool + }{ + { + name: "all components healthy", + expectedState: true, + }, + { + name: "CommitStore is down", + commitStoreDown: true, + expectedState: false, + }, + { + name: "CommitStore error", + commitStoreErr: errors.New("commit store error"), + expectedErr: true, + }, + { + name: "OnRamp is cursed", + onRampCursed: true, + expectedState: false, + }, + { + name: "OnRamp error", + onRampErr: errors.New("onramp error"), + expectedErr: true, + }, + { + name: "Source chain is unhealthy", + sourceChainUnhealthy: true, + expectedState: false, + }, + { + name: "Source chain error", + sourceChainErr: errors.New("source chain error"), + expectedErr: true, + }, + { + name: "Destination chain is unhealthy", + destChainUnhealthy: true, + expectedState: false, + }, + { + name: "Destination chain error", + destChainErr: errors.New("destination chain error"), + expectedErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockCommitStore := mocks.NewCommitStoreReader(t) + mockOnRamp := mocks.NewOnRampReader(t) + + mockCommitStore.On("IsDown", ctx).Return(tc.commitStoreDown, tc.commitStoreErr).Maybe() + mockCommitStore.On("IsDestChainHealthy", ctx).Return(!tc.destChainUnhealthy, tc.destChainErr).Maybe() + mockOnRamp.On("IsSourceCursed", ctx).Return(tc.onRampCursed, tc.onRampErr).Maybe() + mockOnRamp.On("IsSourceChainHealthy", ctx).Return(!tc.sourceChainUnhealthy, tc.sourceChainErr).Maybe() + + chainState := NewChainHealthcheck(logger.TestLogger(t), mockOnRamp, mockCommitStore) + healthy, err := chainState.IsHealthy(ctx, false) + + if tc.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedState, healthy) + } + }) + } +} diff --git a/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go b/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go new file mode 100644 index 0000000000..7c9edc38cc --- /dev/null +++ b/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go @@ -0,0 +1,56 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// ChainHealthcheck is an autogenerated mock type for the ChainHealthcheck type +type ChainHealthcheck struct { + mock.Mock +} + +// IsHealthy provides a mock function with given fields: ctx, forceRefresh +func (_m *ChainHealthcheck) IsHealthy(ctx context.Context, forceRefresh bool) (bool, error) { + ret := _m.Called(ctx, forceRefresh) + + if len(ret) == 0 { + panic("no return value specified for IsHealthy") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, bool) (bool, error)); ok { + return rf(ctx, forceRefresh) + } + if rf, ok := ret.Get(0).(func(context.Context, bool) bool); ok { + r0 = rf(ctx, forceRefresh) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, bool) error); ok { + r1 = rf(ctx, forceRefresh) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewChainHealthcheck creates a new instance of ChainHealthcheck. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewChainHealthcheck(t interface { + mock.TestingT + Cleanup(func()) +}) *ChainHealthcheck { + mock := &ChainHealthcheck{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go index 6d8993a499..9cdb93d49b 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts.go @@ -5,12 +5,9 @@ import ( "encoding/hex" "fmt" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" ) @@ -76,41 +73,3 @@ func FlattenUniqueSlice[T comparable](slices ...[]T) []T { } return flattened } - -// VerifyNotDown returns error if the commitStore is down (paused or destination cursed) or if the source chain is cursed -// Both RPCs are called in parallel to save some time. These calls cannot be batched because they target different chains. -func VerifyNotDown(ctx context.Context, lggr logger.Logger, commitStore ccipdata.CommitStoreReader, onRamp ccipdata.OnRampReader) error { - var ( - eg = new(errgroup.Group) - isDown bool - isCursed bool - ) - - eg.Go(func() error { - var err error - isDown, err = commitStore.IsDown(ctx) - if err != nil { - return errors.Wrap(err, "commitStore isDown check errored") - } - return nil - }) - - eg.Go(func() error { - var err error - isCursed, err = onRamp.IsSourceCursed(ctx) - if err != nil { - return errors.Wrap(err, "onRamp isSourceCursed errored") - } - return nil - }) - - if err := eg.Wait(); err != nil { - return err - } - - if isDown || isCursed { - lggr.Errorf("Source chain is cursed or CommitStore is down", "isDown", isDown, "isCursed", isCursed) - return ccip.ErrChainPausedOrCursed - } - return nil -} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go index 870f83b7e0..4ac9fa708b 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipcommon/shortcuts_test.go @@ -1,7 +1,6 @@ package ccipcommon import ( - "errors" "math/rand" "strconv" "testing" @@ -9,11 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" - - "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks" ) func TestGetMessageIDsAsHexString(t *testing.T) { @@ -64,59 +59,3 @@ func TestFlattenUniqueSlice(t *testing.T) { }) } } - -func TestVerifyNotDown(t *testing.T) { - ctx := tests.Context(t) - - testCases := []struct { - name string - commitStoreDown bool - commitStoreErr error - onRampCursed bool - onRampErr error - expectedErr bool - }{ - { - name: "Neither down nor cursed", - expectedErr: false, - }, - { - name: "CommitStore is down", - commitStoreDown: true, - expectedErr: true, - }, - { - name: "OnRamp is cursed", - onRampCursed: true, - expectedErr: true, - }, - { - name: "CommitStore error", - commitStoreErr: errors.New("commit store error"), - expectedErr: true, - }, - { - name: "OnRamp error", - onRampErr: errors.New("onramp error"), - expectedErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - mockCommitStore := mocks.NewCommitStoreReader(t) - mockOnRamp := mocks.NewOnRampReader(t) - - mockCommitStore.On("IsDown", ctx).Return(tc.commitStoreDown, tc.commitStoreErr) - mockOnRamp.On("IsSourceCursed", ctx).Return(tc.onRampCursed, tc.onRampErr) - - err := VerifyNotDown(ctx, logger.TestLogger(t), mockCommitStore, mockOnRamp) - - if tc.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go index 5366cbbc30..4cba684fb6 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/commit_store_reader_mock.go @@ -295,6 +295,34 @@ func (_m *CommitStoreReader) IsBlessed(ctx context.Context, root [32]byte) (bool return r0, r1 } +// IsDestChainHealthy provides a mock function with given fields: ctx +func (_m *CommitStoreReader) IsDestChainHealthy(ctx context.Context) (bool, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for IsDestChainHealthy") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (bool, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // IsDown provides a mock function with given fields: ctx func (_m *CommitStoreReader) IsDown(ctx context.Context) (bool, error) { ret := _m.Called(ctx) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/onramp_reader_mock.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/onramp_reader_mock.go index bb0ee2530d..8888c2afa8 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/onramp_reader_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/mocks/onramp_reader_mock.go @@ -101,6 +101,34 @@ func (_m *OnRampReader) GetSendRequestsBetweenSeqNums(ctx context.Context, seqNu return r0, r1 } +// IsSourceChainHealthy provides a mock function with given fields: ctx +func (_m *OnRampReader) IsSourceChainHealthy(ctx context.Context) (bool, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for IsSourceChainHealthy") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (bool, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // IsSourceCursed provides a mock function with given fields: ctx func (_m *OnRampReader) IsSourceCursed(ctx context.Context) (bool, error) { ret := _m.Called(ctx) diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go index 6af2231299..bbff85f153 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/commit_store.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -341,6 +342,13 @@ func (c *CommitStore) GetLatestPriceEpochAndRound(ctx context.Context) (uint64, return c.commitStore.GetLatestPriceEpochAndRound(&bind.CallOpts{Context: ctx}) } +func (c *CommitStore) IsDestChainHealthy(context.Context) (bool, error) { + if err := c.lp.Healthy(); err != nil { + return false, nil + } + return true, nil +} + func (c *CommitStore) IsDown(ctx context.Context) (bool, error) { unPausedAndHealthy, err := c.commitStore.IsUnpausedAndARMHealthy(&bind.CallOpts{Context: ctx}) if err != nil { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/onramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/onramp.go index e4377c27ee..896e637f31 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/onramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_0_0/onramp.go @@ -161,6 +161,13 @@ func (o *OnRamp) RouterAddress() (cciptypes.Address, error) { return cciptypes.Address(config.Router.String()), nil } +func (o *OnRamp) IsSourceChainHealthy(context.Context) (bool, error) { + if err := o.lp.Healthy(); err != nil { + return false, nil + } + return true, nil +} + func (o *OnRamp) IsSourceCursed(ctx context.Context) (bool, error) { staticConfig, err := o.cachedStaticConfig(ctx) if err != nil { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go index edf24dab7e..bc506355e2 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/commit_store.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -367,6 +368,13 @@ func (c *CommitStore) GetLatestPriceEpochAndRound(ctx context.Context) (uint64, return c.commitStore.GetLatestPriceEpochAndRound(&bind.CallOpts{Context: ctx}) } +func (c *CommitStore) IsDestChainHealthy(context.Context) (bool, error) { + if err := c.lp.Healthy(); err != nil { + return false, nil + } + return true, nil +} + func (c *CommitStore) IsDown(ctx context.Context) (bool, error) { unPausedAndHealthy, err := c.commitStore.IsUnpausedAndARMHealthy(&bind.CallOpts{Context: ctx}) if err != nil { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/onramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/onramp.go index 95d0bff989..fbe82aa82b 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/onramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_2_0/onramp.go @@ -179,6 +179,13 @@ func (o *OnRamp) RouterAddress() (cciptypes.Address, error) { return cciptypes.Address(config.Router.String()), nil } +func (o *OnRamp) IsSourceChainHealthy(context.Context) (bool, error) { + if err := o.lp.Healthy(); err != nil { + return false, nil + } + return true, nil +} + func (o *OnRamp) IsSourceCursed(ctx context.Context) (bool, error) { staticConfig, err := o.cachedStaticConfig(ctx) if err != nil { diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/onramp.go b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/onramp.go index b8b131e78b..1ca0e03f2e 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/onramp.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdata/v1_5_0/onramp.go @@ -179,6 +179,13 @@ func (o *OnRamp) RouterAddress() (cciptypes.Address, error) { return ccipcalc.EvmAddrToGeneric(config.Router), nil } +func (o *OnRamp) IsSourceChainHealthy(context.Context) (bool, error) { + if err := o.lp.Healthy(); err != nil { + return false, nil + } + return true, nil +} + func (o *OnRamp) IsSourceCursed(ctx context.Context) (bool, error) { staticConfig, err := o.cachedStaticConfig(ctx) if err != nil { diff --git a/core/services/ocr2/plugins/ccip/vars.go b/core/services/ocr2/plugins/ccip/vars.go index 1eabd6dcbe..a44f5e41d6 100644 --- a/core/services/ocr2/plugins/ccip/vars.go +++ b/core/services/ocr2/plugins/ccip/vars.go @@ -11,4 +11,4 @@ const ( ExecPluginLabel = "exec" ) -var ErrChainPausedOrCursed = errors.New("commitStoreReader is down or source chain is cursed") +var ErrChainIsNotHealthy = errors.New("lane processing is stopped because of healthcheck failure, please see crit logs")