diff --git a/core/internal/features/ocr2/features_ocr2_test.go b/core/internal/features/ocr2/features_ocr2_test.go index ce0f3087187..216ca272b1b 100644 --- a/core/internal/features/ocr2/features_ocr2_test.go +++ b/core/internal/features/ocr2/features_ocr2_test.go @@ -437,7 +437,7 @@ typeABI = ''' ''' ` } - ocrJob, err := validate.ValidatedOracleSpecToml(apps[i].Config.OCR2(), apps[i].Config.Insecure(), fmt.Sprintf(` + ocrJob, err := validate.ValidatedOracleSpecToml(testutils.Context(t), apps[i].Config.OCR2(), apps[i].Config.Insecure(), fmt.Sprintf(` type = "offchainreporting2" relay = "evm" schemaVersion = 1 @@ -488,7 +488,7 @@ juelsPerFeeCoinSource = """ answer1 [type=median index=0]; """ juelsPerFeeCoinCacheDuration = "1m" -`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i)) +`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil) require.NoError(t, err) err = apps[i].AddJobV2(testutils.Context(t), &ocrJob) require.NoError(t, err) @@ -793,7 +793,7 @@ chainID = 1337 URL: models.WebURL(*u), })) - ocrJob, err := validate.ValidatedOracleSpecToml(apps[i].Config.OCR2(), apps[i].Config.Insecure(), fmt.Sprintf(` + ocrJob, err := validate.ValidatedOracleSpecToml(testutils.Context(t), apps[i].Config.OCR2(), apps[i].Config.Insecure(), fmt.Sprintf(` type = "offchainreporting2" relay = "evm" schemaVersion = 1 @@ -841,7 +841,7 @@ juelsPerFeeCoinSource = """ answer1 [type=median index=0]; """ juelsPerFeeCoinCacheDuration = "1m" -`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i)) +`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil) require.NoError(t, err) err = apps[i].AddJobV2(testutils.Context(t), &ocrJob) require.NoError(t, err) diff --git a/core/internal/mocks/application.go b/core/internal/mocks/application.go index e1005a4dcf5..c18cb7f8426 100644 --- a/core/internal/mocks/application.go +++ b/core/internal/mocks/application.go @@ -329,6 +329,26 @@ func (_m *Application) GetLogger() logger.SugaredLogger { return r0 } +// GetLoopRegistrarConfig provides a mock function with given fields: +func (_m *Application) GetLoopRegistrarConfig() plugins.RegistrarConfig { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetLoopRegistrarConfig") + } + + var r0 plugins.RegistrarConfig + if rf, ok := ret.Get(0).(func() plugins.RegistrarConfig); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(plugins.RegistrarConfig) + } + } + + return r0 +} + // GetLoopRegistry provides a mock function with given fields: func (_m *Application) GetLoopRegistry() *plugins.LoopRegistry { ret := _m.Called() diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 272debca3ac..c4e32d4f276 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -21,7 +21,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 3b61b46475d..98b5142ba0a 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 h1:fY2wMtlr/VQxPyVVQdi1jFvQHi0VbDnGGVXzLKOZTOY= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 50639ec3c59..e71ad3095b2 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -89,6 +89,7 @@ type Application interface { GetExternalInitiatorManager() webhook.ExternalInitiatorManager GetRelayers() RelayerChainInteroperators GetLoopRegistry() *plugins.LoopRegistry + GetLoopRegistrarConfig() plugins.RegistrarConfig // V2 Jobs (TOML specified) JobSpawner() job.Spawner @@ -150,6 +151,7 @@ type ChainlinkApplication struct { secretGenerator SecretGenerator profiler *pyroscope.Profiler loopRegistry *plugins.LoopRegistry + loopRegistrarConfig plugins.RegistrarConfig started bool startStopMu sync.Mutex @@ -425,10 +427,14 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } else { globalLogger.Debug("Off-chain reporting disabled") } + + loopRegistrarConfig := plugins.NewRegistrarConfig(opts.GRPCOpts, opts.LoopRegistry.Register, opts.LoopRegistry.Unregister) + if cfg.OCR2().Enabled() { globalLogger.Debug("Off-chain reporting v2 enabled") - registrarConfig := plugins.NewRegistrarConfig(opts.GRPCOpts, opts.LoopRegistry.Register) - ocr2DelegateConfig := ocr2.NewDelegateConfig(cfg.OCR2(), cfg.Mercury(), cfg.Threshold(), cfg.Insecure(), cfg.JobPipeline(), cfg.Database(), registrarConfig) + + ocr2DelegateConfig := ocr2.NewDelegateConfig(cfg.OCR2(), cfg.Mercury(), cfg.Threshold(), cfg.Insecure(), cfg.JobPipeline(), cfg.Database(), loopRegistrarConfig) + delegates[job.OffchainReporting2] = ocr2.NewDelegate( sqlxDB, jobORM, @@ -496,6 +502,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { legacyEVMChains, globalLogger, opts.Version, + loopRegistrarConfig, ) } else { feedsService = &feeds.NullService{} @@ -534,6 +541,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { secretGenerator: opts.SecretGenerator, profiler: profiler, loopRegistry: loopRegistry, + loopRegistrarConfig: loopRegistrarConfig, sqlxDB: opts.SqlxDB, db: opts.DB, @@ -604,6 +612,9 @@ func (app *ChainlinkApplication) StopIfStarted() error { func (app *ChainlinkApplication) GetLoopRegistry() *plugins.LoopRegistry { return app.loopRegistry } +func (app *ChainlinkApplication) GetLoopRegistrarConfig() plugins.RegistrarConfig { + return app.loopRegistrarConfig +} // Stop allows the application to exit by halting schedules, closing // logs, and closing the DB connection. diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index abb85f39fe4..2f3b309b45c 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -18,6 +18,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/plugins" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" @@ -102,22 +103,23 @@ type Service interface { type service struct { services.StateMachine - orm ORM - jobORM job.ORM - q pg.Q - csaKeyStore keystore.CSA - p2pKeyStore keystore.P2P - ocr1KeyStore keystore.OCR - ocr2KeyStore keystore.OCR2 - jobSpawner job.Spawner - insecureCfg InsecureConfig - jobCfg JobConfig - ocrCfg OCRConfig - ocr2cfg OCR2Config - connMgr ConnectionsManager - legacyChains legacyevm.LegacyChainContainer - lggr logger.Logger - version string + orm ORM + jobORM job.ORM + q pg.Q + csaKeyStore keystore.CSA + p2pKeyStore keystore.P2P + ocr1KeyStore keystore.OCR + ocr2KeyStore keystore.OCR2 + jobSpawner job.Spawner + insecureCfg InsecureConfig + jobCfg JobConfig + ocrCfg OCRConfig + ocr2cfg OCR2Config + connMgr ConnectionsManager + legacyChains legacyevm.LegacyChainContainer + lggr logger.Logger + version string + loopRegistrarConfig plugins.RegistrarConfig } // NewService constructs a new feeds service @@ -135,25 +137,27 @@ func NewService( legacyChains legacyevm.LegacyChainContainer, lggr logger.Logger, version string, + rc plugins.RegistrarConfig, ) *service { lggr = lggr.Named("Feeds") svc := &service{ - orm: orm, - jobORM: jobORM, - q: pg.NewQ(db, lggr, dbCfg), - jobSpawner: jobSpawner, - p2pKeyStore: keyStore.P2P(), - csaKeyStore: keyStore.CSA(), - ocr1KeyStore: keyStore.OCR(), - ocr2KeyStore: keyStore.OCR2(), - insecureCfg: insecureCfg, - jobCfg: jobCfg, - ocrCfg: ocrCfg, - ocr2cfg: ocr2Cfg, - connMgr: newConnectionsManager(lggr), - legacyChains: legacyChains, - lggr: lggr, - version: version, + orm: orm, + jobORM: jobORM, + q: pg.NewQ(db, lggr, dbCfg), + jobSpawner: jobSpawner, + p2pKeyStore: keyStore.P2P(), + csaKeyStore: keyStore.CSA(), + ocr1KeyStore: keyStore.OCR(), + ocr2KeyStore: keyStore.OCR2(), + insecureCfg: insecureCfg, + jobCfg: jobCfg, + ocrCfg: ocrCfg, + ocr2cfg: ocr2Cfg, + connMgr: newConnectionsManager(lggr), + legacyChains: legacyChains, + lggr: lggr, + version: version, + loopRegistrarConfig: rc, } return svc @@ -534,7 +538,7 @@ type ProposeJobArgs struct { // belonging to another feeds manager, we do not update it. func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) { // Validate the args - if err := s.validateProposeJobArgs(*args); err != nil { + if err := s.validateProposeJobArgs(ctx, *args); err != nil { return 0, err } @@ -717,7 +721,7 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error { return errors.Wrap(err, "fms rpc client") } - j, err := s.generateJob(spec.Definition) + j, err := s.generateJob(ctx, spec.Definition) if err != nil { return errors.Wrap(err, "could not generate job from spec") } @@ -1121,7 +1125,7 @@ func (s *service) findExistingJobForOCRFlux(j *job.Job, qopts pg.QOpt) (int32, e } // generateJob validates and generates a job from a spec. -func (s *service) generateJob(spec string) (*job.Job, error) { +func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error) { jobType, err := job.ValidateSpec(spec) if err != nil { return nil, errors.Wrap(err, "failed to parse job spec TOML") @@ -1138,7 +1142,7 @@ func (s *service) generateJob(spec string) (*job.Job, error) { if !s.ocr2cfg.Enabled() { return nil, ErrOCR2Disabled } - js, err = ocr2.ValidatedOracleSpecToml(s.ocr2cfg, s.insecureCfg, spec) + js, err = ocr2.ValidatedOracleSpecToml(ctx, s.ocr2cfg, s.insecureCfg, spec, s.loopRegistrarConfig) case job.Bootstrap: if !s.ocr2cfg.Enabled() { return nil, ErrOCR2Disabled @@ -1297,9 +1301,9 @@ func (s *service) newOCR2ConfigMsg(cfg OCR2ConfigModel) (*pb.OCR2Config, error) return msg, nil } -func (s *service) validateProposeJobArgs(args ProposeJobArgs) error { +func (s *service) validateProposeJobArgs(ctx context.Context, args ProposeJobArgs) error { // Validate the job spec - j, err := s.generateJob(args.Spec) + j, err := s.generateJob(ctx, args.Spec) if err != nil { return errors.Wrap(err, "failed to generate a job based on spec") } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index afefc5b2df8..5283b03affe 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -186,7 +186,7 @@ func setupTestServiceCfg(t *testing.T, overrideCfg func(c *chainlink.Config, s * keyStore.On("P2P").Return(p2pKeystore) keyStore.On("OCR").Return(ocr1Keystore) keyStore.On("OCR2").Return(ocr2Keystore) - svc := feeds.NewService(orm, jobORM, db, spawner, keyStore, scopedConfig.Insecure(), scopedConfig.JobPipeline(), scopedConfig.OCR(), scopedConfig.OCR2(), scopedConfig.Database(), legacyChains, lggr, "1.0.0") + svc := feeds.NewService(orm, jobORM, db, spawner, keyStore, scopedConfig.Insecure(), scopedConfig.JobPipeline(), scopedConfig.OCR(), scopedConfig.OCR2(), scopedConfig.Database(), legacyChains, lggr, "1.0.0", nil) svc.SetConnectionsManager(connMgr) return &TestService{ diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index 93b29be79b7..d763386a00d 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -797,7 +797,7 @@ func TestORM_CreateJob_OCR2_DuplicatedContractAddress(t *testing.T) { _, address := cltest.MustInsertRandomKey(t, keyStore.Eth()) - jb, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) const juelsPerFeeCoinSource = ` @@ -813,7 +813,7 @@ func TestORM_CreateJob_OCR2_DuplicatedContractAddress(t *testing.T) { err = jobORM.CreateJob(&jb) require.NoError(t, err) - jb2, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb2, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) jb2.Name = null.StringFrom("Job with same chain id & contract address") @@ -823,7 +823,7 @@ func TestORM_CreateJob_OCR2_DuplicatedContractAddress(t *testing.T) { err = jobORM.CreateJob(&jb2) require.Error(t, err) - jb3, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb3, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) jb3.Name = null.StringFrom("Job with different chain id & same contract address") jb3.OCR2OracleSpec.TransmitterID = null.StringFrom(address.String()) @@ -856,7 +856,7 @@ func TestORM_CreateJob_OCR2_Sending_Keys_Transmitter_Keys_Validations(t *testing jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, keyStore, config.Database()) - jb, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) t.Run("sending keys or transmitterID must be defined", func(t *testing.T) { @@ -897,7 +897,7 @@ func TestORM_ValidateKeyStoreMatch(t *testing.T) { var jb job.Job { var err error - jb, err = ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb, err = ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) } @@ -1089,7 +1089,7 @@ func Test_FindJob(t *testing.T) { ) require.NoError(t, err) - jobOCR2, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jobOCR2, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) jobOCR2.OCR2OracleSpec.TransmitterID = null.StringFrom(address.String()) @@ -1104,16 +1104,20 @@ func Test_FindJob(t *testing.T) { ocr2WithFeedID1 := "0x0001000000000000000000000000000000000000000000000000000000000001" ocr2WithFeedID2 := "0x0001000000000000000000000000000000000000000000000000000000000002" jobOCR2WithFeedID1, err := ocr2validate.ValidatedOracleSpecToml( + testutils.Context(t), config.OCR2(), config.Insecure(), fmt.Sprintf(mercuryOracleTOML, cltest.DefaultCSAKey.PublicKeyString(), ocr2WithFeedID1), + nil, ) require.NoError(t, err) jobOCR2WithFeedID2, err := ocr2validate.ValidatedOracleSpecToml( + testutils.Context(t), config.OCR2(), config.Insecure(), fmt.Sprintf(mercuryOracleTOML, cltest.DefaultCSAKey.PublicKeyString(), ocr2WithFeedID2), + nil, ) jobOCR2WithFeedID2.ExternalJobID = uuid.New() jobOCR2WithFeedID2.Name = null.StringFrom("new name") diff --git a/core/services/job/runner_integration_test.go b/core/services/job/runner_integration_test.go index 9bfb991a4b6..3a1f69afa1b 100644 --- a/core/services/job/runner_integration_test.go +++ b/core/services/job/runner_integration_test.go @@ -227,7 +227,7 @@ func TestRunner(t *testing.T) { assert.Contains(t, err.Error(), "not all bridges exist") // Same for ocr2 - jb2, err := validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), fmt.Sprintf(` + jb2, err := validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), fmt.Sprintf(` type = "offchainreporting2" pluginType = "median" schemaVersion = 1 @@ -254,7 +254,7 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -`, placeHolderAddress.String(), b.Name.String())) +`, placeHolderAddress.String(), b.Name.String()), nil) require.NoError(t, err) // Should error creating it because of the juels per fee coin non-existent bridge err = jobORM.CreateJob(&jb2) @@ -262,7 +262,7 @@ answer1 [type=median index=0]; assert.Contains(t, err.Error(), "not all bridges exist") // Duplicate bridge names that exist is ok - jb3, err := validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), fmt.Sprintf(` + jb3, err := validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), fmt.Sprintf(` type = "offchainreporting2" pluginType = "median" schemaVersion = 1 @@ -293,7 +293,7 @@ ds2_multiply [type=multiply times=1.23]; ds2 -> ds2_parse -> ds2_multiply -> answer1; answer1 [type=median index=0]; """ -`, placeHolderAddress, b.Name.String(), b.Name.String(), b.Name.String())) +`, placeHolderAddress, b.Name.String(), b.Name.String(), b.Name.String()), nil) require.NoError(t, err) // Should not error with duplicate bridges err = jobORM.CreateJob(&jb3) diff --git a/core/services/job/spawner_test.go b/core/services/job/spawner_test.go index 71357a675c3..b6de9d790fa 100644 --- a/core/services/job/spawner_test.go +++ b/core/services/job/spawner_test.go @@ -302,7 +302,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { orm := NewTestORM(t, db, pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns()), bridges.NewORM(db, lggr, config.Database()), keyStore, config.Database()) mailMon := servicetest.Run(t, mailboxtest.NewMonitor(t)) - processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil }) + processConfig := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return nil, nil }, func(loopId string) {}) ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), config.Database(), processConfig) d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig, diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index b968177de75..a053b53992d 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -535,7 +535,6 @@ func (d *Delegate) newServicesGenericPlugin( keyValueStore types.KeyValueStore, ) (srvs []job.ServiceCtx, err error) { spec := jb.OCR2OracleSpec - // NOTE: we don't need to validate this config, since that happens as part of creating the job. // See: validate/validate.go's `validateSpec`. pCfg := validate.OCR2GenericPluginConfig{} diff --git a/core/services/ocr2/delegate_test.go b/core/services/ocr2/delegate_test.go index 3da0c9cbfd6..ea8693d48ce 100644 --- a/core/services/ocr2/delegate_test.go +++ b/core/services/ocr2/delegate_test.go @@ -136,7 +136,7 @@ func TestGetEVMEffectiveTransmitterID(t *testing.T) { } t.Run("when sending keys are not defined, the first one should be set to transmitterID", func(t *testing.T) { - jb, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) jb.OCR2OracleSpec.TransmitterID = null.StringFrom("some transmitterID string") jb.OCR2OracleSpec.RelayConfig["sendingKeys"] = nil @@ -150,7 +150,7 @@ func TestGetEVMEffectiveTransmitterID(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - jb, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) setTestCase(&jb, tc, txManager) chain, err := legacyChains.Get(customChainID.String()) @@ -173,7 +173,7 @@ func TestGetEVMEffectiveTransmitterID(t *testing.T) { } t.Run("when forwarders are enabled and chain retrieval fails, error should be handled", func(t *testing.T) { - jb, err := ocr2validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal()) + jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), testspecs.GetOCR2EVMSpecMinimal(), nil) require.NoError(t, err) jb.ForwardingAllowed = true jb.OCR2OracleSpec.TransmitterID = null.StringFrom("0x7e57000000000000000000000000000000000001") diff --git a/core/services/ocr2/plugins/functions/integration_tests/v1/internal/testutils.go b/core/services/ocr2/plugins/functions/integration_tests/v1/internal/testutils.go index 1216eec0a63..ab4b114906e 100644 --- a/core/services/ocr2/plugins/functions/integration_tests/v1/internal/testutils.go +++ b/core/services/ocr2/plugins/functions/integration_tests/v1/internal/testutils.go @@ -428,7 +428,7 @@ func AddOCR2Job(t *testing.T, app *cltest.TestApplication, contractAddress commo Name: "ea_bridge", URL: models.WebURL(*u), })) - job, err := validate.ValidatedOracleSpecToml(app.Config.OCR2(), app.Config.Insecure(), fmt.Sprintf(` + job, err := validate.ValidatedOracleSpecToml(testutils.Context(t), app.Config.OCR2(), app.Config.Insecure(), fmt.Sprintf(` type = "offchainreporting2" name = "functions-node" schemaVersion = 1 @@ -470,7 +470,7 @@ func AddOCR2Job(t *testing.T, app *cltest.TestApplication, contractAddress commo [pluginConfig.s4Constraints] maxPayloadSizeBytes = 10_1000 maxSlotsPerUser = 10 - `, contractAddress, keyBundleID, transmitter, DefaultDONId)) + `, contractAddress, keyBundleID, transmitter, DefaultDONId), nil) require.NoError(t, err) err = app.AddJobV2(testutils.Context(t), &job) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/llo/helpers_test.go b/core/services/ocr2/plugins/llo/helpers_test.go index ae9850134b9..8112cf1b0ba 100644 --- a/core/services/ocr2/plugins/llo/helpers_test.go +++ b/core/services/ocr2/plugins/llo/helpers_test.go @@ -141,7 +141,7 @@ func (node *Node) AddStreamJob(t *testing.T, spec string) { func (node *Node) AddLLOJob(t *testing.T, spec string) { c := node.App.GetConfig() - job, err := validate.ValidatedOracleSpecToml(c.OCR2(), c.Insecure(), spec) + job, err := validate.ValidatedOracleSpecToml(testutils.Context(t), c.OCR2(), c.Insecure(), spec, nil) require.NoError(t, err) err = node.App.AddJobV2(testutils.Context(t), &job) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/mercury/helpers_test.go b/core/services/ocr2/plugins/mercury/helpers_test.go index 1323f834398..43d709453b7 100644 --- a/core/services/ocr2/plugins/mercury/helpers_test.go +++ b/core/services/ocr2/plugins/mercury/helpers_test.go @@ -137,7 +137,7 @@ type Node struct { func (node *Node) AddJob(t *testing.T, spec string) { c := node.App.GetConfig() - job, err := validate.ValidatedOracleSpecToml(c.OCR2(), c.Insecure(), spec) + job, err := validate.ValidatedOracleSpecToml(testutils.Context(t), c.OCR2(), c.Insecure(), spec, nil) require.NoError(t, err) err = node.App.AddJobV2(testutils.Context(t), &job) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/mercury/plugin_test.go b/core/services/ocr2/plugins/mercury/plugin_test.go index 4e6d4d82a7e..3934105a390 100644 --- a/core/services/ocr2/plugins/mercury/plugin_test.go +++ b/core/services/ocr2/plugins/mercury/plugin_test.go @@ -267,6 +267,8 @@ var _ commontypes.MercuryProvider = (*testProvider)(nil) type testRegistrarConfig struct{} +func (c *testRegistrarConfig) UnregisterLOOP(ID string) {} + // RegisterLOOP implements plugins.RegistrarConfig. func (*testRegistrarConfig) RegisterLOOP(config plugins.CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) { return nil, loop.GRPCOpts{}, nil diff --git a/core/services/ocr2/plugins/ocr2keeper/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/integration_test.go index 236e89ae671..ea752256232 100644 --- a/core/services/ocr2/plugins/ocr2keeper/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/integration_test.go @@ -164,7 +164,7 @@ type Node struct { func (node *Node) AddJob(t *testing.T, spec string) { c := node.App.GetConfig() - jb, err := validate.ValidatedOracleSpecToml(c.OCR2(), c.Insecure(), spec) + jb, err := validate.ValidatedOracleSpecToml(testutils.Context(t), c.OCR2(), c.Insecure(), spec, nil) require.NoError(t, err) err = node.App.AddJobV2(testutils.Context(t), &jb) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/ocr2vrf/internal/ocr2vrf_integration_test.go b/core/services/ocr2/plugins/ocr2vrf/internal/ocr2vrf_integration_test.go index 8f743a370c2..769bffd584f 100644 --- a/core/services/ocr2/plugins/ocr2vrf/internal/ocr2vrf_integration_test.go +++ b/core/services/ocr2/plugins/ocr2vrf/internal/ocr2vrf_integration_test.go @@ -498,7 +498,7 @@ linkEthFeedAddress = "%s" uni.feedAddress.String(), ) t.Log("Creating OCR2VRF job with spec:", jobSpec) - ocrJob2, err2 := validate.ValidatedOracleSpecToml(apps[i].Config.OCR2(), apps[i].Config.Insecure(), jobSpec) + ocrJob2, err2 := validate.ValidatedOracleSpecToml(testutils.Context(t), apps[i].Config.OCR2(), apps[i].Config.Insecure(), jobSpec, nil) require.NoError(t, err2) err2 = apps[i].AddJobV2(ctx, &ocrJob2) require.NoError(t, err2) diff --git a/core/services/ocr2/validate/validate.go b/core/services/ocr2/validate/validate.go index 5846eaa032f..19c8043f25b 100644 --- a/core/services/ocr2/validate/validate.go +++ b/core/services/ocr2/validate/validate.go @@ -1,17 +1,22 @@ package validate import ( + "context" "encoding/hex" "encoding/json" "errors" "fmt" + "os/exec" "github.com/lib/pq" "github.com/pelletier/go-toml" pkgerrors "github.com/pkg/errors" libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/loop/reportingplugins" "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/config/env" "github.com/smartcontractkit/chainlink/v2/core/services/job" dkgconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/dkg/config" lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config" @@ -19,10 +24,11 @@ import ( ocr2vrfconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2vrf/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/relay" + "github.com/smartcontractkit/chainlink/v2/plugins" ) // ValidatedOracleSpecToml validates an oracle spec that came from TOML -func ValidatedOracleSpecToml(config OCR2Config, insConf InsecureConfig, tomlString string) (job.Job, error) { +func ValidatedOracleSpecToml(ctx context.Context, config OCR2Config, insConf InsecureConfig, tomlString string, rc plugins.RegistrarConfig) (job.Job, error) { var jb = job.Job{} var spec job.OCR2OracleSpec tree, err := toml.Load(tomlString) @@ -58,7 +64,7 @@ func ValidatedOracleSpecToml(config OCR2Config, insConf InsecureConfig, tomlStri } } - if err = validateSpec(tree, jb); err != nil { + if err = validateSpec(ctx, tree, jb, rc); err != nil { return jb, err } if err = validateTimingParameters(config, insConf, spec); err != nil { @@ -92,7 +98,7 @@ func validateTimingParameters(ocr2Conf OCR2Config, insConf InsecureConfig, spec return libocr2.SanityCheckLocalConfig(lc) } -func validateSpec(tree *toml.Tree, spec job.Job) error { +func validateSpec(ctx context.Context, tree *toml.Tree, spec job.Job, rc plugins.RegistrarConfig) error { expected, notExpected := ocrcommon.CloneSet(params), ocrcommon.CloneSet(notExpectedParams) if err := ocrcommon.ValidateExplicitlySetKeys(tree, expected, notExpected, "ocr2"); err != nil { return err @@ -117,7 +123,7 @@ func validateSpec(tree *toml.Tree, spec job.Job) error { case types.LLO: return validateOCR2LLOSpec(spec.OCR2OracleSpec.PluginConfig) case types.GenericPlugin: - return validateOCR2GenericPluginSpec(spec.OCR2OracleSpec.PluginConfig) + return validateGenericPluginSpec(ctx, spec.OCR2OracleSpec, rc) case "": return errors.New("no plugin specified") default: @@ -167,9 +173,9 @@ func (o *OCR2GenericPluginConfig) UnmarshalJSON(data []byte) error { return nil } -func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error { +func validateGenericPluginSpec(ctx context.Context, spec *job.OCR2OracleSpec, rc plugins.RegistrarConfig) error { p := OCR2GenericPluginConfig{} - err := json.Unmarshal(jsonConfig.Bytes(), &p) + err := json.Unmarshal(spec.PluginConfig.Bytes(), &p) if err != nil { return err } @@ -178,11 +184,60 @@ func validateOCR2GenericPluginSpec(jsonConfig job.JSONConfig) error { return errors.New("generic config invalid: must provide plugin name") } - if p.TelemetryType == "" { - return errors.New("generic config invalid: must provide telemetry type") + if p.OCRVersion != 2 && p.OCRVersion != 3 { + return errors.New("generic config invalid: only OCR version 2 and 3 are supported") } - return nil + plugEnv := env.NewPlugin(p.PluginName) + + command := p.Command + if command == "" { + command = plugEnv.Cmd.Get() + } + + if command == "" { + return errors.New("generic config invalid: no command found") + } + + _, err = exec.LookPath(command) + if err != nil { + return fmt.Errorf("failed to find binary %q", command) + } + + envVars, err := plugins.ParseEnvFile(plugEnv.Env.Get()) + if err != nil { + return fmt.Errorf("failed to parse env file: %w", err) + } + if len(p.EnvVars) > 0 { + for k, v := range p.EnvVars { + envVars = append(envVars, k+"="+v) + } + } + + loopID := fmt.Sprintf("%s-%s-%s", p.PluginName, spec.ContractID, spec.GetID()) + //Starting and stopping a LOOPP isn't efficient; ideally, we'd initiate the LOOPP once and then reference + //it later to conserve resources. This code will be revisited once BCF-3126 is implemented, and we have + //the ability to reference the LOOPP for future use. + cmdFn, grpcOpts, err := rc.RegisterLOOP(plugins.CmdConfig{ + ID: loopID, + Cmd: command, + Env: envVars, + }) + if err != nil { + return fmt.Errorf("failed to register loop: %w", err) + } + defer rc.UnregisterLOOP(loopID) + + pluginLggr, _ := logger.New() + plugin := reportingplugins.NewLOOPPServiceValidation(pluginLggr, grpcOpts, cmdFn) + + err = plugin.Start(ctx) + if err != nil { + return err + } + defer plugin.Close() + + return plugin.ValidateConfig(ctx, spec.PluginConfig) } func validateDKGSpec(jsonConfig job.JSONConfig) error { diff --git a/core/services/ocr2/validate/validate_test.go b/core/services/ocr2/validate/validate_test.go index 52dbe5f0042..305a727d030 100644 --- a/core/services/ocr2/validate/validate_test.go +++ b/core/services/ocr2/validate/validate_test.go @@ -601,15 +601,42 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" [relayConfig] chainID = 4 -[pluginConfig.coreConfig] +[pluginConfig] `, assertion: func(t *testing.T, os job.Job, err error) { require.Error(t, err) require.ErrorContains(t, err, "must provide plugin name") }, + }, { + name: "Generic plugin config validation - ocr version", + toml: ` +type = "offchainreporting2" +schemaVersion = 1 +name = "dkg" +externalJobID = "6d46d85f-d38c-4f4a-9f00-ac29a25b6330" +maxTaskDuration = "1s" +contractID = "0x3e54dCc49F16411A3aaa4cDbC41A25bCa9763Cee" +ocrKeyBundleID = "08d14c6eed757414d72055d28de6caf06535806c6a14e450f3a2f1c854420e17" +p2pv2Bootstrappers = [ + "12D3KooWSbPRwXY4gxFRJT7LWCnjgGbR4S839nfCRCDgQUiNenxa@127.0.0.1:8000" +] +relay = "evm" +pluginType = "plugin" +transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" + +[relayConfig] +chainID = 4 + +[pluginConfig] +PluginName="some random name" +`, + assertion: func(t *testing.T, os job.Job, err error) { + require.Error(t, err) + require.ErrorContains(t, err, "only OCR version 2 and 3 are supported") + }, }, { - name: "Generic plugin config validation - plugin name provided", + name: "Generic plugin config validation - no command", toml: ` type = "offchainreporting2" schemaVersion = 1 @@ -629,15 +656,16 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" chainID = 4 [pluginConfig] -pluginName = "median" +PluginName="some random name" +OCRVersion=2 `, assertion: func(t *testing.T, os job.Job, err error) { require.Error(t, err) - require.ErrorContains(t, err, "must provide telemetry type") + require.ErrorContains(t, err, "no command found") }, }, { - name: "Generic plugin config validation - all provided", + name: "Generic plugin config validation - no binary", toml: ` type = "offchainreporting2" schemaVersion = 1 @@ -657,11 +685,13 @@ transmitterID = "0x74103Cf8b436465870b26aa9Fa2F62AD62b22E35" chainID = 4 [pluginConfig] -pluginName = "median" -telemetryType = "median" +PluginName="some random name" +OCRVersion=2 +Command="some random command" `, assertion: func(t *testing.T, os job.Job, err error) { - require.NoError(t, err) + require.Error(t, err) + require.ErrorContains(t, err, "failed to find binary") }, }, } @@ -674,7 +704,7 @@ telemetryType = "median" tc.overrides(c, s) } }) - s, err := validate.ValidatedOracleSpecToml(c.OCR2(), c.Insecure(), tc.toml) + s, err := validate.ValidatedOracleSpecToml(testutils.Context(t), c.OCR2(), c.Insecure(), tc.toml, nil) tc.assertion(t, s, err) }) } diff --git a/core/web/jobs_controller.go b/core/web/jobs_controller.go index 6296c6a016f..5226d7dd7d6 100644 --- a/core/web/jobs_controller.go +++ b/core/web/jobs_controller.go @@ -105,7 +105,7 @@ func (jc *JobsController) Create(c *gin.Context) { return } - jb, status, err := jc.validateJobSpec(request.TOML) + jb, status, err := jc.validateJobSpec(c.Request.Context(), request.TOML) if err != nil { jsonAPIError(c, status, err) return @@ -174,7 +174,7 @@ func (jc *JobsController) Update(c *gin.Context) { return } - jb, status, err := jc.validateJobSpec(request.TOML) + jb, status, err := jc.validateJobSpec(c.Request.Context(), request.TOML) if err != nil { jsonAPIError(c, status, err) return @@ -214,12 +214,11 @@ func (jc *JobsController) Update(c *gin.Context) { jsonAPIResponse(c, presenters.NewJobResource(jb), jb.Type.String()) } -func (jc *JobsController) validateJobSpec(tomlString string) (jb job.Job, statusCode int, err error) { +func (jc *JobsController) validateJobSpec(ctx context.Context, tomlString string) (jb job.Job, statusCode int, err error) { jobType, err := job.ValidateSpec(tomlString) if err != nil { return jb, http.StatusUnprocessableEntity, errors.Wrap(err, "failed to parse TOML") } - config := jc.App.GetConfig() switch jobType { case job.OffchainReporting: @@ -228,7 +227,7 @@ func (jc *JobsController) validateJobSpec(tomlString string) (jb job.Job, status return jb, http.StatusNotImplemented, errors.New("The Offchain Reporting feature is disabled by configuration") } case job.OffchainReporting2: - jb, err = validate.ValidatedOracleSpecToml(config.OCR2(), config.Insecure(), tomlString) + jb, err = validate.ValidatedOracleSpecToml(ctx, config.OCR2(), config.Insecure(), tomlString, jc.App.GetLoopRegistrarConfig()) if !config.OCR2().Enabled() { return jb, http.StatusNotImplemented, errors.New("The Offchain Reporting 2 feature is disabled by configuration") } diff --git a/core/web/resolver/mutation.go b/core/web/resolver/mutation.go index 685fbe61ccb..7ab5b7a08e8 100644 --- a/core/web/resolver/mutation.go +++ b/core/web/resolver/mutation.go @@ -1024,7 +1024,7 @@ func (r *Resolver) CreateJob(ctx context.Context, args struct { return nil, errors.New("The Offchain Reporting feature is disabled by configuration") } case job.OffchainReporting2: - jb, err = validate.ValidatedOracleSpecToml(r.App.GetConfig().OCR2(), r.App.GetConfig().Insecure(), args.Input.TOML) + jb, err = validate.ValidatedOracleSpecToml(ctx, r.App.GetConfig().OCR2(), r.App.GetConfig().Insecure(), args.Input.TOML, r.App.GetLoopRegistrarConfig()) if !config.OCR2().Enabled() { return nil, errors.New("The Offchain Reporting 2 feature is disabled by configuration") } diff --git a/go.mod b/go.mod index d4a2a5f37a1..0a438d144c6 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 diff --git a/go.sum b/go.sum index 9c82988c9c5..3bba85e2c95 100644 --- a/go.sum +++ b/go.sum @@ -1182,8 +1182,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 h1:fY2wMtlr/VQxPyVVQdi1jFvQHi0VbDnGGVXzLKOZTOY= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 34e134723e6..2bc01df21f7 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -24,7 +24,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 github.com/smartcontractkit/chainlink-testing-framework v1.28.1 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index e8c9e1feb52..658a77c4f04 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1525,8 +1525,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 h1:fY2wMtlr/VQxPyVVQdi1jFvQHi0VbDnGGVXzLKOZTOY= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index e3ce67e5257..e08143040be 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.30.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.2 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 github.com/smartcontractkit/chainlink-testing-framework v1.28.1 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index c0ef8c0b749..a5c62d390e1 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1508,8 +1508,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI= github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25 h1:fY2wMtlr/VQxPyVVQdi1jFvQHi0VbDnGGVXzLKOZTOY= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240404141006-77085a02ce25/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/plugins/loop_registry.go b/plugins/loop_registry.go index a2fcd8ef379..b796ddf87ee 100644 --- a/plugins/loop_registry.go +++ b/plugins/loop_registry.go @@ -70,6 +70,23 @@ func (m *LoopRegistry) Register(id string) (*RegisteredLoop, error) { return m.registry[id], nil } +// Unregister remove a loop from the registry +// Safe for concurrent use. +func (m *LoopRegistry) Unregister(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + loop, exists := m.registry[id] + if !exists { + m.lggr.Debugf("Trying to unregistered a loop that is not registered %q", id) + return + } + + freeport.Return([]int{loop.EnvCfg.PrometheusPort}) + delete(m.registry, id) + m.lggr.Debugf("Unregistered loopp %q", id) +} + // Return slice sorted by plugin name. Safe for concurrent use. func (m *LoopRegistry) List() []*RegisteredLoop { var registeredLoops []*RegisteredLoop diff --git a/plugins/medianpoc/plugin.go b/plugins/medianpoc/plugin.go index ff0222be4bb..76fb4651260 100644 --- a/plugins/medianpoc/plugin.go +++ b/plugins/medianpoc/plugin.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" - ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -30,6 +29,12 @@ type Plugin struct { reportingplugins.MedianProviderServer } +func (p *Plugin) NewValidationService(ctx context.Context) (types.ValidationService, error) { + s := &reportingPluginValidationService{lggr: p.Logger} + p.SubService(s) + return s, nil +} + type pipelineSpec struct { Name string `json:"name"` Spec string `json:"spec"` @@ -130,3 +135,37 @@ func (r *reportingPluginFactoryService) Close() error { func (r *reportingPluginFactoryService) HealthReport() map[string]error { return map[string]error{r.Name(): r.Healthy()} } + +type reportingPluginValidationService struct { + services.StateMachine + lggr logger.Logger +} + +func (r *reportingPluginValidationService) ValidateConfig(ctx context.Context, config map[string]interface{}) error { + tt, ok := config["telemetryType"] + if !ok { + return fmt.Errorf("expected telemtry type") + } + telemetryType, ok := tt.(string) + if !ok { + return fmt.Errorf("expected telemtry type to be of type string but got %T", tt) + } + if telemetryType != "median" { + return fmt.Errorf("expected telemtry type to be median but got %q", telemetryType) + } + + return nil +} +func (r *reportingPluginValidationService) Name() string { return r.lggr.Name() } + +func (r *reportingPluginValidationService) Start(ctx context.Context) error { + return r.StartOnce("ValidationService", func() error { return nil }) +} + +func (r *reportingPluginValidationService) Close() error { + return r.StopOnce("ValidationService", func() error { return nil }) +} + +func (r *reportingPluginValidationService) HealthReport() map[string]error { + return map[string]error{r.Name(): r.Healthy()} +} diff --git a/plugins/registrar.go b/plugins/registrar.go index 90300b738b6..2a82f2a6204 100644 --- a/plugins/registrar.go +++ b/plugins/registrar.go @@ -9,20 +9,23 @@ import ( // RegistrarConfig generates contains static configuration inher type RegistrarConfig interface { RegisterLOOP(config CmdConfig) (func() *exec.Cmd, loop.GRPCOpts, error) + UnregisterLOOP(ID string) } type registarConfig struct { grpcOpts loop.GRPCOpts loopRegistrationFn func(loopId string) (*RegisteredLoop, error) + loopUnregisterFn func(loopId string) } // NewRegistrarConfig creates a RegistarConfig // loopRegistrationFn must act as a global registry function of LOOPs and must be idempotent. // The [func() *exec.Cmd] for a LOOP should be generated by calling [RegistrarConfig.RegisterLOOP] -func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(loopId string) (*RegisteredLoop, error)) RegistrarConfig { +func NewRegistrarConfig(grpcOpts loop.GRPCOpts, loopRegistrationFn func(loopId string) (*RegisteredLoop, error), loopUnregisterFn func(loopId string)) RegistrarConfig { return ®istarConfig{ grpcOpts: grpcOpts, loopRegistrationFn: loopRegistrationFn, + loopUnregisterFn: loopUnregisterFn, } } @@ -34,3 +37,7 @@ func (pc *registarConfig) RegisterLOOP(cfg CmdConfig) (func() *exec.Cmd, loop.GR } return cmdFn, pc.grpcOpts, nil } + +func (pc *registarConfig) UnregisterLOOP(ID string) { + pc.loopUnregisterFn(ID) +}