Skip to content

Commit

Permalink
[chore] Refactor ResultsRunSaver to not expose runResults (#11398)
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier authored Dec 1, 2023
1 parent 9bd3ecb commit 5969e1f
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 158 deletions.
17 changes: 8 additions & 9 deletions core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,12 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
effectiveTransmitterAddress,
)

runResults := make(chan *pipeline.Run, chain.Config().JobPipeline().ResultWriteQueueDepth())
saver := ocrcommon.NewResultRunSaver(
d.pipelineRunner,
lggr,
cfg.JobPipeline().MaxSuccessfulRuns(),
cfg.JobPipeline().ResultWriteQueueDepth(),
)

var configOverrider ocrtypes.ConfigOverrider
configOverriderService, err := d.maybeCreateConfigOverrider(lggr, chain, concreteSpec.ContractAddress)
Expand Down Expand Up @@ -311,7 +316,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
jb,
*jb.PipelineSpec,
lggr,
runResults,
saver,
enhancedTelemChan,
),
LocalConfig: lc,
Expand All @@ -334,13 +339,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
// RunResultSaver needs to be started first so its available
// to read db writes. It is stopped last after the Oracle is shut down
// so no further runs are enqueued and we can drain the queue.
services = append([]job.ServiceCtx{ocrcommon.NewResultRunSaver(
runResults,
d.pipelineRunner,
make(chan struct{}),
lggr,
cfg.JobPipeline().MaxSuccessfulRuns(),
)}, services...)
services = append([]job.ServiceCtx{saver}, services...)
}

return services, nil
Expand Down
85 changes: 17 additions & 68 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,24 +426,22 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {

spec.CaptureEATelemetry = d.cfg.OCR2().CaptureEATelemetry()

runResults := make(chan *pipeline.Run, d.cfg.JobPipeline().ResultWriteQueueDepth())

ctx := lggrCtx.ContextWithValues(context.Background())
switch spec.PluginType {
case types.Mercury:
return d.newServicesMercury(ctx, lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
return d.newServicesMercury(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.Median:
return d.newServicesMedian(ctx, lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.DKG:
return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.OCR2VRF:
return d.newServicesOCR2VRF(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc)
return d.newServicesOCR2VRF(lggr, jb, bootstrapPeers, kb, ocrDB, lc)

case types.OCR2Keeper:
return d.newServicesOCR2Keepers(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
return d.newServicesOCR2Keepers(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.Functions:
const (
Expand All @@ -453,7 +451,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
)
thresholdPluginDB := NewDB(d.db, spec.ID, thresholdPluginId, lggr, d.cfg.Database())
s4PluginDB := NewDB(d.db, spec.ID, s4PluginId, lggr, d.cfg.Database())
return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)
return d.newServicesOCR2Functions(lggr, jb, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)

case types.GenericPlugin:
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
Expand Down Expand Up @@ -652,7 +650,6 @@ func (d *Delegate) newServicesMercury(
ctx context.Context,
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
Expand Down Expand Up @@ -720,7 +717,7 @@ func (d *Delegate) newServicesMercury(

chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
Expand All @@ -736,7 +733,6 @@ func (d *Delegate) newServicesMedian(
ctx context.Context,
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
Expand All @@ -762,14 +758,18 @@ func (d *Delegate) newServicesMedian(
}
errorLog := &errorLog{jobID: jb.ID, recordError: d.jobORM.RecordError}
enhancedTelemChan := make(chan ocrcommon.EnhancedTelemetryData, 100)
mConfig := median.NewMedianConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg)
mConfig := median.NewMedianConfig(
d.cfg.JobPipeline().MaxSuccessfulRuns(),
d.cfg.JobPipeline().ResultWriteQueueDepth(),
d.cfg,
)

relayer, err := d.RelayGetter.Get(rid)
if err != nil {
return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay}
}

medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)
medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)

if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry"))
Expand Down Expand Up @@ -852,7 +852,6 @@ func (d *Delegate) newServicesDKG(
func (d *Delegate) newServicesOCR2VRF(
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
Expand Down Expand Up @@ -1019,28 +1018,16 @@ func (d *Delegate) newServicesOCR2VRF(
return nil, errors.Wrap(err2, "new ocr2vrf")
}

// RunResultSaver needs to be started first, so it's available
// to read odb writes. It is stopped last after the OraclePlugin is shut down
// so no further runs are enqueued, and we can drain the queue.
runResultSaver := ocrcommon.NewResultRunSaver(
runResults,
d.pipelineRunner,
make(chan struct{}),
lggr,
d.cfg.JobPipeline().MaxSuccessfulRuns(),
)

// NOTE: we return from here with the services because the OCR2VRF oracles are defined
// and exported from the ocr2vrf library. It takes care of running the DKG and OCR2VRF
// oracles under the hood together.
oracleCtx := job.NewServiceAdapter(oracles)
return []job.ServiceCtx{runResultSaver, vrfProvider, dkgProvider, oracleCtx}, nil
return []job.ServiceCtx{vrfProvider, dkgProvider, oracleCtx}, nil
}

func (d *Delegate) newServicesOCR2Keepers(
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
Expand All @@ -1059,18 +1046,17 @@ func (d *Delegate) newServicesOCR2Keepers(

switch cfg.ContractVersion {
case "v2.1":
return d.newServicesOCR2Keepers21(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec)
return d.newServicesOCR2Keepers21(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec)
case "v2.0":
return d.newServicesOCR2Keepers20(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec)
return d.newServicesOCR2Keepers20(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec)
default:
return d.newServicesOCR2Keepers20(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec)
return d.newServicesOCR2Keepers20(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, cfg, spec)
}
}

func (d *Delegate) newServicesOCR2Keepers21(
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
Expand Down Expand Up @@ -1162,19 +1148,7 @@ func (d *Delegate) newServicesOCR2Keepers21(
return nil, errors.Wrap(err, "could not create new keepers ocr2 delegate")
}

// RunResultSaver needs to be started first, so it's available
// to read odb writes. It is stopped last after the OraclePlugin is shut down
// so no further runs are enqueued, and we can drain the queue.
runResultSaver := ocrcommon.NewResultRunSaver(
runResults,
d.pipelineRunner,
make(chan struct{}),
lggr,
d.cfg.JobPipeline().MaxSuccessfulRuns(),
)

automationServices := []job.ServiceCtx{
runResultSaver,
keeperProvider,
services.Registry(),
services.BlockSubscriber(),
Expand Down Expand Up @@ -1206,7 +1180,6 @@ func (d *Delegate) newServicesOCR2Keepers21(
func (d *Delegate) newServicesOCR2Keepers20(
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
Expand Down Expand Up @@ -1316,20 +1289,8 @@ func (d *Delegate) newServicesOCR2Keepers20(
return nil, errors.Wrap(err, "could not create new keepers ocr2 delegate")
}

// RunResultSaver needs to be started first, so it's available
// to read odb writes. It is stopped last after the OraclePlugin is shut down
// so no further runs are enqueued, and we can drain the queue.
runResultSaver := ocrcommon.NewResultRunSaver(
runResults,
d.pipelineRunner,
make(chan struct{}),
lggr,
d.cfg.JobPipeline().MaxSuccessfulRuns(),
)

return []job.ServiceCtx{
job.NewServiceAdapter(runr),
runResultSaver,
keeperProvider,
rgstry,
logProvider,
Expand All @@ -1340,7 +1301,6 @@ func (d *Delegate) newServicesOCR2Keepers20(
func (d *Delegate) newServicesOCR2Functions(
lggr logger.SugaredLogger,
jb job.Job,
runResults chan *pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
functionsOcrDB *db,
Expand Down Expand Up @@ -1480,18 +1440,7 @@ func (d *Delegate) newServicesOCR2Functions(
return nil, errors.Wrap(err, "error calling NewFunctionsServices")
}

// RunResultSaver needs to be started first, so it's available
// to read odb writes. It is stopped last after the OraclePlugin is shut down
// so no further runs are enqueued, and we can drain the queue.
runResultSaver := ocrcommon.NewResultRunSaver(
runResults,
d.pipelineRunner,
make(chan struct{}),
lggr,
d.cfg.JobPipeline().MaxSuccessfulRuns(),
)

return append([]job.ServiceCtx{runResultSaver, functionsProvider, thresholdProvider, s4Provider}, functionsServices...), nil
return append([]job.ServiceCtx{functionsProvider, thresholdProvider, s4Provider}, functionsServices...), nil
}

// errorLog implements [loop.ErrorLog]
Expand Down
32 changes: 19 additions & 13 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,38 @@ import (

type MedianConfig interface {
JobPipelineMaxSuccessfulRuns() uint64
JobPipelineResultWriteQueueDepth() uint64
plugins.RegistrarConfig
}

// concrete implementation of MedianConfig
type medianConfig struct {
jobPipelineMaxSuccessfulRuns uint64
jobPipelineMaxSuccessfulRuns uint64
jobPipelineResultWriteQueueDepth uint64
plugins.RegistrarConfig
}

func NewMedianConfig(jobPipelineMaxSuccessfulRuns uint64, pluginProcessCfg plugins.RegistrarConfig) MedianConfig {
func NewMedianConfig(jobPipelineMaxSuccessfulRuns uint64, jobPipelineResultWriteQueueDepth uint64, pluginProcessCfg plugins.RegistrarConfig) MedianConfig {
return &medianConfig{
jobPipelineMaxSuccessfulRuns: jobPipelineMaxSuccessfulRuns,
RegistrarConfig: pluginProcessCfg,
jobPipelineMaxSuccessfulRuns: jobPipelineMaxSuccessfulRuns,
jobPipelineResultWriteQueueDepth: jobPipelineResultWriteQueueDepth,
RegistrarConfig: pluginProcessCfg,
}
}

func (m *medianConfig) JobPipelineMaxSuccessfulRuns() uint64 {
return m.jobPipelineMaxSuccessfulRuns
}

func (m *medianConfig) JobPipelineResultWriteQueueDepth() uint64 {
return m.jobPipelineResultWriteQueueDepth
}

func NewMedianServices(ctx context.Context,
jb job.Job,
isNewlyCreatedJob bool,
relayer loop.Relayer,
pipelineRunner pipeline.Runner,
runResults chan *pipeline.Run,
lggr logger.Logger,
argsNoPlugin libocr.OCR2OracleArgs,
cfg MedianConfig,
Expand All @@ -69,6 +75,13 @@ func NewMedianServices(ctx context.Context,
}
spec := jb.OCR2OracleSpec

runSaver := ocrcommon.NewResultRunSaver(
pipelineRunner,
lggr,
cfg.JobPipelineMaxSuccessfulRuns(),
cfg.JobPipelineResultWriteQueueDepth(),
)

provider, err := relayer.NewPluginProvider(ctx, types.RelayArgs{
ExternalJobID: jb.ExternalJobID,
JobID: jb.ID,
Expand Down Expand Up @@ -104,7 +117,7 @@ func NewMedianServices(ctx context.Context,
jb,
*jb.PipelineSpec,
lggr,
runResults,
runSaver,
chEnhancedTelem,
), ocrcommon.NewInMemoryDataSource(pipelineRunner, jb, pipeline.Spec{
ID: jb.ID,
Expand Down Expand Up @@ -140,13 +153,6 @@ func NewMedianServices(ctx context.Context,
abort()
return
}
runSaver := ocrcommon.NewResultRunSaver(
runResults,
pipelineRunner,
make(chan struct{}),
lggr,
cfg.JobPipelineMaxSuccessfulRuns(),
)
srvs = append(srvs, runSaver, job.NewServiceAdapter(oracle))
if !jb.OCR2OracleSpec.CaptureEATelemetry {
lggr.Infof("Enhanced EA telemetry is disabled for job %s", jb.Name.ValueOrZero())
Expand Down
11 changes: 6 additions & 5 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (

type Config interface {
MaxSuccessfulRuns() uint64
ResultWriteQueueDepth() uint64
}

func NewServices(
jb job.Job,
ocr2Provider commontypes.MercuryProvider,
pipelineRunner pipeline.Runner,
runResults chan *pipeline.Run,
lggr logger.Logger,
argsNoPlugin libocr2.MercuryOracleArgs,
cfg Config,
Expand All @@ -55,6 +55,8 @@ func NewServices(
}
lggr = lggr.Named("MercuryPlugin").With("jobID", jb.ID, "jobName", jb.Name.ValueOrZero())

saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())

switch feedID.Version() {
case 1:
ds := mercuryv1.NewDataSource(
Expand All @@ -63,7 +65,7 @@ func NewServices(
jb,
*jb.PipelineSpec,
lggr,
runResults,
saver,
chEnhancedTelem,
ocr2Provider.ChainReader(),
ocr2Provider.MercuryServerFetcher(),
Expand All @@ -84,7 +86,7 @@ func NewServices(
*jb.PipelineSpec,
feedID,
lggr,
runResults,
saver,
chEnhancedTelem,
ocr2Provider.MercuryServerFetcher(),
*pluginConfig.LinkFeedID,
Expand All @@ -104,7 +106,7 @@ func NewServices(
*jb.PipelineSpec,
feedID,
lggr,
runResults,
saver,
chEnhancedTelem,
ocr2Provider.MercuryServerFetcher(),
*pluginConfig.LinkFeedID,
Expand All @@ -124,6 +126,5 @@ func NewServices(
if err != nil {
return nil, errors.WithStack(err)
}
saver := ocrcommon.NewResultRunSaver(runResults, pipelineRunner, make(chan struct{}), lggr, cfg.MaxSuccessfulRuns())
return []job.ServiceCtx{ocr2Provider, saver, job.NewServiceAdapter(oracle)}, nil
}
Loading

0 comments on commit 5969e1f

Please sign in to comment.