Skip to content

Commit

Permalink
Implement stream specs (#11685)
Browse files Browse the repository at this point in the history
* Implement stream specs

* Fix linter

* Rename StreamRegistry => Registry

* rename migration

* StreamID => type alias
  • Loading branch information
samsondav authored Jan 11, 2024
1 parent c3b156b commit dbc0f91
Show file tree
Hide file tree
Showing 22 changed files with 967 additions and 85 deletions.
7 changes: 7 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
Expand Down Expand Up @@ -290,6 +291,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient)
jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database())
txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database())
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
)

for _, chain := range legacyEVMChains.Slice() {
Expand Down Expand Up @@ -344,6 +346,11 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
db,
cfg.Database(),
globalLogger),
job.Stream: streams.NewDelegate(
globalLogger,
streamRegistry,
pipelineRunner,
cfg.JobPipeline()),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down
66 changes: 35 additions & 31 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ import (
)

const (
BlockHeaderFeeder Type = (Type)(pipeline.BlockHeaderFeederJobType)
BlockhashStore Type = (Type)(pipeline.BlockhashStoreJobType)
Bootstrap Type = (Type)(pipeline.BootstrapJobType)
Cron Type = (Type)(pipeline.CronJobType)
DirectRequest Type = (Type)(pipeline.DirectRequestJobType)
FluxMonitor Type = (Type)(pipeline.FluxMonitorJobType)
OffchainReporting Type = (Type)(pipeline.OffchainReportingJobType)
OffchainReporting2 Type = (Type)(pipeline.OffchainReporting2JobType)
Gateway Type = (Type)(pipeline.GatewayJobType)
Keeper Type = (Type)(pipeline.KeeperJobType)
VRF Type = (Type)(pipeline.VRFJobType)
BlockhashStore Type = (Type)(pipeline.BlockhashStoreJobType)
BlockHeaderFeeder Type = (Type)(pipeline.BlockHeaderFeederJobType)
LegacyGasStationServer Type = (Type)(pipeline.LegacyGasStationServerJobType)
LegacyGasStationSidecar Type = (Type)(pipeline.LegacyGasStationSidecarJobType)
OffchainReporting Type = (Type)(pipeline.OffchainReportingJobType)
OffchainReporting2 Type = (Type)(pipeline.OffchainReporting2JobType)
Stream Type = (Type)(pipeline.StreamJobType)
VRF Type = (Type)(pipeline.VRFJobType)
Webhook Type = (Type)(pipeline.WebhookJobType)
Bootstrap Type = (Type)(pipeline.BootstrapJobType)
Gateway Type = (Type)(pipeline.GatewayJobType)
)

//revive:disable:redefines-builtin-id
Expand All @@ -70,52 +71,55 @@ func (t Type) SchemaVersion() uint32 {

var (
requiresPipelineSpec = map[Type]bool{
BlockHeaderFeeder: false,
BlockhashStore: false,
Bootstrap: false,
Cron: true,
DirectRequest: true,
FluxMonitor: true,
OffchainReporting: false, // bootstrap jobs do not require it
OffchainReporting2: false, // bootstrap jobs do not require it
Gateway: false,
Keeper: false, // observationSource is injected in the upkeep executor
VRF: true,
Webhook: true,
BlockhashStore: false,
BlockHeaderFeeder: false,
LegacyGasStationServer: false,
LegacyGasStationSidecar: false,
Bootstrap: false,
Gateway: false,
OffchainReporting2: false, // bootstrap jobs do not require it
OffchainReporting: false, // bootstrap jobs do not require it
Stream: true,
VRF: true,
Webhook: true,
}
supportsAsync = map[Type]bool{
BlockHeaderFeeder: false,
BlockhashStore: false,
Bootstrap: false,
Cron: true,
DirectRequest: true,
FluxMonitor: false,
OffchainReporting: false,
OffchainReporting2: false,
Gateway: false,
Keeper: true,
VRF: true,
Webhook: true,
BlockhashStore: false,
BlockHeaderFeeder: false,
LegacyGasStationServer: false,
LegacyGasStationSidecar: false,
Bootstrap: false,
Gateway: false,
OffchainReporting2: false,
OffchainReporting: false,
Stream: true,
VRF: true,
Webhook: true,
}
schemaVersions = map[Type]uint32{
BlockHeaderFeeder: 1,
BlockhashStore: 1,
Bootstrap: 1,
Cron: 1,
DirectRequest: 1,
FluxMonitor: 1,
OffchainReporting: 1,
OffchainReporting2: 1,
Gateway: 1,
Keeper: 1,
VRF: 1,
Webhook: 1,
BlockhashStore: 1,
BlockHeaderFeeder: 1,
LegacyGasStationServer: 1,
LegacyGasStationSidecar: 1,
Bootstrap: 1,
Gateway: 1,
OffchainReporting2: 1,
OffchainReporting: 1,
Stream: 1,
VRF: 1,
Webhook: 1,
}
)

Expand Down
2 changes: 2 additions & 0 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
return errors.Wrap(err, "failed to create GatewaySpec for jobSpec")
}
jb.GatewaySpecID = &specID
case Stream:
// 'stream' type has no associated spec, nothing to do here
default:
o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type)
}
Expand Down
17 changes: 9 additions & 8 deletions core/services/job/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ var (
ErrInvalidJobType = errors.New("invalid job type")
ErrInvalidSchemaVersion = errors.New("invalid schema version")
jobTypes = map[Type]struct{}{
BlockHeaderFeeder: {},
BlockhashStore: {},
Bootstrap: {},
Cron: {},
DirectRequest: {},
FluxMonitor: {},
OffchainReporting: {},
OffchainReporting2: {},
Keeper: {},
VRF: {},
Webhook: {},
BlockhashStore: {},
Bootstrap: {},
BlockHeaderFeeder: {},
Gateway: {},
Keeper: {},
LegacyGasStationServer: {},
LegacyGasStationSidecar: {},
OffchainReporting2: {},
OffchainReporting: {},
Stream: {},
VRF: {},
Webhook: {},
}
)

Expand Down
9 changes: 7 additions & 2 deletions core/services/ocrcommon/run_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

type Runner interface {
InsertFinishedRun(run *pipeline.Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error
}

type RunResultSaver struct {
services.StateMachine

maxSuccessfulRuns uint64
runResults chan *pipeline.Run
pipelineRunner pipeline.Runner
pipelineRunner Runner
done chan struct{}
logger logger.Logger
}
Expand All @@ -24,7 +29,7 @@ func (r *RunResultSaver) HealthReport() map[string]error {

func (r *RunResultSaver) Name() string { return r.logger.Name() }

func NewResultRunSaver(pipelineRunner pipeline.Runner,
func NewResultRunSaver(pipelineRunner Runner,
logger logger.Logger, maxSuccessfulRuns uint64, resultsWriteDepth uint64,
) *RunResultSaver {
return &RunResultSaver{
Expand Down
17 changes: 9 additions & 8 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ import (
)

const (
BlockHeaderFeederJobType string = "blockheaderfeeder"
BlockhashStoreJobType string = "blockhashstore"
BootstrapJobType string = "bootstrap"
CronJobType string = "cron"
DirectRequestJobType string = "directrequest"
FluxMonitorJobType string = "fluxmonitor"
OffchainReportingJobType string = "offchainreporting"
OffchainReporting2JobType string = "offchainreporting2"
KeeperJobType string = "keeper"
VRFJobType string = "vrf"
BlockhashStoreJobType string = "blockhashstore"
BlockHeaderFeederJobType string = "blockheaderfeeder"
WebhookJobType string = "webhook"
BootstrapJobType string = "bootstrap"
GatewayJobType string = "gateway"
KeeperJobType string = "keeper"
LegacyGasStationServerJobType string = "legacygasstationserver"
LegacyGasStationSidecarJobType string = "legacygasstationsidecar"
OffchainReporting2JobType string = "offchainreporting2"
OffchainReportingJobType string = "offchainreporting"
StreamJobType string = "stream"
VRFJobType string = "vrf"
WebhookJobType string = "webhook"
)

//go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore
Expand Down
11 changes: 10 additions & 1 deletion core/services/pipeline/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@ type Spec struct {
JobID int32 `json:"-"`
JobName string `json:"-"`
JobType string `json:"-"`

Pipeline *Pipeline `json:"-" db:"-"` // This may be nil, or may be populated manually as a cache. There is no locking on this, so be careful
}

func (s *Spec) GetOrParsePipeline() (*Pipeline, error) {
if s.Pipeline != nil {
return s.Pipeline, nil
}
return s.ParsePipeline()
}

func (s Spec) Pipeline() (*Pipeline, error) {
func (s *Spec) ParsePipeline() (*Pipeline, error) {
return Parse(s.DotDagSource)
}

Expand Down
62 changes: 34 additions & 28 deletions core/services/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,32 @@ func (r *runner) ExecuteRun(
})
defer cancel()

run := NewRun(spec, vars)

pipeline, err := r.initializePipeline(run)
if err != nil {
return run, nil, err
var pipeline *Pipeline
if spec.Pipeline != nil {
// assume if set that it has been pre-initialized
pipeline = spec.Pipeline
} else {
var err error
pipeline, err = r.InitializePipeline(spec)
if err != nil {
return nil, nil, err
}
}

run := NewRun(spec, vars)
taskRunResults := r.run(ctx, pipeline, run, vars, l)

if run.Pending {
return run, nil, pkgerrors.Wrapf(err, "unexpected async run for spec ID %v, tried executing via ExecuteAndInsertFinishedRun", spec.ID)
return run, nil, fmt.Errorf("unexpected async run for spec ID %v, tried executing via ExecuteRun", spec.ID)
}

return run, taskRunResults, nil
}

func (r *runner) initializePipeline(run *Run) (*Pipeline, error) {
pipeline, err := Parse(run.PipelineSpec.DotDagSource)
func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) {
pipeline, err = spec.GetOrParsePipeline()
if err != nil {
return nil, err
return
}

// initialize certain task params
Expand All @@ -278,16 +284,16 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) {
task.(*BridgeTask).config = r.config
task.(*BridgeTask).bridgeConfig = r.bridgeConfig
task.(*BridgeTask).orm = r.btORM
task.(*BridgeTask).specId = run.PipelineSpec.ID
task.(*BridgeTask).specId = spec.ID
// URL is "safe" because it comes from the node's own database. We
// must use the unrestrictedHTTPClient because some node operators
// may run external adapters on their own hardware
task.(*BridgeTask).httpClient = r.unrestrictedHTTPClient
case TaskTypeETHCall:
task.(*ETHCallTask).legacyChains = r.legacyEVMChains
task.(*ETHCallTask).config = r.config
task.(*ETHCallTask).specGasLimit = run.PipelineSpec.GasLimit
task.(*ETHCallTask).jobType = run.PipelineSpec.JobType
task.(*ETHCallTask).specGasLimit = spec.GasLimit
task.(*ETHCallTask).jobType = spec.JobType
case TaskTypeVRF:
task.(*VRFTask).keyStore = r.vrfKeyStore
case TaskTypeVRFV2:
Expand All @@ -296,28 +302,18 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) {
task.(*VRFTaskV2Plus).keyStore = r.vrfKeyStore
case TaskTypeEstimateGasLimit:
task.(*EstimateGasLimitTask).legacyChains = r.legacyEVMChains
task.(*EstimateGasLimitTask).specGasLimit = run.PipelineSpec.GasLimit
task.(*EstimateGasLimitTask).jobType = run.PipelineSpec.JobType
task.(*EstimateGasLimitTask).specGasLimit = spec.GasLimit
task.(*EstimateGasLimitTask).jobType = spec.JobType
case TaskTypeETHTx:
task.(*ETHTxTask).keyStore = r.ethKeyStore
task.(*ETHTxTask).legacyChains = r.legacyEVMChains
task.(*ETHTxTask).specGasLimit = run.PipelineSpec.GasLimit
task.(*ETHTxTask).jobType = run.PipelineSpec.JobType
task.(*ETHTxTask).forwardingAllowed = run.PipelineSpec.ForwardingAllowed
task.(*ETHTxTask).specGasLimit = spec.GasLimit
task.(*ETHTxTask).jobType = spec.JobType
task.(*ETHTxTask).forwardingAllowed = spec.ForwardingAllowed
default:
}
}

// retain old UUID values
for _, taskRun := range run.PipelineTaskRuns {
task := pipeline.ByDotID(taskRun.DotID)
if task != nil && task.Base() != nil {
task.Base().uuid = taskRun.ID
} else {
return nil, pkgerrors.Errorf("failed to match a pipeline task for dot ID: %v", taskRun.DotID)
}
}

return pipeline, nil
}

Expand Down Expand Up @@ -542,11 +538,21 @@ func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, var
}

func (r *runner) Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error) {
pipeline, err := r.initializePipeline(run)
pipeline, err := r.InitializePipeline(run.PipelineSpec)
if err != nil {
return false, err
}

// retain old UUID values
for _, taskRun := range run.PipelineTaskRuns {
task := pipeline.ByDotID(taskRun.DotID)
if task != nil && task.Base() != nil {
task.Base().uuid = taskRun.ID
} else {
return false, pkgerrors.Errorf("failed to match a pipeline task for dot ID: %v", taskRun.DotID)
}
}

preinsert := pipeline.RequiresPreInsert()

q := r.orm.GetQ().WithOpts(pg.WithParentCtx(ctx))
Expand Down
Loading

0 comments on commit dbc0f91

Please sign in to comment.