Skip to content

Commit

Permalink
Merge pull request #123 from conductor-sdk/fix/broken-workers-and-oth…
Browse files Browse the repository at this point in the history
…er-fixes

FIX: worker issues with Orkes Conductor API & other issues
  • Loading branch information
jmigueprieto authored Dec 6, 2024
2 parents bb3dddf + 8751793 commit eba0c7f
Show file tree
Hide file tree
Showing 36 changed files with 72 additions and 57 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
name: CI Build

on: [push, pull_request,workflow_dispatch]
on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:

jobs:
lint:
Expand Down
8 changes: 6 additions & 2 deletions Conductor/Api/EnvironmentResourceApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,13 @@ public ApiResponse<Dictionary<string, string>> GetAllWithHttpInfo()
if (exception != null) throw exception;
}

var list = (List<Dictionary<string, string>>)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(List<Dictionary<string, string>>));
var dictionary = list
.Where(item => item.ContainsKey("name") && item.ContainsKey("value"))
.ToDictionary(item => item["name"], item => item["value"]);
return new ApiResponse<Dictionary<string, string>>(localVarStatusCode,
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
(Dictionary<string, string>)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Dictionary<string, string>)));
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
dictionary);
}

/// <summary>
Expand Down
10 changes: 9 additions & 1 deletion Conductor/Client/Authentication/TokenHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ private string GetTokenFromServer(OrkesAuthenticationSettings authenticationSett
{
try
{
return tokenClient.GenerateToken(tokenRequest)._token;
var token = tokenClient.GenerateToken(tokenRequest);
return token._token;
}
catch (ApiException e)
{
if (e.ErrorCode == 405 || e.ErrorCode == 404)
{
throw new Exception($"Error while getting authentication token. Is the config BasePath correct? {tokenClient.Configuration.BasePath}");
}
}
catch (Exception e)
{
Expand Down
28 changes: 15 additions & 13 deletions Conductor/Client/Extensions/ApiExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
using Conductor.Executor;
using Conductor.Client.Authentication;
using System;
using System.Diagnostics;

namespace Conductor.Client.Extensions
{
Expand All @@ -25,20 +24,25 @@ public class ApiExtensions
private const string ENV_SECRET = "SECRET";
private const int REST_CLIENT_REQUEST_TIME_OUT = 30 * 1000;

public static Configuration Configuration { get; set; }

static ApiExtensions()
{
Configuration = new Configuration(REST_CLIENT_REQUEST_TIME_OUT)
private static readonly Lazy<Configuration> _lazyConfiguration =
new Lazy<Configuration>(() => new Configuration(REST_CLIENT_REQUEST_TIME_OUT)
{
BasePath = GetEnvironmentVariable(ENV_ROOT_URI),
AuthenticationSettings = new OrkesAuthenticationSettings(
GetEnvironmentVariable(ENV_KEY_ID),
GetEnvironmentVariable(ENV_SECRET)
)
};
GetEnvironmentVariable(ENV_KEY_ID),
GetEnvironmentVariable(ENV_SECRET)
)
});

private static Configuration _customConfiguration;

public static Configuration Configuration
{
get => _customConfiguration ?? _lazyConfiguration.Value;
set => _customConfiguration = value;
}


public static WorkflowExecutor GetWorkflowExecutor()
{
return new WorkflowExecutor(
Expand Down Expand Up @@ -66,9 +70,7 @@ public static string GetWorkflowExecutionURL(string workflowId)

private static string GetEnvironmentVariable(string variable)
{
string value = Environment.GetEnvironmentVariable(variable);
Debug.Assert(value != null);
return value;
return Environment.GetEnvironmentVariable(variable) ?? throw new InvalidOperationException($"Environment variable '{variable}' is not set.");
}
}
}
21 changes: 2 additions & 19 deletions Conductor/Client/Models/WorkflowTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
Expand Down Expand Up @@ -225,24 +224,8 @@ public enum WorkflowTaskTypeEnum
/// <param name="workflowTaskType">workflowTaskType.</param>
public WorkflowTask(bool? asyncComplete = default(bool?), string caseExpression = default(string), string caseValueParam = default(string), Dictionary<string, List<WorkflowTask>> decisionCases = default(Dictionary<string, List<WorkflowTask>>), List<WorkflowTask> defaultCase = default(List<WorkflowTask>), List<string> defaultExclusiveJoinTask = default(List<string>), string description = default(string), string dynamicForkJoinTasksParam = default(string), string dynamicForkTasksInputParamName = default(string), string dynamicForkTasksParam = default(string), string dynamicTaskNameParam = default(string), string evaluatorType = default(string), string expression = default(string), List<List<WorkflowTask>> forkTasks = default(List<List<WorkflowTask>>), Dictionary<string, Object> inputParameters = default(Dictionary<string, Object>), List<string> joinOn = default(List<string>), string loopCondition = default(string), List<WorkflowTask> loopOver = default(List<WorkflowTask>), string name = default(string), bool? optional = default(bool?), bool? rateLimited = default(bool?), int? retryCount = default(int?), string scriptExpression = default(string), string sink = default(string), int? startDelay = default(int?), SubWorkflowParams subWorkflowParam = default(SubWorkflowParams), TaskDef taskDefinition = default(TaskDef), string taskReferenceName = default(string), string type = default(string), WorkflowTaskTypeEnum? workflowTaskType = default(WorkflowTaskTypeEnum?), Dictionary<string, StateChangeConfig> onStateChange = default(Dictionary<string, StateChangeConfig>))
{
// to ensure "name" is required (not null)
if (name == null)
{
throw new InvalidDataException("name is a required property for WorkflowTask and cannot be null");
}
else
{
this.Name = name;
}
// to ensure "taskReferenceName" is required (not null)
if (taskReferenceName == null)
{
throw new InvalidDataException("taskReferenceName is a required property for WorkflowTask and cannot be null");
}
else
{
this.TaskReferenceName = taskReferenceName;
}
this.TaskReferenceName = taskReferenceName;
this.Name = name;
this.AsyncComplete = asyncComplete;
this.CaseExpression = caseExpression;
this.CaseValueParam = caseValueParam;
Expand Down
2 changes: 1 addition & 1 deletion Conductor/conductor-csharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="System.ComponentModel.Annotations" Version="5.0.0" />
<PackageReference Include="System.Net.Http.Json" Version="6.0.0" />
<PackageReference Include="RestSharp.Serializers.NewtonsoftJson" Version="110.2.0" />
<PackageReference Include="RestSharp.Serializers.NewtonsoftJson" Version="110.2.0" />
<None Include="/package/Conductor/README.md" Pack="true" PackagePath="/" />
</ItemGroup>
</Project>
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ ARG CONDUCTOR_SERVER_URL
ENV KEY=${KEY}
ENV SECRET=${SECRET}
ENV CONDUCTOR_SERVER_URL=${CONDUCTOR_SERVER_URL}
COPY /csharp-examples /package/csharp-examples
COPY /Tests /package/Tests
WORKDIR /package/Tests
RUN dotnet test -l "console;verbosity=normal"
RUN dotnet test -p:DefineConstants=EXCLUDE_EXAMPLE_WORKERS -l "console;verbosity=normal"

FROM build as pack_release
ARG SDK_VERSION
Expand Down
2 changes: 1 addition & 1 deletion Tests/Worker/WorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async System.Threading.Tasks.Task TestWorkflowAsyncExecution()
var workflow = GetConductorWorkflow();
ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true);
var workflowIdList = await StartWorkflows(workflow, quantity: 15);
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20));
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(30));
await ValidateWorkflowCompletion(workflowIdList.ToArray());
}

Expand Down
10 changes: 5 additions & 5 deletions Tests/Worker/Workers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ static FunctionalWorkers()
_random = new Random();
}

// Polls for 5 task every 200ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")]
// Polls for 5 task every 100ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 100, "simpleWorker")]
public static TaskResult SimpleWorker(Conductor.Client.Models.Task task)
{
return task.Completed();
}

// Polls for 12 tasks every 420ms
[WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")]
// Polls for 5 tasks every 420ms
[WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 420, "lazyWorker")]
public TaskResult LazyWorker(Conductor.Client.Models.Task task)
{
var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048));
var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(100, 900));
System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult();
return task.Completed();
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/conductor-csharp.test.csproj
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp6.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../Conductor/conductor-csharp.csproj" />
<ProjectReference Include="../csharp-examples/csharp-examples.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="TestData\integration_data.json">
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public void AuditLog(object workflowInput, string status, string name)
}

[WorkerTask(taskType: "simple_task_1", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public static string SimpleTask1(Task task)
public static string SimpleTask1(Conductor.Client.Models.Task task)
{
return "OK";
}

[WorkerTask(taskType: "simple_task_2", batchSize: 5, pollIntervalMs: 200, workerId: "workerId")]
public static TaskResult SimpleTask2(Task task)
public static TaskResult SimpleTask2(Conductor.Client.Models.Task task)
{
return new TaskResult { Status = TaskResult.StatusEnum.FAILEDWITHTERMINALERROR };
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
12 changes: 10 additions & 2 deletions csharp-examples/TestWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

namespace csharp.examples;

public class TestWorker(string taskType) : IWorkflowTask
public class TestWorker : IWorkflowTask
{

private readonly Random rnd = new();

public string TaskType { get; } = taskType;
private readonly string taskType;

public TestWorker(string taskType)
{
this.taskType = taskType;
}

public string TaskType => taskType;
public WorkflowTaskExecutorConfiguration WorkerSettings { get; } = new WorkflowTaskExecutorConfiguration()
{
BatchSize = 20
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
*/
using Conductor.Client.Extensions;
using Conductor.Client.Interfaces;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Conductor.Examples.Utils
{
Expand All @@ -32,7 +29,7 @@ public static async Task<bool> StartBackGroundTask(ManualResetEvent waitHandle,
{
var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Information);
await host.StartAsync();
Thread.Sleep(20000);
Thread.Sleep(40000);
waitHandle.Set();
await host.StopAsync();
return true;
Expand Down
11 changes: 8 additions & 3 deletions csharp-examples/csharp-examples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>csharp_examples</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Windows</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.19.5" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets"
Version="1.19.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Conductor\conductor-csharp.csproj" />
<ProjectReference Include="../Conductor/conductor-csharp.csproj" />
</ItemGroup>
<ItemGroup Condition="'$(DefineConstants)' == 'EXCLUDE_EXAMPLE_WORKERS'">
<Compile Remove="Examples/**/*.cs" />
<Compile Remove="WorkflowExamples.cs" />
</ItemGroup>

</Project>

0 comments on commit eba0c7f

Please sign in to comment.