Skip to content

Commit

Permalink
Added additional test case and improvements to close code
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredoconnell committed Sep 27, 2024
1 parent 4e1e255 commit 1cdf151
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
8 changes: 5 additions & 3 deletions internal/step/foreach/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,11 @@ func (r *runningStep) State() step.RunningStepState {
}

func (r *runningStep) Close() error {
r.lock.Lock()
r.closed.Load()
r.lock.Unlock()
closedAlready := r.closed.Swap(true)
if closedAlready {
r.wg.Wait()
return nil
}
r.cancel()
r.wg.Wait()
r.logger.Debugf("Closing inputData channel in foreach step provider")
Expand Down
93 changes: 93 additions & 0 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3221,6 +3221,99 @@ func TestMultiDependencyDependOnContextDoneDeployment(t *testing.T) {
t.Logf("MultiDependency DependOnClosedStep finished in %d ms", duration.Milliseconds())
}

var multiDependencyDependOnContextDoneForEachDeployment = `
version: v0.2.0
input:
root: WorkflowInput
objects:
WorkflowInput:
id: WorkflowInput
properties: {}
steps:
wait_step:
plugin:
src: "n/a"
deployment_type: "builtin"
step: wait
closure_wait_timeout: 0
input:
wait_time_ms: 1000
not_enabled_step:
kind: foreach
items:
- {}
workflow: subworkflow.yaml
enabled: !expr $.steps.wait_step.outputs.success.message == "Plugin slept for 100 ms."
outputs:
finished:
cancelled_step_output: !expr $.steps.not_enabled_step.outputs
closed: # The workflow needs to keep running after the cancelled step exits.
closed_output: !expr $.steps.not_enabled_step.closed.result
`

func TestMultiDependencyDependOnContextDoneForEachDeployment(t *testing.T) {
// A scenario where you close the context but still expect an output by
// depending on the closed output for a foreach.
logConfig := log.Config{
Level: log.LevelDebug,
Destination: log.DestinationStdout,
}
logger := log.New(
logConfig,
)
cfg := &config.Config{
Log: logConfig,
}
factories := workflowFactory{
config: cfg,
}
deployerRegistry := deployerregistry.New(
deployer.Any(testimpl.NewFactory()),
)

pluginProvider := assert.NoErrorR[step.Provider](t)(
plugin.New(logger, deployerRegistry, map[string]interface{}{
"builtin": map[string]any{
"deployer_name": "test-impl",
"deploy_time": "0",
},
}),
)
stepRegistry, err := stepregistry.New(
pluginProvider,
lang.Must2(foreach.New(logger, factories.createYAMLParser, factories.createWorkflow)),
)
assert.NoError(t, err)

factories.stepRegistry = stepRegistry
executor := lang.Must2(workflow.NewExecutor(
logger,
cfg,
stepRegistry,
builtinfunctions.GetFunctions(),
))
wf := lang.Must2(workflow.NewYAMLConverter(stepRegistry).FromYAML(
[]byte(multiDependencyDependOnContextDoneForEachDeployment)),
)
preparedWorkflow := lang.Must2(executor.Prepare(wf, map[string][]byte{
"subworkflow.yaml": []byte(simpleSubWf),
}))

ctx, timeout := context.WithTimeout(context.Background(), time.Millisecond*10)
startTime := time.Now() // Right before execution to not include pre-processing time.
outputID, outputData, err := preparedWorkflow.Execute(ctx, map[string]any{})
duration := time.Since(startTime)
assert.NoError(t, err)
timeout()
assert.Equals(t, outputID, "closed")
assert.Equals(t, outputData.(map[any]any), map[any]any{
"closed_output": map[any]any{
"close_requested": true,
},
})
t.Logf("MultiDependency DependOnClosedStep finished in %d ms", duration.Milliseconds())
}

var multiDependencyForEachParent = `
version: v0.2.0
input:
Expand Down

0 comments on commit 1cdf151

Please sign in to comment.