diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go index f2ded8b4..3ddf9bf9 100644 --- a/internal/step/foreach/provider.go +++ b/internal/step/foreach/provider.go @@ -3,6 +3,7 @@ package foreach import ( "context" "fmt" + "go.arcalot.io/dgraph" "reflect" "sync" @@ -46,9 +47,9 @@ var executeLifecycleStage = step.LifecycleStage{ "items": {}, "wait_for": {}, }, - NextStages: []string{ - string(StageIDOutputs), - string(StageIDFailed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDOutputs): dgraph.AndDependency, + string(StageIDFailed): dgraph.CompletionAndDependency, }, Fatal: false, } @@ -58,7 +59,7 @@ var outputLifecycleStage = step.LifecycleStage{ RunningName: "output", FinishedName: "output", InputFields: map[string]struct{}{}, - NextStages: []string{}, + NextStages: map[string]dgraph.DependencyType{}, Fatal: false, } var errorLifecycleStage = step.LifecycleStage{ @@ -67,7 +68,7 @@ var errorLifecycleStage = step.LifecycleStage{ RunningName: "processing error", FinishedName: "error", InputFields: map[string]struct{}{}, - NextStages: []string{}, + NextStages: map[string]dgraph.DependencyType{}, Fatal: true, } @@ -359,6 +360,7 @@ type runningStep struct { inputAvailable bool inputData chan []any ctx context.Context + closed bool wg sync.WaitGroup cancel context.CancelFunc stageChangeHandler step.StageChangeHandler @@ -368,6 +370,11 @@ type runningStep struct { func (r *runningStep) ProvideStageInput(stage string, input map[string]any) error { r.lock.Lock() + if r.closed { + r.logger.Debugf("exiting foreach ProvideStageInput due to step being closed") + r.lock.Unlock() + return nil + } switch stage { case string(StageIDExecute): items := input["items"] @@ -390,8 +397,8 @@ func (r *runningStep) ProvideStageInput(stage string, input map[string]any) erro r.currentState = step.RunningStepStateRunning } r.inputAvailable = true + r.inputData <- input // Send before unlock to ensure that it never gets closed before sending. r.lock.Unlock() - r.inputData <- input return nil case string(StageIDOutputs): r.lock.Unlock() @@ -412,14 +419,19 @@ func (r *runningStep) CurrentStage() string { } func (r *runningStep) State() step.RunningStepState { - r.lock.Lock() + r.lock.Lock() // TODO: Determine why this gets stuck. defer r.lock.Unlock() return r.currentState } func (r *runningStep) Close() error { + r.lock.Lock() + r.closed = true + r.lock.Unlock() r.cancel() r.wg.Wait() + r.logger.Debugf("Closing inputData channel in foreach step provider") + close(r.inputData) return nil } @@ -431,7 +443,7 @@ func (r *runningStep) ForceClose() error { func (r *runningStep) run() { r.wg.Add(1) defer func() { - close(r.inputData) + r.logger.Debugf("foreach run function done") r.wg.Done() }() waitingForInput := false @@ -471,7 +483,10 @@ func (r *runningStep) run() { input := input go func() { defer func() { - <-sem + select { + case <-sem: + case <-r.ctx.Done(): // Must not deadlock if closed early. + } wg.Done() }() r.logger.Debugf("Queuing item %d...", i) @@ -540,6 +555,7 @@ func (r *runningStep) run() { r.lock.Unlock() r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) case <-r.ctx.Done(): + r.logger.Debugf("context done") return } diff --git a/internal/step/lifecycle.go b/internal/step/lifecycle.go index 83495250..700fe2fd 100644 --- a/internal/step/lifecycle.go +++ b/internal/step/lifecycle.go @@ -30,10 +30,10 @@ func (l Lifecycle[StageType]) DAG() (dgraph.DirectedGraph[StageType], error) { } } for _, stage := range l.Stages { - node := lang.Must2(dag.GetNodeByID(stage.Identifier())) - for _, nextStage := range stage.NextStageIDs() { - if err := node.Connect(nextStage); err != nil { - return nil, fmt.Errorf("failed to connect lifecycle stage %s to %s (%w)", node.ID(), nextStage, err) + for nextStage, dependencyType := range stage.NextStageIDs() { + nextStageNode := lang.Must2(dag.GetNodeByID(nextStage)) + if err := nextStageNode.ConnectDependency(stage.Identifier(), dependencyType); err != nil { + return nil, fmt.Errorf("failed to connect lifecycle stage %s to %s (%w)", stage.Identifier(), nextStage, err) } } } @@ -52,7 +52,7 @@ type lifecycleStage interface { // Identifier returns the ID of the stage. Identifier() string // NextStageIDs returns the next stage identifiers. - NextStageIDs() []string + NextStageIDs() map[string]dgraph.DependencyType } // LifecycleStage is the description of a single stage within a step lifecycle. @@ -72,7 +72,7 @@ type LifecycleStage struct { // will pause if there is no input available. // It will automatically create a DAG node between the current and the described next stages to ensure // that it is running in order. - NextStages []string + NextStages map[string]dgraph.DependencyType // Fatal indicates that this stage should be treated as fatal unless handled by the workflow. Fatal bool } @@ -83,7 +83,7 @@ func (l LifecycleStage) Identifier() string { } // NextStageIDs is a helper function that returns the next possible stages. -func (l LifecycleStage) NextStageIDs() []string { +func (l LifecycleStage) NextStageIDs() map[string]dgraph.DependencyType { return l.NextStages } diff --git a/internal/step/plugin/provider.go b/internal/step/plugin/provider.go index cfddb7ac..92d84ec3 100644 --- a/internal/step/plugin/provider.go +++ b/internal/step/plugin/provider.go @@ -3,6 +3,7 @@ package plugin import ( "context" "fmt" + "go.arcalot.io/dgraph" "go.flow.arcalot.io/pluginsdk/plugin" "reflect" "strings" @@ -168,8 +169,9 @@ var deployingLifecycleStage = step.LifecycleStage{ RunningName: "deploying", FinishedName: "deployed", InputFields: map[string]struct{}{string(StageIDDeploy): {}}, - NextStages: []string{ - string(StageIDStarting), string(StageIDDeployFailed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDStarting): dgraph.AndDependency, + string(StageIDDeployFailed): dgraph.CompletionAndDependency, }, Fatal: false, } @@ -189,8 +191,10 @@ var enablingLifecycleStage = step.LifecycleStage{ InputFields: map[string]struct{}{ "enabled": {}, }, - NextStages: []string{ - string(StageIDStarting), string(StageIDDisabled), string(StageIDCrashed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDStarting): dgraph.AndDependency, + string(StageIDDisabled): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, }, } @@ -203,8 +207,9 @@ var startingLifecycleStage = step.LifecycleStage{ "input": {}, "wait_for": {}, }, - NextStages: []string{ - string(StageIDRunning), string(StageIDCrashed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDRunning): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, }, } @@ -214,8 +219,9 @@ var runningLifecycleStage = step.LifecycleStage{ RunningName: "running", FinishedName: "completed", InputFields: map[string]struct{}{}, - NextStages: []string{ - string(StageIDOutput), string(StageIDCrashed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDOutput): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, }, } @@ -227,8 +233,10 @@ var cancelledLifecycleStage = step.LifecycleStage{ InputFields: map[string]struct{}{ "stop_if": {}, }, - NextStages: []string{ - string(StageIDOutput), string(StageIDCrashed), string(StageIDDeployFailed), + NextStages: map[string]dgraph.DependencyType{ + string(StageIDOutput): dgraph.AndDependency, + string(StageIDCrashed): dgraph.CompletionAndDependency, + string(StageIDDeployFailed): dgraph.CompletionAndDependency, }, } @@ -902,6 +910,7 @@ func (r *runningStep) closeComponents(closeATP bool) error { r.cancel() r.lock.Lock() if r.closed { + r.lock.Unlock() return nil // Already closed } var atpErr error @@ -941,7 +950,7 @@ func (r *runningStep) run() { r.logger.Warningf("failed to remove deployed container for step %s/%s", r.runID, r.pluginStepID) } r.lock.Unlock() - r.transitionToCancelled() + r.cancelledEarly() return default: r.container = container @@ -960,9 +969,6 @@ func (r *runningStep) run() { return } - // It's enabled, so the disabled stage will not occur. - r.stageChangeHandler.OnStepStageFailure(r, string(StageIDDisabled), &r.wg, err) - if err := r.startStage(container); err != nil { r.startFailed(err) return @@ -1050,12 +1056,20 @@ func (r *runningStep) enableStage() (bool, error) { &r.wg, ) + var enabled bool select { - case enabled := <-r.enabledInput: - return enabled, nil + case enabled = <-r.enabledInput: case <-r.ctx.Done(): return false, fmt.Errorf("step closed while determining enablement status") } + + if enabled { + r.lock.Lock() + // It's enabled, so the disabled stage will not occur. + r.stageChangeHandler.OnStepStageFailure(r, string(StageIDDisabled), &r.wg, fmt.Errorf("step enabled; cannot be disabled anymore")) + r.lock.Unlock() + } + return enabled, nil } func (r *runningStep) startStage(container deployer.Plugin) error { @@ -1150,9 +1164,6 @@ func (r *runningStep) runStage() error { var result atp.ExecutionResult select { case result = <-r.executionChannel: - if result.Error != nil { - return result.Error - } case <-r.ctx.Done(): // In this case, it is being instructed to stop. A signal should have been sent. // Shutdown (with sigterm) the container, then wait for the output (valid or error). @@ -1170,6 +1181,10 @@ func (r *runningStep) runStage() error { } + if result.Error != nil { + return result.Error + } + // Execution complete, move to state running stage outputs, then to state finished stage. r.transitionStage(StageIDOutput, step.RunningStepStateRunning) r.completeStep(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData) @@ -1178,6 +1193,8 @@ func (r *runningStep) runStage() error { } func (r *runningStep) markStageFailures(firstStage StageID, err error) { + r.lock.Lock() + defer r.lock.Unlock() switch firstStage { case StageIDEnabling: r.stageChangeHandler.OnStepStageFailure(r, string(StageIDEnabling), &r.wg, err) @@ -1214,15 +1231,16 @@ func (r *runningStep) deployFailed(err error) { r.markStageFailures(StageIDEnabling, err) } -func (r *runningStep) transitionToCancelled() { +func (r *runningStep) cancelledEarly() { r.logger.Infof("Step %s/%s cancelled", r.runID, r.pluginStepID) // Follow the convention of transitioning to running then finished. r.transitionStage(StageIDCancelled, step.RunningStepStateRunning) - // Cancelled currently has no output. - r.transitionStage(StageIDCancelled, step.RunningStepStateFinished) - // This is called after deployment. So everything after deployment cannot occur. err := fmt.Errorf("step %s/%s cancelled", r.runID, r.pluginStepID) + // Cancelled currently has no output. + // Set it as unresolvable since it's cancelled, and to prevent conflicts with its inputs. + r.transitionFromFailedStage(StageIDCancelled, step.RunningStepStateFinished, err) + // Note: This function is only called if it's cancelled during the deployment phase. // If that changes, the stage IDs marked as failed need to be changed. r.markStageFailures(StageIDEnabling, err) @@ -1248,12 +1266,11 @@ func (r *runningStep) transitionToDisabled() { err := fmt.Errorf("step %s/%s disabled", r.runID, r.pluginStepID) r.markStageFailures(StageIDStarting, err) - } func (r *runningStep) startFailed(err error) { r.logger.Debugf("Start failed stage for step %s/%s", r.runID, r.pluginStepID) - r.transitionStage(StageIDCrashed, step.RunningStepStateRunning) + r.transitionFromFailedStage(StageIDCrashed, step.RunningStepStateRunning, err) r.logger.Warningf("Plugin step %s/%s start failed. %v", r.runID, r.pluginStepID, err) // Now it's done. @@ -1288,8 +1305,29 @@ func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepSt r.transitionStageWithOutput(newStage, state, nil, nil) } +func (r *runningStep) transitionFromFailedStage(newStage StageID, state step.RunningStepState, err error) { + r.lock.Lock() + defer r.lock.Unlock() + previousStage := string(r.currentStage) + r.currentStage = newStage + // Don't forget to update this, or else it will behave very oddly. + // First running, then finished. You can't skip states. + r.state = state + r.stageChangeHandler.OnStepStageFailure( + r, + previousStage, + &r.wg, + err, + ) +} + // TransitionStage transitions the stage to the specified stage, and the state to the specified state. -func (r *runningStep) transitionStageWithOutput(newStage StageID, state step.RunningStepState, outputID *string, previousStageOutput *any) { +func (r *runningStep) transitionStageWithOutput( + newStage StageID, + state step.RunningStepState, + outputID *string, + previousStageOutput *any, +) { // A current lack of observability into the atp client prevents // non-fragile testing of this function. r.lock.Lock() diff --git a/internal/step/plugin/provider_test.go b/internal/step/plugin/provider_test.go index e946f000..6d458229 100644 --- a/internal/step/plugin/provider_test.go +++ b/internal/step/plugin/provider_test.go @@ -518,3 +518,5 @@ func TestProvider_StartFail(t *testing.T) { assert.NoError(t, running.Close()) }) } + +// TODO: Add more tests here for the current functions and code paths. diff --git a/workflow/executor.go b/workflow/executor.go index 9c1c8816..132b0d23 100644 --- a/workflow/executor.go +++ b/workflow/executor.go @@ -413,8 +413,12 @@ func (e *executor) connectStepDependencies( if err != nil { return fmt.Errorf("bug: node for current stage not found (%w)", err) } - for _, nextStage := range stage.NextStages { - if err := currentStageNode.Connect(GetStageNodeID(stepID, nextStage)); err != nil { + for nextStage, dependencyType := range stage.NextStages { + nextStageNode, err := dag.GetNodeByID(GetStageNodeID(stepID, nextStage)) + if err != nil { + return fmt.Errorf("bug: node for next stage not found (%w)", err) + } + if err := nextStageNode.ConnectDependency(currentStageNode.ID(), dependencyType); err != nil { return fmt.Errorf("bug: cannot connect nodes (%w)", err) } } diff --git a/workflow/workflow.go b/workflow/workflow.go index d3feae76..a25ed9e6 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -284,7 +284,13 @@ errGatherLoop: } } -func (l *loopState) onStageComplete(stepID string, previousStage *string, previousStageOutputID *string, previousStageOutput *any, wg *sync.WaitGroup) { +func (l *loopState) onStageComplete( + stepID string, + previousStage *string, + previousStageOutputID *string, + previousStageOutput *any, + wg *sync.WaitGroup, +) { l.lock.Lock() defer func() { if previousStage != nil { @@ -303,10 +309,10 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo l.cancel() return } - l.logger.Debugf("Resolving node %q in the DAG", stageNode.ID()) + l.logger.Debugf("Resolving node %q in the DAG on stage complete", stageNode.ID()) if err := stageNode.ResolveNode(dgraph.Resolved); err != nil { - l.logger.Errorf("Failed to resolve stage node ID %s (%w)", stageNode.ID(), err) - l.recentErrors <- fmt.Errorf("failed to resolve stage node ID %s (%w)", stageNode.ID(), err) + errMessage := fmt.Errorf("failed to resolve stage node ID %s (%s)", stageNode.ID(), err.Error()) + l.recentErrors <- errMessage l.cancel() return } @@ -320,7 +326,7 @@ func (l *loopState) onStageComplete(stepID string, previousStage *string, previo } // Resolves the node in the DAG. This allows us to know which nodes are // ready for processing due to all dependencies being resolved. - l.logger.Debugf("Resolving node %q in the DAG", outputNode.ID()) + l.logger.Debugf("Resolving output node %q in the DAG", outputNode.ID()) if err := outputNode.ResolveNode(dgraph.Resolved); err != nil { l.logger.Errorf("Failed to resolve output node ID %s (%w)", outputNode.ID(), err) l.recentErrors <- fmt.Errorf("failed to resolve output node ID %s (%w)", outputNode.ID(), err) @@ -371,7 +377,7 @@ func (l *loopState) markOutputsUnresolvable(stepID string, stageID string, skipp l.logger.Debugf("Will mark node %s in the DAG as unresolvable", stepID+"."+stage.ID+"."+stageOutputID) err = unresolvableOutputNode.ResolveNode(dgraph.Unresolvable) if err != nil { - l.logger.Errorf("Error while marking node %s in DAG as unresolvable (%s)", unresolvableOutputNode.ID(), err.Error()) + panic(fmt.Errorf("error while marking node %s in DAG as unresolvable (%s)", unresolvableOutputNode.ID(), err.Error())) } } } @@ -390,7 +396,13 @@ func (l *loopState) markStageNodeUnresolvable(stepID string, stageID string) { l.logger.Debugf("Will mark node %s in the DAG as unresolvable", stepID+"."+stageID) err = unresolvableOutputNode.ResolveNode(dgraph.Unresolvable) if err != nil { - l.logger.Errorf("Error while marking node %s in DAG as unresolvable (%s)", unresolvableOutputNode.ID(), err.Error()) + panic( + fmt.Errorf( + "error while marking node %s in DAG as unresolvable (%s)", + unresolvableOutputNode.ID(), + err.Error(), + ), + ) } } @@ -404,7 +416,7 @@ func (l *loopState) notifySteps() { //nolint:gocognit // Can include runnable nodes, nodes that cannot be resolved, and nodes that are not for running, like inputs. for nodeID, resolutionStatus := range readyNodes { failed := resolutionStatus == dgraph.Unresolvable - l.logger.Debugf("Processing step node %s", nodeID) + l.logger.Debugf("Processing step node %s with resolution status %q", nodeID, resolutionStatus) node, err := l.dag.GetNodeByID(nodeID) if err != nil { panic(fmt.Errorf("failed to get node %s (%w)", nodeID, err)) @@ -438,6 +450,8 @@ func (l *loopState) notifySteps() { //nolint:gocognit // untypedInputData stores the resolved data untypedInputData, err := l.resolveExpressions(inputData, l.data) if err != nil { + // An error here often indicates a locking issue in a step provider. This would be caused + // by the lock not being held when the output was marked resolved. panic(fmt.Errorf("cannot resolve expressions for %s (%w)", nodeID, err)) } diff --git a/workflow/workflow_test.go b/workflow/workflow_test.go index 59f140bd..67aee20b 100644 --- a/workflow/workflow_test.go +++ b/workflow/workflow_test.go @@ -54,7 +54,7 @@ func TestOutputFailed(t *testing.T) { var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } } @@ -756,7 +756,7 @@ func TestMissingInputsFailedDeployment(t *testing.T) { assert.Equals(t, outputID, "") var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } } @@ -1218,7 +1218,7 @@ func TestInputCancelledStepWorkflow(t *testing.T) { assert.Equals(t, outputID, "") var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } } @@ -1269,7 +1269,7 @@ func TestInputDisabledStepWorkflow(t *testing.T) { assert.Error(t, err) var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } } @@ -1426,7 +1426,7 @@ func TestMultiDependencyWorkflowFailureWithDisabling(t *testing.T) { t.Logf("MultiDependency workflow failed purposefully in %d ms", duration.Milliseconds()) var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } assert.LessThan(t, duration.Seconds(), 4) // It will take 5 seconds if it fails to fail early. } @@ -1481,7 +1481,7 @@ func TestMultiDependencyWorkflowFailureWithCancellation(t *testing.T) { t.Logf("MultiDependency workflow failed purposefully in %d ms", duration.Milliseconds()) var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } assert.LessThan(t, duration.Seconds(), 4) // It will take 5 seconds if it fails to fail early. } @@ -1533,7 +1533,7 @@ func TestMultiDependencyWorkflowFailureWithErrorOut(t *testing.T) { t.Logf("MultiDependency workflow failed purposefully in %d ms", duration.Milliseconds()) var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } // If it takes 5 seconds, then there was a deadlock in the client. // If it takes 6 seconds, then it waited for the second step. @@ -1571,7 +1571,7 @@ steps: wait_time_ms: 7000 outputs: workflow-success: - simple_wait_output: !expr $.steps.failed_step.outputs + failed_step_output: !expr $.steps.failed_step.outputs simple_wait_output: !expr $.steps.long_wait.outputs ` @@ -1593,13 +1593,85 @@ func TestMultiDependencyWorkflowFailureWithDeployFailure(t *testing.T) { t.Logf("MultiDependency workflow failed purposefully in %d ms", duration.Milliseconds()) var typedError *workflow.ErrNoMorePossibleOutputs if !errors.As(err, &typedError) { - t.Fatalf("incorrect error type returned: %T", err) + t.Fatalf("incorrect error type returned: %T (%s)", err, err) } // If it takes 5 seconds, then there was a deadlock in the client. // If it takes 6 seconds, then it waited for the second step. assert.LessThan(t, duration.Milliseconds(), 5500) } +var multiDependencyFailureWorkflowWithDoubleFailure = ` +version: v0.2.0 +input: + root: WorkflowInput + objects: + WorkflowInput: + id: WorkflowInput + properties: {} +steps: + failed_step_A: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 0 + deploy: # This fails first. + deployer_name: "test-impl" + deploy_time: 0 + deploy_succeed: !expr 'false' + quick_step: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 1 # Second, this succeeds, which cancels the second failing step. + failed_step_B: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + wait_for: !expr $.steps.failed_step_A.outputs # Makes it unresolvable + stop_if: !expr $.steps.quick_step.outputs # Hopefully triggers the second resolution. + input: + wait_time_ms: 0 + deploy: + deployer_name: "test-impl" + deploy_succeed: !expr 'true' + long_wait: + plugin: + src: "n/a" + deployment_type: "builtin" + step: wait + input: + wait_time_ms: 100 +outputs: + workflow-success: + failed_step_output: !expr $.steps.failed_step_A.outputs + wait-only: # For the error to be exposed, we need an alternative output that persists beyond the error. + wait_output: !expr $.steps.long_wait.outputs +` + +func TestMultiDependencyWorkflowFailureDoubleFailure(t *testing.T) { + // Creates a scenario where step B's starting (due to wait-for) depends on step A's outputs, + // making A's outputs become unresolvable, while at the same time the step that needs that info (B) crashes. + // Transitioning B to crashed resolves starting, so that is in conflict with the unresolvable + // state propagated from step A. + preparedWorkflow := assert.NoErrorR[workflow.ExecutableWorkflow](t)( + getTestImplPreparedWorkflow(t, multiDependencyFailureWorkflowWithDoubleFailure), + ) + + startTime := time.Now() // Right before execute to not include pre-processing time. + _, _, err := preparedWorkflow.Execute(context.Background(), map[string]any{}) + duration := time.Since(startTime) + assert.NoError(t, err) + t.Logf("MultiDependency finished in %d ms", duration.Milliseconds()) +} + +// TODO: Create a situation where step A's outputs are resolvable, but the context is cancelled, +// resulting in a possible conflict with the cancelled step stage DAG node. + func createTestExecutableWorkflow(t *testing.T, workflowStr string, workflowCtx map[string][]byte) (workflow.ExecutableWorkflow, error) { logConfig := log.Config{ Level: log.LevelDebug,