Skip to content

Commit

Permalink
feat(sdk-go): Handle LHTaskExceptions, Errors, and Panics (#1210)
Browse files Browse the repository at this point in the history
  • Loading branch information
Snarr authored Dec 18, 2024
1 parent 00ef241 commit 641743d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/docs/05-developer-guide/05-task-worker-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ The Go SDK currently (as of `0.11.0`) does not yet support throwing `LHTaskExcep
<TabItem value="python" label="Python">

```python
from littlehorse.exceptions import LHTaskExceptio
from littlehorse.exceptions import LHTaskException

async def ship_item(item_sku: str) -> str:
if is_out_of_stock():
Expand Down
9 changes: 9 additions & 0 deletions sdk-go/littlehorse/lh_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package littlehorse

import "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"

type LHTaskException struct {
Name string
Message string
Content *lhproto.VariableValue
}
62 changes: 58 additions & 4 deletions sdk-go/littlehorse/task_worker_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"log"
"reflect"
"runtime/debug"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -311,13 +312,42 @@ func (m *serverConnectionManager) submitTaskForExecution(task *lhproto.Scheduled
}

func (m *serverConnectionManager) doTask(taskToExec *taskExecutionInfo) {
defer m.recoverFromPanic(taskToExec)
taskResult := m.doTaskHelper(taskToExec.task)
_, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult)
if err != nil {
m.retryReportTask(context.Background(), taskResult, TOTAL_RETRIES)
}
}

func (m *serverConnectionManager) recoverFromPanic(taskToExec *taskExecutionInfo) {
if v := recover(); v != nil {
varVal, _ := InterfaceToVarVal(v)
taskResult := &lhproto.ReportTaskRun{
TaskRunId: taskToExec.task.TaskRunId,
Time: timestamppb.Now(),
Status: lhproto.TaskStatus(lhproto.LHStatus_ERROR),
LogOutput: &lhproto.VariableValue{
Value: &lhproto.VariableValue_Str{
Str: string(debug.Stack()),
},
},
AttemptNumber: taskToExec.task.AttemptNumber,
Result: &lhproto.ReportTaskRun_Error{
Error: &lhproto.LHTaskError{
Type: lhproto.LHErrorType_TASK_FAILURE,
Message: "Task Worker Panic: " + varVal.GetStr(),
},
},
}
_, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult)
if err != nil {
log.Default().Print(err)
m.retryReportTask(context.Background(), taskResult, TOTAL_RETRIES)
}
}
}

func (m *serverConnectionManager) retryReportTask(ctx context.Context, taskResult *lhproto.ReportTaskRun, retries int) {
log.Println("Retrying reportTask rpc on wfRun {}", taskResult.TaskRunId.WfRunId)

Expand Down Expand Up @@ -399,16 +429,40 @@ func (m *serverConnectionManager) doTaskHelper(task *lhproto.ScheduledTask) *lhp
errorReflect := invocationResults[len(invocationResults)-1]

if errorReflect.Interface() != nil {
errorVarVal, err := InterfaceToVarVal(errorReflect.Interface())
if err != nil {
log.Println("WARN: was unable to serialize error")
// Check if the error is an LHTaskException
if lhtErr, ok := errorReflect.Interface().(*LHTaskException); ok {
taskResult.Result = &lhproto.ReportTaskRun_Exception{
Exception: &lhproto.LHTaskException{
Name: lhtErr.Name,
Message: lhtErr.Message,
Content: lhtErr.Content,
},
}
} else {
taskResult.LogOutput = errorVarVal
// Otherwise, try to interpret the error
if err, ok := errorReflect.Interface().(error); ok {
taskResult.Result = &lhproto.ReportTaskRun_Error{
Error: &lhproto.LHTaskError{
Type: lhproto.LHErrorType_TASK_FAILURE,
Message: err.Error(),
},
}
} else {
// If the error returned by the taskMethod does not match the error interface
taskResult.Result = &lhproto.ReportTaskRun_Error{
Error: &lhproto.LHTaskError{
Type: lhproto.LHErrorType_TASK_FAILURE,
Message: "Task Method error serialization failed.",
},
}
}
}

taskResult.Status = lhproto.TaskStatus_TASK_FAILED
}
}

taskResult.AttemptNumber = task.AttemptNumber
taskResult.Time = timestamppb.Now()

return taskResult
Expand Down

0 comments on commit 641743d

Please sign in to comment.