Skip to content

Commit

Permalink
Merge branch 'main' into renovate/all
Browse files Browse the repository at this point in the history
  • Loading branch information
dustinblack authored Nov 16, 2023
2 parents d687bf5 + 4a7cb18 commit c0e96fb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
22 changes: 13 additions & 9 deletions atp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ func (c *client) Execute(
}
}
if err := c.sendCBOR(workStartMsg); err != nil {
c.logger.Errorf("Step %s failed to write start work message: %v", stepData.ID, err)
c.logger.Errorf("Step '%s' failed to write start work message: %v", stepData.ID, err)
return NewErrorExecutionResult(fmt.Errorf("failed to write work start message (%w)", err))
}
c.logger.Debugf("Step %s started, waiting for response...", stepData.ID)
c.logger.Debugf("Step '%s' started, waiting for response...", stepData.ID)

defer c.handleStepComplete(stepData.RunID, receivedSignals)
return c.getResult(stepData, cborReader)
Expand Down Expand Up @@ -346,7 +346,7 @@ func (c *client) executeReadLoop(cborReader *cbor.Decoder) {
var runtimeMessage DecodedRuntimeMessage
for {
if err := cborReader.Decode(&runtimeMessage); err != nil {
c.logger.Errorf("ATP client for steps %s failed to read or decode runtime message: %v", c.getRunningStepIDs(), err)
c.logger.Errorf("ATP client for steps '%s' failed to read or decode runtime message: %v", c.getRunningStepIDs(), err)
// This is fatal since the entire structure of the runtime message is invalid.
c.sendErrorToAll(fmt.Errorf("failed to read or decode runtime message (%w)", err))
return
Expand All @@ -355,7 +355,7 @@ func (c *client) executeReadLoop(cborReader *cbor.Decoder) {
case MessageTypeWorkDone:
var doneMessage WorkDoneMessage
if err := cbor.Unmarshal(runtimeMessage.RawMessageData, &doneMessage); err != nil {
c.logger.Errorf("Failed to decode work done message (%v) for run ID %s ", err, runtimeMessage.RunID)
c.logger.Errorf("Failed to decode work done message (%v) for run ID '%s' ", err, runtimeMessage.RunID)
c.sendExecutionResult(runtimeMessage.RunID, NewErrorExecutionResult(
fmt.Errorf("failed to decode work done message (%w)", err)))
}
Expand Down Expand Up @@ -383,17 +383,21 @@ func (c *client) executeReadLoop(cborReader *cbor.Decoder) {
c.logger.Errorf("Step with run ID '%s' failed to decode error message: %v",
runtimeMessage.RunID, err)
}
c.logger.Errorf("Step with run ID %s sent error message: %v", runtimeMessage.RunID, errMessage)
resultMsg := fmt.Errorf("step %s sent error message: %s", runtimeMessage.RunID,
c.logger.Errorf("Step with run ID '%s' sent error message: %v", runtimeMessage.RunID, errMessage)
resultMsg := fmt.Errorf("step '%s' sent error message: %s", runtimeMessage.RunID,
errMessage.ToString(runtimeMessage.RunID))
if errMessage.ServerFatal {
c.sendErrorToAll(resultMsg)
return // It's server fatal, so this is the last message from the server.
} else if errMessage.StepFatal {
c.sendExecutionResult(runtimeMessage.RunID, NewErrorExecutionResult(resultMsg))
if runtimeMessage.RunID == "" {
c.sendErrorToAll(fmt.Errorf("step fatal error missing run id (%w)", resultMsg))
} else {
c.sendExecutionResult(runtimeMessage.RunID, NewErrorExecutionResult(resultMsg))
}
}
default:
c.logger.Warningf("Step with run ID %s sent unknown message type: %s", runtimeMessage.RunID,
c.logger.Warningf("Step with run ID '%s' sent unknown message type: %s", runtimeMessage.RunID,
runtimeMessage.MessageID)
}
c.mutex.Lock()
Expand Down Expand Up @@ -495,7 +499,7 @@ func (c *client) processWorkDone(
debugLogs := strings.Split(doneMessage.DebugLogs, "\n")
for _, line := range debugLogs {
if strings.TrimSpace(line) != "" {
c.logger.Debugf("Step %s debug: %s", runID, line)
c.logger.Debugf("Step '%s' debug: %s", runID, line)
}
}

Expand Down
10 changes: 6 additions & 4 deletions atp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type ServerError struct {
}

func (e ServerError) String() string {
return fmt.Sprintf("RunID: %s, err: %s, step fatal: %t, server fatal: %t", e.RunID, e.Err, e.StepFatal, e.ServerFatal)
return fmt.Sprintf("RunID: '%s', err: %s, step fatal: %t, server fatal: %t", e.RunID, e.Err, e.StepFatal, e.ServerFatal)
}

func initializeATPServerSession(
Expand Down Expand Up @@ -178,7 +178,7 @@ func (s *atpServerSession) onRuntimeMessageReceived(message *DecodedRuntimeMessa
var workStartMsg WorkStartMessage
if err := cbor.Unmarshal(message.RawMessageData, &workStartMsg); err != nil {
s.workDone <- ServerError{
RunID: "",
RunID: runID,
Err: fmt.Errorf("failed to decode work start message: %w", err),
StepFatal: true,
ServerFatal: false,
Expand All @@ -191,7 +191,7 @@ func (s *atpServerSession) onRuntimeMessageReceived(message *DecodedRuntimeMessa
var signalMessage SignalMessage
if err := cbor.Unmarshal(message.RawMessageData, &signalMessage); err != nil {
s.workDone <- ServerError{
RunID: "",
RunID: runID,
Err: fmt.Errorf("failed to decode signal message: %w", err),
StepFatal: false,
ServerFatal: false,
Expand All @@ -206,6 +206,7 @@ func (s *atpServerSession) onRuntimeMessageReceived(message *DecodedRuntimeMessa
err := s.stdinCloser.Close()
if err != nil {
s.workDone <- ServerError{
// this error does not apply to a specific run id
RunID: "",
Err: fmt.Errorf("error while closing stdin on client done: %w", err),
StepFatal: true,
Expand All @@ -215,6 +216,7 @@ func (s *atpServerSession) onRuntimeMessageReceived(message *DecodedRuntimeMessa
return true // Client done, so terminate loop
default:
s.workDone <- ServerError{
// this error does not apply to a specific run id
RunID: "",
Err: fmt.Errorf("unknown message ID received: %d. This is a sign of incompatible server and client versions",
message.MessageID),
Expand Down Expand Up @@ -323,7 +325,7 @@ func (s *atpServerSession) runStep(runID string, req WorkStartMessage) {
outputID, outputData, err := s.pluginSchema.CallStep(s.ctx, runID, req.StepID, req.Config)
if err != nil {
s.workDone <- ServerError{
RunID: "",
RunID: runID,
Err: fmt.Errorf("error calling step (%w)", err),
StepFatal: true,
ServerFatal: false,
Expand Down

0 comments on commit c0e96fb

Please sign in to comment.