Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dotnet-sdk): create worker context method to access from task execution #1173

Merged
merged 10 commits into from
Dec 5, 2024
Merged
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
Expand Down
21 changes: 21 additions & 0 deletions sdk-dotnet/Examples/WorkerContextExample/MyWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using LittleHorse.Sdk.Worker;

namespace WorkerContextExample;

public class MyWorker
{
[LHTaskMethod("task")]
public void ProcessTask(long requestTime, LHWorkerContext context)
{
context.Log("ProcessPayment");
Console.WriteLine($"Processing request time: {requestTime}");
Console.WriteLine($"The Workflow Run Id is: {context.GetWfRunId()}");
Console.WriteLine($"The Node Run Id is: {context.GetNodeRunId()}");
Console.WriteLine($"The Task Run Id is: {context.GetTaskRunId()}");
Console.WriteLine($"The Idempotency Key is: {context.GetIdempotencyKey()}");
Console.WriteLine($"The Attempt Number is: {context.GetAttemptNumber()}");
Console.WriteLine($"The Scheduled Time is: {context.GetScheduledTime()}");
Console.WriteLine($"The User Group is: {context.GetUserGroup()}");
Console.WriteLine($"The User Id is: {context.GetUserId()}");
}
}
65 changes: 65 additions & 0 deletions sdk-dotnet/Examples/WorkerContextExample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace WorkerContextExample;

public abstract class Program
{
private static ServiceProvider? _serviceProvider;
private static void SetupApplication()
{
_serviceProvider = new ServiceCollection()
.AddLogging(config =>
{
config.AddConsole();
config.SetMinimumLevel(LogLevel.Debug);
})
.BuildServiceProvider();
}

private static LHConfig GetLHConfig(string[] args, ILoggerFactory loggerFactory)
{
var config = new LHConfig(loggerFactory);

string filePath = Path.Combine(Directory.GetCurrentDirectory(), ".config/littlehorse.config");
if (File.Exists(filePath))
config = new LHConfig(filePath, loggerFactory);

return config;
}

private static List<LHTaskWorker<MyWorker>> GetTaskWorkers(LHConfig config)
{
MyWorker executableExceptionHandling = new MyWorker();
var workers = new List<LHTaskWorker<MyWorker>>
{
new(executableExceptionHandling, "task", config)
};

return workers;
}

static void Main(string[] args)
{
SetupApplication();
if (_serviceProvider != null)
{
var loggerFactory = _serviceProvider.GetRequiredService<ILoggerFactory>();
var config = GetLHConfig(args, loggerFactory);
var workers = GetTaskWorkers(config);
foreach (var worker in workers)
{
worker.RegisterTaskDef();
}

Thread.Sleep(300);

foreach (var worker in workers)
{
worker.Start();
}
}
}
}
45 changes: 45 additions & 0 deletions sdk-dotnet/Examples/WorkerContextExample/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## Running WorkerContextExample

This example shows how to get access to the context when executing a task.
Go to the class `MyWorker`.

Let's run the example in `WorkerContextExample`

```
dotnet build
dotnet run
```

In another terminal, use `lhctl` to run the workflow:

```
lhctl run example-worker-context
```

In addition, you can check the result with:

```
# This call shows the result
lhctl get wfRun <wf_run_id>

# This will show you all nodes in tha run
lhctl list nodeRun <wf_run_id>

# This shows the task run information
lhctl get taskRun <wf_run_id> <task_run_global_id>
```

## Considerations

If you need get access to the context you have to add a `LHWorkerContext`
parameter to the signature of your task:

```
[LHTaskMethod("task")]
public void ProcessTask(long requestTime, LHWorkerContext context)
{
...
}
```

The `WorkerContext` should be the last parameter.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\LittleHorse.Sdk\LittleHorse.Sdk.csproj" />
</ItemGroup>

</Project>
41 changes: 41 additions & 0 deletions sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Text;
using LittleHorse.Common.Proto;

namespace LittleHorse.Sdk.Helper
{
public static class LHHelper
{
public static WfRunId GetWFRunId(TaskRunSource taskRunSource)
{
switch (taskRunSource.TaskRunSourceCase)
{
case TaskRunSource.TaskRunSourceOneofCase.TaskNode:
return taskRunSource.TaskNode.NodeRunId.WfRunId;
case TaskRunSource.TaskRunSourceOneofCase.UserTaskTrigger:
return taskRunSource.UserTaskTrigger.NodeRunId.WfRunId;
default:
return null;
}
}

public static string TaskRunIdToString(TaskRunId taskRunId)
{
return $"{taskRunId.WfRunId}/{taskRunId.TaskGuid}";
}

private static string ParseWfRunIdToString(WfRunId wfRunId) {
var output = new StringBuilder();
if (wfRunId.ParentWfRunId != null) {
output.Append(ParseWfRunIdToString(wfRunId.ParentWfRunId));
output.Append("_");
}
output.Append(wfRunId.Id);

return output.ToString();
}

public static String ParseTaskRunIdToString(TaskRunId taskRunId) {
return ParseWfRunIdToString(taskRunId.WfRunId) + "/" + taskRunId.TaskGuid;
}
}
}
4 changes: 2 additions & 2 deletions sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ private static Double GetFloatingValue(object obj)
};
}

public static bool IsInt64Type(Type type)
internal static bool IsInt64Type(Type type)
{
return type.IsAssignableFrom(typeof(Int64))
|| type.IsAssignableFrom(typeof(UInt64))
|| type.IsAssignableFrom(typeof(long))
|| type.IsAssignableFrom(typeof(ulong));
}

public static LHErrorType GetFailureCodeFor(TaskStatus status)
internal static LHErrorType GetFailureCodeFor(TaskStatus status)
{
switch (status) {
case TaskStatus.TaskFailed:
Expand Down
25 changes: 0 additions & 25 deletions sdk-dotnet/LittleHorse.Sdk/Helper/LHWorkerHelper.cs

This file was deleted.

7 changes: 5 additions & 2 deletions sdk-dotnet/LittleHorse.Sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ brew install dotnet
### Build

```
cd sdk-dotnet/LittleHorse.Sdk
cd sdk-dotnet
dotnet build ./LittleHorse.Sdk
```
### Build and Run tests
Expand All @@ -41,10 +41,13 @@ dotnet build ./LittleHorse.Sdk
dotnet test ./LittleHorse.Sdk.Tests
```

### Run Example
### Run Examples

```
dotnet run --project ./Examples/BasicExample
dotnet run --project ./Examples/ExceptionsHandlerExample
dotnet run --project ./Examples/MaskedFieldsExample
dotnet run --project ./Examples/WorkerContextExample
```

### Self-signed TLS certificate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ private async Task RequestMoreWorkAsync()
if (taskToDo.Result != null)
{
var scheduledTask = taskToDo.Result;
var wFRunId = LHWorkerHelper.GetWFRunId(scheduledTask.Source);
_logger?.LogDebug($"Received task schedule request for wfRun {wFRunId}");
var wFRunId = LHHelper.GetWFRunId(scheduledTask.Source);
_logger?.LogDebug($"Received task schedule request for wfRun {wFRunId.Id}");

_connectionManager.SubmitTaskForExecution(scheduledTask, _client);

_logger?.LogDebug($"Scheduled task on threadpool for wfRun {wFRunId}");
_logger?.LogDebug($"Scheduled task on threadpool for wfRun {wFRunId.Id}");
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,20 @@ private void DoTask(ScheduledTask scheduledTask, LittleHorseClient client)
ReportTaskRun result = ExecuteTask(scheduledTask, LHMappingHelper.MapDateTimeFromProtoTimeStamp(scheduledTask.CreatedAt));
_semaphore.Release();

var wfRunId = LHWorkerHelper.GetWFRunId(scheduledTask.Source);
var wfRunId = LHHelper.GetWFRunId(scheduledTask.Source);

try
{
var retriesLeft = MAX_REPORT_RETRIES;

_logger?.LogDebug($"Going to report task for wfRun {wfRunId}");
_logger?.LogDebug($"Going to report task for wfRun {wfRunId.Id}");
Policy.Handle<Exception>().WaitAndRetry(MAX_REPORT_RETRIES,
retryAttempt => TimeSpan.FromSeconds(5),
onRetry: (exception, timeSpan, retryCount, context) =>
{
--retriesLeft;
_logger?.LogDebug($"Failed to report task for wfRun {wfRunId}: {exception.Message}. Retries left: {retriesLeft}");
_logger?.LogDebug($"Retrying reportTask rpc on taskRun {LHWorkerHelper.TaskRunIdToString(result.TaskRunId)}");
_logger?.LogDebug($"Retrying reportTask rpc on taskRun {LHHelper.TaskRunIdToString(result.TaskRunId)}");
}).Execute(() => RunReportTask(result));
}
catch (Exception ex)
Expand Down
Loading
Loading