diff --git a/core/services/chainlink/relayer_factory.go b/core/services/chainlink/relayer_factory.go index 49582e2d52a..8c8aca0aff2 100644 --- a/core/services/chainlink/relayer_factory.go +++ b/core/services/chainlink/relayer_factory.go @@ -127,15 +127,12 @@ func (r *RelayerFactory) NewSolana(ks keystore.Solana, chainCfgs solana.TOMLConf return nil, fmt.Errorf("failed to marshal Solana configs: %w", err) } - solCmdFn, err := plugins.NewCmdFactory(r.Register, plugins.CmdConfig{ - ID: relayID.Name(), - Cmd: cmdName, - }) + rl, err := r.Register(relayID.Name(), cmdName) if err != nil { return nil, fmt.Errorf("failed to create Solana LOOP command: %w", err) } - solanaRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, solCmdFn, string(cfgTOML), signer) + solanaRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, rl.Cmd, string(cfgTOML), signer) } else { // fallback to embedded chain @@ -196,16 +193,13 @@ func (r *RelayerFactory) NewStarkNet(ks keystore.StarkNet, chainCfgs config.TOML return nil, fmt.Errorf("failed to marshal StarkNet configs: %w", err) } - starknetCmdFn, err := plugins.NewCmdFactory(r.Register, plugins.CmdConfig{ - ID: relayID.Name(), - Cmd: cmdName, - }) + rl, err := r.Register(relayID.Name(), cmdName) if err != nil { return nil, fmt.Errorf("failed to create StarkNet LOOP command: %w", err) } // the starknet relayer service has a delicate keystore dependency. the value that is passed to NewRelayerService must // be compatible with instantiating a starknet transaction manager KeystoreAdapter within the LOOPp executable. - starknetRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, starknetCmdFn, string(cfgTOML), loopKs) + starknetRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, rl.Cmd, string(cfgTOML), loopKs) } else { // fallback to embedded chain opts := starkchain.ChainOpts{ diff --git a/core/services/job/spawner_test.go b/core/services/job/spawner_test.go index 3e8ccbab848..df1f94fc560 100644 --- a/core/services/job/spawner_test.go +++ b/core/services/job/spawner_test.go @@ -300,7 +300,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database()) mailMon := servicetest.Run(t, mailboxtest.NewMonitor(t)) - processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil }) + processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name, cmd string) (*plugins.RegisteredLoop, error) { return nil, nil }) ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig) d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig, diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 2ea99d65cfe..551f3bf9583 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -588,7 +588,7 @@ func (d *Delegate) newServicesGenericPlugin( } pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID()) - cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command) + loop, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command) if err != nil { return nil, fmt.Errorf("failed to register loop: %w", err) } @@ -632,7 +632,7 @@ func (d *Delegate) newServicesGenericPlugin( pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner) ta := generic.NewTelemetryAdapter(d.monitoringEndpointGen) - plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog) + plugin := reportingplugins.NewLOOPPService(pluginLggr, d.cfg.GRPCOpts(), loop.Cmd, pluginConfig, providerClientConn, pr, ta, errorLog) oracleArgs.ReportingPluginFactory = plugin srvs = append(srvs, plugin) diff --git a/core/services/ocr2/plugins/median/services.go b/core/services/ocr2/plugins/median/services.go index 4adfc306d64..5b55cab9d18 100644 --- a/core/services/ocr2/plugins/median/services.go +++ b/core/services/ocr2/plugins/median/services.go @@ -162,13 +162,13 @@ func NewMedianServices(ctx context.Context, if medianLoopEnabled { // use unique logger names so we can use it to register a loop medianLggr := lggr.Named("Median").Named(spec.ContractID).Named(spec.GetID()) - cmdFn, telem, err2 := cfg.RegisterLOOP(medianLggr.Name(), medianPluginCmd) + loopp, err2 := cfg.RegisterLOOP(medianLggr.Name(), medianPluginCmd) if err2 != nil { err = fmt.Errorf("failed to register loop: %w", err2) abort() return } - median := loop.NewMedianService(lggr, telem, cmdFn, medianProvider, dataSource, juelsPerFeeCoinSource, errorLog) + median := loop.NewMedianService(lggr, cfg.GRPCOpts(), loopp.Cmd, medianProvider, dataSource, juelsPerFeeCoinSource, errorLog) argsNoPlugin.ReportingPluginFactory = median srvs = append(srvs, median) } else { diff --git a/core/web/loop_registry_test.go b/core/web/loop_registry_test.go index 93eda32d0e8..b28f625bd50 100644 --- a/core/web/loop_registry_test.go +++ b/core/web/loop_registry_test.go @@ -76,15 +76,15 @@ func TestLoopRegistry(t *testing.T) { // note we expect this to be an ordered result expectedLabels := []model.LabelSet{ - model.LabelSet{"__metrics_path__": model.LabelValue(expectedCoreEndPoint)}, - model.LabelSet{"__metrics_path__": model.LabelValue(expectedLooppEndPoint)}, + {"__metrics_path__": model.LabelValue(expectedCoreEndPoint)}, + {"__metrics_path__": model.LabelValue(expectedLooppEndPoint)}, } require.NoError(t, app.KeyStore.OCR().Add(cltest.DefaultOCRKey)) require.NoError(t, app.Start(testutils.Context(t))) // register a mock loop - loop, err := app.GetLoopRegistry().Register("mockLoopImpl") + loop, err := app.GetLoopRegistry().Register("mockLoopImpl", "./COMMAND") require.NoError(t, err) require.NotNil(t, loop) require.Len(t, app.GetLoopRegistry().List(), 1) diff --git a/plugins/config.go b/plugins/config.go index 01574d82099..9e356ccc5b7 100644 --- a/plugins/config.go +++ b/plugins/config.go @@ -1,39 +1,35 @@ package plugins import ( - "os/exec" - "github.com/smartcontractkit/chainlink-common/pkg/loop" ) // RegistrarConfig generates contains static configuration inher type RegistrarConfig interface { - RegisterLOOP(loopId string, cmdName string) (func() *exec.Cmd, loop.GRPCOpts, error) + RegisterLOOP(loopId string, cmdName string) (*RegisteredLoop, error) + GRPCOpts() loop.GRPCOpts } -type registarConfig struct { +type registrarConfig struct { grpcOpts loop.GRPCOpts - loopRegistrationFn func(loopId string) (*RegisteredLoop, error) + loopRegistrationFn func(id, cmd string) (*RegisteredLoop, error) } // NewRegistrarConfig creates a RegistarConfig // loopRegistrationFn must act as a global registry function of LOOPs and must be idempotent. // The [func() *exec.Cmd] for a LOOP should be generated by calling [RegistrarConfig.RegisterLOOP] -func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(loopId string) (*RegisteredLoop, error)) RegistrarConfig { - return ®istarConfig{ +func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(id, cmd string) (*RegisteredLoop, error)) RegistrarConfig { + return ®istrarConfig{ grpcOpts: grpcOpts, loopRegistrationFn: loopRegistrationFn, } } // RegisterLOOP calls the configured loopRegistrationFn. The loopRegistrationFn must act as a global registry for LOOPs and must be idempotent. -func (pc *registarConfig) RegisterLOOP(loopID string, cmdName string) (func() *exec.Cmd, loop.GRPCOpts, error) { - cmdFn, err := NewCmdFactory(pc.loopRegistrationFn, CmdConfig{ - ID: loopID, - Cmd: cmdName, - }) - if err != nil { - return nil, loop.GRPCOpts{}, err - } - return cmdFn, pc.grpcOpts, nil +func (pc *registrarConfig) RegisterLOOP(id, cmd string) (*RegisteredLoop, error) { + return pc.loopRegistrationFn(id, cmd) +} + +func (pc *registrarConfig) GRPCOpts() loop.GRPCOpts { + return pc.grpcOpts } diff --git a/plugins/config_test.go b/plugins/config_test.go new file mode 100644 index 00000000000..22d70eff682 --- /dev/null +++ b/plugins/config_test.go @@ -0,0 +1,33 @@ +package plugins + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func TestRegistrarConfig(t *testing.T) { + mockCfgTracing := &MockCfgTracing{} + registry := make(map[string]*RegisteredLoop) + + // Create a LoopRegistry instance with mockCfgTracing + loopRegistry := &LoopRegistry{ + lggr: logger.TestLogger(t), + registry: registry, + cfgTracing: mockCfgTracing, + } + + opts := loop.GRPCOpts{} + rConf := NewRegistrarConfig(opts, loopRegistry.Register) + + assert.Equal(t, opts, rConf.GRPCOpts()) + + id := "command-id" + rl, err := rConf.RegisterLOOP(id, "./COMMAND") + require.NoError(t, err) + assert.Equal(t, registry[id], rl) +} diff --git a/plugins/loop_registry.go b/plugins/loop_registry.go index 7a5274803d6..e5a6b758e8e 100644 --- a/plugins/loop_registry.go +++ b/plugins/loop_registry.go @@ -2,6 +2,7 @@ package plugins import ( "errors" + "os/exec" "sort" "sync" @@ -20,6 +21,13 @@ var ErrExists = errors.New("plugin already registered") type RegisteredLoop struct { Name string EnvCfg loop.EnvConfig + cmd string +} + +func (r *RegisteredLoop) Cmd() *exec.Cmd { + cmd := exec.Command(r.cmd) //#nosec G204 -- we control the value of the cmd so the lint/sec error is a false positive + cmd.Env = append(cmd.Env, r.EnvCfg.AsCmdEnv()...) + return cmd } // LoopRegistry is responsible for assigning ports to plugins that are to be used for the @@ -42,7 +50,7 @@ func NewLoopRegistry(lggr logger.Logger, tracingConfig config.Tracing) *LoopRegi // Register creates a port of the plugin. It is not idempotent. Duplicate calls to Register will return [ErrExists] // Safe for concurrent use. -func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) { +func (m *LoopRegistry) Register(id string, cmd string) (*RegisteredLoop, error) { m.mu.Lock() defer m.mu.Unlock() @@ -60,7 +68,7 @@ func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) { envCfg.TracingAttributes = m.cfgTracing.Attributes() } - m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg} + m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg, cmd: cmd} m.lggr.Debugf("Registered loopp %q with config %v, port %d", id, envCfg, envCfg.PrometheusPort) return m.registry[id], nil } diff --git a/plugins/loop_registry_test.go b/plugins/loop_registry_test.go index b307469e09b..ba3c9cb0312 100644 --- a/plugins/loop_registry_test.go +++ b/plugins/loop_registry_test.go @@ -3,6 +3,7 @@ package plugins import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -11,16 +12,16 @@ import ( func TestPluginPortManager(t *testing.T) { // register one m := NewLoopRegistry(logger.TestLogger(t), nil) - pFoo, err := m.Register("foo") + pFoo, err := m.Register("foo", "./COMMAND") require.NoError(t, err) require.Equal(t, "foo", pFoo.Name) require.Greater(t, pFoo.EnvCfg.PrometheusPort, 0) // test duplicate - pNil, err := m.Register("foo") + pNil, err := m.Register("foo", "./COMMAND") require.ErrorIs(t, err, ErrExists) require.Nil(t, pNil) // ensure increasing port assignment - pBar, err := m.Register("bar") + pBar, err := m.Register("bar", "./COMMAND") require.NoError(t, err) require.Equal(t, "bar", pBar.Name) require.Equal(t, pFoo.EnvCfg.PrometheusPort+1, pBar.EnvCfg.PrometheusPort) @@ -51,7 +52,7 @@ func TestLoopRegistry_Register(t *testing.T) { } // Test case 1: Register new loop - registeredLoop, err := loopRegistry.Register("testID") + registeredLoop, err := loopRegistry.Register("testID", "./COMMAND") require.Nil(t, err) require.Equal(t, "testID", registeredLoop.Name) require.True(t, registeredLoop.EnvCfg.TracingEnabled) @@ -59,4 +60,8 @@ func TestLoopRegistry_Register(t *testing.T) { require.Equal(t, map[string]string{"attribute": "value"}, registeredLoop.EnvCfg.TracingAttributes) require.Equal(t, 0.1, registeredLoop.EnvCfg.TracingSamplingRatio) require.Equal(t, "/path/to/cert.pem", registeredLoop.EnvCfg.TracingTLSCertPath) + + gc := registeredLoop.Cmd() + assert.Equal(t, "./COMMAND", gc.Path) + assert.ElementsMatch(t, registeredLoop.EnvCfg.AsCmdEnv(), gc.Env) } diff --git a/plugins/utils.go b/plugins/utils.go deleted file mode 100644 index 5e5e4142e86..00000000000 --- a/plugins/utils.go +++ /dev/null @@ -1,25 +0,0 @@ -package plugins - -import ( - "fmt" - "os/exec" -) - -// CmdConfig is configuration used to register the LOOP and generate an exec -type CmdConfig struct { - ID string // unique string used by the node to track the LOOP. typically supplied by the loop logger name - Cmd string // string value of executable to exec -} - -// NewCmdFactory is helper to ensure synchronization between the loop registry and os cmd to exec the LOOP -func NewCmdFactory(register func(id string) (*RegisteredLoop, error), lcfg CmdConfig) (func() *exec.Cmd, error) { - registeredLoop, err := register(lcfg.ID) - if err != nil { - return nil, fmt.Errorf("failed to register %s LOOP plugin: %w", lcfg.ID, err) - } - return func() *exec.Cmd { - cmd := exec.Command(lcfg.Cmd) //#nosec G204 -- we control the value of the cmd so the lint/sec error is a false positive - cmd.Env = append(cmd.Env, registeredLoop.EnvCfg.AsCmdEnv()...) - return cmd - }, nil -}