From 230a35956211060aafdf1f2a127f3b6d4fc51df6 Mon Sep 17 00:00:00 2001 From: patrickhuie19 Date: Tue, 9 Jul 2024 17:22:15 -0400 Subject: [PATCH] chainHealthcheck + token background worker now managed by reporting plugin factory --- .../ocr2/plugins/ccip/ccipexec/factory.go | 44 ++++++++++++--- .../plugins/ccip/ccipexec/initializers.go | 12 ++-- .../ccip/internal/cache/chain_health.go | 13 ++++- .../internal/cache/mocks/chain_health_mock.go | 56 +++++++++++++++++++ .../internal/cache/observed_chain_health.go | 15 +++++ .../ocr2/plugins/ccip/tokendata/bgworker.go | 13 ++++- core/services/relay/evm/exec_provider.go | 6 ++ plugins/cmd/chainlink-ccip-exec/main.go | 2 +- 8 files changed, 139 insertions(+), 22 deletions(-) diff --git a/core/services/ocr2/plugins/ccip/ccipexec/factory.go b/core/services/ocr2/plugins/ccip/ccipexec/factory.go index e24185472d..39fec901be 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/factory.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/factory.go @@ -5,6 +5,10 @@ import ( "fmt" "sync" + "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/observability" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata" @@ -27,6 +31,8 @@ type ExecutionReportingPluginFactory struct { destPriceRegReader ccipdata.PriceRegistryReader destPriceRegAddr cciptypes.Address readersMu *sync.Mutex + + services []services.Service } func (rf *ExecutionReportingPluginFactory) Name() string { @@ -34,14 +40,28 @@ func (rf *ExecutionReportingPluginFactory) Name() string { panic("implement me") } -func (rf *ExecutionReportingPluginFactory) Start(ctx context.Context) error { - //TODO implement me - panic("implement me") +// Start is used to run chainHealthcheck and tokenDataWorker, which were previously passed +// back to the delegate as top level job.ServiceCtx to be managed in core alongside the reporting +// plugin factory +func (rf *ExecutionReportingPluginFactory) Start(ctx context.Context) (err error) { + rf.readersMu.Lock() + defer rf.readersMu.Unlock() + for _, service := range rf.services { + serviceErr := service.Start(ctx) + err = multierr.Append(err, serviceErr) + } + return } -func (rf *ExecutionReportingPluginFactory) Close() error { - //TODO implement me - panic("implement me") +func (rf *ExecutionReportingPluginFactory) Close() (err error) { + rf.readersMu.Lock() + defer rf.readersMu.Unlock() + for _, service := range rf.services { + closeErr := service.Close() + err = multierr.Append(err, closeErr) + } + + return } func (rf *ExecutionReportingPluginFactory) Ready() error { @@ -161,6 +181,7 @@ func NewExecutionReportingPluginFactoryV2(ctx context.Context, lggr logger.Logge chainHealthcheck: chainHealthcheck, newReportingPluginRetryConfig: defaultNewReportingPluginRetryConfig, }, + services: []services.Service{chainHealthcheck, tokenBackgroundWorker}, readersMu: &sync.Mutex{}, // the fields below are initially empty and populated on demand @@ -180,7 +201,7 @@ func NewExecutionReportingPluginFactory(config ExecutionPluginStaticConfig) *Exe } } -func (rf *ExecutionReportingPluginFactory) UpdateDynamicReaders(ctx context.Context, newPriceRegAddr cciptypes.Address) error { +func (rf *ExecutionReportingPluginFactory) UpdateDynamicReaders(_ context.Context, newPriceRegAddr cciptypes.Address) error { rf.readersMu.Lock() defer rf.readersMu.Unlock() // TODO: Investigate use of Close() to cleanup. @@ -216,7 +237,7 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor initialRetryDelay := rf.config.newReportingPluginRetryConfig.InitialDelay maxDelay := rf.config.newReportingPluginRetryConfig.MaxDelay - pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.NewReportingPluginFn(config), initialRetryDelay, maxDelay) + pluginAndInfo, err := ccipcommon.RetryUntilSuccess(rf.newReportingPluginFn(config), initialRetryDelay, maxDelay) if err != nil { return nil, types.ReportingPluginInfo{}, err } @@ -226,10 +247,15 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor // NewReportingPluginFn implements the NewReportingPlugin logic. It is defined as a function so that it can easily be // retried via RetryUntilSuccess. NewReportingPlugin must return successfully in order for the Exec plugin to function, // hence why we can only keep retrying it until it succeeds. -func (rf *ExecutionReportingPluginFactory) NewReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { +func (rf *ExecutionReportingPluginFactory) newReportingPluginFn(config types.ReportingPluginConfig) func() (reportingPluginAndInfo, error) { return func() (reportingPluginAndInfo, error) { ctx := context.Background() // todo: consider setting a timeout + // Start the chainHealthcheck and tokenDataWorker services + // Using Start, while a bit more obtuse, allows us to manage these services + // in the same process as the plugin factory in LOOP mode + err := rf.Start(ctx) + destPriceRegistry, destWrappedNative, err := rf.config.offRampReader.ChangeConfig(ctx, config.OnchainConfig, config.OffchainConfig) if err != nil { return reportingPluginAndInfo{}, err diff --git a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go index 3d6e8ceb22..801027b249 100644 --- a/core/services/ocr2/plugins/ccip/ccipexec/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipexec/initializers.go @@ -56,7 +56,7 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, cfg plugins.Regist loopCmd := env.CCIPExecPlugin.Cmd.Get() loopEnabled := loopCmd != "" - var wrappedPluginFactory *ExecutionReportingPluginFactory + var pluginFactory types.ReportingPluginFactory var err error if loopEnabled { // find loop command @@ -71,15 +71,15 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, cfg plugins.Regist }) // get reporting plugin factory from loop factoryServer := loop.NewExecutionService(lggr, grpcOpts, cmdFn, srcProvider, dstProvider, uint32(srcChainID), uint32(dstChainID), sourceTokenAddress) - // wrap into ExecutionReportingPluginFactory + pluginFactory = factoryServer } else { - wrappedPluginFactory, err = NewExecutionReportingPluginFactoryV2(ctx, lggr, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider) + pluginFactory, err = NewExecutionReportingPluginFactoryV2(ctx, lggr, sourceTokenAddress, srcChainID, dstChainID, srcProvider, dstProvider) if err != nil { return nil, err } } - argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(wrappedPluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(dstChainID)) + argsNoPlugin.ReportingPluginFactory = promwrapper.NewPromFactory(pluginFactory, "CCIPExecution", jb.OCR2OracleSpec.Relay, big.NewInt(0).SetInt64(dstChainID)) argsNoPlugin.Logger = commonlogger.NewOCRWrapper(lggr, true, logError) oracle, err := libocr2.NewOracle(argsNoPlugin) if err != nil { @@ -94,14 +94,10 @@ func NewExecServices(ctx context.Context, lggr logger.Logger, cfg plugins.Regist dstProvider, job.NewServiceAdapter(oracle), ), - wrappedPluginFactory.config.chainHealthcheck, - wrappedPluginFactory.config.tokenDataWorker, }, nil } return []job.ServiceCtx{ job.NewServiceAdapter(oracle), - wrappedPluginFactory.config.chainHealthcheck, - wrappedPluginFactory.config.tokenDataWorker, }, nil } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go b/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go index a58277fbb7..b77571a94c 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/chain_health.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" ) @@ -33,7 +32,7 @@ import ( // //go:generate mockery --quiet --name ChainHealthcheck --filename chain_health_mock.go --case=underscore type ChainHealthcheck interface { - job.ServiceCtx + services.Service IsHealthy(ctx context.Context) (bool, error) } @@ -64,6 +63,16 @@ type chainHealthcheck struct { backgroundCancel context.CancelFunc } +func (c *chainHealthcheck) HealthReport() map[string]error { + //TODO implement me + panic("implement me") +} + +func (c *chainHealthcheck) Name() string { + //TODO implement me + panic("implement me") +} + func NewChainHealthcheck(lggr logger.Logger, onRamp ccipdata.OnRampReader, commitStore ccipdata.CommitStoreReader) *chainHealthcheck { ctx, cancel := context.WithCancel(context.Background()) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go b/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go index b46cf728c6..95e31000c5 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/mocks/chain_health_mock.go @@ -31,6 +31,26 @@ func (_m *ChainHealthcheck) Close() error { return r0 } +// HealthReport provides a mock function with given fields: +func (_m *ChainHealthcheck) HealthReport() map[string]error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for HealthReport") + } + + var r0 map[string]error + if rf, ok := ret.Get(0).(func() map[string]error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]error) + } + } + + return r0 +} + // IsHealthy provides a mock function with given fields: ctx func (_m *ChainHealthcheck) IsHealthy(ctx context.Context) (bool, error) { ret := _m.Called(ctx) @@ -59,6 +79,42 @@ func (_m *ChainHealthcheck) IsHealthy(ctx context.Context) (bool, error) { return r0, r1 } +// Name provides a mock function with given fields: +func (_m *ChainHealthcheck) Name() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Name") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Ready provides a mock function with given fields: +func (_m *ChainHealthcheck) Ready() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Ready") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Start provides a mock function with given fields: _a0 func (_m *ChainHealthcheck) Start(_a0 context.Context) error { ret := _m.Called(_a0) diff --git a/core/services/ocr2/plugins/ccip/internal/cache/observed_chain_health.go b/core/services/ocr2/plugins/ccip/internal/cache/observed_chain_health.go index 941162448a..fe2a03206e 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/observed_chain_health.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/observed_chain_health.go @@ -29,6 +29,21 @@ type ObservedChainHealthcheck struct { laneHealthStatus *prometheus.GaugeVec } +func (o *ObservedChainHealthcheck) Ready() error { + //TODO implement me + panic("implement me") +} + +func (o *ObservedChainHealthcheck) HealthReport() map[string]error { + //TODO implement me + panic("implement me") +} + +func (o *ObservedChainHealthcheck) Name() string { + //TODO implement me + panic("implement me") +} + func NewObservedChainHealthCheck( chainHealthcheck ChainHealthcheck, plugin string, diff --git a/core/services/ocr2/plugins/ccip/tokendata/bgworker.go b/core/services/ocr2/plugins/ccip/tokendata/bgworker.go index 1a74ab2305..699888a088 100644 --- a/core/services/ocr2/plugins/ccip/tokendata/bgworker.go +++ b/core/services/ocr2/plugins/ccip/tokendata/bgworker.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink/v2/core/services/job" ) type msgResult struct { @@ -22,7 +21,7 @@ type msgResult struct { } type Worker interface { - job.ServiceCtx + services.Service // AddJobsFromMsgs will include the provided msgs for background processing. AddJobsFromMsgs(ctx context.Context, msgs []cciptypes.EVM2EVMOnRampCCIPSendRequestedWithMeta) @@ -46,6 +45,16 @@ type BackgroundWorker struct { backgroundCancel context.CancelFunc } +func (w *BackgroundWorker) HealthReport() map[string]error { + //TODO implement me + panic("implement me") +} + +func (w *BackgroundWorker) Name() string { + //TODO implement me + panic("implement me") +} + func NewBackgroundWorker( tokenDataReaders map[cciptypes.Address]Reader, numWorkers int, diff --git a/core/services/relay/evm/exec_provider.go b/core/services/relay/evm/exec_provider.go index 3313386036..12f7d1313e 100644 --- a/core/services/relay/evm/exec_provider.go +++ b/core/services/relay/evm/exec_provider.go @@ -280,6 +280,9 @@ func (d DstExecProvider) NewCommitStoreReader(ctx context.Context, addr cciptype return } +// NewOffRampReader constructs a reader for the offramp contract on the dest chain. +// The offramp address is known when the provider is constructed - by consuming it from the provider instead of at runtime +// we save ourselves wiring it through the execution reporting plugin factory grpc server + client func (d DstExecProvider) NewOffRampReader(ctx context.Context, _ cciptypes.Address) (offRampReader cciptypes.OffRampReader, err error) { offRampReader, err = ccip.NewOffRampReader(d.lggr, d.versionFinder, d.offRampAddress, d.client, d.lp, d.gasEstimator, &d.maxGasPrice, true) return @@ -299,6 +302,9 @@ func (d DstExecProvider) NewTokenDataReader(ctx context.Context, tokenAddress cc return nil, fmt.Errorf("invalid: NewTokenDataReader called on DstExecProvider. It should only be called on SrcExecProvider") } +// NewTokenPoolBatchedReader constructs a batched caller to read token prices from the destination pool. +// The offramp address is known when the provider is constructed - by consuming it from the provider instead of at runtime +// we save ourselves wiring it through the execution reporting plugin factory grpc server + client func (d DstExecProvider) NewTokenPoolBatchedReader(ctx context.Context, _ cciptypes.Address, sourceChainSelector uint64) (tokenPoolBatchedReader cciptypes.TokenPoolBatchedReader, err error) { batchCaller := ccip.NewDynamicLimitedBatchCaller( d.lggr, diff --git a/plugins/cmd/chainlink-ccip-exec/main.go b/plugins/cmd/chainlink-ccip-exec/main.go index 965eeded52..d184dbe131 100644 --- a/plugins/cmd/chainlink-ccip-exec/main.go +++ b/plugins/cmd/chainlink-ccip-exec/main.go @@ -23,7 +23,7 @@ func main() { defer close(stop) plugin.Serve(&plugin.ServeConfig{ - HandshakeConfig: loop.PluginMedianHandshakeConfig(), + HandshakeConfig: loop.PluginCCIPExecutionHandshakeConfig(), Plugins: map[string]plugin.Plugin{ loop.CCIPExecutionLOOPName: &loop.ExecutionLoop{ PluginServer: p,