Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Feb 8, 2024
1 parent 88ef4e0 commit d7bcf30
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 92 deletions.
247 changes: 159 additions & 88 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
relaymercuryv1 "github.com/smartcontractkit/chainlink-data-streams/mercury/v1"
relaymercuryv2 "github.com/smartcontractkit/chainlink-data-streams/mercury/v2"
relaymercuryv3 "github.com/smartcontractkit/chainlink-data-streams/mercury/v3"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
Expand Down Expand Up @@ -96,99 +97,45 @@ func NewServices(
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
srvs = append(srvs, saver)

loopEnabled, loopCmd := env.MercuryPlugin.Cmd.Get() != "", env.MercuryPlugin.Cmd.Get()

// this is the factory that will be used to create the mercury plugin
var factory ocr3types.MercuryPluginFactory
var (
factory ocr3types.MercuryPluginFactory
factoryServices []job.ServiceCtx
)
fCfg := factoryCfg{
orm: orm,
pipelineRunner: pipelineRunner,
jb: jb,
lggr: lggr,
saver: saver,
chEnhancedTelem: chEnhancedTelem,
ocr2Provider: ocr2Provider,
reportingPluginConfig: pluginConfig,
cfg: cfg,
feedID: feedID,
}
switch feedID.Version() {
case 1:
ds := mercuryv1.NewDataSource(
orm,
pipelineRunner,
jb,
*jb.PipelineSpec,
lggr,
saver,
chEnhancedTelem,
ocr2Provider.MercuryChainReader(),
ocr2Provider.MercuryServerFetcher(),
pluginConfig.InitialBlockNumber.Ptr(),
feedID,
)

if loopEnabled {
cmdFn, opts, mercuryLggr, err2 := initLoop(loopCmd, cfg, feedID, lggr)
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to init loop for feed %s: %w", feedID, err2)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv1.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV1())
factory, factoryServices, err = newv1factory(fCfg)
if err != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err)
}
srvs = append(srvs, factoryServices...)
case 2:
ds := mercuryv2.NewDataSource(
orm,
pipelineRunner,
jb,
*jb.PipelineSpec,
feedID,
lggr,
saver,
chEnhancedTelem,
ocr2Provider.MercuryServerFetcher(),
*pluginConfig.LinkFeedID,
*pluginConfig.NativeFeedID,
)

if loopEnabled {
cmdFn, opts, mercuryLggr, err2 := initLoop(loopCmd, cfg, feedID, lggr)
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to init loop for feed %s: %w", feedID, err2)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv2.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV2())
factory, factoryServices, err = newv2factory(fCfg)
if err != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err)
}
srvs = append(srvs, factoryServices...)
case 3:
ds := mercuryv3.NewDataSource(
orm,
pipelineRunner,
jb,
*jb.PipelineSpec,
feedID,
lggr,
saver,
chEnhancedTelem,
ocr2Provider.MercuryServerFetcher(),
*pluginConfig.LinkFeedID,
*pluginConfig.NativeFeedID,
)

if loopEnabled {
cmdFn, opts, mercuryLggr, err2 := initLoop(loopCmd, cfg, feedID, lggr)
if err2 != nil {
abort()
return nil, fmt.Errorf("failed to init loop for feed %s: %w", feedID, err2)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv3.NewFactory(ds, lggr, ocr2Provider.OnchainConfigCodec(), ocr2Provider.ReportCodecV3())
factory, factoryServices, err = newv3factory(fCfg)
if err != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err)
}

srvs = append(srvs, factoryServices...)
default:
return nil, errors.Errorf("unknown Mercury report schema version: %d", feedID.Version())
}
Expand All @@ -202,8 +149,132 @@ func NewServices(
return srvs, nil
}

type factoryCfg struct {
orm types.DataSourceORM
pipelineRunner pipeline.Runner
jb job.Job
lggr logger.Logger
saver *ocrcommon.RunResultSaver
chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData
ocr2Provider commontypes.MercuryProvider
reportingPluginConfig config.PluginConfig
cfg Config
feedID utils.FeedID
}

func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

ds := mercuryv3.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
factoryCfg.jb,
*factoryCfg.jb.PipelineSpec,
factoryCfg.feedID,
factoryCfg.lggr,
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv3.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV3())
}
return factory, srvs, nil
}

func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

ds := mercuryv2.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
factoryCfg.jb,
*factoryCfg.jb.PipelineSpec,
factoryCfg.feedID,
factoryCfg.lggr,
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryServerFetcher(),
*factoryCfg.reportingPluginConfig.LinkFeedID,
*factoryCfg.reportingPluginConfig.NativeFeedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv2.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV2())
}
return factory, srvs, nil
}

func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.ServiceCtx, error) {
var factory ocr3types.MercuryPluginFactory
srvs := make([]job.ServiceCtx, 0)

ds := mercuryv1.NewDataSource(
factoryCfg.orm,
factoryCfg.pipelineRunner,
factoryCfg.jb,
*factoryCfg.jb.PipelineSpec,
factoryCfg.lggr,
factoryCfg.saver,
factoryCfg.chEnhancedTelem,
factoryCfg.ocr2Provider.MercuryChainReader(),
factoryCfg.ocr2Provider.MercuryServerFetcher(),
factoryCfg.reportingPluginConfig.InitialBlockNumber.Ptr(),
factoryCfg.feedID,
)

loopCmd := env.MercuryPlugin.Cmd.Get()
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
factory = relaymercuryv1.NewFactory(ds, factoryCfg.lggr, factoryCfg.ocr2Provider.OnchainConfigCodec(), factoryCfg.ocr2Provider.ReportCodecV1())
}
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
lggr.Debug("Initializing loop for Mercury with command", cmd)
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions core/services/ocr2/plugins/mercury/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"
v1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1"
v2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"
v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3"
"github.com/smartcontractkit/chainlink/v2/core/config/env"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"

mercuryocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/mercury"

Expand Down Expand Up @@ -198,6 +199,7 @@ func TestNewServices(t *testing.T) {
// we are only varying the version via feedID (and the plugin config)
// this wrapper supplies dummy values for the rest of the arguments
func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID utils.FeedID) ([]job.ServiceCtx, error) {
t.Helper()
jb := testJob
jb.OCR2OracleSpec.PluginConfig = pluginConfig
return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID)
Expand Down
2 changes: 1 addition & 1 deletion plugins/cmd/chainlink-mercury/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
This directory house the Mercury LOOPP
This directory houses the Mercury LOOPP

# Running Integration Tests Locally

Expand Down

0 comments on commit d7bcf30

Please sign in to comment.