Skip to content

Commit

Permalink
Refactor loop registry
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Jan 15, 2024
1 parent c32efca commit a83c0b4
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 65 deletions.
14 changes: 4 additions & 10 deletions core/services/chainlink/relayer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,12 @@ func (r *RelayerFactory) NewSolana(ks keystore.Solana, chainCfgs solana.TOMLConf
return nil, fmt.Errorf("failed to marshal Solana configs: %w", err)
}

solCmdFn, err := plugins.NewCmdFactory(r.Register, plugins.CmdConfig{
ID: relayID.Name(),
Cmd: cmdName,
})
rl, err := r.Register(relayID.Name(), cmdName)
if err != nil {
return nil, fmt.Errorf("failed to create Solana LOOP command: %w", err)
}

solanaRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, solCmdFn, string(cfgTOML), signer)
solanaRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, rl.Cmd, string(cfgTOML), signer)

} else {
// fallback to embedded chain
Expand Down Expand Up @@ -196,16 +193,13 @@ func (r *RelayerFactory) NewStarkNet(ks keystore.StarkNet, chainCfgs config.TOML
return nil, fmt.Errorf("failed to marshal StarkNet configs: %w", err)
}

starknetCmdFn, err := plugins.NewCmdFactory(r.Register, plugins.CmdConfig{
ID: relayID.Name(),
Cmd: cmdName,
})
rl, err := r.Register(relayID.Name(), cmdName)
if err != nil {
return nil, fmt.Errorf("failed to create StarkNet LOOP command: %w", err)
}
// the starknet relayer service has a delicate keystore dependency. the value that is passed to NewRelayerService must
// be compatible with instantiating a starknet transaction manager KeystoreAdapter within the LOOPp executable.
starknetRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, starknetCmdFn, string(cfgTOML), loopKs)
starknetRelayers[relayID] = loop.NewRelayerService(lggr, r.GRPCOpts, rl.Cmd, string(cfgTOML), loopKs)
} else {
// fallback to embedded chain
opts := starkchain.ChainOpts{
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database())
mailMon := servicetest.Run(t, mailboxtest.NewMonitor(t))

processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil })
processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name, cmd string) (*plugins.RegisteredLoop, error) { return nil, nil })
ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig)

d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (d *Delegate) newServicesGenericPlugin(
}

pluginLggr := lggr.Named(p.PluginName).Named(spec.ContractID).Named(spec.GetID())
cmdFn, grpcOpts, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command)
loop, err := d.cfg.RegisterLOOP(fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()), command)
if err != nil {
return nil, fmt.Errorf("failed to register loop: %w", err)
}
Expand Down Expand Up @@ -633,7 +633,7 @@ func (d *Delegate) newServicesGenericPlugin(
pr := generic.NewPipelineRunnerAdapter(pluginLggr, jb, d.pipelineRunner)
ta := generic.NewTelemetryAdapter(d.monitoringEndpointGen)

plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog)
plugin := reportingplugins.NewLOOPPService(pluginLggr, d.cfg.GRPCOpts(), loop.Cmd, pluginConfig, providerClientConn, pr, ta, errorLog)
oracleArgs.ReportingPluginFactory = plugin
srvs = append(srvs, plugin)

Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ func NewMedianServices(ctx context.Context,
if medianLoopEnabled {
// use unique logger names so we can use it to register a loop
medianLggr := lggr.Named("Median").Named(spec.ContractID).Named(spec.GetID())
cmdFn, telem, err2 := cfg.RegisterLOOP(medianLggr.Name(), medianPluginCmd)
loopp, err2 := cfg.RegisterLOOP(medianLggr.Name(), medianPluginCmd)
if err2 != nil {
err = fmt.Errorf("failed to register loop: %w", err2)
abort()
return
}
median := loop.NewMedianService(lggr, telem, cmdFn, medianProvider, dataSource, juelsPerFeeCoinSource, errorLog)
median := loop.NewMedianService(lggr, cfg.GRPCOpts(), loopp.Cmd, medianProvider, dataSource, juelsPerFeeCoinSource, errorLog)
argsNoPlugin.ReportingPluginFactory = median
srvs = append(srvs, median)
} else {
Expand Down
6 changes: 3 additions & 3 deletions core/web/loop_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func TestLoopRegistry(t *testing.T) {

// note we expect this to be an ordered result
expectedLabels := []model.LabelSet{
model.LabelSet{"__metrics_path__": model.LabelValue(expectedCoreEndPoint)},
model.LabelSet{"__metrics_path__": model.LabelValue(expectedLooppEndPoint)},
{"__metrics_path__": model.LabelValue(expectedCoreEndPoint)},
{"__metrics_path__": model.LabelValue(expectedLooppEndPoint)},
}

require.NoError(t, app.KeyStore.OCR().Add(cltest.DefaultOCRKey))
require.NoError(t, app.Start(testutils.Context(t)))

// register a mock loop
loop, err := app.GetLoopRegistry().Register("mockLoopImpl")
loop, err := app.GetLoopRegistry().Register("mockLoopImpl", "NOOP-COMMAND")
require.NoError(t, err)
require.NotNil(t, loop)
require.Len(t, app.GetLoopRegistry().List(), 1)
Expand Down
28 changes: 12 additions & 16 deletions plugins/config.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,35 @@
package plugins

import (
"os/exec"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
)

// RegistrarConfig generates contains static configuration inher
type RegistrarConfig interface {
RegisterLOOP(loopId string, cmdName string) (func() *exec.Cmd, loop.GRPCOpts, error)
RegisterLOOP(loopId string, cmdName string) (*RegisteredLoop, error)
GRPCOpts() loop.GRPCOpts
}

type registarConfig struct {
type registrarConfig struct {
grpcOpts loop.GRPCOpts
loopRegistrationFn func(loopId string) (*RegisteredLoop, error)
loopRegistrationFn func(id, cmd string) (*RegisteredLoop, error)
}

// NewRegistrarConfig creates a RegistarConfig
// loopRegistrationFn must act as a global registry function of LOOPs and must be idempotent.
// The [func() *exec.Cmd] for a LOOP should be generated by calling [RegistrarConfig.RegisterLOOP]
func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(loopId string) (*RegisteredLoop, error)) RegistrarConfig {
return &registarConfig{
func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(id, cmd string) (*RegisteredLoop, error)) RegistrarConfig {
return &registrarConfig{
grpcOpts: grpcOpts,
loopRegistrationFn: loopRegistrationFn,
}
}

// RegisterLOOP calls the configured loopRegistrationFn. The loopRegistrationFn must act as a global registry for LOOPs and must be idempotent.
func (pc *registarConfig) RegisterLOOP(loopID string, cmdName string) (func() *exec.Cmd, loop.GRPCOpts, error) {
cmdFn, err := NewCmdFactory(pc.loopRegistrationFn, CmdConfig{
ID: loopID,
Cmd: cmdName,
})
if err != nil {
return nil, loop.GRPCOpts{}, err
}
return cmdFn, pc.grpcOpts, nil
func (pc *registrarConfig) RegisterLOOP(id, cmd string) (*RegisteredLoop, error) {
return pc.loopRegistrationFn(id, cmd)
}

func (pc *registrarConfig) GRPCOpts() loop.GRPCOpts {
return pc.grpcOpts
}
12 changes: 10 additions & 2 deletions plugins/loop_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugins

import (
"errors"
"os/exec"
"sort"
"sync"

Expand All @@ -20,6 +21,13 @@ var ErrExists = errors.New("plugin already registered")
type RegisteredLoop struct {
Name string
EnvCfg loop.EnvConfig
cmd string
}

func (r *RegisteredLoop) Cmd() *exec.Cmd {
cmd := exec.Command(r.cmd) //#nosec G204 -- we control the value of the cmd so the lint/sec error is a false positive
cmd.Env = append(cmd.Env, r.EnvCfg.AsCmdEnv()...)
return cmd
}

// LoopRegistry is responsible for assigning ports to plugins that are to be used for the
Expand All @@ -42,7 +50,7 @@ func NewLoopRegistry(lggr logger.Logger, tracingConfig config.Tracing) *LoopRegi

// Register creates a port of the plugin. It is not idempotent. Duplicate calls to Register will return [ErrExists]
// Safe for concurrent use.
func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) {
func (m *LoopRegistry) Register(id string, cmd string) (*RegisteredLoop, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -60,7 +68,7 @@ func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) {
envCfg.TracingAttributes = m.cfgTracing.Attributes()
}

m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg}
m.registry[id] = &RegisteredLoop{Name: id, EnvCfg: envCfg, cmd: cmd}
m.lggr.Debugf("Registered loopp %q with config %v, port %d", id, envCfg, envCfg.PrometheusPort)
return m.registry[id], nil
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/loop_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
func TestPluginPortManager(t *testing.T) {
// register one
m := NewLoopRegistry(logger.TestLogger(t), nil)
pFoo, err := m.Register("foo")
pFoo, err := m.Register("foo", "./COMMAND")
require.NoError(t, err)
require.Equal(t, "foo", pFoo.Name)
require.Greater(t, pFoo.EnvCfg.PrometheusPort, 0)
// test duplicate
pNil, err := m.Register("foo")
pNil, err := m.Register("foo", "./COMMAND")
require.ErrorIs(t, err, ErrExists)
require.Nil(t, pNil)
// ensure increasing port assignment
pBar, err := m.Register("bar")
pBar, err := m.Register("bar", "./COMMAND")
require.NoError(t, err)
require.Equal(t, "bar", pBar.Name)
require.Equal(t, pFoo.EnvCfg.PrometheusPort+1, pBar.EnvCfg.PrometheusPort)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestLoopRegistry_Register(t *testing.T) {
}

// Test case 1: Register new loop
registeredLoop, err := loopRegistry.Register("testID")
registeredLoop, err := loopRegistry.Register("testID", "./COMMAND")
require.Nil(t, err)
require.Equal(t, "testID", registeredLoop.Name)
require.True(t, registeredLoop.EnvCfg.TracingEnabled)
Expand Down
25 changes: 0 additions & 25 deletions plugins/utils.go

This file was deleted.

0 comments on commit a83c0b4

Please sign in to comment.