Skip to content

Commit

Permalink
[KS-135] Add WorkflowSpec (#12770)
Browse files Browse the repository at this point in the history
* [KS-135] Add WorkflowSpec

* [KS-135] Add WorkflowSpec

* [KS-135] Add WorkflowSpec
  • Loading branch information
cedric-cordenier authored Apr 15, 2024
1 parent 6ef3a42 commit b5e5340
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 59 deletions.
4 changes: 4 additions & 0 deletions core/cmd/jobs_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (p JobPresenter) FriendlyCreatedAt() string {
if p.GatewaySpec != nil {
return p.GatewaySpec.CreatedAt.Format(time.RFC3339)
}
case presenters.WorkflowJobSpec:
if p.WorkflowSpec != nil {
return p.WorkflowSpec.CreatedAt.Format(time.RFC3339)
}
default:
return "unknown"
}
Expand Down
28 changes: 28 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type Job struct {
LiquidityBalancerSpecID *int32
PipelineSpecID int32 // This is deprecated in favor of the `job_pipeline_specs` table relationship
PipelineSpec *pipeline.Spec
WorkflowSpecID *int32
WorkflowSpec *WorkflowSpec
JobSpecErrors []SpecError
Type Type `toml:"type"`
SchemaVersion uint32 `toml:"schemaVersion"`
Expand Down Expand Up @@ -821,3 +823,29 @@ type LiquidityBalancerSpec struct {

LiquidityBalancerConfig string `toml:"liquidityBalancerConfig" db:"liquidity_balancer_config"`
}

type WorkflowSpec struct {
ID int32 `toml:"-"`
WorkflowID string `toml:"workflowId"`
Workflow string `toml:"workflow"`
WorkflowOwner string `toml:"workflowOwner"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
}

const (
workflowIDLen = 64
workflowOwnerLen = 40
)

func (w *WorkflowSpec) Validate() error {
if len(w.WorkflowID) != workflowIDLen {
return fmt.Errorf("incorrect length for id %s: expected %d, got %d", w.WorkflowID, workflowIDLen, len(w.WorkflowID))
}

if len(w.WorkflowOwner) != workflowOwnerLen {
return fmt.Errorf("incorrect length for owner %s: expected %d, got %d", w.WorkflowOwner, workflowOwnerLen, len(w.WorkflowOwner))
}

return nil
}
24 changes: 18 additions & 6 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
// 'workflow' type has no associated spec, nothing to do here
var specID int32
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, created_at, updated_at)
VALUES (:workflow, :workflow_id, :workflow_owner, NOW(), NOW())
RETURNING id;`
if err := pg.PrepareQueryRowx(tx, sql, &specID, jb.WorkflowSpec); err != nil {
return errors.Wrap(err, "failed to create WorkflowSpec for jobSpec")
}
jb.WorkflowSpecID = &specID
default:
o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type)
}
Expand Down Expand Up @@ -542,18 +549,18 @@ func (o *orm) InsertJob(job *Job, qopts ...pg.QOpt) error {
if job.ID == 0 {
query = `INSERT INTO jobs (name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id,
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
VALUES (:name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
:keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id,
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
RETURNING *;`
} else {
query = `INSERT INTO jobs (id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id,
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
VALUES (:id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
:keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id,
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
RETURNING *;`
}
err := q.GetNamed(query, job, job)
Expand Down Expand Up @@ -591,7 +598,8 @@ func (o *orm) DeleteJob(id int32, qopts ...pg.QOpt) error {
blockhash_store_spec_id,
bootstrap_spec_id,
block_header_feeder_spec_id,
gateway_spec_id
gateway_spec_id,
workflow_spec_id
),
deleted_oracle_specs AS (
DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs)
Expand Down Expand Up @@ -629,6 +637,9 @@ func (o *orm) DeleteJob(id int32, qopts ...pg.QOpt) error {
deleted_gateway_specs AS (
DELETE FROM gateway_specs WHERE id IN (SELECT gateway_spec_id FROM deleted_jobs)
),
deleted_workflow_specs AS (
DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)
),
deleted_job_pipeline_specs AS (
DELETE FROM job_pipeline_specs WHERE job_id IN (SELECT id FROM deleted_jobs) RETURNING pipeline_spec_id
)
Expand Down Expand Up @@ -1285,6 +1296,7 @@ func LoadAllJobTypes(tx pg.Queryer, job *Job) error {
loadJobType(tx, job, "LegacyGasStationSidecarSpec", "legacy_gas_station_sidecar_specs", job.LegacyGasStationSidecarSpecID),
loadJobType(tx, job, "BootstrapSpec", "bootstrap_specs", job.BootstrapSpecID),
loadJobType(tx, job, "GatewaySpec", "gateway_specs", job.GatewaySpecID),
loadJobType(tx, job, "WorkflowSpec", "workflow_specs", job.WorkflowSpecID),
)
}

Expand Down
63 changes: 13 additions & 50 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

const hardcodedWorkflow = `
triggers:
- type: "mercury-trigger"
config:
feedIds:
- "0x1111111111111111111100000000000000000000000000000000000000000000"
- "0x2222222222222222222200000000000000000000000000000000000000000000"
- "0x3333333333333333333300000000000000000000000000000000000000000000"
consensus:
- type: "offchain_reporting"
ref: "evm_median"
inputs:
observations:
- "$(trigger.outputs)"
config:
aggregation_method: "data_feeds_2_0"
aggregation_config:
"0x1111111111111111111100000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x2222222222222222222200000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x3333333333333333333300000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
encoder: "EVM"
encoder_config:
abi: "mercury_reports bytes[]"
targets:
- type: "write_polygon-testnet-mumbai"
inputs:
report: "$(evm_median.outputs.report)"
config:
address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef"
params: ["$(report)"]
abi: "receive(report bytes)"
- type: "write_ethereum-testnet-sepolia"
inputs:
report: "$(evm_median.outputs.report)"
config:
address: "0x54e220867af6683aE6DcBF535B4f952cB5116510"
params: ["$(report)"]
abi: "receive(report bytes)"
`

type Delegate struct {
registry types.CapabilitiesRegistry
logger logger.Logger
Expand Down Expand Up @@ -104,9 +56,9 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser

cfg := Config{
Lggr: d.logger,
Spec: hardcodedWorkflow,
Spec: spec.WorkflowSpec.Workflow,
WorkflowID: spec.WorkflowSpec.WorkflowID,
Registry: d.registry,
WorkflowID: mockedWorkflowID,
}
engine, err := NewEngine(cfg)
if err != nil {
Expand Down Expand Up @@ -178,6 +130,17 @@ func ValidatedWorkflowSpec(tomlString string) (job.Job, error) {
return jb, fmt.Errorf("toml unmarshal error on spec: %w", err)
}

var spec job.WorkflowSpec
err = tree.Unmarshal(&spec)
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on job: %w", err)
}

if err := spec.Validate(); err != nil {
return jb, err
}

jb.WorkflowSpec = &spec
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s", jb.Type)
}
Expand Down
2 changes: 2 additions & 0 deletions core/services/workflows/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func TestDelegate_JobSpecValidator(t *testing.T) {
`
type = "workflow"
schemaVersion = 1
workflowId = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
workflowOwner = "00000000000000000000000000000000000000aa"
`,
true,
},
Expand Down
17 changes: 16 additions & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,15 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) er
Inputs: triggerInputs,
Config: t.config,
}
return t.trigger.UnregisterTrigger(ctx, deregRequest)

// if t.trigger == nil, then we haven't initialized the workflow
// yet, and can safely consider the trigger deregistered with
// no further action.
if t.trigger != nil {
return t.trigger.UnregisterTrigger(ctx, deregRequest)
}

return nil
}

func (e *Engine) Close() error {
Expand Down Expand Up @@ -545,6 +553,13 @@ func (e *Engine) Close() error {
Config: s.config,
}

// if capability is nil, then we haven't initialized
// the workflow yet and can safely consider it deregistered
// with no further action.
if s.capability == nil {
return nil
}

innerErr := s.capability.UnregisterFromWorkflow(ctx, reg)
if innerErr != nil {
return fmt.Errorf("failed to unregister from workflow: %+v", reg)
Expand Down
48 changes: 48 additions & 0 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,54 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const hardcodedWorkflow = `
triggers:
- type: "mercury-trigger"
config:
feedIds:
- "0x1111111111111111111100000000000000000000000000000000000000000000"
- "0x2222222222222222222200000000000000000000000000000000000000000000"
- "0x3333333333333333333300000000000000000000000000000000000000000000"
consensus:
- type: "offchain_reporting"
ref: "evm_median"
inputs:
observations:
- "$(trigger.outputs)"
config:
aggregation_method: "data_feeds_2_0"
aggregation_config:
"0x1111111111111111111100000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x2222222222222222222200000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
"0x3333333333333333333300000000000000000000000000000000000000000000":
deviation: "0.001"
heartbeat: 3600
encoder: "EVM"
encoder_config:
abi: "mercury_reports bytes[]"
targets:
- type: "write_polygon-testnet-mumbai"
inputs:
report: "$(evm_median.outputs.report)"
config:
address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef"
params: ["$(report)"]
abi: "receive(report bytes)"
- type: "write_ethereum-testnet-sepolia"
inputs:
report: "$(evm_median.outputs.report)"
config:
address: "0x54e220867af6683aE6DcBF535B4f952cB5116510"
params: ["$(report)"]
abi: "receive(report bytes)"
`

type mockCapability struct {
capabilities.CapabilityInfo
capabilities.CallbackExecutable
Expand Down
Loading

0 comments on commit b5e5340

Please sign in to comment.