From 356075e58ac0def1a0f4303f93f73253bd5469f8 Mon Sep 17 00:00:00 2001 From: krehermann Date: Thu, 16 May 2024 14:10:51 -0600 Subject: [PATCH] prefactoring --- core/services/workflows/dependency_graph.go | 247 ++++++++++++++++++ .../workflows/dependency_graph_test.go | 244 +++++++++++++++++ core/services/workflows/engine.go | 21 +- core/services/workflows/engine_test.go | 8 +- core/services/workflows/models.go | 152 +---------- core/services/workflows/models_test.go | 14 +- core/services/workflows/models_yaml_test.go | 10 - core/services/workflows/state.go | 96 +------ 8 files changed, 522 insertions(+), 270 deletions(-) create mode 100644 core/services/workflows/dependency_graph.go create mode 100644 core/services/workflows/dependency_graph_test.go diff --git a/core/services/workflows/dependency_graph.go b/core/services/workflows/dependency_graph.go new file mode 100644 index 00000000000..46843a761fe --- /dev/null +++ b/core/services/workflows/dependency_graph.go @@ -0,0 +1,247 @@ +package workflows + +import ( + "errors" + "fmt" + "regexp" + "strings" + + "github.com/dominikbraun/graph" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +const ( + KeywordTrigger = "trigger" +) + +// StepDefinition is the parsed representation of a step in a workflow. +// +// Within the workflow spec, they are called "Capability Properties". +type StepDefinition struct { + ID string `json:"id" jsonschema:"required"` + Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"` + Inputs map[string]any `json:"inputs,omitempty"` + Config map[string]any `json:"config" jsonschema:"required"` + + CapabilityType capabilities.CapabilityType `json:"-"` +} + +// WorkflowSpec is the parsed representation of a workflow. +type WorkflowSpec struct { + Triggers []StepDefinition `json:"triggers" jsonschema:"required"` + Actions []StepDefinition `json:"actions,omitempty"` + Consensus []StepDefinition `json:"consensus" jsonschema:"required"` + Targets []StepDefinition `json:"targets" jsonschema:"required"` +} + +func (w *WorkflowSpec) Steps() []StepDefinition { + s := []StepDefinition{} + s = append(s, w.Actions...) + s = append(s, w.Consensus...) + s = append(s, w.Targets...) + return s +} + +type Vertex struct { + StepDefinition + dependencies []string +} + +// DependencyGraph is an intermediate representation of a workflow wherein all the graph +// vertices are represented and validated. It is a static representation of the workflow dependencies. +type DependencyGraph struct { + ID string + graph.Graph[string, *Vertex] + + Triggers []*StepDefinition + + Spec *WorkflowSpec +} + +// VID is an identifier for a Vertex that can be used to uniquely identify it in a graph. +// it represents the notion `hash` in the graph package AddVertex method. +// we refrain from naming it `hash` to avoid confusion with the hash function. +func (v *Vertex) VID() string { + return v.Ref +} + +func ParseDependencyGraph(yamlWorkflow string) (*DependencyGraph, error) { + spec, err := ParseWorkflowSpecYaml(yamlWorkflow) + if err != nil { + return nil, err + } + + // Construct and validate the graph. We instantiate an + // empty graph with just one starting entry: `trigger`. + // This provides the starting point for our graph and + // points to all dependent steps. + // Note: all triggers are represented by a single step called + // `trigger`. This is because for workflows with multiple triggers + // only one trigger will have started the workflow. + stepHash := func(s *Vertex) string { + return s.VID() + } + g := graph.New( + stepHash, + graph.PreventCycles(), + graph.Directed(), + ) + err = g.AddVertex(&Vertex{ + StepDefinition: StepDefinition{Ref: KeywordTrigger}, + }) + if err != nil { + return nil, err + } + + // Next, let's populate the other entries in the graph. + for _, s := range spec.Steps() { + // TODO: The workflow format spec doesn't always require a `Ref` + // to be provided (triggers and targets don't have a `Ref` for example). + // To handle this, we default the `Ref` to the type, but ideally we + // should find a better long-term way to handle this. + if s.Ref == "" { + s.Ref = s.ID + } + + innerErr := g.AddVertex(&Vertex{StepDefinition: s}) + if innerErr != nil { + return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr) + } + } + + stepRefs, err := g.AdjacencyMap() + if err != nil { + return nil, err + } + + // Next, let's iterate over the steps and populate + // any edges. + for stepRef := range stepRefs { + step, innerErr := g.Vertex(stepRef) + if innerErr != nil { + return nil, innerErr + } + + refs, innerErr := findRefs(step.Inputs) + if innerErr != nil { + return nil, innerErr + } + step.dependencies = refs + + if stepRef != KeywordTrigger && len(refs) == 0 { + return nil, errors.New("all non-trigger steps must have a dependent ref") + } + + for _, r := range refs { + innerErr = g.AddEdge(r, step.Ref) + if innerErr != nil { + return nil, innerErr + } + } + } + + triggerSteps := []*StepDefinition{} + for _, t := range spec.Triggers { + tt := t + triggerSteps = append(triggerSteps, &tt) + } + wf := &DependencyGraph{ + Spec: &spec, + Graph: g, + Triggers: triggerSteps, + } + return wf, err +} + +var ( + interpolationTokenRe = regexp.MustCompile(`^\$\((\S+)\)$`) +) + +// findRefs takes an `inputs` map and returns a list of all the step references +// contained within it. +func findRefs(inputs map[string]any) ([]string, error) { + refs := []string{} + _, err := DeepMap( + inputs, + // This function is called for each string in the map + // for each string, we iterate over each match of the interpolation token + // - if there are no matches, return no reference + // - if there is one match, return the reference + // - if there are multiple matches (in the case of a multi-part state reference), return just the step ref + func(el string) (any, error) { + matches := interpolationTokenRe.FindStringSubmatch(el) + if len(matches) < 2 { + return el, nil + } + + m := matches[1] + parts := strings.Split(m, ".") + if len(parts) < 1 { + return nil, fmt.Errorf("invalid ref %s", m) + } + + refs = append(refs, parts[0]) + return el, nil + }, + ) + return refs, err +} + +// DeepMap recursively applies a transformation function +// over each string within: +// +// - a map[string]any +// - a []any +// - a string +func DeepMap(input any, transform func(el string) (any, error)) (any, error) { + // in the case of a string, simply apply the transformation + // in the case of a map, recurse and apply the transformation to each value + // in the case of a list, recurse and apply the transformation to each element + switch tv := input.(type) { + case string: + nv, err := transform(tv) + if err != nil { + return nil, err + } + + return nv, nil + case mapping: + // coerce mapping to map[string]any + mp := map[string]any(tv) + + nm := map[string]any{} + for k, v := range mp { + nv, err := DeepMap(v, transform) + if err != nil { + return nil, err + } + + nm[k] = nv + } + return nm, nil + case map[string]any: + nm := map[string]any{} + for k, v := range tv { + nv, err := DeepMap(v, transform) + if err != nil { + return nil, err + } + + nm[k] = nv + } + return nm, nil + case []any: + a := []any{} + for _, el := range tv { + ne, err := DeepMap(el, transform) + if err != nil { + return nil, err + } + + a = append(a, ne) + } + return a, nil + } + + return nil, fmt.Errorf("cannot traverse item %+v of type %T", input, input) +} diff --git a/core/services/workflows/dependency_graph_test.go b/core/services/workflows/dependency_graph_test.go new file mode 100644 index 00000000000..e6a0f9a810c --- /dev/null +++ b/core/services/workflows/dependency_graph_test.go @@ -0,0 +1,244 @@ +package workflows_test + +import ( + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseDependencyGraph(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + yaml string + graph map[string]map[string]struct{} + errMsg string + }{ + { + name: "basic example", + yaml: ` +triggers: + - id: "a-trigger" + +actions: + - id: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + graph: map[string]map[string]struct{}{ + workflows.KeywordTrigger: { + "an-action": struct{}{}, + "a-consensus": struct{}{}, + }, + "an-action": { + "a-consensus": struct{}{}, + }, + "a-consensus": { + "a-target": struct{}{}, + }, + "a-target": {}, + }, + }, + { + name: "circular relationship", + yaml: ` +triggers: + - id: "a-trigger" + +actions: + - id: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + output: $(a-second-action.outputs) + - id: "a-second-action" + ref: "a-second-action" + inputs: + output: $(an-action.outputs) + +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "edge would create a cycle", + }, + { + name: "indirect circular relationship", + yaml: ` +triggers: + - id: "a-trigger" + +actions: + - id: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + action_output: $(a-third-action.outputs) + - id: "a-second-action" + ref: "a-second-action" + inputs: + output: $(an-action.outputs) + - id: "a-third-action" + ref: "a-third-action" + inputs: + output: $(a-second-action.outputs) + +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + an-action_output: $(an-action.outputs) + +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "edge would create a cycle", + }, + { + name: "relationship doesn't exist", + yaml: ` +triggers: + - id: "a-trigger" + +actions: + - id: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + action_output: $(missing-action.outputs) + +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + an-action_output: $(an-action.outputs) + +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "source vertex missing-action: vertex not found", + }, + { + name: "two trigger nodes", + yaml: ` +triggers: + - id: "a-trigger" + - id: "a-second-trigger" + +actions: + - id: "an-action" + ref: "an-action" + inputs: + trigger_output: $(trigger.outputs) + +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + an-action_output: $(an-action.outputs) + +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + graph: map[string]map[string]struct{}{ + workflows.KeywordTrigger: { + "an-action": struct{}{}, + }, + "an-action": { + "a-consensus": struct{}{}, + }, + "a-consensus": { + "a-target": struct{}{}, + }, + "a-target": {}, + }, + }, + { + name: "non-trigger step with no dependent refs", + yaml: ` +triggers: + - id: "a-trigger" + - id: "a-second-trigger" +actions: + - id: "an-action" + ref: "an-action" + inputs: + hello: "world" +consensus: + - id: "a-consensus" + ref: "a-consensus" + inputs: + trigger_output: $(trigger.outputs) + action_output: $(an-action.outputs) +targets: + - id: "a-target" + ref: "a-target" + inputs: + consensus_output: $(a-consensus.outputs) +`, + errMsg: "all non-trigger steps must have a dependent ref", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(st *testing.T) { + //wf, err := workflows.Parse(tc.yaml) + wf, err := workflows.ParseDependencyGraph(tc.yaml) + if tc.errMsg != "" { + assert.ErrorContains(st, err, tc.errMsg) + } else { + require.NoError(st, err) + + adjacencies, err := wf.AdjacencyMap() + require.NoError(t, err) + + got := map[string]map[string]struct{}{} + for k, v := range adjacencies { + if _, ok := got[k]; !ok { + got[k] = map[string]struct{}{} + } + for adj := range v { + got[k][adj] = struct{}{} + } + } + + assert.Equal(st, tc.graph, got, adjacencies) + } + }) + } +} diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 2b497057ada..ea0cd3d1615 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -29,6 +29,11 @@ type donInfo struct { PeerID func() *p2ptypes.PeerID } +type stepRequest struct { + stepRef string + state store.WorkflowExecution +} + // Engine handles the lifecycle of a single workflow and its executions. type Engine struct { services.StateMachine @@ -103,11 +108,11 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { // - fetching the capability // - register the capability to this workflow // - initializing the step's executionStrategy - capabilityRegistrationErr := e.workflow.walkDo(keywordTrigger, func(s *step) error { + capabilityRegistrationErr := e.workflow.walkDo(KeywordTrigger, func(s *step) error { // The graph contains a dummy step for triggers, but // we handle triggers separately since there might be more than one // trigger registered to a workflow. - if s.Ref == keywordTrigger { + if s.Ref == KeywordTrigger { return nil } @@ -441,13 +446,13 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event v e.logger.Debugw("executing on a trigger event", "event", event, "executionID", executionID) ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ - keywordTrigger: { + KeywordTrigger: { Outputs: &store.StepOutput{ Value: event, }, Status: store.StatusCompleted, ExecutionID: executionID, - Ref: keywordTrigger, + Ref: KeywordTrigger, }, }, WorkflowID: e.workflow.id, @@ -463,7 +468,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event v // Find the tasks we need to fire when a trigger has fired and enqueue them. // This consists of a) nodes without a dependency and b) nodes which depend // on a trigger - triggerDependents, err := e.workflow.dependents(keywordTrigger) + triggerDependents, err := e.workflow.dependents(KeywordTrigger) if err != nil { return err } @@ -492,7 +497,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow // we've completed the workflow. if len(stepDependents) == 0 { workflowCompleted := true - err := e.workflow.walkDo(keywordTrigger, func(s *step) error { + err := e.workflow.walkDo(KeywordTrigger, func(s *step) error { step, ok := state.Steps[s.Ref] // The step is missing from the state, // which means it hasn't been processed yet. @@ -701,8 +706,8 @@ func (e *Engine) Close() error { close(e.stopCh) e.wg.Wait() - err := e.workflow.walkDo(keywordTrigger, func(s *step) error { - if s.Ref == keywordTrigger { + err := e.workflow.walkDo(KeywordTrigger, func(s *step) error { + if s.Ref == KeywordTrigger { return nil } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 1ad7a3c2ae2..8cbce419b60 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -547,13 +547,13 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) { dbstore := store.NewDBStore(pgtest.NewSqlxDB(t), clockwork.NewFakeClock()) ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ - keywordTrigger: { + KeywordTrigger: { Outputs: &store.StepOutput{ Value: resp, }, Status: store.StatusCompleted, ExecutionID: "", - Ref: keywordTrigger, + Ref: KeywordTrigger, }, }, WorkflowID: "", @@ -602,13 +602,13 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) { dbstore := store.NewDBStore(pgtest.NewSqlxDB(t), clock) ec := &store.WorkflowExecution{ Steps: map[string]*store.WorkflowExecutionStep{ - keywordTrigger: { + KeywordTrigger: { Outputs: &store.StepOutput{ Value: resp, }, Status: store.StatusCompleted, ExecutionID: "", - Ref: keywordTrigger, + Ref: KeywordTrigger, }, }, WorkflowID: "", diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index dadadc8ba0e..b23b6a7b09d 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -1,49 +1,14 @@ package workflows import ( - "errors" "fmt" "github.com/dominikbraun/graph" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) -type stepRequest struct { - stepRef string - state store.WorkflowExecution -} - -// StepDefinition is the parsed representation of a step in a workflow. -// -// Within the workflow spec, they are called "Capability Properties". -type StepDefinition struct { - ID string `json:"id" jsonschema:"required"` - Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"` - Inputs map[string]any `json:"inputs,omitempty"` - Config map[string]any `json:"config" jsonschema:"required"` - - CapabilityType capabilities.CapabilityType `json:"-"` -} - -// WorkflowSpec is the parsed representation of a workflow. -type WorkflowSpec struct { - Triggers []StepDefinition `json:"triggers" jsonschema:"required"` - Actions []StepDefinition `json:"actions,omitempty"` - Consensus []StepDefinition `json:"consensus" jsonschema:"required"` - Targets []StepDefinition `json:"targets" jsonschema:"required"` -} - -func (w *WorkflowSpec) Steps() []StepDefinition { - s := []StepDefinition{} - s = append(s, w.Actions...) - s = append(s, w.Consensus...) - s = append(s, w.Targets...) - return s -} - // workflow is a directed graph of nodes, where each node is a step. // // triggers are special steps that are stored separately, they're @@ -114,135 +79,20 @@ type step struct { executionStrategy executionStrategy } -type Vertex struct { - StepDefinition - dependencies []string -} - -// DependencyGraph is an intermediate representation of a workflow wherein all the graph -// vertices are represented and validated. It is a static representation of the workflow dependencies. -type DependencyGraph struct { - ID string - graph.Graph[string, *Vertex] - - Triggers []*StepDefinition - - Spec *WorkflowSpec -} - -// VID is an identifier for a Vertex that can be used to uniquely identify it in a graph. -// it represents the notion `hash` in the graph package AddVertex method. -// we refrain from naming it `hash` to avoid confusion with the hash function. -func (v *Vertex) VID() string { - return v.Ref -} - type triggerCapability struct { StepDefinition trigger capabilities.TriggerCapability config *values.Map } -const ( - keywordTrigger = "trigger" -) - func Parse(yamlWorkflow string) (*workflow, error) { - wf2, err := ParseDepedencyGraph(yamlWorkflow) + wf2, err := ParseDependencyGraph(yamlWorkflow) if err != nil { return nil, err } return createWorkflow(wf2) } -func ParseDepedencyGraph(yamlWorkflow string) (*DependencyGraph, error) { - spec, err := ParseWorkflowSpecYaml(yamlWorkflow) - if err != nil { - return nil, err - } - - // Construct and validate the graph. We instantiate an - // empty graph with just one starting entry: `trigger`. - // This provides the starting point for our graph and - // points to all dependent steps. - // Note: all triggers are represented by a single step called - // `trigger`. This is because for workflows with multiple triggers - // only one trigger will have started the workflow. - stepHash := func(s *Vertex) string { - return s.VID() - } - g := graph.New( - stepHash, - graph.PreventCycles(), - graph.Directed(), - ) - err = g.AddVertex(&Vertex{ - StepDefinition: StepDefinition{Ref: keywordTrigger}, - }) - if err != nil { - return nil, err - } - - // Next, let's populate the other entries in the graph. - for _, s := range spec.Steps() { - // TODO: The workflow format spec doesn't always require a `Ref` - // to be provided (triggers and targets don't have a `Ref` for example). - // To handle this, we default the `Ref` to the type, but ideally we - // should find a better long-term way to handle this. - if s.Ref == "" { - s.Ref = s.ID - } - - innerErr := g.AddVertex(&Vertex{StepDefinition: s}) - if innerErr != nil { - return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr) - } - } - - stepRefs, err := g.AdjacencyMap() - if err != nil { - return nil, err - } - - // Next, let's iterate over the steps and populate - // any edges. - for stepRef := range stepRefs { - step, innerErr := g.Vertex(stepRef) - if innerErr != nil { - return nil, innerErr - } - - refs, innerErr := findRefs(step.Inputs) - if innerErr != nil { - return nil, innerErr - } - step.dependencies = refs - - if stepRef != keywordTrigger && len(refs) == 0 { - return nil, errors.New("all non-trigger steps must have a dependent ref") - } - - for _, r := range refs { - innerErr = g.AddEdge(r, step.Ref) - if innerErr != nil { - return nil, innerErr - } - } - } - - triggerSteps := []*StepDefinition{} - for _, t := range spec.Triggers { - tt := t - triggerSteps = append(triggerSteps, &tt) - } - wf := &DependencyGraph{ - Spec: &spec, - Graph: g, - Triggers: triggerSteps, - } - return wf, err -} - // createWorkflow converts a StaticWorkflow to an executable workflow // by adding metadata to the vertices that is owned by the workflow runtime. func createWorkflow(wf2 *DependencyGraph) (*workflow, error) { diff --git a/core/services/workflows/models_test.go b/core/services/workflows/models_test.go index 0964b13d277..7f838c5b224 100644 --- a/core/services/workflows/models_test.go +++ b/core/services/workflows/models_test.go @@ -41,7 +41,7 @@ targets: consensus_output: $(a-consensus.outputs) `, graph: map[string]map[string]struct{}{ - keywordTrigger: { + KeywordTrigger: { "an-action": struct{}{}, "a-consensus": struct{}{}, }, @@ -175,7 +175,7 @@ targets: consensus_output: $(a-consensus.outputs) `, graph: map[string]map[string]struct{}{ - keywordTrigger: { + KeywordTrigger: { "an-action": struct{}{}, }, "an-action": { @@ -240,3 +240,13 @@ targets: }) } } + +func TestParsesIntsCorrectly(t *testing.T) { + wf, err := Parse(hardcodedWorkflow) + require.NoError(t, err) + + n, err := wf.Vertex("evm_median") + require.NoError(t, err) + + assert.Equal(t, int64(3600), n.Config["aggregation_config"].(map[string]any)["0x1111111111111111111100000000000000000000000000000000000000000000"].(map[string]any)["heartbeat"]) +} diff --git a/core/services/workflows/models_yaml_test.go b/core/services/workflows/models_yaml_test.go index 5fa326dda5d..1afde42e801 100644 --- a/core/services/workflows/models_yaml_test.go +++ b/core/services/workflows/models_yaml_test.go @@ -232,16 +232,6 @@ func TestJsonSchema(t *testing.T) { }) } -func TestParsesIntsCorrectly(t *testing.T) { - wf, err := Parse(hardcodedWorkflow) - require.NoError(t, err) - - n, err := wf.Vertex("evm_median") - require.NoError(t, err) - - assert.Equal(t, int64(3600), n.Config["aggregation_config"].(map[string]any)["0x1111111111111111111100000000000000000000000000000000000000000000"].(map[string]any)["heartbeat"]) -} - func TestMappingCustomType(t *testing.T) { m := mapping(map[string]any{}) data := ` diff --git a/core/services/workflows/state.go b/core/services/workflows/state.go index 4026a59be0b..0dd0244cada 100644 --- a/core/services/workflows/state.go +++ b/core/services/workflows/state.go @@ -2,7 +2,6 @@ package workflows import ( "fmt" - "regexp" "strconv" "strings" @@ -118,16 +117,12 @@ func interpolateKey(key string, state store.WorkflowExecution) (any, error) { return val, nil } -var ( - interpolationTokenRe = regexp.MustCompile(`^\$\((\S+)\)$`) -) - // findAndInterpolateAllKeys takes an `input` any value, and recursively // identifies any values that should be replaced from `state`. // // A value `v` should be replaced if it is wrapped as follows: `$(v)`. func findAndInterpolateAllKeys(input any, state store.WorkflowExecution) (any, error) { - return deepMap( + return DeepMap( input, func(el string) (any, error) { matches := interpolationTokenRe.FindStringSubmatch(el) @@ -140,92 +135,3 @@ func findAndInterpolateAllKeys(input any, state store.WorkflowExecution) (any, e }, ) } - -// findRefs takes an `inputs` map and returns a list of all the step references -// contained within it. -func findRefs(inputs map[string]any) ([]string, error) { - refs := []string{} - _, err := deepMap( - inputs, - // This function is called for each string in the map - // for each string, we iterate over each match of the interpolation token - // - if there are no matches, return no reference - // - if there is one match, return the reference - // - if there are multiple matches (in the case of a multi-part state reference), return just the step ref - func(el string) (any, error) { - matches := interpolationTokenRe.FindStringSubmatch(el) - if len(matches) < 2 { - return el, nil - } - - m := matches[1] - parts := strings.Split(m, ".") - if len(parts) < 1 { - return nil, fmt.Errorf("invalid ref %s", m) - } - - refs = append(refs, parts[0]) - return el, nil - }, - ) - return refs, err -} - -// deepMap recursively applies a transformation function -// over each string within: -// -// - a map[string]any -// - a []any -// - a string -func deepMap(input any, transform func(el string) (any, error)) (any, error) { - // in the case of a string, simply apply the transformation - // in the case of a map, recurse and apply the transformation to each value - // in the case of a list, recurse and apply the transformation to each element - switch tv := input.(type) { - case string: - nv, err := transform(tv) - if err != nil { - return nil, err - } - - return nv, nil - case mapping: - // coerce mapping to map[string]any - mp := map[string]any(tv) - - nm := map[string]any{} - for k, v := range mp { - nv, err := deepMap(v, transform) - if err != nil { - return nil, err - } - - nm[k] = nv - } - return nm, nil - case map[string]any: - nm := map[string]any{} - for k, v := range tv { - nv, err := deepMap(v, transform) - if err != nil { - return nil, err - } - - nm[k] = nv - } - return nm, nil - case []any: - a := []any{} - for _, el := range tv { - ne, err := deepMap(el, transform) - if err != nil { - return nil, err - } - - a = append(a, ne) - } - return a, nil - } - - return nil, fmt.Errorf("cannot traverse item %+v of type %T", input, input) -}