Skip to content

Commit

Permalink
Decouple static workflow graph validation from execution state (#13201)
Browse files Browse the repository at this point in the history
* Decouple static workflow graph validation from execution state

* linter; revert executable to step; fix preexisiting shadowing in engine

* address feedback
  • Loading branch information
krehermann authored May 15, 2024
1 parent 0917394 commit aeb9f4d
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 57 deletions.
40 changes: 20 additions & 20 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,45 +122,45 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
return capabilityRegistrationErr
}

func (e *Engine) initializeCapability(ctx context.Context, s *step) error {
func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
// If the capability already exists, that means we've already registered it
if s.capability != nil {
if step.capability != nil {
return nil
}

cp, err := e.registry.Get(ctx, s.ID)
cp, err := e.registry.Get(ctx, step.ID)
if err != nil {
return fmt.Errorf("failed to get capability with ref %s: %s", s.ID, err)
return fmt.Errorf("failed to get capability with ref %s: %s", step.ID, err)
}

// We configure actions, consensus and targets here, and
// they all satisfy the `CallbackCapability` interface
cc, ok := cp.(capabilities.CallbackCapability)
if !ok {
return fmt.Errorf("could not coerce capability %s to CallbackCapability", s.ID)
return fmt.Errorf("could not coerce capability %s to CallbackCapability", step.ID)
}

if s.config == nil {
configMap, newMapErr := values.NewMap(s.Config)
if step.config == nil {
configMap, newMapErr := values.NewMap(step.Config)
if newMapErr != nil {
return fmt.Errorf("failed to convert config to values.Map: %s", newMapErr)
}
s.config = configMap
step.config = configMap
}

registrationRequest := capabilities.RegisterToWorkflowRequest{
Metadata: capabilities.RegistrationMetadata{
WorkflowID: e.workflow.id,
},
Config: s.config,
Config: step.config,
}

err = cc.RegisterToWorkflow(ctx, registrationRequest)
if err != nil {
return fmt.Errorf("failed to register to workflow (%+v): %w", registrationRequest, err)
}

s.capability = cc
step.capability = cc
return nil
}

Expand Down Expand Up @@ -260,8 +260,8 @@ func (e *Engine) resumeInProgressExecutions(ctx context.Context) error {
// and `scheduledExecution` for targets. If we don't have the necessary
// config to initialize a scheduledExecution for a target, we'll fallback to
// using `immediateExecution`.
func (e *Engine) initializeExecutionStrategy(step *step) error {
if step.executionStrategy != nil {
func (e *Engine) initializeExecutionStrategy(s *step) error {
if s.executionStrategy != nil {
return nil
}

Expand All @@ -272,16 +272,16 @@ func (e *Engine) initializeExecutionStrategy(step *step) error {
}

ie := immediateExecution{}
if step.CapabilityType != capabilities.CapabilityTypeTarget {
e.logger.Debugf("initializing step %+v with immediate execution strategy: not a target", step)
step.executionStrategy = ie
if s.CapabilityType != capabilities.CapabilityTypeTarget {
e.logger.Debugf("initializing step %+v with immediate execution strategy: not a target", s)
s.executionStrategy = ie
return nil
}

dinfo := e.donInfo
if dinfo.DON == nil {
e.logger.Debugf("initializing target step with immediate execution strategy: donInfo %+v", e.donInfo)
step.executionStrategy = ie
s.executionStrategy = ie
return nil
}

Expand All @@ -294,17 +294,17 @@ func (e *Engine) initializeExecutionStrategy(step *step) error {
}

if position == nil {
e.logger.Debugf("initializing step %+v with immediate execution strategy: position not found in donInfo %+v", step, e.donInfo)
step.executionStrategy = ie
e.logger.Debugf("initializing step %+v with immediate execution strategy: position not found in donInfo %+v", s, e.donInfo)
s.executionStrategy = ie
return nil
}

step.executionStrategy = scheduledExecution{
s.executionStrategy = scheduledExecution{
DON: e.donInfo.DON,
Position: *position,
PeerID: e.donInfo.PeerID(),
}
e.logger.Debugf("initializing step %+v with scheduled execution strategy", step)
e.logger.Debugf("initializing step %+v with scheduled execution strategy", s)
return nil
}

Expand Down
142 changes: 114 additions & 28 deletions core/services/workflows/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type stepRequest struct {
state store.WorkflowExecution
}

// stepDefinition is the parsed representation of a step in a workflow.
// StepDefinition is the parsed representation of a step in a workflow.
//
// Within the workflow spec, they are called "Capability Properties".
type stepDefinition struct {
type StepDefinition struct {
ID string `json:"id" jsonschema:"required"`
Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"`
Inputs map[string]any `json:"inputs,omitempty"`
Expand All @@ -28,16 +28,16 @@ type stepDefinition struct {
CapabilityType capabilities.CapabilityType `json:"-"`
}

// workflowSpec is the parsed representation of a workflow.
type workflowSpec struct {
Triggers []stepDefinition `json:"triggers" jsonschema:"required"`
Actions []stepDefinition `json:"actions,omitempty"`
Consensus []stepDefinition `json:"consensus" jsonschema:"required"`
Targets []stepDefinition `json:"targets" jsonschema:"required"`
// WorkflowSpec is the parsed representation of a workflow.
type WorkflowSpec struct {
Triggers []StepDefinition `json:"triggers" jsonschema:"required"`
Actions []StepDefinition `json:"actions,omitempty"`
Consensus []StepDefinition `json:"consensus" jsonschema:"required"`
Targets []StepDefinition `json:"targets" jsonschema:"required"`
}

func (w *workflowSpec) steps() []stepDefinition {
s := []stepDefinition{}
func (w *WorkflowSpec) Steps() []StepDefinition {
s := []StepDefinition{}
s = append(s, w.Actions...)
s = append(s, w.Consensus...)
s = append(s, w.Targets...)
Expand All @@ -55,7 +55,7 @@ type workflow struct {

triggers []*triggerCapability

spec *workflowSpec
spec *WorkflowSpec
}

func (w *workflow) walkDo(start string, do func(s *step) error) error {
Expand Down Expand Up @@ -106,17 +106,39 @@ func (w *workflow) dependents(start string) ([]*step, error) {
return steps, nil
}

// step wraps a stepDefinition with additional context for dependencies and execution
// step wraps a Vertex with additional context for execution that is mutated by the engine
type step struct {
stepDefinition
dependencies []string
Vertex
capability capabilities.CallbackCapability
config *values.Map
executionStrategy executionStrategy
}

type Vertex struct {
StepDefinition
dependencies []string
}

// DependencyGraph is an intermediate representation of a workflow wherein all the graph
// vertices are represented and validated. It is a static representation of the workflow dependencies.
type DependencyGraph struct {
ID string
graph.Graph[string, *Vertex]

Triggers []*StepDefinition

Spec *WorkflowSpec
}

// VID is an identifier for a Vertex that can be used to uniquely identify it in a graph.
// it represents the notion `hash` in the graph package AddVertex method.
// we refrain from naming it `hash` to avoid confusion with the hash function.
func (v *Vertex) VID() string {
return v.Ref
}

type triggerCapability struct {
stepDefinition
StepDefinition
trigger capabilities.TriggerCapability
config *values.Map
}
Expand All @@ -126,6 +148,14 @@ const (
)

func Parse(yamlWorkflow string) (*workflow, error) {
wf2, err := ParseDepedencyGraph(yamlWorkflow)
if err != nil {
return nil, err
}
return createWorkflow(wf2)
}

func ParseDepedencyGraph(yamlWorkflow string) (*DependencyGraph, error) {
spec, err := ParseWorkflowSpecYaml(yamlWorkflow)
if err != nil {
return nil, err
Expand All @@ -138,23 +168,23 @@ func Parse(yamlWorkflow string) (*workflow, error) {
// Note: all triggers are represented by a single step called
// `trigger`. This is because for workflows with multiple triggers
// only one trigger will have started the workflow.
stepHash := func(s *step) string {
return s.Ref
stepHash := func(s *Vertex) string {
return s.VID()
}
g := graph.New(
stepHash,
graph.PreventCycles(),
graph.Directed(),
)
err = g.AddVertex(&step{
stepDefinition: stepDefinition{Ref: keywordTrigger},
err = g.AddVertex(&Vertex{
StepDefinition: StepDefinition{Ref: keywordTrigger},
})
if err != nil {
return nil, err
}

// Next, let's populate the other entries in the graph.
for _, s := range spec.steps() {
for _, s := range spec.Steps() {
// TODO: The workflow format spec doesn't always require a `Ref`
// to be provided (triggers and targets don't have a `Ref` for example).
// To handle this, we default the `Ref` to the type, but ideally we
Expand All @@ -163,7 +193,7 @@ func Parse(yamlWorkflow string) (*workflow, error) {
s.Ref = s.ID
}

innerErr := g.AddVertex(&step{stepDefinition: s})
innerErr := g.AddVertex(&Vertex{StepDefinition: s})
if innerErr != nil {
return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr)
}
Expand Down Expand Up @@ -200,16 +230,72 @@ func Parse(yamlWorkflow string) (*workflow, error) {
}
}

triggerSteps := []*triggerCapability{}
triggerSteps := []*StepDefinition{}
for _, t := range spec.Triggers {
triggerSteps = append(triggerSteps, &triggerCapability{
stepDefinition: t,
})
tt := t
triggerSteps = append(triggerSteps, &tt)
}
wf := &workflow{
spec: &spec,
wf := &DependencyGraph{
Spec: &spec,
Graph: g,
triggers: triggerSteps,
Triggers: triggerSteps,
}
return wf, err
}

// createWorkflow converts a StaticWorkflow to an executable workflow
// by adding metadata to the vertices that is owned by the workflow runtime.
func createWorkflow(wf2 *DependencyGraph) (*workflow, error) {
out := &workflow{
id: wf2.ID,
triggers: []*triggerCapability{},
spec: wf2.Spec,
}

for _, t := range wf2.Triggers {
out.triggers = append(out.triggers, &triggerCapability{
StepDefinition: *t,
})
}

stepHash := func(s *step) string {
// must use the same hash function as the DependencyGraph.
// this ensures that the intermediate representation (DependencyGraph) and the workflow
// representation label vertices with the same identifier, which in turn allows us to
// to copy the edges from the intermediate representation to the executable representation.
return s.Vertex.VID()
}
g := graph.New(
stepHash,
graph.PreventCycles(),
graph.Directed(),
)
adjMap, err := wf2.Graph.AdjacencyMap()
if err != nil {
return nil, fmt.Errorf("failed to convert intermediate representation to adjacency map: %w", err)
}

// copy the all the vertices from the intermediate graph to the executable workflow graph
for vertexRef := range adjMap {
v, innerErr := wf2.Graph.Vertex(vertexRef)
if innerErr != nil {
return nil, fmt.Errorf("failed to retrieve vertex for %s: %w", vertexRef, innerErr)
}
innerErr = g.AddVertex(&step{Vertex: *v})
if innerErr != nil {
return nil, fmt.Errorf("failed to add vertex to executable workflow %s: %w", vertexRef, innerErr)
}
}
// now we can add all the edges. this works because we are using vertex hash function is the same in both graphs.
// see comment on `stepHash` function.
for vertexRef, edgeRefs := range adjMap {
for edgeRef := range edgeRefs {
innerErr := g.AddEdge(vertexRef, edgeRef)
if innerErr != nil {
return nil, fmt.Errorf("failed to add edge from '%s' to '%s': %w", vertexRef, edgeRef, innerErr)
}
}
}
out.Graph = g
return out, nil
}
18 changes: 9 additions & 9 deletions core/services/workflows/models_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func GenerateJsonSchema() ([]byte, error) {
return json.MarshalIndent(schema, "", " ")
}

func ParseWorkflowSpecYaml(data string) (workflowSpec, error) {
func ParseWorkflowSpecYaml(data string) (WorkflowSpec, error) {
w := workflowSpecYaml{}
err := yaml.Unmarshal([]byte(data), &w)

Expand All @@ -46,36 +46,36 @@ type workflowSpecYaml struct {
//
// We support multiple ways of defining a workflow spec yaml,
// but internally we want to work with a single representation.
func (w workflowSpecYaml) toWorkflowSpec() workflowSpec {
triggers := make([]stepDefinition, 0, len(w.Triggers))
func (w workflowSpecYaml) toWorkflowSpec() WorkflowSpec {
triggers := make([]StepDefinition, 0, len(w.Triggers))
for _, t := range w.Triggers {
sd := t.toStepDefinition()
sd.CapabilityType = capabilities.CapabilityTypeTrigger
triggers = append(triggers, sd)
}

actions := make([]stepDefinition, 0, len(w.Actions))
actions := make([]StepDefinition, 0, len(w.Actions))
for _, a := range w.Actions {
sd := a.toStepDefinition()
sd.CapabilityType = capabilities.CapabilityTypeAction
actions = append(actions, sd)
}

consensus := make([]stepDefinition, 0, len(w.Consensus))
consensus := make([]StepDefinition, 0, len(w.Consensus))
for _, c := range w.Consensus {
sd := c.toStepDefinition()
sd.CapabilityType = capabilities.CapabilityTypeConsensus
consensus = append(consensus, sd)
}

targets := make([]stepDefinition, 0, len(w.Targets))
targets := make([]StepDefinition, 0, len(w.Targets))
for _, t := range w.Targets {
sd := t.toStepDefinition()
sd.CapabilityType = capabilities.CapabilityTypeTarget
targets = append(targets, sd)
}

return workflowSpec{
return WorkflowSpec{
Triggers: triggers,
Actions: actions,
Consensus: consensus,
Expand Down Expand Up @@ -247,8 +247,8 @@ type stepDefinitionYaml struct {
// toStepDefinition converts a stepDefinitionYaml to a stepDefinition.
//
// `stepDefinition` is the converged representation of a step in a workflow.
func (s stepDefinitionYaml) toStepDefinition() stepDefinition {
return stepDefinition{
func (s stepDefinitionYaml) toStepDefinition() StepDefinition {
return StepDefinition{
Ref: s.Ref,
ID: s.ID.String(),
Inputs: s.Inputs,
Expand Down

0 comments on commit aeb9f4d

Please sign in to comment.