Skip to content

Commit

Permalink
Added Async for the task processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Katari.Manikanta authored and Katari.Manikanta committed Dec 21, 2023
1 parent fa315f3 commit 9b1ba80
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 41 deletions.
4 changes: 3 additions & 1 deletion Conductor/Client/Interfaces/IWorkflowTask.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using Conductor.Client.Models;
using Conductor.Client.Worker;
using System.Threading;
using System.Threading.Tasks;

namespace Conductor.Client.Interfaces
{
public interface IWorkflowTask
{
string TaskType { get; }
WorkflowTaskExecutorConfiguration WorkerSettings { get; }
TaskResult Execute(Task task);
Task<TaskResult> Execute(Models.Task task, CancellationToken token);
}
}
10 changes: 8 additions & 2 deletions Conductor/Client/Worker/GenericWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Conductor.Client.Interfaces;
using Conductor.Client.Models;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

namespace Conductor.Client.Worker
{
Expand All @@ -20,9 +22,13 @@ public GenericWorker(string taskType, WorkflowTaskExecutorConfiguration workerSe
_workerInstance = workerInstance;
}

public TaskResult Execute(Task task)
public async Task<TaskResult> Execute(Models.Task task, CancellationToken token)
{
var taskResult = _executeTaskMethod.Invoke(_workerInstance, new object[] { task });

if (token.IsCancellationRequested)
return new TaskResult() { Status = TaskResult.StatusEnum.FAILEDWITHTERMINALERROR, ReasonForIncompletion = "Token Requested Cancel" };

var taskResult = await System.Threading.Tasks.Task.Run(() => _executeTaskMethod.Invoke(_workerInstance, new object[] { task }));
return (TaskResult)taskResult;
}
}
Expand Down
92 changes: 58 additions & 34 deletions Conductor/Client/Worker/WorkflowTaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Conductor.Client.Worker
{
Expand Down Expand Up @@ -44,9 +45,9 @@ public WorkflowTaskExecutor(
_workflowTaskMonitor = workflowTaskMonitor;
}

public System.Threading.Tasks.Task Start()
public Task Start()
{
var thread = System.Threading.Tasks.Task.Run(() => Work4Ever());
var thread = Task.Run(() => Work4Ever());
_logger.LogInformation(
$"[{_workerSettings.WorkerId}] Started worker"
+ $", taskName: {_worker.TaskType}"
Expand Down Expand Up @@ -78,15 +79,26 @@ private void Work4Ever()
}
}

private void WorkOnce()
private async void WorkOnce()
{
var tasks = PollTasks();
if (tasks.Count == 0)
{
Sleep(_workerSettings.PollInterval);
return;
}
ProcessTasks(tasks);
var uniqueBatchId = Guid.NewGuid();
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Processing tasks batch"
+ $", Task batch unique Id: {uniqueBatchId}"
);

await Task.Run(() => ProcessTasks(tasks));

_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Completed tasks batch"
+ $", Task batch unique Id: {uniqueBatchId}"
);
}

private List<Models.Task> PollTasks()
Expand Down Expand Up @@ -116,55 +128,67 @@ private void WorkOnce()
return tasks;
}

private void ProcessTasks(List<Models.Task> tasks)
private async void ProcessTasks(List<Models.Task> tasks)
{
List<Task> threads = new List<Task>();
if (tasks == null || tasks.Count == 0)
{
return;
}
foreach (var task in tasks)
{
_workflowTaskMonitor.IncrementRunningWorker();
System.Threading.Tasks.Task.Run(() => ProcessTask(task));
threads.Add(Task.Run(() => ProcessTask(task)));
}
await Task.WhenAll(threads);
}

private void ProcessTask(Models.Task task)
private async void ProcessTask(Models.Task task)
{
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
);
try
using (var cancelToken = new CancellationTokenSource())
{
var taskResult = _worker.Execute(task);

_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Done processing task for worker"
$"[{_workerSettings.WorkerId}] Processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
+ $", CancelToken: {cancelToken.Token}"
);
UpdateTask(taskResult);
}
catch (Exception e)
{
_logger.LogDebug(
$"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
);
var taskResult = task.Failed(e.Message);
UpdateTask(taskResult);
}
finally
{
_workflowTaskMonitor.RunningWorkerDone();

try
{
var taskResult = await _worker.Execute(task, cancelToken.Token);
_logger.LogTrace(
$"[{_workerSettings.WorkerId}] Done processing task for worker"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
+ $", CancelToken: {cancelToken.Token}"
);
UpdateTask(taskResult);
}
catch (Exception e)
{
_logger.LogDebug(
$"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}"
+ $", taskType: {_worker.TaskType}"
+ $", domain: {_workerSettings.Domain}"
+ $", taskId: {task.TaskId}"
+ $", workflowId: {task.WorkflowInstanceId}"
+ $", CancelToken: {cancelToken.Token}"
);
var taskResult = task.Failed(e.Message);
UpdateTask(taskResult);

cancelToken.Cancel();
}
finally
{
_workflowTaskMonitor.RunningWorkerDone();
}
}
}

Expand Down
15 changes: 11 additions & 4 deletions Tests/Worker/Workers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using Conductor.Client.Models;
using Conductor.Client.Worker;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Tests.Worker
{
Expand All @@ -18,14 +20,14 @@ static FunctionalWorkers()

// Polls for 5 task every 200ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")]
public static TaskResult SimpleWorker(Task task)
public static TaskResult SimpleWorker(Conductor.Client.Models.Task task)
{
return task.Completed();
}

// Polls for 12 tasks every 420ms
[WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")]
public TaskResult LazyWorker(Task task)
public TaskResult LazyWorker(Conductor.Client.Models.Task task)
{
var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048));
Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Milliseconds} ms");
Expand All @@ -46,9 +48,14 @@ public ClassWorker(string taskType = "random_task_type")
WorkerSettings = new WorkflowTaskExecutorConfiguration();
}

public TaskResult Execute(Task task)
public async Task<TaskResult> Execute(Conductor.Client.Models.Task task, CancellationToken token)
{
throw new System.Exception("random exception");
if (token.IsCancellationRequested)
{
throw new Exception("Token request Cancelled");
}

throw new Exception("random exception");
}
}
}

0 comments on commit 9b1ba80

Please sign in to comment.