From 135476888a5c07404ceae90d1e3f194bd4938c2e Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Wed, 31 Jan 2024 13:37:32 -0800 Subject: [PATCH] add support for new apis --- Conductor/Api/IWorkflowResourceApi.cs | 67 ++++- Conductor/Api/TaskResourceApi.cs | 141 ++++++++- Conductor/Api/WorkflowResourceApi.cs | 142 +++++++-- .../Client/Models/StartWorkflowRequest.cs | 12 +- Conductor/Client/Models/TaskDef.cs | 20 +- Conductor/Client/Models/TaskResult.cs | 20 +- .../Client/Models/WorkflowStateUpdate.cs | 140 +++++++++ Tests/Api/WorkflowResourceApiTest.cs | 270 +++++++++++++++++- Tests/Worker/Workers.cs | 96 +++---- Tests/conductor-csharp.test.csproj | 4 +- 10 files changed, 785 insertions(+), 127 deletions(-) create mode 100644 Conductor/Client/Models/WorkflowStateUpdate.cs diff --git a/Conductor/Api/IWorkflowResourceApi.cs b/Conductor/Api/IWorkflowResourceApi.cs index 9fb1468e..48b9282f 100644 --- a/Conductor/Api/IWorkflowResourceApi.cs +++ b/Conductor/Api/IWorkflowResourceApi.cs @@ -73,9 +73,19 @@ public interface IWorkflowResourceApi : IApiAccessor /// /// Update the value of the workflow variables for the given workflow id /// - /// - /// ApiResponse of Object(void) - Object UpdateWorkflowVariables(Workflow workflow); + /// + /// /// + /// Workflow + Workflow UpdateWorkflowVariables(string workflowId, Dictionary variables); + + /// + /// Update the value of the workflow variables for the given workflow id and return api response + /// + /// + /// /// + /// Workflow + ApiResponse UpdateWorkflowVariablesWithHttpInfo(string workflowId, Dictionary variables); + /// /// Execute a workflow synchronously /// @@ -672,6 +682,57 @@ public interface IWorkflowResourceApi : IApiAccessor /// /// ApiResponse of Workflow ApiResponse TestWorkflowWithHttpInfo(WorkflowTestRequest request); + + /// + /// Update a workflow state by updating variables or in progress task Updates the workflow variables, tasks and triggers evaluation. + /// + /// Thrown when fails to make API call + /// + /// + /// (optional) + /// (optional, default to 10) + /// WorkflowRun + WorkflowRun UpdateWorkflow(string workflowId, WorkflowStateUpdate request, + List waitUntilTaskRefs = null, int? waitForSeconds = null); + + + /// + /// Update a workflow state by updating variables or in progress task Updates the workflow variables, tasks and triggers evaluation. + /// + /// Thrown when fails to make API call + /// + /// + /// (optional) + /// (optional, default to 10) + /// WorkflowRun + ApiResponse UpdateWorkflowWithHttpInfo(string workflowId, WorkflowStateUpdate request, + List waitUntilTaskRefs = null, int? waitForSeconds = null); + + /// + /// Gets the workflow by workflow id + /// + /// + /// + /// + /// Thrown when fails to make API call + /// + /// (optional, default to true) + /// Workflow + Workflow GetWorkflow(string workflowId, bool includeTasks); + + + /// + /// Gets the workflow by workflow id + /// + /// + /// + /// + /// Thrown when fails to make API call + /// + /// (optional, default to true) + /// Workflow + ApiResponse GetWorkflowWithHttpInfo(string workflowId, bool includeTasks); + #endregion Synchronous Operations } } \ No newline at end of file diff --git a/Conductor/Api/TaskResourceApi.cs b/Conductor/Api/TaskResourceApi.cs index f25b67e3..9c801518 100644 --- a/Conductor/Api/TaskResourceApi.cs +++ b/Conductor/Api/TaskResourceApi.cs @@ -396,6 +396,38 @@ public interface ITaskResourceApi : IApiAccessor /// /// ApiResponse of string ApiResponse UpdateTaskWithHttpInfo(Dictionary body, string workflowId, string taskRefName, string status, string workerid = null); + + + //aa + /// + /// Update a task By Ref Name, evaluates the workflow and returns the updated workflow + /// + /// + /// + /// + /// Thrown when fails to make API call + /// + /// + /// + /// + /// (optional) + /// Workflow + Workflow UpdateTaskSync(Dictionary output, string workflowId, string taskRefName, TaskResult.StatusEnum status, string workerid = null); + + /// + /// Update a task By Ref Name + /// + /// + /// + /// + /// Thrown when fails to make API call + /// + /// + /// + /// + /// ApiResponse of Workflow + ApiResponse UpdateTaskSyncWithHttpInfo(Dictionary output, string workflowId, string taskRefName, TaskResult.StatusEnum status, string workerid = null); + //aaa #endregion Synchronous Operations } @@ -1704,14 +1736,107 @@ public ApiResponse UpdateTaskWithHttpInfo(Dictionary bod (string)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(string))); } - // public ExternalStorageLocation GetExternalStorageLocation(string path, string operation, string payloadType) - // { - // throw new NotImplementedException(); - // } + /// + /// Update a task By Ref Name + /// + /// Thrown when fails to make API call + /// + /// + /// + /// + /// (optional) + /// string + public Workflow UpdateTaskSync(Dictionary output, string workflowId, string taskRefName, TaskResult.StatusEnum status, string workerid = null) + { + ApiResponse localVarResponse = UpdateTaskSyncWithHttpInfo(output, workflowId, taskRefName, status, workerid); + return localVarResponse.Data; + } + + /// + /// Update a task By Ref Name + /// + /// Thrown when fails to make API call + /// + /// + /// + /// + /// ApiResponse of string + public ApiResponse UpdateTaskSyncWithHttpInfo(Dictionary output, string workflowId, string taskRefName, TaskResult.StatusEnum status, string workerid = null) + { + // verify the required parameter 'body' is set + if (output == null) + throw new ApiException(400, "Missing required parameter 'body' when calling TaskResourceApi->UpdateTask"); + // verify the required parameter 'workflowId' is set + if (workflowId == null) + throw new ApiException(400, "Missing required parameter 'workflowId' when calling TaskResourceApi->UpdateTask"); + // verify the required parameter 'taskRefName' is set + if (taskRefName == null) + throw new ApiException(400, "Missing required parameter 'taskRefName' when calling TaskResourceApi->UpdateTask"); + // verify the required parameter 'status' is set + if (status == null) + throw new ApiException(400, "Missing required parameter 'status' when calling TaskResourceApi->UpdateTask"); + + var localVarPath = "/tasks/{workflowId}/{taskRefName}/{status}/sync"; + var localVarPathParams = new Dictionary(); + var localVarQueryParams = new List>(); + var localVarHeaderParams = new Dictionary(this.Configuration.DefaultHeader); + var localVarFormParams = new Dictionary(); + var localVarFileParams = new Dictionary(); + Object localVarPostBody = null; + + // to determine the Content-Type header + String[] localVarHttpContentTypes = new String[] { + "application/json" + }; + String localVarHttpContentType = this.Configuration.ApiClient.SelectHeaderContentType(localVarHttpContentTypes); + + // to determine the Accept header + String[] localVarHttpHeaderAccepts = new String[] { + "text/plain" + }; + String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts); + if (localVarHttpHeaderAccept != null) + localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept); + + if (workerid == null) + { + workerid = Environment.MachineName; + } - // public ApiResponse GetExternalStorageLocationWithHttpInfo(string path, string operation, string payloadType) - // { - // throw new NotImplementedException(); - // } + if (workflowId != null) localVarPathParams.Add("workflowId", this.Configuration.ApiClient.ParameterToString(workflowId)); // path parameter + if (taskRefName != null) localVarPathParams.Add("taskRefName", this.Configuration.ApiClient.ParameterToString(taskRefName)); // path parameter + if (status != null) localVarPathParams.Add("status", this.Configuration.ApiClient.ParameterToString(status)); // path parameter + if (workerid != null) localVarQueryParams.AddRange(this.Configuration.ApiClient.ParameterToKeyValuePairs("", "workerid", workerid)); // query parameter + if (output != null && output.GetType() != typeof(byte[])) + { + localVarPostBody = this.Configuration.ApiClient.Serialize(output); // http body (model) parameter + } + else + { + localVarPostBody = output; // byte array + } + // authentication (api_key) required + if (!String.IsNullOrEmpty(this.Configuration.AccessToken)) + { + localVarHeaderParams["X-Authorization"] = this.Configuration.AccessToken; + } + + // make the HTTP request + IRestResponse localVarResponse = (IRestResponse)this.Configuration.ApiClient.CallApi(localVarPath, + Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams, + localVarPathParams, localVarHttpContentType); + + int localVarStatusCode = (int)localVarResponse.StatusCode; + + if (ExceptionFactory != null) + { + Exception exception = ExceptionFactory("UpdateTask", localVarResponse); + if (exception != null) throw exception; + } + + return new ApiResponse(localVarStatusCode, + localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)), + (Workflow)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Workflow))); + } } } diff --git a/Conductor/Api/WorkflowResourceApi.cs b/Conductor/Api/WorkflowResourceApi.cs index 2cc166d9..bf46ae56 100644 --- a/Conductor/Api/WorkflowResourceApi.cs +++ b/Conductor/Api/WorkflowResourceApi.cs @@ -321,25 +321,24 @@ public ApiResponse ExecuteWorkflowWithHttpInfo(StartWorkflowRequest (WorkflowRun)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(WorkflowRun))); } - public Object UpdateWorkflowVariables(Workflow workflow) + public Workflow UpdateWorkflowVariables(string workflowId, Dictionary variables) { - ApiResponse localVarResponse = UpdateWorkflowVariablesWithHttpInfo(workflow); + ApiResponse localVarResponse = UpdateWorkflowVariablesWithHttpInfo(workflowId, variables); return localVarResponse.Data; } - public ApiResponse UpdateWorkflowVariablesWithHttpInfo(Workflow workflow) + public ApiResponse UpdateWorkflowVariablesWithHttpInfo(string workflowId, Dictionary variables) { // verify the required parameter 'body' is set - if (workflow == null) - throw new ApiException(400, "Missing required parameter 'body' when calling WorkflowResourceApi->Update"); + if (workflowId == null) + throw new ApiException(400, "Missing required parameter 'workflowId' when calling WorkflowResourceApi->UpdateWorkflowVariables"); - if (string.IsNullOrEmpty(workflow.WorkflowId)) - throw new ApiException(400, "Missing required parameter 'WorkflowId' when calling WorkflowResourceApi->Update"); + - if (workflow.Variables == null) - throw new ApiException(400, "Missing required parameter 'Variables' when calling WorkflowResourceApi->Update"); + if (variables == null) + throw new ApiException(400, "Missing required parameter 'variables' when calling WorkflowResourceApi->UpdateWorkflowVariables"); - var localVarPath = $"/workflow/{workflow.WorkflowId}/variables"; + var localVarPath = $"/workflow/{workflowId}/variables"; var localVarPathParams = new Dictionary(); var localVarQueryParams = new List>(); var localVarHeaderParams = new Dictionary(this.Configuration.DefaultHeader); @@ -360,11 +359,8 @@ public ApiResponse UpdateWorkflowVariablesWithHttpInfo(Workflow workflow String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts); if (localVarHttpHeaderAccept != null) localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept); - - if (workflow != null && workflow.GetType() != typeof(byte[])) - { - localVarPostBody = this.Configuration.ApiClient.Serialize(workflow.Variables); - } + + localVarPostBody = this.Configuration.ApiClient.Serialize(variables); // authentication (api_key) required if (!String.IsNullOrEmpty(this.Configuration.AccessToken)) @@ -383,9 +379,9 @@ public ApiResponse UpdateWorkflowVariablesWithHttpInfo(Workflow workflow if (exception != null) throw exception; } - return new ApiResponse(localVarStatusCode, + return new ApiResponse(localVarStatusCode, localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)), - (Object)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Object))); + (Workflow)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Workflow))); } /// @@ -2258,5 +2254,117 @@ public ApiResponse UploadCompletedWorkflowsWithHttpInfo() localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)), (Object)this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(Object))); } + + /// + /// Update a workflow state by updating variables or in progress task Updates the workflow variables, tasks and triggers evaluation. + /// + /// Thrown when fails to make API call + /// + /// + /// (optional) + /// (optional, default to 10) + /// WorkflowRun + public WorkflowRun UpdateWorkflow (string workflowId, WorkflowStateUpdate request, + List waitUntilTaskRefs = null, int? waitForSeconds = null) + { + ApiResponse localVarResponse = UpdateWorkflowWithHttpInfo(workflowId, request, waitUntilTaskRefs, waitForSeconds); + return localVarResponse.Data; + } + + + /// + /// Update a workflow state by updating variables or in progress task Updates the workflow variables, tasks and triggers evaluation. + /// + /// Thrown when fails to make API call + /// + /// + /// (optional) + /// (optional, default to 10) + /// ApiResponse of WorkflowRun + public ApiResponse< WorkflowRun > UpdateWorkflowWithHttpInfo (string workflowId, WorkflowStateUpdate request, + List waitUntilTaskRefs = null, int? waitForSeconds = null) + { + string requestId = Guid.NewGuid().ToString(); + string waitUntilTaskRef = waitUntilTaskRefs != null && waitUntilTaskRefs.Count > 0 ? + waitUntilTaskRefs.Aggregate((a, b) => a + ", " + b) : null; + + // verify the required parameter 'request' is set + if (request == null) + throw new ApiException(400, "Missing required parameter 'request' when calling WorkflowResourceApi->UpdateWorkflowAndTaskState"); + // verify the required parameter 'requestId' is set + if (requestId == null) + throw new ApiException(400, "Missing required parameter 'requestId' when calling WorkflowResourceApi->UpdateWorkflowAndTaskState"); + // verify the required parameter 'workflowId' is set + if (workflowId == null) + throw new ApiException(400, "Missing required parameter 'workflowId' when calling WorkflowResourceApi->UpdateWorkflowAndTaskState"); + + var localVarPath = "/workflow/{workflowId}/state"; + var localVarPathParams = new Dictionary(); + var localVarQueryParams = new List>(); + var localVarHeaderParams = new Dictionary(this.Configuration.DefaultHeader); + var localVarFormParams = new Dictionary(); + var localVarFileParams = new Dictionary(); + Object localVarPostBody = null; + + // to determine the Content-Type header + String[] localVarHttpContentTypes = new String[] { + "application/json" + }; + String localVarHttpContentType = this.Configuration.ApiClient.SelectHeaderContentType(localVarHttpContentTypes); + + // to determine the Accept header + String[] localVarHttpHeaderAccepts = new String[] { + "*/*" + }; + String localVarHttpHeaderAccept = this.Configuration.ApiClient.SelectHeaderAccept(localVarHttpHeaderAccepts); + if (localVarHttpHeaderAccept != null) + localVarHeaderParams.Add("Accept", localVarHttpHeaderAccept); + + if (workflowId != null) localVarPathParams.Add("workflowId", this.Configuration.ApiClient.ParameterToString(workflowId)); // path parameter + if (requestId != null) localVarQueryParams.AddRange(this.Configuration.ApiClient.ParameterToKeyValuePairs("", "requestId", requestId)); // query parameter + if (waitUntilTaskRef != null) localVarQueryParams.AddRange(this.Configuration.ApiClient.ParameterToKeyValuePairs("", "waitUntilTaskRef", waitUntilTaskRef)); // query parameter + if (waitForSeconds != null) localVarQueryParams.AddRange(this.Configuration.ApiClient.ParameterToKeyValuePairs("", "waitForSeconds", waitForSeconds)); // query parameter + if (request != null && request.GetType() != typeof(byte[])) + { + localVarPostBody = this.Configuration.ApiClient.Serialize(request); // http body (model) parameter + } + else + { + localVarPostBody = request; // byte array + } + + // authentication (api_key) required + if (!String.IsNullOrEmpty(this.Configuration.AccessToken)) + { + localVarHeaderParams["X-Authorization"] = this.Configuration.AccessToken; + } + + // make the HTTP request + IRestResponse localVarResponse = (IRestResponse) this.Configuration.ApiClient.CallApi(localVarPath, + Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams, + localVarPathParams, localVarHttpContentType); + + int localVarStatusCode = (int) localVarResponse.StatusCode; + + if (ExceptionFactory != null) + { + Exception exception = ExceptionFactory("UpdateWorkflowAndTaskState", localVarResponse); + if (exception != null) throw exception; + } + + return new ApiResponse(localVarStatusCode, + localVarResponse.Headers.ToDictionary(x => x.Name, x => string.Join(",", x.Value)), + (WorkflowRun) this.Configuration.ApiClient.Deserialize(localVarResponse, typeof(WorkflowRun))); + } + + public Workflow GetWorkflow(string workflowId, bool includeTasks) + { + return GetExecutionStatus(workflowId, includeTasks); + } + + public ApiResponse GetWorkflowWithHttpInfo(string workflowId, bool includeTasks) + { + return GetExecutionStatusWithHttpInfo(workflowId, includeTasks); + } } } diff --git a/Conductor/Client/Models/StartWorkflowRequest.cs b/Conductor/Client/Models/StartWorkflowRequest.cs index e83bb2de..0d67d0c4 100644 --- a/Conductor/Client/Models/StartWorkflowRequest.cs +++ b/Conductor/Client/Models/StartWorkflowRequest.cs @@ -27,6 +27,8 @@ public partial class StartWorkflowRequest : IEquatable, IV /// taskToDomain. /// version. /// workflowDef. + /// idempotencyKey. + /// idempotencyStrategy. public StartWorkflowRequest(string correlationId = default(string), string createdBy = default(string), string externalInputPayloadStoragePath = default(string), Dictionary input = default(Dictionary), @@ -34,7 +36,9 @@ public partial class StartWorkflowRequest : IEquatable, IV int? priority = default(int?), Dictionary taskToDomain = default(Dictionary), int? version = default(int?), - WorkflowDef workflowDef = default(WorkflowDef) + WorkflowDef workflowDef = default(WorkflowDef), + string idempotencyKey = default(string), + IdempotencyStrategy idempotencyStrategy = IdempotencyStrategy.FAIL ) { this.Name = name; @@ -46,12 +50,14 @@ public partial class StartWorkflowRequest : IEquatable, IV this.TaskToDomain = taskToDomain; this.Version = version; this.WorkflowDef = workflowDef; - // this.IdempotencyKey = idempotencyKey; - // this.IdempotencyStrategy = idempotencyStrategy; + this.IdempotencyKey = idempotencyKey; + this.IdempotencyStrategy = idempotencyStrategy; } + [DataMember(Name = "idempotencyStrategy", EmitDefaultValue = true)] public IdempotencyStrategy IdempotencyStrategy { get; set; } + [DataMember(Name = "idempotencyKey", EmitDefaultValue = false)] public string IdempotencyKey { get; set; } /// diff --git a/Conductor/Client/Models/TaskDef.cs b/Conductor/Client/Models/TaskDef.cs index 789c15ee..db48d7f2 100644 --- a/Conductor/Client/Models/TaskDef.cs +++ b/Conductor/Client/Models/TaskDef.cs @@ -99,24 +99,8 @@ public enum TimeoutPolicyEnum /// updatedBy. public TaskDef(int? backoffScaleFactor = default(int?), int? concurrentExecLimit = default(int?), long? createTime = default(long?), string createdBy = default(string), string description = default(string), string executionNameSpace = default(string), List inputKeys = default(List), Dictionary inputTemplate = default(Dictionary), string isolationGroupId = default(string), string name = default(string), List outputKeys = default(List), string ownerApp = default(string), string ownerEmail = default(string), int? pollTimeoutSeconds = default(int?), int? rateLimitFrequencyInSeconds = default(int?), int? rateLimitPerFrequency = default(int?), long? responseTimeoutSeconds = default(long?), int? retryCount = default(int?), int? retryDelaySeconds = default(int?), RetryLogicEnum? retryLogic = default(RetryLogicEnum?), TimeoutPolicyEnum? timeoutPolicy = default(TimeoutPolicyEnum?), long? timeoutSeconds = default(long?), long? updateTime = default(long?), string updatedBy = default(string)) { - // to ensure "name" is required (not null) - if (name == null) - { - throw new InvalidDataException("name is a required property for TaskDef and cannot be null"); - } - else - { - this.Name = name; - } - // to ensure "timeoutSeconds" is required (not null) - if (timeoutSeconds == null) - { - throw new InvalidDataException("timeoutSeconds is a required property for TaskDef and cannot be null"); - } - else - { - this.TimeoutSeconds = timeoutSeconds; - } + this.TimeoutSeconds = timeoutSeconds; + this.Name = name; this.BackoffScaleFactor = backoffScaleFactor; this.ConcurrentExecLimit = concurrentExecLimit; this.CreateTime = createTime; diff --git a/Conductor/Client/Models/TaskResult.cs b/Conductor/Client/Models/TaskResult.cs index 38acbcd9..84bc9f18 100644 --- a/Conductor/Client/Models/TaskResult.cs +++ b/Conductor/Client/Models/TaskResult.cs @@ -63,24 +63,8 @@ public enum StatusEnum /// workflowInstanceId (required). public TaskResult(long? callbackAfterSeconds = default(long?), string externalOutputPayloadStoragePath = default(string), List logs = default(List), Dictionary outputData = default(Dictionary), string reasonForIncompletion = default(string), StatusEnum? status = default(StatusEnum?), string subWorkflowId = default(string), string taskId = default(string), string workerId = default(string), string workflowInstanceId = default(string)) { - // to ensure "taskId" is required (not null) - if (taskId == null) - { - throw new InvalidDataException("taskId is a required property for TaskResult and cannot be null"); - } - else - { - this.TaskId = taskId; - } - // to ensure "workflowInstanceId" is required (not null) - if (workflowInstanceId == null) - { - throw new InvalidDataException("workflowInstanceId is a required property for TaskResult and cannot be null"); - } - else - { - this.WorkflowInstanceId = workflowInstanceId; - } + this.WorkflowInstanceId = workflowInstanceId; + this.TaskId = taskId; this.CallbackAfterSeconds = callbackAfterSeconds; this.ExternalOutputPayloadStoragePath = externalOutputPayloadStoragePath; this.Logs = logs; diff --git a/Conductor/Client/Models/WorkflowStateUpdate.cs b/Conductor/Client/Models/WorkflowStateUpdate.cs new file mode 100644 index 00000000..f1f35424 --- /dev/null +++ b/Conductor/Client/Models/WorkflowStateUpdate.cs @@ -0,0 +1,140 @@ +using System; +using System.Text; +using System.Collections.Generic; +using System.Runtime.Serialization; +using Newtonsoft.Json; +using System.ComponentModel.DataAnnotations; +using System.Linq; + +namespace Conductor.Client.Models +{ + /// + /// WorkflowStateUpdate + /// + [DataContract] + public partial class WorkflowStateUpdate : IEquatable, IValidatableObject + { + /// + /// Initializes a new instance of the class. + /// + /// taskReferenceName. + /// taskResult. + /// variables. + public WorkflowStateUpdate(string taskReferenceName = default(string), TaskResult taskResult = default(TaskResult), Dictionary variables = default(Dictionary)) + { + this.TaskReferenceName = taskReferenceName; + this.TaskResult = taskResult; + this.Variables = variables; + } + + /// + /// Gets or Sets TaskReferenceName + /// + [DataMember(Name="taskReferenceName", EmitDefaultValue=false)] + public string TaskReferenceName { get; set; } + + /// + /// Gets or Sets TaskResult + /// + [DataMember(Name="taskResult", EmitDefaultValue=false)] + public TaskResult TaskResult { get; set; } + + /// + /// Gets or Sets Variables + /// + [DataMember(Name="variables", EmitDefaultValue=false)] + public Dictionary Variables { get; set; } + + /// + /// Returns the string presentation of the object + /// + /// String presentation of the object + public override string ToString() + { + var sb = new StringBuilder(); + sb.Append("class WorkflowStateUpdate {\n"); + sb.Append(" TaskReferenceName: ").Append(TaskReferenceName).Append("\n"); + sb.Append(" TaskResult: ").Append(TaskResult).Append("\n"); + sb.Append(" Variables: ").Append(Variables).Append("\n"); + sb.Append("}\n"); + return sb.ToString(); + } + + /// + /// Returns the JSON string presentation of the object + /// + /// JSON string presentation of the object + public virtual string ToJson() + { + return JsonConvert.SerializeObject(this, Formatting.Indented); + } + + /// + /// Returns true if objects are equal + /// + /// Object to be compared + /// Boolean + public override bool Equals(object input) + { + return this.Equals(input as WorkflowStateUpdate); + } + + /// + /// Returns true if WorkflowStateUpdate instances are equal + /// + /// Instance of WorkflowStateUpdate to be compared + /// Boolean + public bool Equals(WorkflowStateUpdate input) + { + if (input == null) + return false; + + return + ( + this.TaskReferenceName == input.TaskReferenceName || + (this.TaskReferenceName != null && + this.TaskReferenceName.Equals(input.TaskReferenceName)) + ) && + ( + this.TaskResult == input.TaskResult || + (this.TaskResult != null && + this.TaskResult.Equals(input.TaskResult)) + ) && + ( + this.Variables == input.Variables || + this.Variables != null && + input.Variables != null && + this.Variables.SequenceEqual(input.Variables) + ); + } + + /// + /// Gets the hash code + /// + /// Hash code + public override int GetHashCode() + { + unchecked // Overflow is fine, just wrap + { + int hashCode = 41; + if (this.TaskReferenceName != null) + hashCode = hashCode * 59 + this.TaskReferenceName.GetHashCode(); + if (this.TaskResult != null) + hashCode = hashCode * 59 + this.TaskResult.GetHashCode(); + if (this.Variables != null) + hashCode = hashCode * 59 + this.Variables.GetHashCode(); + return hashCode; + } + } + + /// + /// To validate all properties of the instance + /// + /// Validation context + /// Validation Result + IEnumerable IValidatableObject.Validate(ValidationContext validationContext) + { + yield break; + } + } +} \ No newline at end of file diff --git a/Tests/Api/WorkflowResourceApiTest.cs b/Tests/Api/WorkflowResourceApiTest.cs index 343c3bc7..c53af19d 100644 --- a/Tests/Api/WorkflowResourceApiTest.cs +++ b/Tests/Api/WorkflowResourceApiTest.cs @@ -4,18 +4,23 @@ using Conductor.Client.Models; using Conductor.Definition; using Conductor.Definition.TaskType; -using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; +using Conductor.Client; +using Microsoft.Extensions.Logging; using Tests.Worker; using Xunit; +using Xunit.Abstractions; +using Task = Conductor.Client.Models.Task; namespace conductor_csharp.test.Api { public class WorkflowResourceApiTest { + private readonly ITestOutputHelper _testOutputHelper; private const string WORKFLOW_NAME = "TestToCreateVariables"; private const string TASK_NAME = "TestToCreateVariables_Task"; private const string WORKFLOW_VARIABLE_1 = "TestVariable1"; @@ -24,14 +29,151 @@ public class WorkflowResourceApiTest private const int WORKFLOW_VERSION = 1; private readonly WorkflowResourceApi _workflowClient; + private readonly ITaskResourceApi _taskClient; private readonly ILogger _logger; - public WorkflowResourceApiTest() + public WorkflowResourceApiTest(ITestOutputHelper testOutputHelper) { + _testOutputHelper = testOutputHelper; _workflowClient = ApiExtensions.GetClient(); + _taskClient = ApiExtensions.GetClient(); _logger = ApplicationLogging.CreateLogger(); } + [Fact] + public void TestWorkflowOperations() + { + // Start Workflow + var correlationId = Guid.NewGuid().ToString(); + var startWorkflowRequest = new StartWorkflowRequest + { + Name = "csharp_sync_task_variable_updates", + Version = 1, + Input = new Dictionary(), + CorrelationId = correlationId + }; + var workflowId = _workflowClient.StartWorkflow(startWorkflowRequest); + _testOutputHelper.WriteLine($"Started workflow with id {workflowId}"); + + // Update a variable inside the workflow + _workflowClient.UpdateWorkflowVariables(workflowId, new Dictionary { { "case", "case1" } }); + + // Get workflow execution status + Workflow workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Task lastTask = workflow.Tasks.Last(); + _testOutputHelper.WriteLine( + $"Workflow status is {workflow.Status} and currently running task is {lastTask.ReferenceTaskName}"); + + workflow = _taskClient.UpdateTaskSync(new Dictionary { { "a", "b" } }, workflowId, lastTask.ReferenceTaskName, TaskResult.StatusEnum.COMPLETED, "test_worker"); + + // Get updated workflow status + lastTask = workflow.Tasks.Last(); + Assert.Equal(lastTask.Status, Task.StatusEnum.INPROGRESS); + _testOutputHelper.WriteLine( + $"Workflow status is {workflow.Status} and currently running task is {lastTask.ReferenceTaskName}"); + + // Terminate the workflow + _workflowClient.Terminate(workflowId, reason: "testing termination"); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.TERMINATED, workflow.Status); + lastTask = workflow.Tasks.Last(); + _testOutputHelper.WriteLine( + $"Workflow status is {workflow.Status} and status of last task {lastTask.Status}"); + + // Retry the workflow + _workflowClient.Retry(workflowId); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.RUNNING, workflow.Status); + lastTask = workflow.Tasks.Last(); + _testOutputHelper.WriteLine( + $"Workflow status is {workflow.Status} and status of last task {lastTask.ReferenceTaskName} is {lastTask.Status}"); + + // Mark the WAIT task as completed by calling Task completion API + TaskResult taskResult = new TaskResult + { + WorkflowInstanceId = workflowId, + TaskId = lastTask.TaskId, + Status = TaskResult.StatusEnum.COMPLETED, + OutputData = new Dictionary { { "greetings", "hello from Orkes" } } + }; + workflow = _taskClient.UpdateTaskSync(new Dictionary { { "greetings", "hello from Orkes" } }, + workflowId, lastTask.ReferenceTaskName, TaskResult.StatusEnum.COMPLETED, ""); + + lastTask = workflow.Tasks.Last(); + Assert.Equal(Task.StatusEnum.SCHEDULED, lastTask.Status); + _testOutputHelper.WriteLine( + $"Workflow status is {workflow.Status} and status of last task {lastTask.ReferenceTaskName} is {lastTask.Status}"); + + // Terminate the workflow again + _workflowClient.Terminate(workflowId, reason: "terminating for testing"); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.TERMINATED, workflow.Status); + + + // Rerun workflow from a specific task + RerunWorkflowRequest rerunRequest = new RerunWorkflowRequest + { + ReRunFromTaskId = workflow.Tasks[3].TaskId + }; + _workflowClient.Rerun(rerunRequest, workflowId); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.RUNNING, workflow.Status); + + + // Restart the workflow + _workflowClient.Terminate(workflowId, reason: "terminating so we can do a restart"); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.TERMINATED, workflow.Status); + + _workflowClient.Restart(workflowId); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.RUNNING, workflow.Status); + + // Pause the workflow + _workflowClient.PauseWorkflow(workflowId); + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + Assert.Equal(Workflow.StatusEnum.PAUSED, workflow.Status); + _testOutputHelper.WriteLine($"Workflow status is {workflow.Status}"); + + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + // Wait task should have completed + Task waitTask = workflow.Tasks[0]; + Assert.Equal(Task.StatusEnum.INPROGRESS, waitTask.Status); + _testOutputHelper.WriteLine($"Workflow status is {workflow.Status} and wait task is {waitTask.Status}"); + + + // Because workflow is paused, no further task should have been scheduled, making WAIT the last task + // Expecting only 1 task + _testOutputHelper.WriteLine($"Number of tasks in workflow is {workflow.Tasks.Count}"); + Assert.Single(workflow.Tasks); + + // Resume the workflow + _workflowClient.ResumeWorkflow(workflowId); + lastTask = workflow.Tasks.Last(); + workflow = _taskClient.UpdateTaskSync(new Dictionary { { "a", "b" } }, workflowId, lastTask.ReferenceTaskName, TaskResult.StatusEnum.COMPLETED, "test_worker"); + + workflow = _workflowClient.GetWorkflow(workflowId, includeTasks: true); + + // There should be 3 tasks + _testOutputHelper.WriteLine( + $"Number of tasks in workflow is {workflow.Tasks.Count} and last task is {workflow.Tasks.Last().ReferenceTaskName}"); + Assert.Equal(3, workflow.Tasks.Count); + + // Search for workflows + var searchResults = _workflowClient.Search(start: 0, size: 100, freeText: "*", + query: $"correlationId = '{correlationId}'"); + _testOutputHelper.WriteLine( + $"Found {searchResults.Results.Count} execution with correlation_id '{correlationId}'"); + Assert.Single(searchResults.Results); + + correlationId = Guid.NewGuid().ToString(); + searchResults = _workflowClient.Search(start: 0, size: 100, freeText: "*", + query: $"status IN (RUNNING) AND correlationId = \"{correlationId}\""); + // Shouldn't find anything! + _testOutputHelper.WriteLine( + $"Found {searchResults.Results.Count} workflows with correlation id {correlationId}"); + } + [Fact] public async void UpdateWorkflowVariables() { @@ -39,26 +181,132 @@ public async void UpdateWorkflowVariables() var _workflow = GetConductorWorkflow(); ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(_workflow, true); var workflowId = ApiExtensions.GetWorkflowExecutor().StartWorkflow(_workflow); - await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20)); - await ValidateWorkflowCompletion(workflowId); // Create variables collection with values to be updated - var updateDict = new Dictionary { - {WORKFLOW_VARIABLE_1,"Value1" }, - {WORKFLOW_VARIABLE_2,"Value2" }, + var updateDict = new Dictionary + { + { WORKFLOW_VARIABLE_1, "Value1" }, + { WORKFLOW_VARIABLE_2, "Value2" }, }; - var updateVariableData = new Workflow() { WorkflowId = workflowId, Variables = updateDict }; + // Update the work flow variables - _workflowClient.UpdateWorkflowVariables(updateVariableData); + _workflowClient.UpdateWorkflowVariables(workflowId, updateDict); // Fetch latest workflow data to validate the change in variables var _updatedWorkFlow = _workflowClient.GetWorkflowStatusSummary(workflowId, includeVariables: true); // Verify workflow variables data is equal with input passed Assert.Equal(_updatedWorkFlow.Variables, updateDict); } + [Fact] + public void TestUpdateWorkflowState() + { + var startWorkflowRequest = new StartWorkflowRequest + { + Name = "csharp_sync_task_variable_updates", + Version = 1, + Input = new Dictionary() + }; + var workflowRun = _workflowClient.ExecuteWorkflow(startWorkflowRequest, + Guid.NewGuid().ToString(), + startWorkflowRequest.Name, startWorkflowRequest.Version, + "wait_task_ref"); + + var workflowId = workflowRun.WorkflowId; + _testOutputHelper.WriteLine(workflowId); + + TaskResult taskResult = new TaskResult + { + OutputData = new Dictionary { { "a", "b" } } + }; + + WorkflowStateUpdate request = new WorkflowStateUpdate + { + TaskReferenceName = "wait_task_ref", + TaskResult = taskResult, + Variables = new Dictionary { { "case", "case1" } } + }; + + workflowRun = _workflowClient.UpdateWorkflow(workflowId, request, + new List { "wait_task_ref_1", "wait_task_ref_2" }, 10); + + _testOutputHelper.WriteLine(workflowRun.ToString()); + _testOutputHelper.WriteLine(workflowRun.Status.ToString()); + _testOutputHelper.WriteLine(workflowRun.Tasks + .Select(task => $"{task.ReferenceTaskName}:{task.Status}") + .ToList().ToString()); + + request = new WorkflowStateUpdate + { + TaskReferenceName = "wait_task_ref_2", + TaskResult = taskResult + }; + + workflowRun = _workflowClient.UpdateWorkflow(workflowId, request, new List(), 2); + Assert.Equal(WorkflowStatus.StatusEnum.RUNNING.ToString(), workflowRun.Status.ToString()); + + request = new WorkflowStateUpdate + { + TaskReferenceName = "simple_task_ref", + TaskResult = taskResult + }; + workflowRun = _workflowClient.UpdateWorkflow(workflowId, request, new List(), 2); + + var allTaskStatus = workflowRun.Tasks + .Select(t => t.Status) + .ToHashSet(); + + Assert.Single(allTaskStatus); + Assert.Equal(Task.StatusEnum.COMPLETED.ToString(), allTaskStatus.First().ToString()); + + _testOutputHelper.WriteLine(workflowRun.Status.ToString()); + _testOutputHelper.WriteLine(workflowRun.Tasks + .Select(task => $"{task.ReferenceTaskName}:{task.Status}") + .ToList().ToString()); + } + + + [Fact] + public void TestStartWorkflowConflict() + { + var startWorkflowRequest = new StartWorkflowRequest + { + Name = "csharp_sync_task_variable_updates", + Version = 1 + }; + + string idempotencyKey = Guid.NewGuid().ToString(); + startWorkflowRequest.IdempotencyKey = idempotencyKey; + startWorkflowRequest.IdempotencyStrategy = IdempotencyStrategy.FAIL; + + string workflowId = _workflowClient.StartWorkflow(startWorkflowRequest); + _testOutputHelper.WriteLine(workflowId); + + startWorkflowRequest.IdempotencyStrategy = IdempotencyStrategy.RETURN_EXISTING; + string workflowId2 = _workflowClient.StartWorkflow(startWorkflowRequest); + Assert.Equal(workflowId, workflowId2); + + startWorkflowRequest.IdempotencyStrategy = IdempotencyStrategy.FAIL; + bool conflict = false; + + try + { + _workflowClient.StartWorkflow(startWorkflowRequest); + } + catch (ApiException ce) + { + if (ce.ErrorCode == 409) + { + conflict = true; + } + } + + Assert.True(conflict); + } + private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout) { - var host = WorkflowTaskHost.CreateWorkerHost(LogLevel.Information, new ClassWorker()); + var host = WorkflowTaskHost.CreateWorkerHost(LogLevel.Information, + new ClassWorker(taskType: "TestToCreateVariables_Task")); await host.StartAsync(); Thread.Sleep(workflowCompletionTimeout); await host.StopAsync(); @@ -94,4 +342,4 @@ private async System.Threading.Tasks.Task ValidateWorkflowCompletion(params stri Assert.Equal(0, incompleteWorkflowCounter); } } -} +} \ No newline at end of file diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs index 0eb68054..b26ebc3f 100644 --- a/Tests/Worker/Workers.cs +++ b/Tests/Worker/Workers.cs @@ -1,63 +1,63 @@ -using Conductor.Client.Extensions; -using Conductor.Client.Interfaces; -using Conductor.Client.Models; -using Conductor.Client.Worker; using System; using System.Threading; using System.Threading.Tasks; +using Conductor.Client.Extensions; +using Conductor.Client.Interfaces; +using Conductor.Client.Worker; +using Conductor.Client.Models; +using Task = System.Threading.Tasks.Task; + +namespace Tests.Worker; -namespace Tests.Worker +[WorkerTask] +public class FunctionalWorkers { - [WorkerTask] - public class FunctionalWorkers + private static readonly Random _random; + + static FunctionalWorkers() + { + _random = new Random(); + } + + // Polls for 5 task every 200ms + [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")] + public static TaskResult SimpleWorker(Conductor.Client.Models.Task task) { - private static Random _random; - - static FunctionalWorkers() - { - _random = new Random(); - } - - // Polls for 5 task every 200ms - [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")] - public static TaskResult SimpleWorker(Conductor.Client.Models.Task task) - { - return task.Completed(); - } - - // Polls for 12 tasks every 420ms - [WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")] - public TaskResult LazyWorker(Conductor.Client.Models.Task task) - { - var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048)); - System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult(); - return task.Completed(); - } + return task.Completed(); } - public class ClassWorker : IWorkflowTask + // Polls for 12 tasks every 420ms + [WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")] + public TaskResult LazyWorker(Conductor.Client.Models.Task task) { - public string TaskType { get; } + var timeSpan = TimeSpan.FromMilliseconds(_random.Next(128, 2048)); + Task.Delay(timeSpan).GetAwaiter().GetResult(); + return task.Completed(); + } +} - public WorkflowTaskExecutorConfiguration WorkerSettings { get; } +public class ClassWorker : IWorkflowTask +{ + public ClassWorker(string taskType = "random_task_type") + { + TaskType = taskType; + WorkerSettings = new WorkflowTaskExecutorConfiguration(); + } - public ClassWorker(string taskType = "random_task_type") - { - TaskType = taskType; - WorkerSettings = new WorkflowTaskExecutorConfiguration(); - } + public string TaskType { get; } - public async Task Execute(Conductor.Client.Models.Task task, CancellationToken token) - { - if (token != CancellationToken.None && token.IsCancellationRequested) - throw new Exception("Token request Cancelled"); + public WorkflowTaskExecutorConfiguration WorkerSettings { get; } - throw new NotImplementedException(); - } + public async Task Execute(Conductor.Client.Models.Task task, CancellationToken token) + { + if (token != CancellationToken.None && token.IsCancellationRequested) + throw new Exception("Token request Cancelled"); + + return new TaskResult(status: TaskResult.StatusEnum.COMPLETED, taskId:task.TaskId, workflowInstanceId:task.WorkflowInstanceId); + } - public TaskResult Execute(Conductor.Client.Models.Task task) - { - throw new NotImplementedException(); - } + public TaskResult Execute(Conductor.Client.Models.Task task) + { + throw new NotImplementedException(); } -} +} \ No newline at end of file diff --git a/Tests/conductor-csharp.test.csproj b/Tests/conductor-csharp.test.csproj index e1b4a1d1..50d3857b 100644 --- a/Tests/conductor-csharp.test.csproj +++ b/Tests/conductor-csharp.test.csproj @@ -3,11 +3,13 @@ netcoreapp6.0 + + - + \ No newline at end of file