Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KS-208: move workflow yaml and graph validation #519

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions go.mod
krehermann marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ go 1.21

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/dominikbraun/graph v0.23.0
github.com/fxamacker/cbor/v2 v2.5.0
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0
github.com/hashicorp/consul/sdk v0.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.6.0
github.com/invopop/jsonschema v0.12.0
github.com/jmoiron/sqlx v1.3.5
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
Expand All @@ -35,6 +38,7 @@ require (
golang.org/x/mod v0.14.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
sigs.k8s.io/yaml v1.4.0
)

require (
Expand All @@ -49,12 +53,10 @@ require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect; indirec
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -156,6 +158,7 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -648,3 +651,5 @@ honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
248 changes: 248 additions & 0 deletions pkg/workflows/dependency_graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package workflows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are parts of models.go + state.go, right? Let me know if you modified anything, otherwise I'm assuming it's just copy-paste.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, just moved things. it's easy to see by reading the corresponding change in core smartcontractkit/chainlink#13235


import (
"errors"
"fmt"
"regexp"
"strings"

"github.com/dominikbraun/graph"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
)

const (
KeywordTrigger = "trigger"
)

// StepDefinition is the parsed representation of a step in a workflow.
//
// Within the workflow spec, they are called "Capability Properties".
type StepDefinition struct {
ID string `json:"id" jsonschema:"required"`
Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"`
Inputs map[string]any `json:"inputs,omitempty"`
Config map[string]any `json:"config" jsonschema:"required"`

CapabilityType capabilities.CapabilityType `json:"-"`
}

// WorkflowSpec is the parsed representation of a workflow.
type WorkflowSpec struct {
Triggers []StepDefinition `json:"triggers" jsonschema:"required"`
Actions []StepDefinition `json:"actions,omitempty"`
Consensus []StepDefinition `json:"consensus" jsonschema:"required"`
Targets []StepDefinition `json:"targets" jsonschema:"required"`
}

func (w *WorkflowSpec) Steps() []StepDefinition {
s := []StepDefinition{}
s = append(s, w.Actions...)
s = append(s, w.Consensus...)
s = append(s, w.Targets...)
return s
}

type Vertex struct {
StepDefinition
Dependencies []string
}

// DependencyGraph is an intermediate representation of a workflow wherein all the graph
// vertices are represented and validated. It is a static representation of the workflow dependencies.
type DependencyGraph struct {
ID string
graph.Graph[string, *Vertex]

Triggers []*StepDefinition

Spec *WorkflowSpec
}

// VID is an identifier for a Vertex that can be used to uniquely identify it in a graph.
// it represents the notion `hash` in the graph package AddVertex method.
// we refrain from naming it `hash` to avoid confusion with the hash function.
func (v *Vertex) VID() string {
return v.Ref
}

func ParseDependencyGraph(yamlWorkflow string) (*DependencyGraph, error) {
spec, err := ParseWorkflowSpecYaml(yamlWorkflow)
if err != nil {
return nil, err
}

// Construct and validate the graph. We instantiate an
// empty graph with just one starting entry: `trigger`.
// This provides the starting point for our graph and
// points to all dependent steps.
// Note: all triggers are represented by a single step called
// `trigger`. This is because for workflows with multiple triggers
// only one trigger will have started the workflow.
stepHash := func(s *Vertex) string {
return s.VID()
}
g := graph.New(
stepHash,
graph.PreventCycles(),
graph.Directed(),
)
err = g.AddVertex(&Vertex{
StepDefinition: StepDefinition{Ref: KeywordTrigger},
})
if err != nil {
return nil, err
}

// Next, let's populate the other entries in the graph.
for _, s := range spec.Steps() {
// TODO: The workflow format spec doesn't always require a `Ref`
// to be provided (triggers and targets don't have a `Ref` for example).
// To handle this, we default the `Ref` to the type, but ideally we
// should find a better long-term way to handle this.
if s.Ref == "" {
s.Ref = s.ID
}

innerErr := g.AddVertex(&Vertex{StepDefinition: s})
if innerErr != nil {
return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr)
}
}

stepRefs, err := g.AdjacencyMap()
if err != nil {
return nil, err
}

// Next, let's iterate over the steps and populate
// any edges.
for stepRef := range stepRefs {
step, innerErr := g.Vertex(stepRef)
if innerErr != nil {
return nil, innerErr
}

refs, innerErr := findRefs(step.Inputs)
if innerErr != nil {
return nil, innerErr
}
step.Dependencies = refs

if stepRef != KeywordTrigger && len(refs) == 0 {
return nil, errors.New("all non-trigger steps must have a dependent ref")
}

for _, r := range refs {
innerErr = g.AddEdge(r, step.Ref)
if innerErr != nil {
return nil, innerErr
}
}
}

triggerSteps := []*StepDefinition{}
for _, t := range spec.Triggers {
tt := t
triggerSteps = append(triggerSteps, &tt)
}
wf := &DependencyGraph{
Spec: &spec,
Graph: g,
Triggers: triggerSteps,
}
return wf, err
}

var (
InterpolationTokenRe = regexp.MustCompile(`^\$\((\S+)\)$`)
)

// findRefs takes an `inputs` map and returns a list of all the step references
// contained within it.
func findRefs(inputs map[string]any) ([]string, error) {
refs := []string{}
_, err := DeepMap(
inputs,
// This function is called for each string in the map
// for each string, we iterate over each match of the interpolation token
// - if there are no matches, return no reference
// - if there is one match, return the reference
// - if there are multiple matches (in the case of a multi-part state reference), return just the step ref
func(el string) (any, error) {
matches := InterpolationTokenRe.FindStringSubmatch(el)
if len(matches) < 2 {
return el, nil
}

m := matches[1]
parts := strings.Split(m, ".")
if len(parts) < 1 {
return nil, fmt.Errorf("invalid ref %s", m)
}

refs = append(refs, parts[0])
return el, nil
},
)
return refs, err
}

// DeepMap recursively applies a transformation function
// over each string within:
//
// - a map[string]any
// - a []any
// - a string
func DeepMap(input any, transform func(el string) (any, error)) (any, error) {
// in the case of a string, simply apply the transformation
// in the case of a map, recurse and apply the transformation to each value
// in the case of a list, recurse and apply the transformation to each element
switch tv := input.(type) {
case string:
nv, err := transform(tv)
if err != nil {
return nil, err
}

return nv, nil
case mapping:
// coerce mapping to map[string]any
mp := map[string]any(tv)

nm := map[string]any{}
for k, v := range mp {
nv, err := DeepMap(v, transform)
if err != nil {
return nil, err
}

nm[k] = nv
}
return nm, nil
case map[string]any:
nm := map[string]any{}
for k, v := range tv {
nv, err := DeepMap(v, transform)
if err != nil {
return nil, err
}

nm[k] = nv
}
return nm, nil
case []any:
a := []any{}
for _, el := range tv {
ne, err := DeepMap(el, transform)
if err != nil {
return nil, err
}

a = append(a, ne)
}
return a, nil
}

return nil, fmt.Errorf("cannot traverse item %+v of type %T", input, input)
}
Loading
Loading