Skip to content

Commit

Permalink
Address most linter errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredoconnell committed Jul 26, 2024
1 parent b27ec98 commit 18680bd
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 28 deletions.
64 changes: 39 additions & 25 deletions internal/step/plugin/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (r *runnableStep) DisabledOutputSchema() *schema.StepOutputSchema {
)
}

func ClosureTimeoutSchema() *schema.PropertySchema {
func closureTimeoutSchema() *schema.PropertySchema {
return schema.NewPropertySchema(
schema.NewIntSchema(schema.PointerTo(int64(0)), nil, nil),
schema.NewDisplayValue(
Expand Down Expand Up @@ -610,7 +610,7 @@ func (r *runnableStep) Lifecycle(input map[string]any) (result step.Lifecycle[st
nil,
nil,
),
"closure_wait_timeout": ClosureTimeoutSchema(),
"closure_wait_timeout": closureTimeoutSchema(),
},
Outputs: map[string]*schema.StepOutputSchema{
"started": r.StartedSchema(),
Expand Down Expand Up @@ -930,7 +930,7 @@ func (r *runningStep) provideStartingInput(input map[string]any) error {
// Now get the other necessary data for running the step
var timeout int64
if input["closure_wait_timeout"] != nil {
unserializedTimeout, err := ClosureTimeoutSchema().Unserialize(input["closure_wait_timeout"])
unserializedTimeout, err := closureTimeoutSchema().Unserialize(input["closure_wait_timeout"])
if err != nil {
return err
}
Expand Down Expand Up @@ -1086,38 +1086,50 @@ func (r *runningStep) run() {
r.cancel() // Close before WaitGroup done
r.wg.Done() // Done. Close may now exit.
}()
container, contextDoneEarly, err := r.deployStage()
pluginConnection := r.startPlugin()
if pluginConnection == nil {
return
}
defer func() {
err := pluginConnection.Close()
if err != nil {
r.logger.Errorf("failed to close deployed container for step %s/%s", r.runID, r.pluginStepID)
}
}()
r.postDeployment(pluginConnection)
}

// Deploys the plugin, and handles failure cases.
func (r *runningStep) startPlugin() deployer.Plugin {
pluginConnection, contextDoneEarly, err := r.deployStage()
if contextDoneEarly {
if err != nil {
r.logger.Debugf("error due to step early closure: %s", err.Error())
}
r.closedEarly(StageIDEnabling, true)
return
return nil
} else if err != nil {
r.deployFailed(err)
return
return nil
}
r.lock.Lock()
select {
case <-r.ctx.Done():
if err := container.Close(); err != nil {
if err := pluginConnection.Close(); err != nil {
r.logger.Warningf("failed to close deployed container for step %s/%s", r.runID, r.pluginStepID)
}
r.lock.Unlock()
r.closedEarly(StageIDEnabling, false)
return
return nil
default:
r.container = container
r.container = pluginConnection
}
defer func() {
err := container.Close()
if err != nil {
r.logger.Errorf("failed to close deployed container for step %s/%s", r.runID, r.pluginStepID)
}
}()
r.lock.Unlock()
r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", container.ID(), r.runID, r.pluginStepID)
r.logger.Debugf("Successfully deployed container with ID '%s' for step %s/%s", pluginConnection.ID(), r.runID, r.pluginStepID)
return pluginConnection
}

func (r *runningStep) postDeployment(pluginConnection deployer.Plugin) {
r.logger.Debugf("Checking to see if step %s/%s is enabled", r.runID, r.pluginStepID)
enabled, contextDoneEarly := r.enableStage()
if contextDoneEarly {
Expand All @@ -1131,7 +1143,8 @@ func (r *runningStep) run() {
}

var forceCloseTimeoutMS int64
if contextDoneEarly, forceCloseTimeoutMS, err = r.startStage(container); contextDoneEarly {
var err error
if contextDoneEarly, forceCloseTimeoutMS, err = r.startStage(pluginConnection); contextDoneEarly {
r.closedEarly(StageIDRunning, true)
return
} else if err != nil {
Expand Down Expand Up @@ -1357,7 +1370,7 @@ func (r *runningStep) runStage(forCloseWaitMS int64) error {
} else {
r.logger.Warningf("Got step context done before step run complete. Force closing step %s/%s.", r.runID, r.pluginStepID)
err := r.forceCloseInternal()
return fmt.Errorf("step forced closed after timeout without result (%s)", err)
return fmt.Errorf("step forced closed after timeout without result (%w)", err)
}
// Wait for cancellation to occur.
select {
Expand All @@ -1379,7 +1392,7 @@ func (r *runningStep) runStage(forCloseWaitMS int64) error {
}

// Execution complete, move to state running stage outputs, then to state finished stage.
r.transitionStage(StageIDOutput, step.RunningStepStateRunning)
r.transitionRunningStage(StageIDOutput)
r.completeStep(r.currentStage, step.RunningStepStateFinished, &result.OutputID, &result.OutputData)

return nil
Expand Down Expand Up @@ -1413,7 +1426,7 @@ func (r *runningStep) markNotClosable(err error) {

func (r *runningStep) deployFailed(err error) {
r.logger.Debugf("Deploy failed stage for step %s/%s", r.runID, r.pluginStepID)
r.transitionStage(StageIDDeployFailed, step.RunningStepStateRunning)
r.transitionRunningStage(StageIDDeployFailed)
r.logger.Warningf("Plugin step %s/%s deploy failed. %v", r.runID, r.pluginStepID, err)

// Now it's done.
Expand Down Expand Up @@ -1457,7 +1470,7 @@ func (r *runningStep) closedEarly(stageToMarkUnresolvable StageID, priorStageFai
if priorStageFailed {
r.transitionFromFailedStage(StageIDClosed, step.RunningStepStateRunning, fmt.Errorf("step closed early"))
} else {
r.transitionStage(StageIDClosed, step.RunningStepStateRunning)
r.transitionRunningStage(StageIDClosed)
}
closedOutput := any(map[any]any{"cancelled": r.cancelled, "close_requested": r.closed.Load()})

Expand Down Expand Up @@ -1488,7 +1501,7 @@ func (r *runningStep) startFailed(err error) {
}

func (r *runningStep) runFailed(err error) {
r.transitionStage(StageIDCrashed, step.RunningStepStateRunning)
r.transitionRunningStage(StageIDCrashed)
r.logger.Warningf("Plugin step %s/%s run failed. %v", r.runID, r.pluginStepID, err)

// Now it's done.
Expand All @@ -1501,9 +1514,10 @@ func (r *runningStep) runFailed(err error) {
r.markNotClosable(err)
}

// TransitionStage transitions the stage to the specified stage, and the state to the specified state.
func (r *runningStep) transitionStage(newStage StageID, state step.RunningStepState) {
r.transitionStageWithOutput(newStage, state, nil, nil)
// TransitionStage transitions the running step to the specified stage, and the state running.
// For other situations, use transitionFromFailedStage, completeStep, or transitionStageWithOutput
func (r *runningStep) transitionRunningStage(newStage StageID) {
r.transitionStageWithOutput(newStage, step.RunningStepStateRunning, nil, nil)
}

func (r *runningStep) transitionFromFailedStage(newStage StageID, state step.RunningStepState, err error) {
Expand Down
9 changes: 7 additions & 2 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) (
case <-ctx.Done():
lastErrors := l.handleErrors()
if lastErrors == nil {
e.logger.Warningf("Workflow execution aborted. Waiting 6 more seconds for output. %s", lastErrors)
e.logger.Warningf(
"Workflow execution aborted. Waiting 6 more seconds for output (%w)",
lastErrors)
} else {
return "", nil, lastErrors
}
Expand All @@ -233,7 +235,10 @@ func (e *executableWorkflow) Execute(ctx context.Context, serializedInput any) (
case outputDataEntry, ok := <-l.outputDataChannel:
if !ok {
lastErrors := l.handleErrors()
return "", nil, fmt.Errorf("output data channel unexpectedly closed while waiting after execution aborted. %s", lastErrors)
return "", nil,
fmt.Errorf(
"output data channel unexpectedly closed while waiting after execution aborted (%w)",
lastErrors)
}
return e.handleOutput(l, outputDataEntry)
case err := <-l.recentErrors: // The context is done, so instead just check for errors.
Expand Down
2 changes: 1 addition & 1 deletion workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ steps:
deployment_type: "builtin"
step: wait
input:
wait_time_ms: 0
wait_time_ms: 1000
# You can verify that this test works by commenting out this line. It should fail.
stop_if: !expr $.steps.short_wait.outputs
# Delay needs to be delayed long enough to ensure that it's in a deploy state when it's cancelled by short_wait
Expand Down

0 comments on commit 18680bd

Please sign in to comment.