From d61ba517f3ccf5edeb3218410ef5483a3186fa0b Mon Sep 17 00:00:00 2001 From: Jared O'Connell Date: Fri, 26 Jul 2024 11:46:09 -0400 Subject: [PATCH] Address linter errors --- internal/step/foreach/provider.go | 201 ++++++++++++++++-------------- 1 file changed, 106 insertions(+), 95 deletions(-) diff --git a/internal/step/foreach/provider.go b/internal/step/foreach/provider.go index ace5e6e5..0745ac61 100644 --- a/internal/step/foreach/provider.go +++ b/internal/step/foreach/provider.go @@ -464,111 +464,122 @@ func (r *runningStep) run() { waitingForInput, &r.wg, ) + r.runOnInput() +} + +func (r *runningStep) runOnInput() { select { case loopData, ok := <-r.inputData: if !ok { + r.logger.Debugf("aborted waiting for result in foreach") return } + r.processInput(loopData) + case <-r.ctx.Done(): + r.logger.Debugf("context done") + return + } +} - itemOutputs := make([]any, len(loopData)) - itemErrors := make(map[int]string, len(loopData)) - - r.logger.Debugf("Executing subworkflow for step %s...", r.runID) - wg := &sync.WaitGroup{} - wg.Add(len(loopData)) - errors := false - sem := make(chan struct{}, r.parallelism) - for i, input := range loopData { - i := i - input := input - go func() { - defer func() { - select { - case <-sem: - case <-r.ctx.Done(): // Must not deadlock if closed early. - } - wg.Done() - }() - r.logger.Debugf("Queuing item %d...", i) - select { - case sem <- struct{}{}: - case <-r.ctx.Done(): - r.logger.Debugf("Aborting item %d execution.", i) - return - } +func (r *runningStep) processInput(inputData []any) { + r.logger.Debugf("Executing subworkflow for step %s...", r.runID) + outputs, errors := r.executeSubWorkflows(inputData) - r.logger.Debugf("Executing item %d...", i) - // Ignore the output ID here because it can only be "success" - _, outputData, err := r.workflow.Execute(r.ctx, input) - r.lock.Lock() - if err != nil { - errors = true - itemErrors[i] = err.Error() - } else { - itemOutputs[i] = outputData - } - r.lock.Unlock() - r.logger.Debugf("Item %d complete.", i) - }() + r.logger.Debugf("Subworkflow %s complete.", r.runID) + r.lock.Lock() + previousStage := string(r.currentStage) + r.currentState = step.RunningStepStateRunning + var outputID string + var outputData any + var unresolvableStage StageID + var unresolvableError error + if len(errors) > 0 { + r.currentStage = StageIDFailed + unresolvableStage = StageIDOutputs + unresolvableError = fmt.Errorf("foreach subworkflow failed with errors (%v)", errors) + outputID = "error" + dataMap := make(map[int]any, len(inputData)) + for i, entry := range outputs { + if entry != nil { + dataMap[i] = entry + } } - wg.Wait() - r.logger.Debugf("Subworkflow %s complete.", r.runID) - r.lock.Lock() - previousStage := string(r.currentStage) - r.currentState = step.RunningStepStateRunning - var outputID string - var outputData any - var unresolvableStage StageID - var unresolvableError error - if errors { - r.currentStage = StageIDFailed - unresolvableStage = StageIDOutputs - unresolvableError = fmt.Errorf("foreach subworkflow failed with errors (%v)", itemErrors) - outputID = "error" - dataMap := make(map[int]any, len(loopData)) - for i, entry := range itemOutputs { - if entry != nil { - dataMap[i] = entry + outputData = map[string]any{ + "data": dataMap, + "messages": errors, + } + } else { + r.currentStage = StageIDOutputs + unresolvableStage = StageIDFailed + unresolvableError = fmt.Errorf("foreach succeeded, so error case is unresolvable") + outputID = "success" + outputData = map[string]any{ + "data": outputs, + } + } + currentStage := r.currentStage + r.lock.Unlock() + r.stageChangeHandler.OnStageChange( + r, + &previousStage, + nil, + nil, + string(currentStage), + false, + &r.wg, + ) + r.stageChangeHandler.OnStepStageFailure( + r, + string(unresolvableStage), + &r.wg, + unresolvableError, + ) + r.lock.Lock() + r.currentState = step.RunningStepStateFinished + previousStage = string(r.currentStage) + r.lock.Unlock() + r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) +} + +// returns true if there is an error +func (r *runningStep) executeSubWorkflows(inputData []any) ([]any, map[int]string) { + itemOutputs := make([]any, len(inputData)) + itemErrors := make(map[int]string, len(inputData)) + wg := &sync.WaitGroup{} + wg.Add(len(inputData)) + sem := make(chan struct{}, r.parallelism) + for i, input := range inputData { + i := i + input := input + go func() { + defer func() { + select { + case <-sem: + case <-r.ctx.Done(): // Must not deadlock if closed early. } + wg.Done() + }() + r.logger.Debugf("Queuing item %d...", i) + select { + case sem <- struct{}{}: + case <-r.ctx.Done(): + r.logger.Debugf("Aborting item %d execution.", i) + return } - outputData = map[string]any{ - "data": dataMap, - "messages": itemErrors, - } - } else { - r.currentStage = StageIDOutputs - unresolvableStage = StageIDFailed - unresolvableError = fmt.Errorf("foreach succeeded, so error case is unresolvable") - outputID = "success" - outputData = map[string]any{ - "data": itemOutputs, + + r.logger.Debugf("Executing item %d...", i) + // Ignore the output ID here because it can only be "success" + _, outputData, err := r.workflow.Execute(r.ctx, input) + r.lock.Lock() + if err != nil { + itemErrors[i] = err.Error() + } else { + itemOutputs[i] = outputData } - } - currentStage := r.currentStage - r.lock.Unlock() - r.stageChangeHandler.OnStageChange( - r, - &previousStage, - nil, - nil, - string(currentStage), - false, - &r.wg, - ) - r.stageChangeHandler.OnStepStageFailure( - r, - string(unresolvableStage), - &r.wg, - unresolvableError, - ) - r.lock.Lock() - r.currentState = step.RunningStepStateFinished - previousStage = string(r.currentStage) - r.lock.Unlock() - r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg) - case <-r.ctx.Done(): - r.logger.Debugf("context done") - return + r.lock.Unlock() + r.logger.Debugf("Item %d complete.", i) + }() } - + wg.Wait() + return itemOutputs, itemErrors }