Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to address UpdateVariableFunctionality and Inprogress throwing error issue #100

Merged
merged 5 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions Conductor/Api/WorkflowResourceApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public interface IWorkflowResourceApi : IApiAccessor
/// <returns>WorkflowRun</returns>
WorkflowRun ExecuteWorkflow(StartWorkflowRequest body, string requestId, string name, int? version, string waitUntilTaskRef = null);

/// <summary>
/// Update the value of the workflow variables for the given workflow id
/// </summary>
/// <param name="workflow"></param>
/// <returns>ApiResponse of Object(void)</returns>
Object UpdateWorkflowVariables(Workflow workflow);
/// <summary>
/// Execute a workflow synchronously
/// </summary>
Expand Down Expand Up @@ -1029,6 +1035,73 @@ public ApiResponse<WorkflowRun> ExecuteWorkflowWithHttpInfo(StartWorkflowRequest
(WorkflowRun)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(WorkflowRun)));
}

public Object UpdateWorkflowVariables(Workflow workflow)
{
ApiResponse<Object> localVarResponse = UpdateWorkflowVariablesWithHttpInfo(workflow);
return localVarResponse.Data;
}

public ApiResponse<Object> UpdateWorkflowVariablesWithHttpInfo(Workflow workflow)
{
// verify the required parameter 'body' is set
if (workflow == null)
throw new ApiException(400, "Missing required parameter 'body' when calling WorkflowResourceApi->Update");

if (string.IsNullOrEmpty(workflow.WorkflowId))
throw new ApiException(400, "Missing required parameter 'WorkflowId' when calling WorkflowResourceApi->Update");

if (workflow.Variables == null)
throw new ApiException(400, "Missing required parameter 'Variables' when calling WorkflowResourceApi->Update");

var localVarPath = $"/workflow/{workflow.WorkflowId}/variables";
var localVarPathParams = new Dictionary<String, String>();
var localVarQueryParams = new List<KeyValuePair<String, String>>();
var localVarHeaderParams = new Dictionary<String, String>(this.Configuration.DefaultHeader);
var localVarFormParams = new Dictionary<String, String>();
var localVarFileParams = new Dictionary<String, FileParameter>();
Object localVarPostBody = null;

// to determine the Content-Type header
String[] localVarHttpContentTypes = new String[] {
"application/json"
};
String localVarHttpContentType = this.Configuration.ApiClient.SelectHeaderContentType(localVarHttpContentTypes);

// to determine the Accept header
String[] localVarHttpHeaderAccepts = new String[] {
"*/*"
};
String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts);
if (localVarHttpHeaderAccept != null)
localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept);

if (workflow != null && workflow.GetType() != typeof(byte[]))
{
localVarPostBody = this.Configuration.ApiClient.Serialize(workflow.Variables);
}

// authentication (api_key) required
if (!String.IsNullOrEmpty(this.Configuration.AccessToken))
{
localVarHeaderParams["X-Authorization"] = this.Configuration.AccessToken;
}

IRestResponse localVarResponse = (IRestResponse)this.Configuration.ApiClient.CallApi(localVarPath,
Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams,
localVarPathParams, localVarHttpContentType);
int localVarStatusCode = (int)localVarResponse.StatusCode;

if (ExceptionFactory != null)
{
Exception exception = ExceptionFactory("Update", localVarResponse);
if (exception != null) throw exception;
}

return new ApiResponse<Object>(localVarStatusCode,
localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)),
(Object)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Object)));
}

/// <summary>
/// Gets the workflow by workflow id
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Conductor/Client/Extensions/ConductorTaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static TaskResult InProgress(this Task task, string log = null, long? cal
{
new TaskExecLog { TaskId = task.TaskId, Log = log, CreatedTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }
},
callbackAfterSeconds: callbackAfterSeconds.Value
callbackAfterSeconds: callbackAfterSeconds
);
}

Expand Down
7 changes: 7 additions & 0 deletions Conductor/Definition/ConductorWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public ConductorWorkflow WithOutputParameter(string key, object value)
return this;
}

public ConductorWorkflow WithVariable(string key, object value)
{
if (Variables == null) // if workflow does not have any variables, initialize with empty collection
Variables = new Dictionary<string, object>();
Variables.Add(key, value);
return this;
}
public ConductorWorkflow WithOwner(string ownerEmail)
{
OwnerEmail = ownerEmail;
Expand Down
97 changes: 97 additions & 0 deletions Tests/Api/WorkflowResourceApiTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using conductor.csharp.Client.Extensions;
using Conductor.Api;
using Conductor.Client.Extensions;
using Conductor.Client.Models;
using Conductor.Definition;
using Conductor.Definition.TaskType;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Tests.Worker;
using Xunit;

namespace conductor_csharp.test.Api
{
public class WorkflowResourceApiTest
{
private const string WORKFLOW_NAME = "TestToCreateVariables";
private const string TASK_NAME = "TestToCreateVariables_Task";
private const string WORKFLOW_VARIABLE_1 = "TestVariable1";
private const string WORKFLOW_VARIABLE_2 = "TestVariable2";
private const string WORKFLOW_DESC = "Test Workflow With Variables";
private const int WORKFLOW_VERSION = 1;

private readonly WorkflowResourceApi _workflowClient;
private readonly ILogger _logger;

public WorkflowResourceApiTest()
{
_workflowClient = ApiExtensions.GetClient<WorkflowResourceApi>();
_logger = ApplicationLogging.CreateLogger<WorkerTests>();
}

[Fact]
public async void UpdateWorkflowVariables()
{
// Prepare workflow
var _workflow = GetConductorWorkflow();
ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(_workflow, true);
var workflowId = ApiExtensions.GetWorkflowExecutor().StartWorkflow(_workflow);
await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20));
await ValidateWorkflowCompletion(workflowId);

// Create variables collection with values to be updated
var updateDict = new Dictionary<string, object> {
{WORKFLOW_VARIABLE_1,"Value1" },
{WORKFLOW_VARIABLE_2,"Value2" },
};
var updateVariableData = new Workflow() { WorkflowId = workflowId, Variables = updateDict };
// Update the work flow variables
_workflowClient.UpdateWorkflowVariables(updateVariableData);
// Fetch latest workflow data to validate the change in variables
var _updatedWorkFlow = _workflowClient.GetWorkflowStatusSummary(workflowId, includeVariables: true);
// Verify workflow variables data is equal with input passed
Assert.Equal(_updatedWorkFlow.Variables, updateDict);
}

private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout)
{
var host = WorkflowTaskHost.CreateWorkerHost(LogLevel.Information, new ClassWorker());
await host.StartAsync();
Thread.Sleep(workflowCompletionTimeout);
await host.StopAsync();
}

private ConductorWorkflow GetConductorWorkflow()
{
return new ConductorWorkflow()
.WithName(WORKFLOW_NAME)
.WithVersion(WORKFLOW_VERSION)
.WithDescription(WORKFLOW_DESC)
.WithTask(new SimpleTask(TASK_NAME, TASK_NAME))
.WithVariable(WORKFLOW_VARIABLE_1, $"{WORKFLOW_VARIABLE_1}_Value")
.WithVariable(WORKFLOW_VARIABLE_2, $"{WORKFLOW_VARIABLE_2}_Value");
}

private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params string[] workflowIdList)
{
var workflowStatusList = await WorkflowExtensions.GetWorkflowStatusList(
_workflowClient,
maxAllowedInParallel: 10,
workflowIdList
);
var incompleteWorkflowCounter = 0;
workflowStatusList.ToList().ForEach(wf =>
{
if (wf.Status.Value != WorkflowStatus.StatusEnum.COMPLETED)
{
incompleteWorkflowCounter += 1;
_logger.LogInformation($"Workflow not completed, workflowId: {wf.WorkflowId}");
}
});
Assert.Equal(0, incompleteWorkflowCounter);
}
}
}
4 changes: 2 additions & 2 deletions csharp-examples/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public async void StartTasks()
}
}

while (true)
Thread.Sleep(TimeSpan.FromDays(1)); // after 1 year will stop the service
while (true)
Thread.Sleep(TimeSpan.FromDays(1));// after 1 year will stop the service

}
catch (Exception e)
Expand Down
77 changes: 77 additions & 0 deletions csharp-examples/WorkFlowExamples.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using Conductor.Client;
using Conductor.Definition.TaskType;
using Conductor.Definition;
using Conductor.Executor;
using Conductor.Api;
using Conductor.Client.Authentication;

namespace csharp_examples
{
public class WorkFlowExamples
{

private const string KEY_ID = "<REPLACE_WITH_KEY_ID>";
private const string KEY_SECRET = "<REPLACE_WITH_KEY_SECRET>";
private const string OWNER_EMAIL = "<REPLACE_WITH_OWNER_EMAIL>";

private const string WORKFLOW_ID = "<REPLACE_WITH_WORKFLOW_ID>";
private const string WORKFLOW_NAME = "<REPLACE_WITH_WORKFLOW_NAME>";
private const string WORKFLOW_DESCRIPTION = "<REPLACE_WITH_WORKFLOW_DESCRIPTION>";
private const string TASK_NAME = "<REPLACE_WITH_TASK_NAME >";
private const string TASK_REFERENCE = "<REPLACE_WITH_TASK_REFERENCE_NAME>";

private const string VARIABLE_OLD_VALUE = "SOME_OLD_VALUE";
private const string VARIABLE_NAME_1 = "<REPLACE_WITH_VARIABLE_NAME_1>";
private const string VARIABLE_NEW_VALUE_1 = "<REPLACE_WITH_OWNER_VALUE_1>";
private const string VARIABLE_NAME_2 = "<REPLACE_WITH_VARIABLE_NAME_2>";
private const string VARIABLE_NEW_VALUE_2 = "<REPLACE_WITH_OWNER_VALUE_2>";


public void RegisterWorkFlow()
{
Configuration configuration = new Configuration()
{
AuthenticationSettings = new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET)
};

WorkflowExecutor executor = new WorkflowExecutor(configuration);
executor.RegisterWorkflow(GetConductorWorkflow(), true);
}

private ConductorWorkflow GetConductorWorkflow()
{
var conductorWorkFlow = new ConductorWorkflow()
.WithName(WORKFLOW_NAME).WithDescription(WORKFLOW_DESCRIPTION)
.WithTask(new SimpleTask(TASK_NAME, TASK_REFERENCE))
.WithOwner(OWNER_EMAIL);

var workflowVariableTobeAdded = new Dictionary<string, object>
{
{ VARIABLE_NAME_1, VARIABLE_OLD_VALUE},
{ VARIABLE_NAME_2, VARIABLE_OLD_VALUE }
};

conductorWorkFlow.Variables = workflowVariableTobeAdded;
return conductorWorkFlow;
}

public void UpdateWorkflowVariablesWithWorkFlowId()
{
var orkesApiClient = new OrkesApiClient(new Configuration(),
new OrkesAuthenticationSettings(KEY_ID, KEY_SECRET));
var workflowClient = orkesApiClient.GetClient<WorkflowResourceApi>();
var workFlowVariables = new Dictionary<string, object>
{
{ VARIABLE_NAME_1, VARIABLE_NEW_VALUE_1 },
{ VARIABLE_NAME_2, VARIABLE_NEW_VALUE_2 }
};

workflowClient.UpdateWorkflowVariables(new Conductor.Client.Models.Workflow()
{
WorkflowId = WORKFLOW_ID,
Variables = workFlowVariables
});
}

}
}
Loading