From d632c0148f888511d10966fcb883d2f98009638f Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Mon, 11 Mar 2024 18:13:39 +0100 Subject: [PATCH] Using Service interface for wrapping background worker --- .../plugins/ccip/ccipcommit/initializers.go | 56 +++++++++++-------- .../plugins/ccip/ccipexec/initializers.go | 41 ++++++++------ .../ccip/internal/cache/chain_health.go | 5 +- .../ccip/internal/cache/chain_health_test.go | 2 +- .../internal/cache/mocks/chain_health_mock.go | 36 ++++++++++++ 5 files changed, 97 insertions(+), 43 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 6eb7e5af6a..c1ed5c83c0 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -43,7 +43,7 @@ import ( ) func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, pr pipeline.Runner, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { - pluginConfig, backfillArgs, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...) + pluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToCommitPluginConfig(lggr, jb, pr, chainSet, qopts...) if err != nil { return nil, err } @@ -60,16 +60,22 @@ func NewCommitServices(ctx context.Context, lggr logger.Logger, jb job.Job, chai } // If this is a brand-new job, then we make use of the start blocks. If not then we're rebooting and log poller will pick up where we left off. if new { - return []job.ServiceCtx{oraclelib.NewBackfilledOracle( - pluginConfig.lggr, - backfillArgs.SourceLP, - backfillArgs.DestLP, - backfillArgs.SourceStartBlock, - backfillArgs.DestStartBlock, - job.NewServiceAdapter(oracle)), + return []job.ServiceCtx{ + oraclelib.NewBackfilledOracle( + pluginConfig.lggr, + backfillArgs.SourceLP, + backfillArgs.DestLP, + backfillArgs.SourceStartBlock, + backfillArgs.DestStartBlock, + job.NewServiceAdapter(oracle), + ), + chainHealthcheck, }, nil } - return []job.ServiceCtx{job.NewServiceAdapter(oracle)}, nil + return []job.ServiceCtx{ + job.NewServiceAdapter(oracle), + chainHealthcheck, + }, nil } func CommitReportToEthTxMeta(typ ccipconfig.ContractType, ver semver.Version) (func(report []byte) (*txmgr.TxMeta, error), error) { @@ -108,10 +114,10 @@ func UnregisterCommitPluginLpFilters(ctx context.Context, lggr logger.Logger, jb return multiErr } -func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, error) { +func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Runner, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*CommitPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) { params, err := extractJobSpecParams(jb, chainSet) if err != nil { - return nil, nil, err + return nil, nil, nil, err } lggr.Infow("Initializing commit plugin", @@ -124,11 +130,11 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run versionFinder := factory.NewEvmVersionFinder() commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, params.commitStoreAddress, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), qopts...) if err != nil { - return nil, nil, errors.Wrap(err, "could not create commitStore reader") + return nil, nil, nil, errors.Wrap(err, "could not create commitStore reader") } sourceChainName, destChainName, err := ccipconfig.ResolveChainNames(params.sourceChain.ID().Int64(), params.destChain.ID().Int64()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } commitLggr := lggr.Named("CCIPCommit").With("sourceChain", sourceChainName, "destChain", destChainName) @@ -137,12 +143,12 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run if withPipeline { priceGetter, err = pricegetter.NewPipelineGetter(params.pluginConfig.TokenPricesUSDPipeline, pr, jb.ID, jb.ExternalJobID, jb.Name.ValueOrZero(), lggr) if err != nil { - return nil, nil, fmt.Errorf("creating pipeline price getter: %w", err) + return nil, nil, nil, fmt.Errorf("creating pipeline price getter: %w", err) } } else { // Use dynamic price getter. if params.pluginConfig.PriceGetterConfig == nil { - return nil, nil, fmt.Errorf("priceGetterConfig is nil") + return nil, nil, nil, fmt.Errorf("priceGetterConfig is nil") } // Build price getter clients for all chains specified in the aggregator configurations. @@ -153,7 +159,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run // Retrieve the chain. chain, _, err2 := ccipconfig.GetChainByChainID(chainSet, chainID) if err2 != nil { - return nil, nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2) + return nil, nil, nil, fmt.Errorf("retrieving chain for chainID %d: %w", chainID, err2) } caller := rpclib.NewDynamicLimitedBatchCaller( lggr, @@ -166,7 +172,7 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run priceGetter, err = pricegetter.NewDynamicPriceGetter(*params.pluginConfig.PriceGetterConfig, priceGetterClients) if err != nil { - return nil, nil, fmt.Errorf("creating dynamic price getter: %w", err) + return nil, nil, nil, fmt.Errorf("creating dynamic price getter: %w", err) } } @@ -174,27 +180,27 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run onrampAddress := cciptypes.Address(params.commitStoreStaticCfg.OnRamp.String()) onRampReader, err := factory.NewOnRampReader(commitLggr, versionFinder, params.commitStoreStaticCfg.SourceChainSelector, params.commitStoreStaticCfg.ChainSelector, onrampAddress, params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...) if err != nil { - return nil, nil, errors.Wrap(err, "failed onramp reader") + return nil, nil, nil, errors.Wrap(err, "failed onramp reader") } offRampReader, err := factory.NewOffRampReader(commitLggr, versionFinder, params.pluginConfig.OffRamp, params.destChain.Client(), params.destChain.LogPoller(), params.destChain.GasEstimator(), true, qopts...) if err != nil { - return nil, nil, errors.Wrap(err, "failed offramp reader") + return nil, nil, nil, errors.Wrap(err, "failed offramp reader") } onRampRouterAddr, err := onRampReader.RouterAddress() if err != nil { - return nil, nil, err + return nil, nil, nil, err } routerAddr, err := ccipcalc.GenericAddrToEvm(onRampRouterAddr) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client()) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sourceNative, err := sourceRouter.GetWrappedNative(nil) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Prom wrappers @@ -245,7 +251,9 @@ func jobSpecToCommitPluginConfig(lggr logger.Logger, jb job.Job, pr pipeline.Run DestLP: params.destChain.LogPoller(), SourceStartBlock: params.pluginConfig.SourceStartBlock, DestStartBlock: params.pluginConfig.DestStartBlock, - }, nil + }, + chainHealthcheck, + nil } type jobSpecParams struct { diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index f396d288db..86d49648c3 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -47,7 +47,7 @@ import ( const numTokenDataWorkers = 5 func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, new bool, argsNoPlugin libocr2.OCR2OracleArgs, logError func(string), qopts ...pg.QOpt) ([]job.ServiceCtx, error) { - execPluginConfig, backfillArgs, err := jobSpecToExecPluginConfig(ctx, lggr, jb, chainSet, qopts...) + execPluginConfig, backfillArgs, chainHealthcheck, err := jobSpecToExecPluginConfig(ctx, lggr, jb, chainSet, qopts...) if err != nil { return nil, err } @@ -71,10 +71,15 @@ func NewExecutionServices(ctx context.Context, lggr logger.Logger, jb job.Job, c backfillArgs.DestLP, backfillArgs.SourceStartBlock, backfillArgs.DestStartBlock, - job.NewServiceAdapter(oracle)), + job.NewServiceAdapter(oracle), + ), + chainHealthcheck, }, nil } - return []job.ServiceCtx{job.NewServiceAdapter(oracle)}, nil + return []job.ServiceCtx{ + job.NewServiceAdapter(oracle), + chainHealthcheck, + }, nil } // UnregisterExecPluginLpFilters unregisters all the registered filters for both source and dest chains. @@ -153,10 +158,10 @@ func initTokenDataProviders(lggr logger.Logger, jobID string, pluginConfig ccipc return tokenDataProviders, nil } -func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *ccipcommon.BackfillArgs, error) { +func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.Job, chainSet legacyevm.LegacyChainContainer, qopts ...pg.QOpt) (*ExecutionPluginStaticConfig, *ccipcommon.BackfillArgs, *cache.ObservedChainHealthcheck, error) { params, err := extractJobSpecParams(lggr, jb, chainSet, true, qopts...) if err != nil { - return nil, nil, err + return nil, nil, nil, err } lggr.Infow("Initializing exec plugin", @@ -172,39 +177,39 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J sourceChainName, destChainName, err := ccipconfig.ResolveChainNames(sourceChainID, destChainID) if err != nil { - return nil, nil, err + return nil, nil, nil, err } execLggr := lggr.Named("CCIPExecution").With("sourceChain", sourceChainName, "destChain", destChainName) onRampReader, err := factory.NewOnRampReader(execLggr, versionFinder, params.offRampConfig.SourceChainSelector, params.offRampConfig.ChainSelector, params.offRampConfig.OnRamp, params.sourceChain.LogPoller(), params.sourceChain.Client(), qopts...) if err != nil { - return nil, nil, errors.Wrap(err, "create onramp reader") + return nil, nil, nil, errors.Wrap(err, "create onramp reader") } dynamicOnRampConfig, err := onRampReader.GetDynamicConfig() if err != nil { - return nil, nil, errors.Wrap(err, "get onramp dynamic config") + return nil, nil, nil, errors.Wrap(err, "get onramp dynamic config") } routerAddr, err := ccipcalc.GenericAddrToEvm(dynamicOnRampConfig.Router) if err != nil { - return nil, nil, err + return nil, nil, nil, err } sourceRouter, err := router.NewRouter(routerAddr, params.sourceChain.Client()) if err != nil { - return nil, nil, errors.Wrap(err, "failed loading source router") + return nil, nil, nil, errors.Wrap(err, "failed loading source router") } sourceWrappedNative, err := sourceRouter.GetWrappedNative(&bind.CallOpts{}) if err != nil { - return nil, nil, errors.Wrap(err, "could not get source native token") + return nil, nil, nil, errors.Wrap(err, "could not get source native token") } commitStoreReader, err := factory.NewCommitStoreReader(lggr, versionFinder, params.offRampConfig.CommitStore, params.destChain.Client(), params.destChain.LogPoller(), params.sourceChain.GasEstimator(), qopts...) if err != nil { - return nil, nil, errors.Wrap(err, "could not load commitStoreReader reader") + return nil, nil, nil, errors.Wrap(err, "could not load commitStoreReader reader") } tokenDataProviders, err := initTokenDataProviders(lggr, jobIDToString(jb.ID), params.pluginConfig, params.sourceChain.LogPoller(), qopts...) if err != nil { - return nil, nil, errors.Wrap(err, "could not get token data providers") + return nil, nil, nil, errors.Wrap(err, "could not get token data providers") } // Prom wrappers @@ -215,11 +220,11 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J destChainSelector, err := chainselectors.SelectorFromChainId(uint64(destChainID)) if err != nil { - return nil, nil, fmt.Errorf("get chain %d selector: %w", destChainID, err) + return nil, nil, nil, fmt.Errorf("get chain %d selector: %w", destChainID, err) } sourceChainSelector, err := chainselectors.SelectorFromChainId(uint64(sourceChainID)) if err != nil { - return nil, nil, fmt.Errorf("get chain %d selector: %w", sourceChainID, err) + return nil, nil, nil, fmt.Errorf("get chain %d selector: %w", sourceChainID, err) } execLggr.Infow("Initialized exec plugin", @@ -233,7 +238,7 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J tokenPoolBatchedReader, err := batchreader.NewEVMTokenPoolBatchedReader(execLggr, sourceChainSelector, offRampReader.Address(), batchCaller) if err != nil { - return nil, nil, fmt.Errorf("new token pool batched reader: %w", err) + return nil, nil, nil, fmt.Errorf("new token pool batched reader: %w", err) } chainHealthcheck := cache.NewObservedChainHealthCheck( @@ -278,7 +283,9 @@ func jobSpecToExecPluginConfig(ctx context.Context, lggr logger.Logger, jb job.J DestLP: params.destChain.LogPoller(), SourceStartBlock: params.pluginConfig.SourceStartBlock, DestStartBlock: params.pluginConfig.DestStartBlock, - }, nil + }, + chainHealthcheck, + nil } type jobSpecParams struct { diff --git a/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go b/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go index 0a3830af0f..25d01f83bc 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" ) @@ -32,6 +33,7 @@ import ( // //go:generate mockery --quiet --name ChainHealthcheck --filename chain_health_mock.go --case=underscore type ChainHealthcheck interface { + job.ServiceCtx // IsHealthy checks if the chain is healthy and returns true if it is, false otherwise. IsHealthy(ctx context.Context) (bool, error) } @@ -143,7 +145,7 @@ func (c *chainHealthcheck) IsHealthy(ctx context.Context) (bool, error) { return true, nil } -func (c *chainHealthcheck) Start() error { +func (c *chainHealthcheck) Start(context.Context) error { return c.StateMachine.StartOnce("ChainHealthcheck", func() error { c.lggr.Info("Starting ChainHealthcheck") c.wg.Add(1) @@ -222,6 +224,7 @@ func (c *chainHealthcheck) checkIfRMNsAreHealthy(ctx context.Context) (bool, err } // If the value is not found in the cache, fetch the RMN curse state in a sync manner for the first time + c.lggr.Warnw("Refreshing RMN state from the plugin routine, this should happen only once per lane during boot") return c.refresh(ctx) } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go b/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go index 57d0e74a22..8c0bf0a2c4 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/chain_health_test.go @@ -207,7 +207,7 @@ func Test_RefreshingInBackground(t *testing.T) { 10*time.Microsecond, ) defer chainState.backgroundCancel() - require.NoError(t, chainState.Start()) + require.NoError(t, chainState.Start(tests.Context(t))) // All healthy assertHealthy(t, chainState, true) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go b/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go index 9a1b806c37..69daa7c798 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go @@ -13,6 +13,24 @@ type ChainHealthcheck struct { mock.Mock } +// Close provides a mock function with given fields: +func (_m *ChainHealthcheck) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + // IsHealthy provides a mock function with given fields: ctx func (_m *ChainHealthcheck) IsHealthy(ctx context.Context) (bool, error) { ret := _m.Called(ctx) @@ -41,6 +59,24 @@ func (_m *ChainHealthcheck) IsHealthy(ctx context.Context) (bool, error) { return r0, r1 } +// Start provides a mock function with given fields: _a0 +func (_m *ChainHealthcheck) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewChainHealthcheck creates a new instance of ChainHealthcheck. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewChainHealthcheck(t interface {