Skip to content

Commit

Permalink
KS-208: move workflow yaml and graph validation (#519)
Browse files Browse the repository at this point in the history
Refactor workflow yaml and graph validation into common
  • Loading branch information
krehermann authored May 17, 2024
1 parent 0a8299b commit f4446b8
Show file tree
Hide file tree
Showing 16 changed files with 1,426 additions and 5 deletions.
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ go 1.21

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/dominikbraun/graph v0.23.0
github.com/fxamacker/cbor/v2 v2.5.0
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0
github.com/hashicorp/consul/sdk v0.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.6.0
github.com/invopop/jsonschema v0.12.0
github.com/jmoiron/sqlx v1.3.5
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
Expand All @@ -35,6 +38,7 @@ require (
golang.org/x/mod v0.14.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
sigs.k8s.io/yaml v1.4.0
)

require (
Expand All @@ -49,12 +53,10 @@ require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect; indirec
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo=
github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -156,6 +158,7 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -648,3 +651,5 @@ honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
248 changes: 248 additions & 0 deletions pkg/workflows/dependency_graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package workflows

import (
"errors"
"fmt"
"regexp"
"strings"

"github.com/dominikbraun/graph"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
)

const (
KeywordTrigger = "trigger"
)

// StepDefinition is the parsed representation of a step in a workflow.
//
// Within the workflow spec, they are called "Capability Properties".
type StepDefinition struct {
ID string `json:"id" jsonschema:"required"`
Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"`
Inputs map[string]any `json:"inputs,omitempty"`
Config map[string]any `json:"config" jsonschema:"required"`

CapabilityType capabilities.CapabilityType `json:"-"`
}

// WorkflowSpec is the parsed representation of a workflow.
type WorkflowSpec struct {
Triggers []StepDefinition `json:"triggers" jsonschema:"required"`
Actions []StepDefinition `json:"actions,omitempty"`
Consensus []StepDefinition `json:"consensus" jsonschema:"required"`
Targets []StepDefinition `json:"targets" jsonschema:"required"`
}

func (w *WorkflowSpec) Steps() []StepDefinition {
s := []StepDefinition{}
s = append(s, w.Actions...)
s = append(s, w.Consensus...)
s = append(s, w.Targets...)
return s
}

type Vertex struct {
StepDefinition
Dependencies []string
}

// DependencyGraph is an intermediate representation of a workflow wherein all the graph
// vertices are represented and validated. It is a static representation of the workflow dependencies.
type DependencyGraph struct {
ID string
graph.Graph[string, *Vertex]

Triggers []*StepDefinition

Spec *WorkflowSpec
}

// VID is an identifier for a Vertex that can be used to uniquely identify it in a graph.
// it represents the notion `hash` in the graph package AddVertex method.
// we refrain from naming it `hash` to avoid confusion with the hash function.
func (v *Vertex) VID() string {
return v.Ref
}

func ParseDependencyGraph(yamlWorkflow string) (*DependencyGraph, error) {
spec, err := ParseWorkflowSpecYaml(yamlWorkflow)
if err != nil {
return nil, err
}

// Construct and validate the graph. We instantiate an
// empty graph with just one starting entry: `trigger`.
// This provides the starting point for our graph and
// points to all dependent steps.
// Note: all triggers are represented by a single step called
// `trigger`. This is because for workflows with multiple triggers
// only one trigger will have started the workflow.
stepHash := func(s *Vertex) string {
return s.VID()
}
g := graph.New(
stepHash,
graph.PreventCycles(),
graph.Directed(),
)
err = g.AddVertex(&Vertex{
StepDefinition: StepDefinition{Ref: KeywordTrigger},
})
if err != nil {
return nil, err
}

// Next, let's populate the other entries in the graph.
for _, s := range spec.Steps() {
// TODO: The workflow format spec doesn't always require a `Ref`
// to be provided (triggers and targets don't have a `Ref` for example).
// To handle this, we default the `Ref` to the type, but ideally we
// should find a better long-term way to handle this.
if s.Ref == "" {
s.Ref = s.ID
}

innerErr := g.AddVertex(&Vertex{StepDefinition: s})
if innerErr != nil {
return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr)
}
}

stepRefs, err := g.AdjacencyMap()
if err != nil {
return nil, err
}

// Next, let's iterate over the steps and populate
// any edges.
for stepRef := range stepRefs {
step, innerErr := g.Vertex(stepRef)
if innerErr != nil {
return nil, innerErr
}

refs, innerErr := findRefs(step.Inputs)
if innerErr != nil {
return nil, innerErr
}
step.Dependencies = refs

if stepRef != KeywordTrigger && len(refs) == 0 {
return nil, errors.New("all non-trigger steps must have a dependent ref")
}

for _, r := range refs {
innerErr = g.AddEdge(r, step.Ref)
if innerErr != nil {
return nil, innerErr
}
}
}

triggerSteps := []*StepDefinition{}
for _, t := range spec.Triggers {
tt := t
triggerSteps = append(triggerSteps, &tt)
}
wf := &DependencyGraph{
Spec: &spec,
Graph: g,
Triggers: triggerSteps,
}
return wf, err
}

var (
InterpolationTokenRe = regexp.MustCompile(`^\$\((\S+)\)$`)
)

// findRefs takes an `inputs` map and returns a list of all the step references
// contained within it.
func findRefs(inputs map[string]any) ([]string, error) {
refs := []string{}
_, err := DeepMap(
inputs,
// This function is called for each string in the map
// for each string, we iterate over each match of the interpolation token
// - if there are no matches, return no reference
// - if there is one match, return the reference
// - if there are multiple matches (in the case of a multi-part state reference), return just the step ref
func(el string) (any, error) {
matches := InterpolationTokenRe.FindStringSubmatch(el)
if len(matches) < 2 {
return el, nil
}

m := matches[1]
parts := strings.Split(m, ".")
if len(parts) < 1 {
return nil, fmt.Errorf("invalid ref %s", m)
}

refs = append(refs, parts[0])
return el, nil
},
)
return refs, err
}

// DeepMap recursively applies a transformation function
// over each string within:
//
// - a map[string]any
// - a []any
// - a string
func DeepMap(input any, transform func(el string) (any, error)) (any, error) {
// in the case of a string, simply apply the transformation
// in the case of a map, recurse and apply the transformation to each value
// in the case of a list, recurse and apply the transformation to each element
switch tv := input.(type) {
case string:
nv, err := transform(tv)
if err != nil {
return nil, err
}

return nv, nil
case mapping:
// coerce mapping to map[string]any
mp := map[string]any(tv)

nm := map[string]any{}
for k, v := range mp {
nv, err := DeepMap(v, transform)
if err != nil {
return nil, err
}

nm[k] = nv
}
return nm, nil
case map[string]any:
nm := map[string]any{}
for k, v := range tv {
nv, err := DeepMap(v, transform)
if err != nil {
return nil, err
}

nm[k] = nv
}
return nm, nil
case []any:
a := []any{}
for _, el := range tv {
ne, err := DeepMap(el, transform)
if err != nil {
return nil, err
}

a = append(a, ne)
}
return a, nil
}

return nil, fmt.Errorf("cannot traverse item %+v of type %T", input, input)
}
Loading

0 comments on commit f4446b8

Please sign in to comment.