diff --git a/docs/docs/05-developer-guide/05-task-worker-development.md b/docs/docs/05-developer-guide/05-task-worker-development.md index c1ee6ec8d..440fb30f7 100644 --- a/docs/docs/05-developer-guide/05-task-worker-development.md +++ b/docs/docs/05-developer-guide/05-task-worker-development.md @@ -178,7 +178,7 @@ The Go SDK currently (as of `0.11.0`) does not yet support throwing `LHTaskExcep ```python -from littlehorse.exceptions import LHTaskExceptio +from littlehorse.exceptions import LHTaskException async def ship_item(item_sku: str) -> str: if is_out_of_stock(): diff --git a/sdk-go/littlehorse/lh_errors.go b/sdk-go/littlehorse/lh_errors.go new file mode 100644 index 000000000..4dd9610a0 --- /dev/null +++ b/sdk-go/littlehorse/lh_errors.go @@ -0,0 +1,9 @@ +package littlehorse + +import "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" + +type LHTaskException struct { + Name string + Message string + Content *lhproto.VariableValue +} diff --git a/sdk-go/littlehorse/task_worker_internal.go b/sdk-go/littlehorse/task_worker_internal.go index a928b9879..c2591bf18 100644 --- a/sdk-go/littlehorse/task_worker_internal.go +++ b/sdk-go/littlehorse/task_worker_internal.go @@ -5,6 +5,7 @@ import ( "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto" "log" "reflect" + "runtime/debug" "strconv" "sync" "time" @@ -311,6 +312,7 @@ func (m *serverConnectionManager) submitTaskForExecution(task *lhproto.Scheduled } func (m *serverConnectionManager) doTask(taskToExec *taskExecutionInfo) { + defer m.recoverFromPanic(taskToExec) taskResult := m.doTaskHelper(taskToExec.task) _, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult) if err != nil { @@ -318,6 +320,34 @@ func (m *serverConnectionManager) doTask(taskToExec *taskExecutionInfo) { } } +func (m *serverConnectionManager) recoverFromPanic(taskToExec *taskExecutionInfo) { + if v := recover(); v != nil { + varVal, _ := InterfaceToVarVal(v) + taskResult := &lhproto.ReportTaskRun{ + TaskRunId: taskToExec.task.TaskRunId, + Time: timestamppb.Now(), + Status: lhproto.TaskStatus(lhproto.LHStatus_ERROR), + LogOutput: &lhproto.VariableValue{ + Value: &lhproto.VariableValue_Str{ + Str: string(debug.Stack()), + }, + }, + AttemptNumber: taskToExec.task.AttemptNumber, + Result: &lhproto.ReportTaskRun_Error{ + Error: &lhproto.LHTaskError{ + Type: lhproto.LHErrorType_TASK_FAILURE, + Message: "Task Worker Panic: " + varVal.GetStr(), + }, + }, + } + _, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult) + if err != nil { + log.Default().Print(err) + m.retryReportTask(context.Background(), taskResult, TOTAL_RETRIES) + } + } +} + func (m *serverConnectionManager) retryReportTask(ctx context.Context, taskResult *lhproto.ReportTaskRun, retries int) { log.Println("Retrying reportTask rpc on wfRun {}", taskResult.TaskRunId.WfRunId) @@ -399,16 +429,40 @@ func (m *serverConnectionManager) doTaskHelper(task *lhproto.ScheduledTask) *lhp errorReflect := invocationResults[len(invocationResults)-1] if errorReflect.Interface() != nil { - errorVarVal, err := InterfaceToVarVal(errorReflect.Interface()) - if err != nil { - log.Println("WARN: was unable to serialize error") + // Check if the error is an LHTaskException + if lhtErr, ok := errorReflect.Interface().(*LHTaskException); ok { + taskResult.Result = &lhproto.ReportTaskRun_Exception{ + Exception: &lhproto.LHTaskException{ + Name: lhtErr.Name, + Message: lhtErr.Message, + Content: lhtErr.Content, + }, + } } else { - taskResult.LogOutput = errorVarVal + // Otherwise, try to interpret the error + if err, ok := errorReflect.Interface().(error); ok { + taskResult.Result = &lhproto.ReportTaskRun_Error{ + Error: &lhproto.LHTaskError{ + Type: lhproto.LHErrorType_TASK_FAILURE, + Message: err.Error(), + }, + } + } else { + // If the error returned by the taskMethod does not match the error interface + taskResult.Result = &lhproto.ReportTaskRun_Error{ + Error: &lhproto.LHTaskError{ + Type: lhproto.LHErrorType_TASK_FAILURE, + Message: "Task Method error serialization failed.", + }, + } + } } + taskResult.Status = lhproto.TaskStatus_TASK_FAILED } } + taskResult.AttemptNumber = task.AttemptNumber taskResult.Time = timestamppb.Now() return taskResult