Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor loop registry #11807

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,12 @@ func (r *RelayerFactory) NewSolana(ks keystore.Solana, chainCfgs solana.TOMLConf
return nil, fmt.Errorf("failed to marshal Solana configs: %w", err)
}

solCmdFn, err := plugins.NewCmdFactory(r.Register, plugins.CmdConfig{
ID: relayID.Name(),
Cmd: cmdName,
})
rl, err := r.Register(relayID.Name(), cmdName)
if err != nil {
return nil, fmt.Errorf("failed to create Solana LOOP command: %w", err)
}

solanaRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, solCmdFn, string(cfgTOML), signer)
solanaRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, rl.Cmd, string(cfgTOML), signer)

} else {
// fallback to embedded chain
Expand Down Expand Up @@ -196,16 +193,13 @@ func (r *RelayerFactory) NewStarkNet(ks keystore.StarkNet, chainCfgs config.TOML
return nil, fmt.Errorf("failed to marshal StarkNet configs: %w", err)
}

starknetCmdFn, err := plugins.NewCmdFactory(r.Register, plugins.CmdConfig{
ID: relayID.Name(),
Cmd: cmdName,
})
rl, err := r.Register(relayID.Name(), cmdName)
if err != nil {
return nil, fmt.Errorf("failed to create StarkNet LOOP command: %w", err)
}
// the starknet relayer service has a delicate keystore dependency. the value that is passed to NewRelayerService must
// be compatible with instantiating a starknet transaction manager KeystoreAdapter within the LOOPp executable.
starknetRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, starknetCmdFn, string(cfgTOML), loopKs)
starknetRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, rl.Cmd, string(cfgTOML), loopKs)
} else {
// fallback to embedded chain
opts := starkchain.ChainOpts{
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database())
mailMon := servicetest.Run(t, mailboxtest.NewMonitor(t))

processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil })
processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name, cmd string) (*plugins.RegisteredLoop, error) { return nil, nil })
ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig)

d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func (d *Delegate) newServicesGenericPlugin(
}

pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command)
loop, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command)
if err != nil {
return nil, fmt.Errorf("failed to register loop: %w", err)
}
Expand Down Expand Up @@ -632,7 +632,7 @@ func (d *Delegate) newServicesGenericPlugin(
pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner)
ta := generic.NewTelemetryAdapter(d.monitoringEndpointGen)

plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog)
plugin := reportingplugins.NewLOOPPService(pluginLggr, d.cfg.GRPCOpts(), loop.Cmd, pluginConfig, providerClientConn, pr, ta, errorLog)
oracleArgs.ReportingPluginFactory = plugin
srvs = append(srvs, plugin)

Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ func NewMedianServices(ctx context.Context,
if medianLoopEnabled {
// use unique logger names so we can use it to register a loop
medianLggr := lggr.Named("Median").Named(spec.ContractID).Named(spec.GetID())
cmdFn, telem, err2 := cfg.RegisterLOOP(medianLggr.Name(), medianPluginCmd)
loopp, err2 := cfg.RegisterLOOP(medianLggr.Name(), medianPluginCmd)
if err2 != nil {
err = fmt.Errorf("failed to register loop: %w", err2)
abort()
return
}
median := loop.NewMedianService(lggr, telem, cmdFn, medianProvider, dataSource, juelsPerFeeCoinSource, errorLog)
median := loop.NewMedianService(lggr, cfg.GRPCOpts(), loopp.Cmd, medianProvider, dataSource, juelsPerFeeCoinSource, errorLog)
argsNoPlugin.ReportingPluginFactory = median
srvs = append(srvs, median)
} else {
Expand Down
6 changes: 3 additions & 3 deletions core/web/loop_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func TestLoopRegistry(t *testing.T) {

// note we expect this to be an ordered result
expectedLabels := []model.LabelSet{
model.LabelSet{"__metrics_path__": model.LabelValue(expectedCoreEndPoint)},
model.LabelSet{"__metrics_path__": model.LabelValue(expectedLooppEndPoint)},
{"__metrics_path__": model.LabelValue(expectedCoreEndPoint)},
{"__metrics_path__": model.LabelValue(expectedLooppEndPoint)},
}

require.NoError(t, app.KeyStore.OCR().Add(cltest.DefaultOCRKey))
require.NoError(t, app.Start(testutils.Context(t)))

// register a mock loop
loop, err := app.GetLoopRegistry().Register("mockLoopImpl")
loop, err := app.GetLoopRegistry().Register("mockLoopImpl", "NOOP-COMMAND")
require.NoError(t, err)
require.NotNil(t, loop)
require.Len(t, app.GetLoopRegistry().List(), 1)
Expand Down
28 changes: 12 additions & 16 deletions plugins/config.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package plugins

import (
"os/exec"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
)

// RegistrarConfig generates contains static configuration inher
type RegistrarConfig interface {
RegisterLOOP(loopId string, cmdName string) (func() *exec.Cmd, loop.GRPCOpts, error)
RegisterLOOP(loopId string, cmdName string) (*RegisteredLoop, error)
GRPCOpts() loop.GRPCOpts
}

type registarConfig struct {
type registrarConfig struct {
grpcOpts loop.GRPCOpts
loopRegistrationFn func(loopId string) (*RegisteredLoop, error)
loopRegistrationFn func(id, cmd string) (*RegisteredLoop, error)
}

// NewRegistrarConfig creates a RegistarConfig
// loopRegistrationFn must act as a global registry function of LOOPs and must be idempotent.
// The [func() *exec.Cmd] for a LOOP should be generated by calling [RegistrarConfig.RegisterLOOP]
func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(loopId string) (*RegisteredLoop, error)) RegistrarConfig {
return &registarConfig{
func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(id, cmd string) (*RegisteredLoop, error)) RegistrarConfig {
return &registrarConfig{
grpcOpts: grpcOpts,
loopRegistrationFn: loopRegistrationFn,
}
}

// RegisterLOOP calls the configured loopRegistrationFn. The loopRegistrationFn must act as a global registry for LOOPs and must be idempotent.
func (pc *registarConfig) RegisterLOOP(loopID string, cmdName string) (func() *exec.Cmd, loop.GRPCOpts, error) {
cmdFn, err := NewCmdFactory(pc.loopRegistrationFn, CmdConfig{
ID: loopID,
Cmd: cmdName,
})
if err != nil {
return nil, loop.GRPCOpts{}, err
}
return cmdFn, pc.grpcOpts, nil
func (pc *registrarConfig) RegisterLOOP(id, cmd string) (*RegisteredLoop, error) {
return pc.loopRegistrationFn(id, cmd)
}

func (pc *registrarConfig) GRPCOpts() loop.GRPCOpts {
return pc.grpcOpts
}
12 changes: 10 additions & 2 deletions plugins/loop_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugins

import (
"errors"
"os/exec"
"sort"
"sync"

Expand All @@ -20,6 +21,13 @@ var ErrExists = errors.New("plugin already registered")
type RegisteredLoop struct {
Name string
EnvCfg loop.EnvConfig
cmd string
}

func (r *RegisteredLoop) Cmd() *exec.Cmd {
cmd := exec.Command(r.cmd) //#nosec G204 -- we control the value of the cmd so the lint/sec error is a false positive
cmd.Env = append(cmd.Env, r.EnvCfg.AsCmdEnv()...)
return cmd
}

// LoopRegistry is responsible for assigning ports to plugins that are to be used for the
Expand All @@ -42,7 +50,7 @@ func NewLoopRegistry(lggr logger.Logger, tracingConfig config.Tracing) *LoopRegi

// Register creates a port of the plugin. It is not idempotent. Duplicate calls to Register will return [ErrExists]
// Safe for concurrent use.
func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) {
func (m *LoopRegistry) Register(id string, cmd string) (*RegisteredLoop, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -60,7 +68,7 @@ func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) {
envCfg.TracingAttributes = m.cfgTracing.Attributes()
}

m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg}
m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg, cmd: cmd}
m.lggr.Debugf("Registered loopp %q with config %v, port %d", id, envCfg, envCfg.PrometheusPort)
return m.registry[id], nil
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/loop_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
func TestPluginPortManager(t *testing.T) {
// register one
m := NewLoopRegistry(logger.TestLogger(t), nil)
pFoo, err := m.Register("foo")
pFoo, err := m.Register("foo", "./COMMAND")
require.NoError(t, err)
require.Equal(t, "foo", pFoo.Name)
require.Greater(t, pFoo.EnvCfg.PrometheusPort, 0)
// test duplicate
pNil, err := m.Register("foo")
pNil, err := m.Register("foo", "./COMMAND")
require.ErrorIs(t, err, ErrExists)
require.Nil(t, pNil)
// ensure increasing port assignment
pBar, err := m.Register("bar")
pBar, err := m.Register("bar", "./COMMAND")
require.NoError(t, err)
require.Equal(t, "bar", pBar.Name)
require.Equal(t, pFoo.EnvCfg.PrometheusPort+1, pBar.EnvCfg.PrometheusPort)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestLoopRegistry_Register(t *testing.T) {
}

// Test case 1: Register new loop
registeredLoop, err := loopRegistry.Register("testID")
registeredLoop, err := loopRegistry.Register("testID", "./COMMAND")
require.Nil(t, err)
require.Equal(t, "testID", registeredLoop.Name)
require.True(t, registeredLoop.EnvCfg.TracingEnabled)
Expand Down
25 changes: 0 additions & 25 deletions plugins/utils.go

This file was deleted.

Loading