Skip to content

Commit

Permalink
CCIP-1704 More robust checks of RMN/Chain every phase (#583)
Browse files Browse the repository at this point in the history
More robust way of verifying if CCIP should halt processing, it's based
on four items:
1. Source chain is healthy (this is verified by checking if source
LogPoller saw finality violation)
2. Dest chain is healthy (this is verified by checking if destination
LogPoller saw finality violation)
3. CommitStore is down (this is verified by checking if CommitStore is
down and destination RMN is not cursed)
4. Source chain is cursed (this is verified by checking if source RMN is
not cursed)

Whenever any of the above checks fail, the chain is considered unhealthy
and the CCIP should stop
processing messages. Additionally, when the chain is unhealthy, this
information is considered "sticky"
and is cached for 30 minutes. This may lead to some false-positives, but
in this case, we want to be extra cautious and avoid executing any
reorged messages.

Health checks are now verified in every OCR2 phase and during
Observation and ShouldTransmit we enforce reading data from chain.
Additionally, to reduce the number of calls to the RPC, we cache RMN
curse state for 20 seconds.
  • Loading branch information
mateusz-sekara authored Mar 7, 2024
1 parent b7ab1d1 commit f683950
Show file tree
Hide file tree
Showing 21 changed files with 694 additions and 142 deletions.
5 changes: 4 additions & 1 deletion core/services/ocr2/plugins/ccip/ccipcommit/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
)
Expand Down Expand Up @@ -84,20 +85,22 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin

pluginOffChainConfig := rf.config.commitStore.OffchainConfig()

lggr := rf.config.lggr.Named("CommitReportingPlugin")
return &CommitReportingPlugin{
sourceChainSelector: rf.config.sourceChainSelector,
sourceNative: rf.config.sourceNative,
onRampReader: rf.config.onRampReader,
commitStoreReader: rf.config.commitStore,
priceGetter: rf.config.priceGetter,
F: config.F,
lggr: rf.config.lggr.Named("CommitReportingPlugin"),
lggr: lggr,
inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry),
destPriceRegistryReader: rf.destPriceRegReader,
offRampReader: rf.config.offRamp,
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
offchainConfig: pluginOffChainConfig,
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: cache.NewChainHealthcheck(lggr, rf.config.onRampReader, rf.config.commitStore),
},
types.ReportingPluginInfo{
Name: "CCIPCommit",
Expand Down
33 changes: 29 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
Expand Down Expand Up @@ -85,11 +86,17 @@ type CommitReportingPlugin struct {
priceGetter pricegetter.PriceGetter
metricsCollector ccip.PluginMetricsCollector
// State
inflightReports *inflightCommitReportsContainer
inflightReports *inflightCommitReportsContainer
chainHealthcheck cache.ChainHealthcheck
}

// Query is not used by the CCIP Commit plugin.
func (r *CommitReportingPlugin) Query(context.Context, types.ReportTimestamp) (types.Query, error) {
func (r *CommitReportingPlugin) Query(ctx context.Context, _ types.ReportTimestamp) (types.Query, error) {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil {
return nil, err
} else if !healthy {
return nil, ccip.ErrChainIsNotHealthy
}
return types.Query{}, nil
}

Expand All @@ -99,8 +106,10 @@ func (r *CommitReportingPlugin) Query(context.Context, types.ReportTimestamp) (t
// the observation will be considered invalid and rejected.
func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query) (types.Observation, error) {
lggr := r.lggr.Named("CommitObservation")
if err := ccipcommon.VerifyNotDown(ctx, r.lggr, r.commitStoreReader, r.onRampReader); err != nil {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, true); err != nil {
return nil, err
} else if !healthy {
return nil, ccip.ErrChainIsNotHealthy
}
r.inflightReports.expire(lggr)

Expand Down Expand Up @@ -355,6 +364,12 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
now := time.Now()
lggr := r.lggr.Named("CommitReport")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil {
return false, nil, err
} else if !healthy {
return false, nil, ccip.ErrChainIsNotHealthy
}

parsableObservations := ccip.GetParsableObservations[ccip.CommitObservation](lggr, observations)

feeTokens, bridgeableTokens, err := ccipcommon.GetDestinationTokens(ctx, r.offRampReader, r.destPriceRegistryReader)
Expand Down Expand Up @@ -650,7 +665,6 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context,
if err != nil {
return false, err
}

lggr := r.lggr.Named("CommitShouldAcceptFinalizedReport").With(
"merkleRoot", parsedReport.MerkleRoot,
"minSeqNum", parsedReport.Interval.Min,
Expand All @@ -665,6 +679,12 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context,
return false, nil
}

if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, false); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
}

if r.isStaleReport(ctx, lggr, parsedReport, true, reportTimestamp) {
lggr.Infow("Rejecting stale report")
return false, nil
Expand All @@ -686,6 +706,11 @@ func (r *CommitReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context
if err != nil {
return false, err
}
if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, true); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
}
// If report is not stale we transmit.
// When the commitTransmitter enqueues the tx for tx manager,
// we mark it as fulfilled, effectively removing it from the set of inflight messages.
Expand Down
28 changes: 28 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes"
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache"
ccipcachemocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/cache/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcalc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata/factory"
Expand Down Expand Up @@ -109,11 +111,13 @@ func TestCommitReportingPlugin_Observation(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
commitStoreReader := ccipdatamocks.NewCommitStoreReader(t)
commitStoreReader.On("IsDown", ctx).Return(tc.commitStorePaused, nil)
commitStoreReader.On("IsDestChainHealthy", ctx).Return(true, nil)
if !tc.commitStorePaused && !tc.sourceChainCursed {
commitStoreReader.On("GetExpectedNextSequenceNumber", ctx).Return(tc.commitStoreSeqNum, nil)
}

onRampReader := ccipdatamocks.NewOnRampReader(t)
onRampReader.On("IsSourceChainHealthy", ctx).Return(true, nil)
onRampReader.On("IsSourceCursed", ctx).Return(tc.sourceChainCursed, nil)
if len(tc.sendReqs) > 0 {
onRampReader.On("GetSendRequestsBetweenSeqNums", ctx, tc.commitStoreSeqNum, tc.commitStoreSeqNum+OnRampMessagesScanLimit, true).
Expand Down Expand Up @@ -164,6 +168,7 @@ func TestCommitReportingPlugin_Observation(t *testing.T) {
p.sourceNative = sourceNativeTokenAddr
p.gasPriceEstimator = gasPriceEstimator
p.metricsCollector = ccip.NoopMetricsCollector
p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, onRampReader, commitStoreReader)

obs, err := p.Observation(ctx, tc.epochAndRound, types.Query{})

Expand Down Expand Up @@ -197,6 +202,9 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.destPriceRegistryReader = destPriceRegReader
offRampReader.On("GetTokens", ctx).Return(cciptypes.OffRampTokens{}, nil).Maybe()
destPriceRegReader.On("GetFeeTokens", ctx).Return(nil, nil).Maybe()
chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthcheck.On("IsHealthy", ctx, false).Return(true, nil).Maybe()
p.chainHealthcheck = chainHealthcheck

o := ccip.CommitObservation{Interval: cciptypes.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)}
obs, err := o.Marshal()
Expand Down Expand Up @@ -325,6 +333,9 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
commitStoreReader, err := v1_2_0.NewCommitStore(logger.TestLogger(t), utils.RandomAddress(), nil, lp, nil)
assert.NoError(t, err)

healthCheck := ccipcachemocks.NewChainHealthcheck(t)
healthCheck.On("IsHealthy", ctx, false).Return(true, nil)

p := &CommitReportingPlugin{}
p.lggr = logger.TestLogger(t)
p.inflightReports = newInflightCommitReportsContainer(time.Minute)
Expand All @@ -337,6 +348,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
p.commitStoreReader = commitStoreReader
p.F = tc.f
p.metricsCollector = ccip.NoopMetricsCollector
p.chainHealthcheck = healthCheck

aos := make([]types.AttributedObservation, 0, len(tc.observations))
for _, o := range tc.observations {
Expand Down Expand Up @@ -397,6 +409,10 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
p.commitStoreReader = commitStoreReader
commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx).Return(true, nil).Maybe()
p.chainHealthcheck = chainHealthCheck

encodedReport, err := encodeCommitReport(report)
assert.NoError(t, err)
shouldAccept, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport)
Expand All @@ -422,6 +438,10 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
commitStoreReader.On("DecodeCommitReport", mock.Anything).Return(report, nil)
commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(onChainSeqNum, nil)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx, false).Return(true, nil)
p.chainHealthcheck = chainHealthCheck

// stale since report interval is behind on chain seq num
report.Interval = cciptypes.CommitStoreInterval{Min: onChainSeqNum - 2, Max: onChainSeqNum + 10}
encodedReport, err := encodeCommitReport(report)
Expand Down Expand Up @@ -471,6 +491,10 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
encodedReport, err := encodeCommitReport(report)
assert.NoError(t, err)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx, false).Return(true, nil)
p.chainHealthcheck = chainHealthCheck

shouldAccept, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport)
assert.NoError(t, err)
assert.True(t, shouldAccept)
Expand Down Expand Up @@ -506,6 +530,10 @@ func TestCommitReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) {
p.inflightReports = newInflightCommitReportsContainer(time.Minute)
p.lggr = logger.TestLogger(t)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx, true).Return(true, nil).Maybe()
p.chainHealthcheck = chainHealthCheck

t.Run("should transmit when report is not stale", func(t *testing.T) {
// not-stale since report interval is not behind on chain seq num
report.Interval = cciptypes.CommitStoreInterval{Min: onChainSeqNum, Max: onChainSeqNum + 10}
Expand Down
5 changes: 3 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
}

offchainConfig := rf.config.offRampReader.OffchainConfig()

lggr := rf.config.lggr.Named("ExecutionReportingPlugin")
return &ExecutionReportingPlugin{
F: config.F,
lggr: rf.config.lggr.Named("ExecutionReportingPlugin"),
lggr: lggr,
offchainConfig: offchainConfig,
tokenDataWorker: rf.config.tokenDataWorker,
gasPriceEstimator: rf.config.offRampReader.GasPriceEstimator(),
Expand All @@ -90,6 +90,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
inflightReports: newInflightExecReportsContainer(offchainConfig.InflightCacheExpiry.Duration()),
snoozedRoots: cache.NewSnoozedRoots(rf.config.offRampReader.OnchainConfig().PermissionLessExecutionThresholdSeconds, offchainConfig.RootSnoozeTime.Duration()),
metricsCollector: rf.config.metricsCollector,
chainHealthcheck: cache.NewChainHealthcheck(lggr, rf.config.onRampReader, rf.config.commitStoreReader),
}, types.ReportingPluginInfo{
Name: "CCIPExecution",
// Setting this to false saves on calldata since OffRamp doesn't require agreement between NOPs
Expand Down
31 changes: 27 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,26 @@ type ExecutionReportingPlugin struct {
tokenPoolBatchedReader batchreader.TokenPoolBatchedReader

// State
inflightReports *inflightExecReportsContainer
snoozedRoots cache.SnoozedRoots
inflightReports *inflightExecReportsContainer
snoozedRoots cache.SnoozedRoots
chainHealthcheck cache.ChainHealthcheck
}

func (r *ExecutionReportingPlugin) Query(context.Context, types.ReportTimestamp) (types.Query, error) {
func (r *ExecutionReportingPlugin) Query(ctx context.Context, _ types.ReportTimestamp) (types.Query, error) {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil {
return nil, err
} else if !healthy {
return nil, ccip.ErrChainIsNotHealthy
}
return types.Query{}, nil
}

func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp types.ReportTimestamp, query types.Query) (types.Observation, error) {
lggr := r.lggr.Named("ExecutionObservation")
if err := ccipcommon.VerifyNotDown(ctx, r.lggr, r.commitStoreReader, r.onRampReader); err != nil {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, true); err != nil {
return nil, err
} else if !healthy {
return nil, ccip.ErrChainIsNotHealthy
}

// Ensure that the source price registry is synchronized with the onRamp.
Expand Down Expand Up @@ -761,6 +769,11 @@ func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.

func (r *ExecutionReportingPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp, query types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
lggr := r.lggr.Named("ExecutionReport")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil {
return false, nil, err
} else if !healthy {
return false, nil, ccip.ErrChainIsNotHealthy
}
parsableObservations := ccip.GetParsableObservations[ccip.ExecutionObservation](lggr, observations)
// Need at least F+1 observations
if len(parsableObservations) <= r.F {
Expand Down Expand Up @@ -853,6 +866,11 @@ func (r *ExecutionReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Conte
}
lggr = lggr.With("messageIDs", ccipcommon.GetMessageIDsAsHexString(execReport.Messages))

if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, false); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
}
// If the first message is executed already, this execution report is stale, and we do not accept it.
stale, err := r.isStaleReport(ctx, execReport.Messages)
if err != nil {
Expand All @@ -879,6 +897,11 @@ func (r *ExecutionReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Cont
}
lggr = lggr.With("messageIDs", ccipcommon.GetMessageIDsAsHexString(execReport.Messages))

if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, true); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
}
// If report is not stale we transmit.
// When the executeTransmitter enqueues the tx for tx manager,
// we mark it as execution_sent, removing it from the set of inflight messages.
Expand Down
Loading

0 comments on commit f683950

Please sign in to comment.