Skip to content

Commit

Permalink
Merge branch 'master' into dashboard-scheduledwfruns
Browse files Browse the repository at this point in the history
  • Loading branch information
HazimAr authored Dec 18, 2024
2 parents 943e2f9 + 8a82d0a commit cefb2e2
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 8 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ jobs:
NODE_AUTH_TOKEN: ${{ secrets.NPM_PUBLISH_TOKEN }}
run: npm publish --access public

sdk-dotnet:
runs-on: ubuntu-latest
needs:
- publish-docker
- prepare
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Dotnet
uses: actions/setup-dotnet@v4
with:
dotnet-version: '6'
- name: Bump version
env:
TAG: ${{ needs.prepare.outputs.tag }}
run: sed -i "s/<PackageVersion>.*<\/PackageVersion>/<PackageVersion>${TAG}<\/PackageVersion>/g" sdk-dotnet/LittleHorse.Sdk/LittleHorse.Sdk.csproj
- name: Build Project
run: dotnet build sdk-dotnet/LittleHorse.Sdk/LittleHorse.Sdk.csproj
- name: Create Package
run: dotnet pack --configuration Release sdk-dotnet/LittleHorse.Sdk/LittleHorse.Sdk.csproj
- name: Publish Package to nuget.org
run: dotnet nuget push sdk-dotnet/LittleHorse.Sdk/bin/Release/*.nupkg -k $NUGET_AUTH_TOKEN -s https://api.nuget.org/v3/index.json
env:
NUGET_AUTH_TOKEN: ${{ secrets.NUGET_TOKEN }}

lhctl:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -174,6 +199,7 @@ jobs:
- sdk-java
- sdk-python
- sdk-js
- sdk-dotnet
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand Down
15 changes: 15 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,18 @@ jobs:
run: |
npm ci
npm run test
tests-sdk-dotnet:
if: ${{ !contains(github.event.head_commit.message, '[skip main]') }}
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Dotnet
uses: actions/setup-dotnet@v4
with:
dotnet-version: '6'
- name: Build Project
run: dotnet build sdk-dotnet/LittleHorse.Sdk/LittleHorse.Sdk.csproj
- name: Test Dotnet
run: dotnet test sdk-dotnet/LittleHorse.Sdk.Tests
2 changes: 1 addition & 1 deletion docs/docs/05-developer-guide/05-task-worker-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ The Go SDK currently (as of `0.11.0`) does not yet support throwing `LHTaskExcep
<TabItem value="python" label="Python">

```python
from littlehorse.exceptions import LHTaskExceptio
from littlehorse.exceptions import LHTaskException

async def ship_item(item_sku: str) -> str:
if is_out_of_stock():
Expand Down
4 changes: 1 addition & 3 deletions sdk-dotnet/LittleHorse.Sdk/LittleHorse.Sdk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AssemblyVersion>0.5.8</AssemblyVersion>
<FileVersion>0.5.8</FileVersion>
<PackageVersion>0.5.8-alpha</PackageVersion>
<PackageVersion>0.0.0</PackageVersion>
<Company>LittleHorse Enterprises LLC</Company>
<Authors>LittleHorse Enterprises LLC</Authors>
<Product>LittleHorseSDK</Product>
Expand Down
9 changes: 9 additions & 0 deletions sdk-go/littlehorse/lh_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package littlehorse

import "github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"

type LHTaskException struct {
Name string
Message string
Content *lhproto.VariableValue
}
62 changes: 58 additions & 4 deletions sdk-go/littlehorse/task_worker_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"log"
"reflect"
"runtime/debug"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -311,13 +312,42 @@ func (m *serverConnectionManager) submitTaskForExecution(task *lhproto.Scheduled
}

func (m *serverConnectionManager) doTask(taskToExec *taskExecutionInfo) {
defer m.recoverFromPanic(taskToExec)
taskResult := m.doTaskHelper(taskToExec.task)
_, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult)
if err != nil {
m.retryReportTask(context.Background(), taskResult, TOTAL_RETRIES)
}
}

func (m *serverConnectionManager) recoverFromPanic(taskToExec *taskExecutionInfo) {
if v := recover(); v != nil {
varVal, _ := InterfaceToVarVal(v)
taskResult := &lhproto.ReportTaskRun{
TaskRunId: taskToExec.task.TaskRunId,
Time: timestamppb.Now(),
Status: lhproto.TaskStatus(lhproto.LHStatus_ERROR),
LogOutput: &lhproto.VariableValue{
Value: &lhproto.VariableValue_Str{
Str: string(debug.Stack()),
},
},
AttemptNumber: taskToExec.task.AttemptNumber,
Result: &lhproto.ReportTaskRun_Error{
Error: &lhproto.LHTaskError{
Type: lhproto.LHErrorType_TASK_FAILURE,
Message: "Task Worker Panic: " + varVal.GetStr(),
},
},
}
_, err := (*taskToExec.specificStub).ReportTask(context.Background(), taskResult)
if err != nil {
log.Default().Print(err)
m.retryReportTask(context.Background(), taskResult, TOTAL_RETRIES)
}
}
}

func (m *serverConnectionManager) retryReportTask(ctx context.Context, taskResult *lhproto.ReportTaskRun, retries int) {
log.Println("Retrying reportTask rpc on wfRun {}", taskResult.TaskRunId.WfRunId)

Expand Down Expand Up @@ -399,16 +429,40 @@ func (m *serverConnectionManager) doTaskHelper(task *lhproto.ScheduledTask) *lhp
errorReflect := invocationResults[len(invocationResults)-1]

if errorReflect.Interface() != nil {
errorVarVal, err := InterfaceToVarVal(errorReflect.Interface())
if err != nil {
log.Println("WARN: was unable to serialize error")
// Check if the error is an LHTaskException
if lhtErr, ok := errorReflect.Interface().(*LHTaskException); ok {
taskResult.Result = &lhproto.ReportTaskRun_Exception{
Exception: &lhproto.LHTaskException{
Name: lhtErr.Name,
Message: lhtErr.Message,
Content: lhtErr.Content,
},
}
} else {
taskResult.LogOutput = errorVarVal
// Otherwise, try to interpret the error
if err, ok := errorReflect.Interface().(error); ok {
taskResult.Result = &lhproto.ReportTaskRun_Error{
Error: &lhproto.LHTaskError{
Type: lhproto.LHErrorType_TASK_FAILURE,
Message: err.Error(),
},
}
} else {
// If the error returned by the taskMethod does not match the error interface
taskResult.Result = &lhproto.ReportTaskRun_Error{
Error: &lhproto.LHTaskError{
Type: lhproto.LHErrorType_TASK_FAILURE,
Message: "Task Method error serialization failed.",
},
}
}
}

taskResult.Status = lhproto.TaskStatus_TASK_FAILED
}
}

taskResult.AttemptNumber = task.AttemptNumber
taskResult.Time = timestamppb.Now()

return taskResult
Expand Down

0 comments on commit cefb2e2

Please sign in to comment.