From f109e6b99a093053fd61016bced4d47b3cebdd1e Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Mon, 13 Nov 2023 16:17:13 -0500 Subject: [PATCH] wip --- ] | 118 ------------------ core/services/job/spawner.go | 10 +- core/services/llo/channel_definition_cache.go | 2 + core/services/llo/delegate.go | 95 ++++++++++++++ core/services/ocr2/delegate.go | 4 +- core/services/relay/evm/evm.go | 3 +- core/services/relay/evm/llo_provider.go | 8 +- 7 files changed, 111 insertions(+), 129 deletions(-) delete mode 100644 ] create mode 100644 core/services/llo/delegate.go diff --git a/] b/] deleted file mode 100644 index 43374c0e6e8..00000000000 --- a/] +++ /dev/null @@ -1,118 +0,0 @@ -package llo - -import ( - "context" - "fmt" - "math/big" - - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" -) - -type Runner interface { - ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) -} - -// TODO: Generalize to beyond simply an int -type DataPoint *big.Int - -type Stream interface { - Observe(ctx context.Context) (DataPoint, error) -} - -type stream struct { - id string - lggr logger.Logger - runResults chan<- *pipeline.Run - spec pipeline.Spec - runner Runner -} - -func NewStream(lggr logger.Logger, id string, runResults chan<- *pipeline.Run, pipelineSpec pipeline.Spec, pipelineRunner Runner) Stream { - return newStream(lggr, id, runResults, pipelineSpec, pipelineRunner) -} - -func newStream(lggr logger.Logger, id string, runResults chan<- *pipeline.Run, pipelineSpec pipeline.Spec, pipelineRunner Runner) *stream { - return &stream{id, lggr, runResults, pipelineSpec, pipelineRunner} -} - -func (s *stream) Observe(ctx context.Context) (DataPoint, error) { - var run *pipeline.Run - run, trrs, err := s.executeRun(ctx) - if err != nil { - return nil, fmt.Errorf("Observe failed while executing run: %w", err) - } - select { - case s.runResults <- run: - default: - s.lggr.Warnf("unable to enqueue run save for job ID %d, buffer full", s.spec.JobID) - } - - // NOTE: trrs comes back as _all_ tasks, but we only want the terminal ones - // They are guaranteed to be sorted by index asc so should be in the correct order - var finaltrrs []pipeline.TaskRunResult - for _, trr := range trrs { - if trr.IsTerminal() { - finaltrrs = append(finaltrrs, trr) - } - } - - // FIXME: How to handle arbitrary-shaped inputs? - // For now just assume everything is one *big.Int - var parsed parseOutput - parsed, pipelineExecutionErr = ds.parse(finaltrrs) - if pipelineExecutionErr != nil { - pipelineExecutionErr = fmt.Errorf("Observe failed while parsing run results: %w", pipelineExecutionErr) - return - } - obs.BenchmarkPrice = parsed.benchmarkPrice - obs.Bid = parsed.bid - obs.Ask = parsed.ask - -} - -// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). -// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. -func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { - vars := pipeline.NewVarsFrom(map[string]interface{}{ - "jb": map[string]interface{}{ - "databaseID": ds.jb.ID, - "externalJobID": ds.jb.ExternalJobID, - "name": ds.jb.Name.ValueOrZero(), - }, - }) - - run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr) - if err != nil { - return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID) - } - - return run, trrs, err -} - -// returns error on parse errors: if something is the wrong type -func (ds *datasource) parse(trrs pipeline.TaskRunResults) (*big.Int, error) { - var finaltrrs []pipeline.TaskRunResult - for _, trr := range trrs { - // only return terminal trrs from executeRun - if trr.IsTerminal() { - finaltrrs = append(finaltrrs, trr) - } - } - - // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed - // by the pipeline executor - if len(finaltrrs) != 1 { - return o, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs)) - } - res := finaltrrs[0].Result - if res.Error != nil { - o.benchmarkPrice.Err = res.Error - } else if val, err := toBigInt(res.Value); err != nil { - return fmt.Errorf("failed to parse BenchmarkPrice: %w", err) - } else { - o.benchmarkPrice.Val = val - } - - return o, merr -} diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 03ee8cee13a..1793b3cc22a 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -61,20 +61,20 @@ type ( Delegate interface { JobType() Type // BeforeJobCreated is only called once on first time job create. - BeforeJobCreated(spec Job) + BeforeJobCreated(Job) // ServicesForSpec returns services to be started and stopped for this // job. In case a given job type relies upon well-defined startup/shutdown // ordering for services, they are started in the order they are given // and stopped in reverse order. - ServicesForSpec(spec Job) ([]ServiceCtx, error) - AfterJobCreated(spec Job) - BeforeJobDeleted(spec Job) + ServicesForSpec(Job) ([]ServiceCtx, error) + AfterJobCreated(Job) + BeforeJobDeleted(Job) // OnDeleteJob will be called from within DELETE db transaction. Any db // commands issued within OnDeleteJob() should be performed first, before any // non-db side effects. This is required in order to guarantee mutual atomicity between // all tasks intended to happen during job deletion. For the same reason, the job will // not show up in the db within OnDeleteJob(), even though it is still actively running. - OnDeleteJob(spec Job, q pg.Queryer) error + OnDeleteJob(jb Job, q pg.Queryer) error } activeJob struct { diff --git a/core/services/llo/channel_definition_cache.go b/core/services/llo/channel_definition_cache.go index f5941e5c2a2..544b9296343 100644 --- a/core/services/llo/channel_definition_cache.go +++ b/core/services/llo/channel_definition_cache.go @@ -27,6 +27,8 @@ func NewChannelDefinitionCache() ChannelDefinitionCache { return &channelDefinitionCache{} } +// TODO: Needs a way to subscribe/unsubscribe to contracts + func (c *channelDefinitionCache) Start(ctx context.Context) error { // TODO: Initial load, then poll // TODO: needs to be populated asynchronously from onchain ConfigurationStore diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go new file mode 100644 index 00000000000..61aba911ccc --- /dev/null +++ b/core/services/llo/delegate.go @@ -0,0 +1,95 @@ +package llo + +import ( + "context" + "fmt" + "log" + + relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + ocr2plus "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/libocr/commontypes" +) + +var _ job.Delegate = &Delegate{} + +// Delegate is a container struct for an Oracle plugin. This struct provides +// the ability to start and stop underlying services associated with the +// plugin instance. +type Delegate struct { + llo ocr2plus.Oracle + logger *log.Logger +} + +type DelegateConfig struct { + BinaryNetworkEndpointFactory types.BinaryNetworkEndpointFactory + V2Bootstrappers []commontypes.BootstrapperLocator + ContractConfigTracker types.ContractConfigTracker + ContractTransmitter ocr3types.ContractTransmitter[relayllo.ReportInfo] + KeepersDatabase ocr3types.Database + Logger commontypes.Logger + MonitoringEndpoint commontypes.MonitoringEndpoint + OffchainConfigDigester types.OffchainConfigDigester + OffchainKeyring types.OffchainKeyring + OnchainKeyring ocr3types.OnchainKeyring[relayllo.ReportInfo] + LocalConfig types.LocalConfig +} + +func NewDelegate(c DelegateConfig) (*Delegate, error) { + return &Delegate{} +} + +func (d *Delegate) Start(_ context.Context) error { + return d.llo.Start() +} + +func (d *Delegate) JobType() job.Type { + // FIXME: Is this correct? + return job.OffchainReporting2 +} + +// BeforeJobCreated is only called once on first time job create. +func (d *Delegate) BeforeJobCreated(jb job.Job) {} + +// ServicesForSpec returns services to be started and stopped for this +// job. In case a given job type relies upon well-defined startup/shutdown +// ordering for services, they are started in the order they are given +// and stopped in reverse order. +func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { + // create the oracle from config values + llo, err := ocr2plus.NewOracle(ocr2plus.OCR3OracleArgs[relayllo.ReportInfo]{ + BinaryNetworkEndpointFactory: c.BinaryNetworkEndpointFactory, + V2Bootstrappers: c.V2Bootstrappers, + ContractConfigTracker: c.ContractConfigTracker, + ContractTransmitter: c.ContractTransmitter, + Database: c.KeepersDatabase, + LocalConfig: c.LocalConfig, + Logger: c.Logger, + MonitoringEndpoint: c.MonitoringEndpoint, + OffchainConfigDigester: c.OffchainConfigDigester, + OffchainKeyring: c.OffchainKeyring, + OnchainKeyring: c.OnchainKeyring, + ReportingPluginFactory: relayllo.NewLLOPluginFactory(), + }) + + if err != nil { + return nil, fmt.Errorf("%w: failed to create new OCR oracle", err) + } + + return []job.ServiceCtx{llo} +} +func (d *Delegate) AfterJobCreated(jb job.Job) {} +func (d *Delegate) BeforeJobDeleted(jb job.Job) {} + +// OnDeleteJob will be called from within DELETE db transaction. Any db +// commands issued within OnDeleteJob() should be performed first, before any +// non-db side effects. This is required in order to guarantee mutual atomicity between +// all tasks intended to happen during job deletion. For the same reason, the job will +// not show up in the db within OnDeleteJob(), even though it is still actively running. +func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error { + return nil +} diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index a93032bba9c..b7364ac0d05 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -796,13 +796,13 @@ func (d *Delegate) newServicesLLO( chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100) - mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) + lloServices, err2 := llo.NewServices(jb, lloProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID)) if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) { enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury")) mercuryServices = append(mercuryServices, enhancedTelemService) } else { - lggr.Infow("Enhanced telemetry is disabled for mercury job", "job", jb.Name) + lggr.Infow("Enhanced telemetry is disabled for llo job", "job", jb.Name) } return mercuryServices, err2 diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 882f00222bb..ff4eecccbda 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -235,8 +235,9 @@ func (r *Relayer) NewLLOProvider(rargs relaytypes.RelayArgs, pargs relaytypes.Pl // FIXME // transmitter := llo.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, r.db, r.pgCfg) transmitter := llo.NewTransmitter(r.lggr, client, privKey.PublicKey) + channelDefinitionCache := llo.NewChannelDefinitionCache() - return NewLLOProvider(configWatcher, transmitter, r.lggr), nil + return NewLLOProvider(configWatcher, transmitter, r.lggr, channelDefinitionCache), nil } func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) { diff --git a/core/services/relay/evm/llo_provider.go b/core/services/relay/evm/llo_provider.go index 643ab5130dc..3601f56cc67 100644 --- a/core/services/relay/evm/llo_provider.go +++ b/core/services/relay/evm/llo_provider.go @@ -16,9 +16,10 @@ import ( var _ relaytypes.LLOProvider = (*lloProvider)(nil) type lloProvider struct { - configWatcher *configWatcher - transmitter llo.Transmitter - logger logger.Logger + configWatcher *configWatcher + transmitter llo.Transmitter + logger logger.Logger + channelDefinitionCache llo.ChannelDefinitionCache ms services.MultiStart } @@ -33,6 +34,7 @@ func NewLLOProvider( configWatcher, transmitter, lggr, + channelDefinitionCache, services.MultiStart{}, } }