Skip to content

Commit

Permalink
feat(dotnet-sdk): create worker context method to access from task ex…
Browse files Browse the repository at this point in the history
…ecution (#1173)
  • Loading branch information
KarlaCarvajal authored Dec 5, 2024
1 parent db18236 commit b0adf3c
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
Expand Down
21 changes: 21 additions & 0 deletions sdk-dotnet/Examples/WorkerContextExample/MyWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using LittleHorse.Sdk.Worker;

namespace WorkerContextExample;

public class MyWorker
{
[LHTaskMethod("task")]
public void ProcessTask(long requestTime, LHWorkerContext context)
{
context.Log("ProcessPayment");
Console.WriteLine($"Processing request time: {requestTime}");
Console.WriteLine($"The Workflow Run Id is: {context.GetWfRunId()}");
Console.WriteLine($"The Node Run Id is: {context.GetNodeRunId()}");
Console.WriteLine($"The Task Run Id is: {context.GetTaskRunId()}");
Console.WriteLine($"The Idempotency Key is: {context.GetIdempotencyKey()}");
Console.WriteLine($"The Attempt Number is: {context.GetAttemptNumber()}");
Console.WriteLine($"The Scheduled Time is: {context.GetScheduledTime()}");
Console.WriteLine($"The User Group is: {context.GetUserGroup()}");
Console.WriteLine($"The User Id is: {context.GetUserId()}");
}
}
65 changes: 65 additions & 0 deletions sdk-dotnet/Examples/WorkerContextExample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using LittleHorse.Sdk;
using LittleHorse.Sdk.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace WorkerContextExample;

public abstract class Program
{
private static ServiceProvider? _serviceProvider;
private static void SetupApplication()
{
_serviceProvider = new ServiceCollection()
.AddLogging(config =>
{
config.AddConsole();
config.SetMinimumLevel(LogLevel.Debug);
})
.BuildServiceProvider();
}

private static LHConfig GetLHConfig(string[] args, ILoggerFactory loggerFactory)
{
var config = new LHConfig(loggerFactory);

string filePath = Path.Combine(Directory.GetCurrentDirectory(), ".config/littlehorse.config");
if (File.Exists(filePath))
config = new LHConfig(filePath, loggerFactory);

return config;
}

private static List<LHTaskWorker<MyWorker>> GetTaskWorkers(LHConfig config)
{
MyWorker executableExceptionHandling = new MyWorker();
var workers = new List<LHTaskWorker<MyWorker>>
{
new(executableExceptionHandling, "task", config)
};

return workers;
}

static void Main(string[] args)
{
SetupApplication();
if (_serviceProvider != null)
{
var loggerFactory = _serviceProvider.GetRequiredService<ILoggerFactory>();
var config = GetLHConfig(args, loggerFactory);
var workers = GetTaskWorkers(config);
foreach (var worker in workers)
{
worker.RegisterTaskDef();
}

Thread.Sleep(300);

foreach (var worker in workers)
{
worker.Start();
}
}
}
}
45 changes: 45 additions & 0 deletions sdk-dotnet/Examples/WorkerContextExample/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## Running WorkerContextExample

This example shows how to get access to the context when executing a task.
Go to the class `MyWorker`.

Let's run the example in `WorkerContextExample`

```
dotnet build
dotnet run
```

In another terminal, use `lhctl` to run the workflow:

```
lhctl run example-worker-context
```

In addition, you can check the result with:

```
# This call shows the result
lhctl get wfRun <wf_run_id>
# This will show you all nodes in tha run
lhctl list nodeRun <wf_run_id>
# This shows the task run information
lhctl get taskRun <wf_run_id> <task_run_global_id>
```

## Considerations

If you need get access to the context you have to add a `LHWorkerContext`
parameter to the signature of your task:

```
[LHTaskMethod("task")]
public void ProcessTask(long requestTime, LHWorkerContext context)
{
...
}
```

The `WorkerContext` should be the last parameter.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\LittleHorse.Sdk\LittleHorse.Sdk.csproj" />
</ItemGroup>

</Project>
41 changes: 41 additions & 0 deletions sdk-dotnet/LittleHorse.Sdk/Helper/LHHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Text;
using LittleHorse.Common.Proto;

namespace LittleHorse.Sdk.Helper
{
public static class LHHelper
{
public static WfRunId GetWFRunId(TaskRunSource taskRunSource)
{
switch (taskRunSource.TaskRunSourceCase)
{
case TaskRunSource.TaskRunSourceOneofCase.TaskNode:
return taskRunSource.TaskNode.NodeRunId.WfRunId;
case TaskRunSource.TaskRunSourceOneofCase.UserTaskTrigger:
return taskRunSource.UserTaskTrigger.NodeRunId.WfRunId;
default:
return null;
}
}

public static string TaskRunIdToString(TaskRunId taskRunId)
{
return $"{taskRunId.WfRunId}/{taskRunId.TaskGuid}";
}

private static string ParseWfRunIdToString(WfRunId wfRunId) {
var output = new StringBuilder();
if (wfRunId.ParentWfRunId != null) {
output.Append(ParseWfRunIdToString(wfRunId.ParentWfRunId));
output.Append("_");
}
output.Append(wfRunId.Id);

return output.ToString();
}

public static String ParseTaskRunIdToString(TaskRunId taskRunId) {
return ParseWfRunIdToString(taskRunId.WfRunId) + "/" + taskRunId.TaskGuid;
}
}
}
4 changes: 2 additions & 2 deletions sdk-dotnet/LittleHorse.Sdk/Helper/LHMappingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ private static Double GetFloatingValue(object obj)
};
}

public static bool IsInt64Type(Type type)
internal static bool IsInt64Type(Type type)
{
return type.IsAssignableFrom(typeof(Int64))
|| type.IsAssignableFrom(typeof(UInt64))
|| type.IsAssignableFrom(typeof(long))
|| type.IsAssignableFrom(typeof(ulong));
}

public static LHErrorType GetFailureCodeFor(TaskStatus status)
internal static LHErrorType GetFailureCodeFor(TaskStatus status)
{
switch (status) {
case TaskStatus.TaskFailed:
Expand Down
25 changes: 0 additions & 25 deletions sdk-dotnet/LittleHorse.Sdk/Helper/LHWorkerHelper.cs

This file was deleted.

7 changes: 5 additions & 2 deletions sdk-dotnet/LittleHorse.Sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ brew install dotnet
### Build

```
cd sdk-dotnet/LittleHorse.Sdk
cd sdk-dotnet
dotnet build ./LittleHorse.Sdk
```
### Build and Run tests
Expand All @@ -41,10 +41,13 @@ dotnet build ./LittleHorse.Sdk
dotnet test ./LittleHorse.Sdk.Tests
```

### Run Example
### Run Examples

```
dotnet run --project ./Examples/BasicExample
dotnet run --project ./Examples/ExceptionsHandlerExample
dotnet run --project ./Examples/MaskedFieldsExample
dotnet run --project ./Examples/WorkerContextExample
```

### Self-signed TLS certificate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ private async Task RequestMoreWorkAsync()
if (taskToDo.Result != null)
{
var scheduledTask = taskToDo.Result;
var wFRunId = LHWorkerHelper.GetWFRunId(scheduledTask.Source);
_logger?.LogDebug($"Received task schedule request for wfRun {wFRunId}");
var wFRunId = LHHelper.GetWFRunId(scheduledTask.Source);
_logger?.LogDebug($"Received task schedule request for wfRun {wFRunId.Id}");

_connectionManager.SubmitTaskForExecution(scheduledTask, _client);

_logger?.LogDebug($"Scheduled task on threadpool for wfRun {wFRunId}");
_logger?.LogDebug($"Scheduled task on threadpool for wfRun {wFRunId.Id}");
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,20 @@ private void DoTask(ScheduledTask scheduledTask, LittleHorseClient client)
ReportTaskRun result = ExecuteTask(scheduledTask, LHMappingHelper.MapDateTimeFromProtoTimeStamp(scheduledTask.CreatedAt));
_semaphore.Release();

var wfRunId = LHWorkerHelper.GetWFRunId(scheduledTask.Source);
var wfRunId = LHHelper.GetWFRunId(scheduledTask.Source);

try
{
var retriesLeft = MAX_REPORT_RETRIES;

_logger?.LogDebug($"Going to report task for wfRun {wfRunId}");
_logger?.LogDebug($"Going to report task for wfRun {wfRunId.Id}");
Policy.Handle<Exception>().WaitAndRetry(MAX_REPORT_RETRIES,
retryAttempt => TimeSpan.FromSeconds(5),
onRetry: (exception, timeSpan, retryCount, context) =>
{
--retriesLeft;
_logger?.LogDebug($"Failed to report task for wfRun {wfRunId}: {exception.Message}. Retries left: {retriesLeft}");
_logger?.LogDebug($"Retrying reportTask rpc on taskRun {LHWorkerHelper.TaskRunIdToString(result.TaskRunId)}");
_logger?.LogDebug($"Retrying reportTask rpc on taskRun {LHHelper.TaskRunIdToString(result.TaskRunId)}");
}).Execute(() => RunReportTask(result));
}
catch (Exception ex)
Expand Down
Loading

0 comments on commit b0adf3c

Please sign in to comment.