Skip to content

Commit

Permalink
Add Map state to embedded wfm.
Browse files Browse the repository at this point in the history
We already have parallel, so map is pretty similar!

Also, I added validation to parallel's branches; I'm not actually sure if parallel allows zero
branches, but it is conceivable I guess (the result of a parallel with no branches would be the
empty array).
  • Loading branch information
taylor-sutton committed Mar 7, 2021
1 parent 26b0567 commit ec08ada
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
35 changes: 34 additions & 1 deletion embedded/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,36 @@ func validateWorkflowDefinitionStates(wfd models.WorkflowDefinition, resources m
if state.Seconds <= 0 {
return errors.Errorf("invalid seconds parameter in wait %s.%s", wfd.Name, stateName)
}
case models.SLStateTypeSucceed, models.SLStateTypeFail, models.SLStateTypeParallel:
case models.SLStateTypeMap:
checkNextState = true
if state.Iterator == nil {
return errors.Errorf("required parameter Iterator for Map state not given in state %s.%s", wfd.Name, stateName)
}
if state.MaxConcurrency < 0 {
return errors.Errorf("invalid MaxConcurrency in Map state %s.%s, must, cannot be negative, got %d", wfd.Name, stateName, state.MaxConcurrency)
}
innerWFD := models.WorkflowDefinition{
Name: fmt.Sprintf("%s__Iterator", stateName),
StateMachine: state.Iterator,
}
if err := validateWorkflowDefinition(innerWFD, resources); err != nil {
return errors.Errorf("inside the Iterator for Map state %s.%s: %w",
wfd.Name, stateName, err,
)
}
case models.SLStateTypeParallel:
for i, branch := range state.Branches {
innerWFD := models.WorkflowDefinition{
Name: fmt.Sprintf("%s__Branch[%d]", stateName, i),
StateMachine: branch,
}
if err := validateWorkflowDefinition(innerWFD, resources); err != nil {
return errors.Errorf("inside the Branch[%d] for Parallel state %s.%s: %w",
i, wfd.Name, stateName,
)
}
}
case models.SLStateTypeSucceed, models.SLStateTypeFail:
// no op
default:
return errors.Errorf("invalid state type '%s' in %s.%s", state.Type, wfd.Name, stateName)
Expand Down Expand Up @@ -326,6 +355,10 @@ func (e *Embedded) setStateMachineResources(i *models.StartWorkflowRequest, stat
state.Branches[idx] = branch
}
stateMachine.States[stateName] = state

case models.SLStateTypeMap:
e.setStateMachineResources(i, state.Iterator)
stateMachine.States[stateName] = state
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions embedded/embedded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,48 @@ var validateWorkflowDefinitionStatesTests = []validateWorkflowDefinitionStatesTe
require.NoError(t, err)
},
},
{
description: "validate map state (invalid inner)",
input: models.WorkflowDefinition{
StateMachine: &models.SLStateMachine{
States: map[string]models.SLState{
"map": models.SLState{
Type: models.SLStateTypeMap,
Iterator: &models.SLStateMachine{},
End: true,
},
},
},
},
assertions: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
description: "validate map state (valid inner)",
input: models.WorkflowDefinition{
StateMachine: &models.SLStateMachine{
States: map[string]models.SLState{
"map": models.SLState{
Type: models.SLStateTypeMap,
Iterator: &models.SLStateMachine{
States: map[string]models.SLState{
"pass": models.SLState{
End: true,
Type: models.SLStateTypePass,
Result: "result",
},
},
},
End: true,
},
},
},
},
assertions: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
description: "invalid state type",
input: models.WorkflowDefinition{
Expand Down

0 comments on commit ec08ada

Please sign in to comment.