Skip to content

Commit

Permalink
Address linter errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredoconnell committed Jul 26, 2024
1 parent 18680bd commit d61ba51
Showing 1 changed file with 106 additions and 95 deletions.
201 changes: 106 additions & 95 deletions internal/step/foreach/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,111 +464,122 @@ func (r *runningStep) run() {
waitingForInput,
&r.wg,
)
r.runOnInput()
}

func (r *runningStep) runOnInput() {
select {
case loopData, ok := <-r.inputData:
if !ok {
r.logger.Debugf("aborted waiting for result in foreach")
return
}
r.processInput(loopData)
case <-r.ctx.Done():
r.logger.Debugf("context done")
return
}
}

itemOutputs := make([]any, len(loopData))
itemErrors := make(map[int]string, len(loopData))

r.logger.Debugf("Executing subworkflow for step %s...", r.runID)
wg := &sync.WaitGroup{}
wg.Add(len(loopData))
errors := false
sem := make(chan struct{}, r.parallelism)
for i, input := range loopData {
i := i
input := input
go func() {
defer func() {
select {
case <-sem:
case <-r.ctx.Done(): // Must not deadlock if closed early.
}
wg.Done()
}()
r.logger.Debugf("Queuing item %d...", i)
select {
case sem <- struct{}{}:
case <-r.ctx.Done():
r.logger.Debugf("Aborting item %d execution.", i)
return
}
func (r *runningStep) processInput(inputData []any) {
r.logger.Debugf("Executing subworkflow for step %s...", r.runID)
outputs, errors := r.executeSubWorkflows(inputData)

r.logger.Debugf("Executing item %d...", i)
// Ignore the output ID here because it can only be "success"
_, outputData, err := r.workflow.Execute(r.ctx, input)
r.lock.Lock()
if err != nil {
errors = true
itemErrors[i] = err.Error()
} else {
itemOutputs[i] = outputData
}
r.lock.Unlock()
r.logger.Debugf("Item %d complete.", i)
}()
r.logger.Debugf("Subworkflow %s complete.", r.runID)
r.lock.Lock()
previousStage := string(r.currentStage)
r.currentState = step.RunningStepStateRunning
var outputID string
var outputData any
var unresolvableStage StageID
var unresolvableError error
if len(errors) > 0 {
r.currentStage = StageIDFailed
unresolvableStage = StageIDOutputs
unresolvableError = fmt.Errorf("foreach subworkflow failed with errors (%v)", errors)
outputID = "error"
dataMap := make(map[int]any, len(inputData))
for i, entry := range outputs {
if entry != nil {
dataMap[i] = entry
}
}
wg.Wait()
r.logger.Debugf("Subworkflow %s complete.", r.runID)
r.lock.Lock()
previousStage := string(r.currentStage)
r.currentState = step.RunningStepStateRunning
var outputID string
var outputData any
var unresolvableStage StageID
var unresolvableError error
if errors {
r.currentStage = StageIDFailed
unresolvableStage = StageIDOutputs
unresolvableError = fmt.Errorf("foreach subworkflow failed with errors (%v)", itemErrors)
outputID = "error"
dataMap := make(map[int]any, len(loopData))
for i, entry := range itemOutputs {
if entry != nil {
dataMap[i] = entry
outputData = map[string]any{
"data": dataMap,
"messages": errors,
}
} else {
r.currentStage = StageIDOutputs
unresolvableStage = StageIDFailed
unresolvableError = fmt.Errorf("foreach succeeded, so error case is unresolvable")
outputID = "success"
outputData = map[string]any{
"data": outputs,
}
}
currentStage := r.currentStage
r.lock.Unlock()
r.stageChangeHandler.OnStageChange(
r,
&previousStage,
nil,
nil,
string(currentStage),
false,
&r.wg,
)
r.stageChangeHandler.OnStepStageFailure(
r,
string(unresolvableStage),
&r.wg,
unresolvableError,
)
r.lock.Lock()
r.currentState = step.RunningStepStateFinished
previousStage = string(r.currentStage)
r.lock.Unlock()
r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg)
}

// returns true if there is an error

Check failure on line 544 in internal/step/foreach/provider.go

View workflow job for this annotation

GitHub Actions / lint and test / golangci-lint

Comment should end in a period (godot)
func (r *runningStep) executeSubWorkflows(inputData []any) ([]any, map[int]string) {
itemOutputs := make([]any, len(inputData))
itemErrors := make(map[int]string, len(inputData))
wg := &sync.WaitGroup{}
wg.Add(len(inputData))
sem := make(chan struct{}, r.parallelism)
for i, input := range inputData {
i := i
input := input
go func() {
defer func() {
select {
case <-sem:
case <-r.ctx.Done(): // Must not deadlock if closed early.
}
wg.Done()
}()
r.logger.Debugf("Queuing item %d...", i)
select {
case sem <- struct{}{}:
case <-r.ctx.Done():
r.logger.Debugf("Aborting item %d execution.", i)
return
}
outputData = map[string]any{
"data": dataMap,
"messages": itemErrors,
}
} else {
r.currentStage = StageIDOutputs
unresolvableStage = StageIDFailed
unresolvableError = fmt.Errorf("foreach succeeded, so error case is unresolvable")
outputID = "success"
outputData = map[string]any{
"data": itemOutputs,

r.logger.Debugf("Executing item %d...", i)
// Ignore the output ID here because it can only be "success"
_, outputData, err := r.workflow.Execute(r.ctx, input)
r.lock.Lock()
if err != nil {
itemErrors[i] = err.Error()
} else {
itemOutputs[i] = outputData
}
}
currentStage := r.currentStage
r.lock.Unlock()
r.stageChangeHandler.OnStageChange(
r,
&previousStage,
nil,
nil,
string(currentStage),
false,
&r.wg,
)
r.stageChangeHandler.OnStepStageFailure(
r,
string(unresolvableStage),
&r.wg,
unresolvableError,
)
r.lock.Lock()
r.currentState = step.RunningStepStateFinished
previousStage = string(r.currentStage)
r.lock.Unlock()
r.stageChangeHandler.OnStepComplete(r, previousStage, &outputID, &outputData, &r.wg)
case <-r.ctx.Done():
r.logger.Debugf("context done")
return
r.lock.Unlock()
r.logger.Debugf("Item %d complete.", i)
}()
}

wg.Wait()
return itemOutputs, itemErrors
}

0 comments on commit d61ba51

Please sign in to comment.