Skip to content

Commit

Permalink
Add table support for capability "type" property (#12622)
Browse files Browse the repository at this point in the history
* Add table type support

* Factor out yaml logic

* Rename workflow -> models
  • Loading branch information
HenryNguyen5 authored Apr 5, 2024
1 parent 1bb36e0 commit a2bdcf5
Show file tree
Hide file tree
Showing 11 changed files with 1,016 additions and 687 deletions.
5 changes: 5 additions & 0 deletions .changeset/new-cheetahs-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add table support to "type" property for step definitions
205 changes: 205 additions & 0 deletions core/services/workflows/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package workflows

import (
"fmt"

"github.com/dominikbraun/graph"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

type stepRequest struct {
stepRef string
state executionState
}

// stepDefinition is the parsed representation of a step in a workflow.
//
// Within the workflow spec, they are called "Capability Properties".
type stepDefinition struct {
Type string `json:"type" jsonschema:"required"`
Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"`
Inputs map[string]any `json:"inputs,omitempty"`
Config map[string]any `json:"config" jsonschema:"required"`
}

// workflowSpec is the parsed representation of a workflow.
type workflowSpec struct {
Triggers []stepDefinition `json:"triggers" jsonschema:"required"`
Actions []stepDefinition `json:"actions,omitempty"`
Consensus []stepDefinition `json:"consensus" jsonschema:"required"`
Targets []stepDefinition `json:"targets" jsonschema:"required"`
}

func (w *workflowSpec) steps() []stepDefinition {
s := []stepDefinition{}
s = append(s, w.Actions...)
s = append(s, w.Consensus...)
s = append(s, w.Targets...)
return s
}

// workflow is a directed graph of nodes, where each node is a step.
//
// triggers are special steps that are stored separately, they're
// treated differently due to their nature of being the starting
// point of a workflow.
type workflow struct {
graph.Graph[string, *step]

triggers []*triggerCapability

spec *workflowSpec
}

func (w *workflow) walkDo(start string, do func(s *step) error) error {
var outerErr error
err := graph.BFS(w.Graph, start, func(ref string) bool {
n, err := w.Graph.Vertex(ref)
if err != nil {
outerErr = err
return true
}

err = do(n)
if err != nil {
outerErr = err
return true
}

return false
})
if err != nil {
return err
}

return outerErr
}

func (w *workflow) dependents(start string) ([]*step, error) {
steps := []*step{}
m, err := w.Graph.AdjacencyMap()
if err != nil {
return nil, err
}

adj, ok := m[start]
if !ok {
return nil, fmt.Errorf("could not find step with ref %s", start)
}

for adjacentRef := range adj {
n, err := w.Graph.Vertex(adjacentRef)
if err != nil {
return nil, err
}

steps = append(steps, n)
}

return steps, nil
}

// step wraps a stepDefinition with additional context for dependencies and execution
type step struct {
stepDefinition
dependencies []string
capability capabilities.CallbackExecutable
config *values.Map
}

type triggerCapability struct {
stepDefinition
trigger capabilities.TriggerCapability
config *values.Map
}

const (
keywordTrigger = "trigger"
)

func Parse(yamlWorkflow string) (*workflow, error) {
spec, err := ParseWorkflowSpecYaml(yamlWorkflow)
if err != nil {
return nil, err
}

// Construct and validate the graph. We instantiate an
// empty graph with just one starting entry: `trigger`.
// This provides the starting point for our graph and
// points to all dependent steps.
// Note: all triggers are represented by a single step called
// `trigger`. This is because for workflows with multiple triggers
// only one trigger will have started the workflow.
stepHash := func(s *step) string {
return s.Ref
}
g := graph.New(
stepHash,
graph.PreventCycles(),
graph.Directed(),
)
err = g.AddVertex(&step{
stepDefinition: stepDefinition{Ref: keywordTrigger},
})
if err != nil {
return nil, err
}

// Next, let's populate the other entries in the graph.
for _, s := range spec.steps() {
// TODO: The workflow format spec doesn't always require a `Ref`
// to be provided (triggers and targets don't have a `Ref` for example).
// To handle this, we default the `Ref` to the type, but ideally we
// should find a better long-term way to handle this.
if s.Ref == "" {
s.Ref = s.Type
}

innerErr := g.AddVertex(&step{stepDefinition: s})
if innerErr != nil {
return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr)
}
}

stepRefs, err := g.AdjacencyMap()
if err != nil {
return nil, err
}

// Next, let's iterate over the steps and populate
// any edges.
for stepRef := range stepRefs {
step, innerErr := g.Vertex(stepRef)
if innerErr != nil {
return nil, innerErr
}

refs, innerErr := findRefs(step.Inputs)
if innerErr != nil {
return nil, innerErr
}
step.dependencies = refs

for _, r := range refs {
innerErr = g.AddEdge(r, step.Ref)
if innerErr != nil {
return nil, innerErr
}
}
}

triggerSteps := []*triggerCapability{}
for _, t := range spec.Triggers {
triggerSteps = append(triggerSteps, &triggerCapability{
stepDefinition: t,
})
}
wf := &workflow{
spec: &spec,
Graph: g,
triggers: triggerSteps,
}
return wf, err
}
Loading

0 comments on commit a2bdcf5

Please sign in to comment.