Skip to content

Commit

Permalink
Moving cache refresh to the bg worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Mar 8, 2024
1 parent f6b2afc commit 6434aee
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 118 deletions.
12 changes: 4 additions & 8 deletions core/services/ocr2/plugins/ccip/ccipcommit/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) {
pluginConfig, backfillArgs, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...)
pluginConfig, backfillArgs, err := jobSpecToCommitPluginConfig(ctx, lggr, jb, pr, chainSet, qopts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb
return multiErr
}

func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, error) {
func jobSpecToCommitPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, error) {
params, err := extractJobSpecParams(jb, chainSet)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -205,16 +205,12 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run

chainHealthcheck := cache.NewObservedChainHealthCheck(
cache.NewChainHealthcheck(
// Adding more details to Logger to make healthcheck logs more informative
// It's safe because healthcheck logs only in case of unhealthy state
ctx,
lggr.With(
"onramp", onrampAddress,
"commitStore", params.commitStoreAddress,
"offramp", params.pluginConfig.OffRamp,
),
onRampReader,
commitStoreReader,
),
), onRampReader, commitStoreReader),
ccip.CommitPluginLabel,
params.sourceChain.ID().Int64(),
params.destChain.ID().Int64(),
Expand Down
8 changes: 4 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *CommitReportingPlugin) Query(context.Context, types.ReportTimestamp) (t
// the observation will be considered invalid and rejected.
func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query) (types.Observation, error) {
lggr := r.lggr.Named("CommitObservation")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, true); err != nil {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx); err != nil {
return nil, err
} else if !healthy {
return nil, ccip.ErrChainIsNotHealthy
Expand Down Expand Up @@ -360,7 +360,7 @@ func (r *CommitReportingPlugin) getLatestGasPriceUpdate(ctx context.Context, now
func (r *CommitReportingPlugin) Report(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
now := time.Now()
lggr := r.lggr.Named("CommitReport")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx); err != nil {
return false, nil, err
} else if !healthy {
return false, nil, ccip.ErrChainIsNotHealthy
Expand Down Expand Up @@ -675,7 +675,7 @@ func (r *CommitReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Context,
return false, nil
}

if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, false); err1 != nil {
if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
Expand All @@ -702,7 +702,7 @@ func (r *CommitReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Context
if err != nil {
return false, err
}
if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, true); err1 != nil {
if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestCommitReportingPlugin_Observation(t *testing.T) {
p.sourceNative = sourceNativeTokenAddr
p.gasPriceEstimator = gasPriceEstimator
p.metricsCollector = ccip.NoopMetricsCollector
p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, onRampReader, commitStoreReader)
p.chainHealthcheck = cache.NewChainHealthcheck(ctx, p.lggr, onRampReader, commitStoreReader)

obs, err := p.Observation(ctx, tc.epochAndRound, types.Query{})

Expand Down Expand Up @@ -203,7 +203,7 @@ func TestCommitReportingPlugin_Report(t *testing.T) {
offRampReader.On("GetTokens", ctx).Return(cciptypes.OffRampTokens{}, nil).Maybe()
destPriceRegReader.On("GetFeeTokens", ctx).Return(nil, nil).Maybe()
chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthcheck.On("IsHealthy", ctx, false).Return(true, nil).Maybe()
chainHealthcheck.On("IsHealthy", ctx).Return(true, nil).Maybe()
p.chainHealthcheck = chainHealthcheck

o := ccip.CommitObservation{Interval: cciptypes.CommitStoreInterval{Min: 1, Max: 1}, SourceGasPriceUSD: big.NewInt(0)}
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
commitStoreReader.On("GetExpectedNextSequenceNumber", mock.Anything).Return(onChainSeqNum, nil)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx, false).Return(true, nil)
chainHealthCheck.On("IsHealthy", ctx).Return(true, nil)
p.chainHealthcheck = chainHealthCheck

// stale since report interval is behind on chain seq num
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestCommitReportingPlugin_ShouldAcceptFinalizedReport(t *testing.T) {
assert.NoError(t, err)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx, false).Return(true, nil)
chainHealthCheck.On("IsHealthy", ctx).Return(true, nil)
p.chainHealthcheck = chainHealthCheck

shouldAccept, err := p.ShouldAcceptFinalizedReport(ctx, types.ReportTimestamp{}, encodedReport)
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestCommitReportingPlugin_ShouldTransmitAcceptedReport(t *testing.T) {
p.lggr = logger.TestLogger(t)

chainHealthCheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthCheck.On("IsHealthy", ctx, true).Return(true, nil).Maybe()
chainHealthCheck.On("IsHealthy", ctx).Return(true, nil).Maybe()
p.chainHealthcheck = chainHealthCheck

t.Run("should transmit when report is not stale", func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J

chainHealthcheck := cache.NewObservedChainHealthCheck(
cache.NewChainHealthcheck(
ctx,
// Adding more details to Logger to make healthcheck logs more informative
// It's safe because healthcheck logs only in case of unhealthy state
lggr.With(
Expand Down
8 changes: 4 additions & 4 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *ExecutionReportingPlugin) Query(context.Context, types.ReportTimestamp)

func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp types.ReportTimestamp, query types.Query) (types.Observation, error) {
lggr := r.lggr.Named("ExecutionObservation")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, true); err != nil {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx); err != nil {
return nil, err
} else if !healthy {
return nil, ccip.ErrChainIsNotHealthy
Expand Down Expand Up @@ -765,7 +765,7 @@ func (r *ExecutionReportingPlugin) buildReport(ctx context.Context, lggr logger.

func (r *ExecutionReportingPlugin) Report(ctx context.Context, timestamp types.ReportTimestamp, query types.Query, observations []types.AttributedObservation) (bool, types.Report, error) {
lggr := r.lggr.Named("ExecutionReport")
if healthy, err := r.chainHealthcheck.IsHealthy(ctx, false); err != nil {
if healthy, err := r.chainHealthcheck.IsHealthy(ctx); err != nil {
return false, nil, err
} else if !healthy {
return false, nil, ccip.ErrChainIsNotHealthy
Expand Down Expand Up @@ -862,7 +862,7 @@ func (r *ExecutionReportingPlugin) ShouldAcceptFinalizedReport(ctx context.Conte
}
lggr = lggr.With("messageIDs", ccipcommon.GetMessageIDsAsHexString(execReport.Messages))

if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, false); err1 != nil {
if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
Expand Down Expand Up @@ -893,7 +893,7 @@ func (r *ExecutionReportingPlugin) ShouldTransmitAcceptedReport(ctx context.Cont
}
lggr = lggr.With("messageIDs", ccipcommon.GetMessageIDsAsHexString(execReport.Messages))

if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx, true); err1 != nil {
if healthy, err1 := r.chainHealthcheck.IsHealthy(ctx); err1 != nil {
return false, err1
} else if !healthy {
return false, ccip.ErrChainIsNotHealthy
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
p.sourcePriceRegistryProvider = mockOnRampPriceRegistryProvider

p.snoozedRoots = cache.NewSnoozedRoots(time.Minute, time.Minute)
p.chainHealthcheck = cache.NewChainHealthcheck(p.lggr, mockOnRampReader, commitStoreReader)
p.chainHealthcheck = cache.NewChainHealthcheck(ctx, p.lggr, mockOnRampReader, commitStoreReader)

_, err = p.Observation(ctx, types.ReportTimestamp{}, types.Query{})
if tc.expErr {
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestExecutionReportingPlugin_Report(t *testing.T) {

p.commitStoreReader = ccipdatamocks.NewCommitStoreReader(t)
chainHealthcheck := ccipcachemocks.NewChainHealthcheck(t)
chainHealthcheck.On("IsHealthy", ctx, false).Return(true, nil)
chainHealthcheck.On("IsHealthy", ctx).Return(true, nil)
p.chainHealthcheck = chainHealthcheck

observations := make([]types.AttributedObservation, len(tc.observations))
Expand Down
127 changes: 74 additions & 53 deletions core/services/ocr2/plugins/ccip/internal/cache/chain_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,91 +25,93 @@ import (
// and is cached for a certain period of time based on defaultGlobalStatusDuration.
// This may lead to some false-positives, but in this case we want to be extra cautious and avoid executing any reorged messages.
//
// Additionally, to reduce the number of calls to the RPC, we cache RMN curse state for a certain period of
// time based on defaultRmnStatusDuration.
// Additionally, to reduce the number of calls to the RPC, we refresh RMN state in the background based on defaultRMNStateRefreshInterval
//
//go:generate mockery --quiet --name ChainHealthcheck --filename chain_health_mock.go --case=underscore
type ChainHealthcheck interface {
// IsHealthy checks if the chain is healthy and returns true if it is, false otherwise
// If forceRefresh is set to true, it will refresh the RMN curse state. Should be used in the Observation and ShouldTransmit phases of OCR2.
// Otherwise, it will use the cached value of the RMN curse state.
IsHealthy(ctx context.Context, forceRefresh bool) (bool, error)
IsHealthy(ctx context.Context) (bool, error)
}

const (
// RMN curse state is refreshed every 20 seconds or when ForceIsHealthy is called
defaultRmnStatusDuration = 20 * time.Second
defaultGlobalStatusDuration = 30 * time.Minute
defaultRMNStateRefreshInterval = 10 * time.Second
defaultGlobalStatusDuration = 30 * time.Minute

globalStatusKey = "globalStatus"
rmnStatusKey = "rmnCurseCheck"
)

type chainHealthcheck struct {
cache *cache.Cache
globalStatusKey string
rmnStatusKey string
globalStatusExpiration time.Duration
rmnStatusExpiration time.Duration
cache *cache.Cache
globalStatusKey string
rmnStatusKey string
globalStatusExpiration time.Duration
rmnStatusRefreshInterval time.Duration

lggr logger.Logger
onRamp ccipdata.OnRampReader
commitStore ccipdata.CommitStoreReader
}

func NewChainHealthcheck(
lggr logger.Logger,
onRamp ccipdata.OnRampReader,
commitStore ccipdata.CommitStoreReader,
) *chainHealthcheck {
return &chainHealthcheck{
cache: cache.New(defaultRmnStatusDuration, 0),
globalStatusKey: globalStatusKey,
rmnStatusKey: rmnStatusKey,
globalStatusExpiration: defaultGlobalStatusDuration,
rmnStatusExpiration: defaultRmnStatusDuration,

lggr: lggr,
onRamp: onRamp,
commitStore: commitStore,
}
func NewChainHealthcheck(ctx context.Context, lggr logger.Logger, onRamp ccipdata.OnRampReader, commitStore ccipdata.CommitStoreReader) *chainHealthcheck {
return newChainHealthcheckWithCustomEviction(
ctx,
lggr,
onRamp,
commitStore,
defaultGlobalStatusDuration,
defaultRMNStateRefreshInterval,
)
}

func newChainHealthcheckWithCustomEviction(
lggr logger.Logger,
onRamp ccipdata.OnRampReader,
commitStore ccipdata.CommitStoreReader,
globalStatusDuration time.Duration,
rmnStatusDuration time.Duration,
) *chainHealthcheck {
return &chainHealthcheck{
cache: cache.New(rmnStatusDuration, 0),
rmnStatusKey: rmnStatusKey,
globalStatusKey: globalStatusKey,
globalStatusExpiration: globalStatusDuration,
rmnStatusExpiration: rmnStatusDuration,
func newChainHealthcheckWithCustomEviction(ctx context.Context, lggr logger.Logger, onRamp ccipdata.OnRampReader, commitStore ccipdata.CommitStoreReader, globalStatusDuration time.Duration, rmnStatusRefreshInterval time.Duration) *chainHealthcheck {
ch := &chainHealthcheck{
cache: cache.New(rmnStatusRefreshInterval, 0),
rmnStatusKey: rmnStatusKey,
globalStatusKey: globalStatusKey,
globalStatusExpiration: globalStatusDuration,
rmnStatusRefreshInterval: rmnStatusRefreshInterval,

lggr: lggr,
onRamp: onRamp,
commitStore: commitStore,
}
ch.spawnBackgroundRefresher(ctx)
return ch
}

type rmnResponse struct {
healthy bool
err error
}

func (c *chainHealthcheck) IsHealthy(ctx context.Context, forceRefresh bool) (bool, error) {
func (c *chainHealthcheck) IsHealthy(ctx context.Context) (bool, error) {
// Verify if flag is raised to indicate that the chain is not healthy
// If set to false then immediately return false without checking the chain
if healthy, found := c.cache.Get(c.globalStatusKey); found && !healthy.(bool) {
return false, nil
if cachedValue, found := c.cache.Get(c.globalStatusKey); found {
healthy, ok := cachedValue.(bool)
// If cached value is properly casted to bool and not healthy it means the sticky flag is raised
// and should be returned immediately
if !ok {
c.lggr.Criticalw("Failed to cast cached value to sticky healthcheck", "value", cachedValue)
} else if ok && !healthy {
return false, nil
}
}

// These checks are cheap and don't require any communication with the database or RPC
if healthy, err := c.checkIfReadersAreHealthy(ctx); err != nil {
return false, err
} else if !healthy {
c.cache.Set(c.globalStatusKey, false, c.globalStatusExpiration)
return healthy, nil
}

if healthy, err := c.checkIfRMNsAreHealthy(ctx, forceRefresh); err != nil {
// First call might initialize cache if it's not initialized yet. Otherwise, it will use the cached value
if healthy, err := c.checkIfRMNsAreHealthy(ctx); err != nil {
return false, err
} else if !healthy {
c.cache.Set(c.globalStatusKey, false, c.globalStatusExpiration)
Expand All @@ -118,6 +120,26 @@ func (c *chainHealthcheck) IsHealthy(ctx context.Context, forceRefresh bool) (bo
return true, nil
}

func (c *chainHealthcheck) spawnBackgroundRefresher(ctx context.Context) {
ticker := time.NewTicker(c.rmnStatusRefreshInterval)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Refresh the RMN state
c.refresh(ctx)
}
}
}()
}

func (c *chainHealthcheck) refresh(ctx context.Context) {
healthy, err := c.fetchRMNCurseState(ctx)
c.cache.Set(c.rmnStatusKey, rmnResponse{healthy, err}, -1)
}

// checkIfReadersAreHealthy checks if the source and destination chains are healthy by calling underlying LogPoller
// These calls are cheap because they don't require any communication with the database or RPC, so we don't have
// to cache the result of these calls.
Expand All @@ -142,20 +164,19 @@ func (c *chainHealthcheck) checkIfReadersAreHealthy(ctx context.Context) (bool,
return sourceChainHealthy && destChainHealthy, nil
}

func (c *chainHealthcheck) checkIfRMNsAreHealthy(ctx context.Context, forceFetch bool) (bool, error) {
if !forceFetch {
if healthy, found := c.cache.Get(c.rmnStatusKey); found {
return healthy.(bool), nil
func (c *chainHealthcheck) checkIfRMNsAreHealthy(ctx context.Context) (bool, error) {
if cachedValue, found := c.cache.Get(c.rmnStatusKey); found {
rmn, ok := cachedValue.(rmnResponse)
if ok {
return rmn.healthy, rmn.err
}
c.lggr.Criticalw("Failed to cast cached value to RMN response", "response", rmn)
}

// If the value is not found in the cache, fetch the RMN curse state in a sync manner for the first time
healthy, err := c.fetchRMNCurseState(ctx)
if err != nil {
return false, err
}

c.cache.Set(c.rmnStatusKey, healthy, c.rmnStatusExpiration)
return healthy, nil
c.cache.Set(c.rmnStatusKey, rmnResponse{healthy, err}, -1)
return healthy, err
}

func (c *chainHealthcheck) fetchRMNCurseState(ctx context.Context) (bool, error) {
Expand Down
Loading

0 comments on commit 6434aee

Please sign in to comment.