From d7bcf302809c2e957501aa77b72eb773d0e940f4 Mon Sep 17 00:00:00 2001 From: krehermann Date: Thu, 8 Feb 2024 10:27:25 -0700 Subject: [PATCH] address comments --- core/services/ocr2/plugins/mercury/plugin.go | 247 +++++++++++------- .../ocr2/plugins/mercury/plugin_test.go | 8 +- plugins/cmd/chainlink-mercury/README.md | 2 +- 3 files changed, 165 insertions(+), 92 deletions(-) diff --git a/core/services/ocr2/plugins/mercury/plugin.go b/core/services/ocr2/plugins/mercury/plugin.go index cb07dfe3492..c5eba78b0d8 100644 --- a/core/services/ocr2/plugins/mercury/plugin.go +++ b/core/services/ocr2/plugins/mercury/plugin.go @@ -10,12 +10,13 @@ import ( libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - "github.com/smartcontractkit/chainlink-common/pkg/loop" - commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" relaymercuryv1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1" relaymercuryv2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2" relaymercuryv3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3" + "github.com/smartcontractkit/chainlink-common/pkg/loop" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services" @@ -96,99 +97,45 @@ func NewServices( saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth()) srvs = append(srvs, saver) - loopEnabled, loopCmd := env.MercuryPlugin.Cmd.Get() != "", env.MercuryPlugin.Cmd.Get() - // this is the factory that will be used to create the mercury plugin - var factory ocr3types.MercuryPluginFactory + var ( + factory ocr3types.MercuryPluginFactory + factoryServices []job.ServiceCtx + ) + fCfg := factoryCfg{ + orm: orm, + pipelineRunner: pipelineRunner, + jb: jb, + lggr: lggr, + saver: saver, + chEnhancedTelem: chEnhancedTelem, + ocr2Provider: ocr2Provider, + reportingPluginConfig: pluginConfig, + cfg: cfg, + feedID: feedID, + } switch feedID.Version() { case 1: - ds := mercuryv1.NewDataSource( - orm, - pipelineRunner, - jb, - *jb.PipelineSpec, - lggr, - saver, - chEnhancedTelem, - ocr2Provider.MercuryChainReader(), - ocr2Provider.MercuryServerFetcher(), - pluginConfig.InitialBlockNumber.Ptr(), - feedID, - ) - - if loopEnabled { - cmdFn, opts, mercuryLggr, err2 := initLoop(loopCmd, cfg, feedID, lggr) - if err2 != nil { - abort() - return nil, fmt.Errorf("failed to init loop for feed %s: %w", feedID, err2) - } - // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle - factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, ocr2Provider, ds) - srvs = append(srvs, factoryServer) - // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle - factory = factoryServer - } else { - factory = relaymercuryv1.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV1()) + factory, factoryServices, err = newv1factory(fCfg) + if err != nil { + abort() + return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err) } + srvs = append(srvs, factoryServices...) case 2: - ds := mercuryv2.NewDataSource( - orm, - pipelineRunner, - jb, - *jb.PipelineSpec, - feedID, - lggr, - saver, - chEnhancedTelem, - ocr2Provider.MercuryServerFetcher(), - *pluginConfig.LinkFeedID, - *pluginConfig.NativeFeedID, - ) - - if loopEnabled { - cmdFn, opts, mercuryLggr, err2 := initLoop(loopCmd, cfg, feedID, lggr) - if err2 != nil { - abort() - return nil, fmt.Errorf("failed to init loop for feed %s: %w", feedID, err2) - } - // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle - factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, ocr2Provider, ds) - srvs = append(srvs, factoryServer) - // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle - factory = factoryServer - } else { - factory = relaymercuryv2.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV2()) + factory, factoryServices, err = newv2factory(fCfg) + if err != nil { + abort() + return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err) } + srvs = append(srvs, factoryServices...) case 3: - ds := mercuryv3.NewDataSource( - orm, - pipelineRunner, - jb, - *jb.PipelineSpec, - feedID, - lggr, - saver, - chEnhancedTelem, - ocr2Provider.MercuryServerFetcher(), - *pluginConfig.LinkFeedID, - *pluginConfig.NativeFeedID, - ) - - if loopEnabled { - cmdFn, opts, mercuryLggr, err2 := initLoop(loopCmd, cfg, feedID, lggr) - if err2 != nil { - abort() - return nil, fmt.Errorf("failed to init loop for feed %s: %w", feedID, err2) - } - // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle - factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, ocr2Provider, ds) - srvs = append(srvs, factoryServer) - // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle - factory = factoryServer - } else { - factory = relaymercuryv3.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV3()) + factory, factoryServices, err = newv3factory(fCfg) + if err != nil { + abort() + return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err) } - + srvs = append(srvs, factoryServices...) default: return nil, errors.Errorf("unknown Mercury report schema version: %d", feedID.Version()) } @@ -202,8 +149,132 @@ func NewServices( return srvs, nil } +type factoryCfg struct { + orm types.DataSourceORM + pipelineRunner pipeline.Runner + jb job.Job + lggr logger.Logger + saver *ocrcommon.RunResultSaver + chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData + ocr2Provider commontypes.MercuryProvider + reportingPluginConfig config.PluginConfig + cfg Config + feedID utils.FeedID +} + +func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) { + var factory ocr3types.MercuryPluginFactory + srvs := make([]job.ServiceCtx, 0) + + ds := mercuryv3.NewDataSource( + factoryCfg.orm, + factoryCfg.pipelineRunner, + factoryCfg.jb, + *factoryCfg.jb.PipelineSpec, + factoryCfg.feedID, + factoryCfg.lggr, + factoryCfg.saver, + factoryCfg.chEnhancedTelem, + factoryCfg.ocr2Provider.MercuryServerFetcher(), + *factoryCfg.reportingPluginConfig.LinkFeedID, + *factoryCfg.reportingPluginConfig.NativeFeedID, + ) + + loopCmd := env.MercuryPlugin.Cmd.Get() + loopEnabled := loopCmd != "" + + if loopEnabled { + cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + if err != nil { + return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv3.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV3()) + } + return factory, srvs, nil +} + +func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) { + var factory ocr3types.MercuryPluginFactory + srvs := make([]job.ServiceCtx, 0) + + ds := mercuryv2.NewDataSource( + factoryCfg.orm, + factoryCfg.pipelineRunner, + factoryCfg.jb, + *factoryCfg.jb.PipelineSpec, + factoryCfg.feedID, + factoryCfg.lggr, + factoryCfg.saver, + factoryCfg.chEnhancedTelem, + factoryCfg.ocr2Provider.MercuryServerFetcher(), + *factoryCfg.reportingPluginConfig.LinkFeedID, + *factoryCfg.reportingPluginConfig.NativeFeedID, + ) + + loopCmd := env.MercuryPlugin.Cmd.Get() + loopEnabled := loopCmd != "" + + if loopEnabled { + cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + if err != nil { + return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv2.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV2()) + } + return factory, srvs, nil +} + +func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) { + var factory ocr3types.MercuryPluginFactory + srvs := make([]job.ServiceCtx, 0) + + ds := mercuryv1.NewDataSource( + factoryCfg.orm, + factoryCfg.pipelineRunner, + factoryCfg.jb, + *factoryCfg.jb.PipelineSpec, + factoryCfg.lggr, + factoryCfg.saver, + factoryCfg.chEnhancedTelem, + factoryCfg.ocr2Provider.MercuryChainReader(), + factoryCfg.ocr2Provider.MercuryServerFetcher(), + factoryCfg.reportingPluginConfig.InitialBlockNumber.Ptr(), + factoryCfg.feedID, + ) + + loopCmd := env.MercuryPlugin.Cmd.Get() + loopEnabled := loopCmd != "" + + if loopEnabled { + cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr) + if err != nil { + return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err) + } + // in loopp mode, the factory is grpc server, and we need to handle the server lifecycle + factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds) + srvs = append(srvs, factoryServer) + // adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle + factory = factoryServer + } else { + factory = relaymercuryv1.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV1()) + } + return factory, srvs, nil +} + func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) { - lggr.Debug("Initializing loop for Mercury with command", cmd) + lggr.Debugw("Initializing Mercury loop", "command", cmd) mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String()) envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get()) if err != nil { diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index 45cdef51fa5..4e6d4d82a7e 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -10,15 +10,16 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/smartcontractkit/chainlink/v2/core/config/env" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink-common/pkg/loop" commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" v1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1" v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" - "github.com/smartcontractkit/chainlink/v2/core/config/env" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury" @@ -198,6 +199,7 @@ func TestNewServices(t *testing.T) { // we are only varying the version via feedID (and the plugin config) // this wrapper supplies dummy values for the rest of the arguments func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID) ([]job.ServiceCtx, error) { + t.Helper() jb := testJob jb.OCR2OracleSpec.PluginConfig = pluginConfig return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID) diff --git a/plugins/cmd/chainlink-mercury/README.md b/plugins/cmd/chainlink-mercury/README.md index 8cf382fa6a8..89775cfe911 100644 --- a/plugins/cmd/chainlink-mercury/README.md +++ b/plugins/cmd/chainlink-mercury/README.md @@ -1,4 +1,4 @@ -This directory house the Mercury LOOPP +This directory houses the Mercury LOOPP # Running Integration Tests Locally