From 215ee72404bbdc1538d1884c39300f1067e9f598 Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 11:52:19 -0800 Subject: [PATCH 1/7] Update build.gradle --- conductor-clients/java/conductor-java-sdk/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/conductor-clients/java/conductor-java-sdk/build.gradle b/conductor-clients/java/conductor-java-sdk/build.gradle index de4e1a47a..f7d42fb42 100644 --- a/conductor-clients/java/conductor-java-sdk/build.gradle +++ b/conductor-clients/java/conductor-java-sdk/build.gradle @@ -34,6 +34,7 @@ subprojects { implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}" implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${versions.jackson}" + implementation "com.fasterxml.jackson.module:jackson-module-afterburner:${versions.jackson}" implementation "org.slf4j:slf4j-api:${versions.slf4j}" implementation "org.apache.commons:commons-lang3:${versions.commonsLang}" From eacb9b0058a59b4687536f859e073c0099240b39 Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 11:55:23 -0800 Subject: [PATCH 2/7] improvements to poll and update --- .../client/automator/TaskRunner.java | 27 ++++++++++++------- .../conductor/client/http/TaskClient.java | 20 ++++++++++++++ .../common/config/ObjectMapperProvider.java | 2 ++ .../java/conductor-java-sdk/gradle.properties | 2 +- .../client/http/OrkesTaskClient.java | 4 +++ 5 files changed, 44 insertions(+), 11 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java index c481bd456..e9b272139 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java @@ -14,6 +14,7 @@ import java.io.PrintWriter; import java.io.StringWriter; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -21,6 +22,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -64,6 +66,7 @@ class TaskRunner { private volatile boolean pollingAndExecuting = true; private final List<PollFilter> pollFilters; private final EventDispatcher<TaskRunnerEvent> eventDispatcher; + private final LinkedBlockingQueue<Task> tasksTobeExecuted; TaskRunner(Worker worker, TaskClient taskClient, @@ -83,6 +86,7 @@ class TaskRunner { this.permits = new Semaphore(threadCount); this.pollFilters = pollFilters; this.eventDispatcher = eventDispatcher; + this.tasksTobeExecuted = new LinkedBlockingQueue<>(); //1. Is there a worker level override? this.domain = PropertyFactory.getString(taskType, Worker.PROP_DOMAIN, null); @@ -191,7 +195,7 @@ private List<Task> pollTasksForWorker() { Stopwatch stopwatch = Stopwatch.createStarted(); //TODO move this to the top? try { LOGGER.trace("Polling task of type: {} in domain: '{}' with size {}", taskType, domain, pollCount); - tasks = pollTask(domain, pollCount); + tasks = pollTask(pollCount); permits.release(pollCount - tasks.size()); //release extra permits stopwatch.stop(); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); @@ -222,10 +226,16 @@ private List<Task> pollTasksForWorker() { return tasks; } - private List<Task> pollTask(String domain, int count) { + private List<Task> pollTask(int count) { if (count < 1) { return Collections.emptyList(); } + List<Task> polled = new ArrayList<>(count); + tasksTobeExecuted.drainTo(polled, count); + if(!polled.isEmpty()) { + System.out.println("Returning " + polled.size() + " from memory"); + return polled; + } String workerId = worker.getIdentity(); LOGGER.debug("poll {} in the domain {} with batch size {}", taskType, domain, count); return taskClient.batchPollTasksInDomain( @@ -329,14 +339,11 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo result.setOutputData(null); } - retryOperation( - (TaskResult taskResult) -> { - taskClient.updateTask(taskResult); - return null; - }, - count, - result, - "updateTask"); + Task nextTask = retryOperation(taskClient::updateTaskV2, count, result, "updateTaskV2"); + if(nextTask != null) { + tasksTobeExecuted.add(nextTask); + } + } catch (Exception e) { worker.onErrorUpdate(task); LOGGER.error( diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 4b0ec5851..d9f87138f 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -182,6 +182,26 @@ public void updateTask(TaskResult taskResult) { client.execute(request); } + /** + * Updates the result of a task execution. If the size of the task output payload is bigger than + * {@link ExternalPayloadStorage}, if enabled, else the task is marked as + * FAILED_WITH_TERMINAL_ERROR. + * + * @param taskResult the {@link TaskResult} of the executed task to be updated. + */ + public Task updateTaskV2(TaskResult taskResult) { + Validate.notNull(taskResult, "Task result cannot be null"); + ConductorClientRequest request = ConductorClientRequest.builder() + .method(Method.POST) + .path("/tasks") + .body(taskResult) + .build(); + + ConductorClientResponse<Task> response = client.execute(request, new TypeReference<>() { + }); + return response.getData(); + } + public Optional<String> evaluateAndUploadLargePayload(Map<String, Object> taskOutputData, String taskType) { if (!conductorClientConfiguration.isEnforceThresholds()) { return Optional.empty(); diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java index bc637fc15..52617150f 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import com.fasterxml.jackson.module.kotlin.KotlinModule; public class ObjectMapperProvider { @@ -37,6 +38,7 @@ private static ObjectMapper _getObjectMapper() { JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS)); objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); objectMapper.registerModule(new JavaTimeModule()); + objectMapper.registerModule(new AfterburnerModule()); objectMapper.registerModule(new KotlinModule.Builder().build()); return objectMapper; } diff --git a/conductor-clients/java/conductor-java-sdk/gradle.properties b/conductor-clients/java/conductor-java-sdk/gradle.properties index c984afef6..d1de052e7 100644 --- a/conductor-clients/java/conductor-java-sdk/gradle.properties +++ b/conductor-clients/java/conductor-java-sdk/gradle.properties @@ -1 +1 @@ -version=4.0.1 +version=4.0.2-beta diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java index 3bba49946..551be5d02 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java @@ -148,6 +148,10 @@ public List<Task> batchPollTasksInDomain(String taskType, String domain, String return taskClient.batchPollTasksInDomain(taskType, domain, workerId, count, timeoutInMillisecond); } + public Task updateTaskV2(TaskResult taskResult) { + return taskClient.updateTaskV2(taskResult); + } + public void updateTask(TaskResult taskResult) { taskClient.updateTask(taskResult); } From 293f54ce1b0df1e7c7ed6bfa15ffad3c2c9c2764 Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 12:30:13 -0800 Subject: [PATCH 3/7] method update --- .../client/automator/TaskRunner.java | 24 ++++++++++++++----- .../conductor/client/http/TaskClient.java | 2 +- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java index e9b272139..8c4945e93 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java @@ -67,6 +67,7 @@ class TaskRunner { private final List<PollFilter> pollFilters; private final EventDispatcher<TaskRunnerEvent> eventDispatcher; private final LinkedBlockingQueue<Task> tasksTobeExecuted; + private final boolean enableUpdateV2; TaskRunner(Worker worker, TaskClient taskClient, @@ -87,7 +88,8 @@ class TaskRunner { this.pollFilters = pollFilters; this.eventDispatcher = eventDispatcher; this.tasksTobeExecuted = new LinkedBlockingQueue<>(); - + this.enableUpdateV2 = Boolean.valueOf(System.getProperty("taskUpdateV2", "false")); + LOGGER.info("taskUpdateV2 is set to {}", this.enableUpdateV2); //1. Is there a worker level override? this.domain = PropertyFactory.getString(taskType, Worker.PROP_DOMAIN, null); if (this.domain == null) { @@ -230,10 +232,10 @@ private List<Task> pollTask(int count) { if (count < 1) { return Collections.emptyList(); } + LOGGER.trace("in memory queue size for tasks: {}", tasksTobeExecuted.size()); List<Task> polled = new ArrayList<>(count); tasksTobeExecuted.drainTo(polled, count); if(!polled.isEmpty()) { - System.out.println("Returning " + polled.size() + " from memory"); return polled; } String workerId = worker.getIdentity(); @@ -338,10 +340,20 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo result.setExternalOutputPayloadStoragePath(optionalExternalStorageLocation.get()); result.setOutputData(null); } - - Task nextTask = retryOperation(taskClient::updateTaskV2, count, result, "updateTaskV2"); - if(nextTask != null) { - tasksTobeExecuted.add(nextTask); + if(enableUpdateV2) { + Task nextTask = retryOperation(taskClient::updateTaskV2, count, result, "updateTaskV2"); + if (nextTask != null) { + tasksTobeExecuted.add(nextTask); + } + } else { + retryOperation( + (TaskResult taskResult) -> { + taskClient.updateTask(taskResult); + return null; + }, + count, + result, + "updateTask"); } } catch (Exception e) { diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index d9f87138f..cb328dbc4 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -193,7 +193,7 @@ public Task updateTaskV2(TaskResult taskResult) { Validate.notNull(taskResult, "Task result cannot be null"); ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) - .path("/tasks") + .path("/tasks/v2") .body(taskResult) .build(); From 0779107adf1d263a351433617ed0c3312910b16a Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 21:32:07 -0800 Subject: [PATCH 4/7] task services --- .../conductor/client/http/TaskClient.java | 2 +- .../core/execution/WorkflowExecutor.java | 2 +- .../core/execution/WorkflowExecutorOps.java | 11 ++-- .../conductor/service/ExecutionService.java | 4 +- .../conductor/service/TaskService.java | 5 +- .../conductor/service/TaskServiceImpl.java | 18 +++---- .../rest/controllers/TaskResource.java | 52 ++++++++++++++++++- 7 files changed, 72 insertions(+), 22 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index cb328dbc4..e592439be 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -193,7 +193,7 @@ public Task updateTaskV2(TaskResult taskResult) { Validate.notNull(taskResult, "Task result cannot be null"); ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) - .path("/tasks/v2") + .path("/tasks/update-v2") .body(taskResult) .build(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index b70c4c5ed..1567a99e2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -68,7 +68,7 @@ void restart(String workflowId, boolean useLatestDefinitions) * @throws IllegalArgumentException if the {@link TaskResult} is null. * @throws NotFoundException if the Task is not found. */ - void updateTask(TaskResult taskResult); + TaskModel updateTask(TaskResult taskResult); /** * @param taskId id of the task diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java index 77d5ec1b3..6affdff81 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java @@ -712,16 +712,16 @@ public WorkflowModel terminateWorkflow( /** * @param taskResult the task result to be updated. - * @throws IllegalArgumentException if the {@link TaskResult} is null. + * @throws IllegalArgumentException if the {@link TaskResult} is null. @Returns Updated task * @throws NotFoundException if the Task is not found. */ @Override - public void updateTask(TaskResult taskResult) { + public TaskModel updateTask(TaskResult taskResult) { if (taskResult == null) { throw new IllegalArgumentException("Task object is null"); } else if (taskResult.isExtendLease()) { extendLease(taskResult); - return; + return null; } String workflowId = taskResult.getWorkflowInstanceId(); @@ -750,7 +750,7 @@ public void updateTask(TaskResult taskResult) { taskQueueName); Monitors.recordUpdateConflict( task.getTaskType(), workflowInstance.getWorkflowName(), task.getStatus()); - return; + return task; } if (workflowInstance.getStatus().isTerminal()) { @@ -765,7 +765,7 @@ public void updateTask(TaskResult taskResult) { task.getTaskType(), workflowInstance.getWorkflowName(), workflowInstance.getStatus()); - return; + return task; } // for system tasks, setting to SCHEDULED would mean restarting the task which is @@ -885,6 +885,7 @@ public void updateTask(TaskResult taskResult) { if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { decide(workflowId); } + return task; } private void notifyTaskStatusListener(TaskModel task) { diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 4fb3f78ea..b60a98ae4 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -261,8 +261,8 @@ public void terminateWorkflow(String workflowId, String reason) { workflowExecutor.terminateWorkflow(workflowId, reason); } - public void updateTask(TaskResult taskResult) { - workflowExecutor.updateTask(taskResult); + public TaskModel updateTask(TaskResult taskResult) { + return workflowExecutor.updateTask(taskResult); } public List<Task> getTasks(String taskType, String startKey, int count) { diff --git a/core/src/main/java/com/netflix/conductor/service/TaskService.java b/core/src/main/java/com/netflix/conductor/service/TaskService.java index c69c5135d..d568ed3e2 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskService.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskService.java @@ -24,6 +24,7 @@ import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.model.TaskModel; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; @@ -91,9 +92,9 @@ Task getPendingTaskForWorkflow( * Updates a task. * * @param taskResult Instance of {@link TaskResult} - * @return task Id of the updated task. + * @return the updated task. */ - String updateTask( + TaskModel updateTask( @NotNull(message = "TaskResult cannot be null or empty.") @Valid TaskResult taskResult); /** diff --git a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java index 0007bdf4e..b42d43fb5 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java @@ -36,6 +36,7 @@ import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; +import com.netflix.conductor.model.TaskModel; @Audit @Trace @@ -124,19 +125,14 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam * Updates a task. * * @param taskResult Instance of {@link TaskResult} - * @return task Id of the updated task. + * @return the updated task. */ - public String updateTask(TaskResult taskResult) { + public TaskModel updateTask(TaskResult taskResult) { LOGGER.debug( "Update Task: {} with callback time: {}", taskResult, taskResult.getCallbackAfterSeconds()); - executionService.updateTask(taskResult); - LOGGER.debug( - "Task: {} updated successfully with callback time: {}", - taskResult, - taskResult.getCallbackAfterSeconds()); - return taskResult.getTaskId(); + return executionService.updateTask(taskResult); } @Override @@ -157,7 +153,11 @@ public String updateTask( if (StringUtils.isNotBlank(workerId)) { taskResult.setWorkerId(workerId); } - return updateTask(taskResult); + TaskModel updatedTask = updateTask(taskResult); + if (updatedTask != null) { + return updatedTask.getTaskId(); + } + return null; } /** diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index bad7968a9..b5421f48a 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -32,12 +32,18 @@ import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.exception.NotFoundException; +import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.service.TaskService; +import com.netflix.conductor.service.WorkflowService; import io.swagger.v3.oas.annotations.Operation; +import jakarta.validation.Valid; import static com.netflix.conductor.rest.config.RequestMappingConstants.TASKS; +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; import static org.springframework.http.MediaType.TEXT_PLAIN_VALUE; @RestController @@ -45,9 +51,11 @@ public class TaskResource { private final TaskService taskService; + private final WorkflowService workflowService; - public TaskResource(TaskService taskService) { + public TaskResource(TaskService taskService, WorkflowService workflowService) { this.taskService = taskService; + this.workflowService = workflowService; } @GetMapping("/poll/{tasktype}") @@ -80,7 +88,20 @@ public ResponseEntity<List<Task>> batchPoll( @PostMapping(produces = TEXT_PLAIN_VALUE) @Operation(summary = "Update a task") public String updateTask(@RequestBody TaskResult taskResult) { - return taskService.updateTask(taskResult); + taskService.updateTask(taskResult); + return taskResult.getTaskId(); + } + + @PostMapping("/update-v2") + @Operation(summary = "Update a task and return the next available task to be processed") + public ResponseEntity<Task> updateTaskV2(@RequestBody @Valid TaskResult taskResult) { + TaskModel updatedTask = taskService.updateTask(taskResult); + if (updatedTask == null) { + return ResponseEntity.noContent().build(); + } + String taskType = updatedTask.getTaskType(); + String domain = updatedTask.getDomain(); + return poll(taskType, taskResult.getWorkerId(), domain); } @PostMapping(value = "/{workflowId}/{taskRefName}/{status}", produces = TEXT_PLAIN_VALUE) @@ -95,6 +116,33 @@ public String updateTask( return taskService.updateTask(workflowId, taskRefName, status, workerId, output); } + @PostMapping( + value = "/{workflowId}/{taskRefName}/{status}/sync", + produces = APPLICATION_JSON_VALUE) + @Operation(summary = "Update a task By Ref Name synchronously and return the updated workflow") + public Workflow updateTaskSync( + @PathVariable("workflowId") String workflowId, + @PathVariable("taskRefName") String taskRefName, + @PathVariable("status") TaskResult.Status status, + @RequestParam(value = "workerid", required = false) String workerId, + @RequestBody Map<String, Object> output) { + + Task pending = taskService.getPendingTaskForWorkflow(workflowId, taskRefName); + if (pending == null) { + throw new NotFoundException( + String.format( + "Found no running task %s of workflow %s to update", + taskRefName, workflowId)); + } + + TaskResult taskResult = new TaskResult(pending); + taskResult.setStatus(status); + taskResult.getOutputData().putAll(output); + taskResult.setWorkerId(workerId); + taskService.updateTask(taskResult); + return workflowService.getExecutionStatus(pending.getWorkflowInstanceId(), true); + } + @PostMapping("/{taskId}/log") @Operation(summary = "Log Task Execution Details") public void log(@PathVariable("taskId") String taskId, @RequestBody String log) { From f660fccab19f9159bee85fd119955543a223c39b Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 23:24:08 -0800 Subject: [PATCH 5/7] Update TaskResourceTest.java --- .../conductor/rest/controllers/TaskResourceTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java b/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java index 5e0e04e13..99eec55ba 100644 --- a/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java +++ b/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Map; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.service.WorkflowService; import org.junit.Before; import org.junit.Test; import org.springframework.http.ResponseEntity; @@ -47,11 +49,13 @@ public class TaskResourceTest { private TaskService mockTaskService; private TaskResource taskResource; + private WorkflowService workflowService; @Before public void before() { this.mockTaskService = mock(TaskService.class); - this.taskResource = new TaskResource(this.mockTaskService); + this.workflowService = mock(WorkflowService.class); + this.taskResource = new TaskResource(this.mockTaskService, this.workflowService); } @Test @@ -86,7 +90,9 @@ public void testUpdateTask() { TaskResult taskResult = new TaskResult(); taskResult.setStatus(TaskResult.Status.COMPLETED); taskResult.setTaskId("123"); - when(mockTaskService.updateTask(any(TaskResult.class))).thenReturn("123"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("123"); + when(mockTaskService.updateTask(any(TaskResult.class))).thenReturn(taskModel); assertEquals("123", taskResource.updateTask(taskResult)); } From 79e5e1793ed5bae4535991d8135ea1ace7ec4571 Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 23:38:47 -0800 Subject: [PATCH 6/7] Update TaskResourceTest.java --- .../netflix/conductor/rest/controllers/TaskResourceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java b/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java index 99eec55ba..c5c0b2e4e 100644 --- a/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java +++ b/rest/src/test/java/com/netflix/conductor/rest/controllers/TaskResourceTest.java @@ -18,8 +18,6 @@ import java.util.List; import java.util.Map; -import com.netflix.conductor.model.TaskModel; -import com.netflix.conductor.service.WorkflowService; import org.junit.Before; import org.junit.Test; import org.springframework.http.ResponseEntity; @@ -31,7 +29,9 @@ import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.service.TaskService; +import com.netflix.conductor.service.WorkflowService; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; From 7ddedbb5f9d903e9260af51e3a0afbc4b2cc3be0 Mon Sep 17 00:00:00 2001 From: Viren Baraiya <virenx@gmail.com> Date: Sun, 8 Dec 2024 23:55:45 -0800 Subject: [PATCH 7/7] refactor --- .../executor/task/AnnotatedWorkerTests.java | 31 ------------------- .../sdk/workflow/executor/task/Bike.java | 25 +++++++++++++++ .../sdk/workflow/executor/task/Car.java | 25 +++++++++++++++ .../sdk/workflow/executor/task/CarWorker.java | 26 ++++++++++++++++ 4 files changed, 76 insertions(+), 31 deletions(-) create mode 100644 conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Bike.java create mode 100644 conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Car.java create mode 100644 conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/CarWorker.java diff --git a/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerTests.java b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerTests.java index 6793ec179..4ef178088 100644 --- a/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerTests.java +++ b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerTests.java @@ -39,37 +39,6 @@ public class AnnotatedWorkerTests { - static class Car { - String brand; - - String getBrand() { - return brand; - } - - void setBrand(String brand) { - this.brand = brand; - } - } - - static class Bike { - String brand; - - String getBrand() { - return brand; - } - - void setBrand(String brand) { - this.brand = brand; - } - } - - static class CarWorker { - @WorkerTask("test_1") - public @OutputParam("result") List<Car> doWork(@InputParam("input") List<Car> input) { - return input; - } - } - @Test @DisplayName("it should handle null values when InputParam is a List") void nullListAsInputParam() throws NoSuchMethodException { diff --git a/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Bike.java b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Bike.java new file mode 100644 index 000000000..2bf157a7e --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Bike.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Conductor Authors. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.sdk.workflow.executor.task; + +public class Bike { + String brand; + + String getBrand() { + return brand; + } + + void setBrand(String brand) { + this.brand = brand; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Car.java b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Car.java new file mode 100644 index 000000000..5e944f03e --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/Car.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Conductor Authors. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.sdk.workflow.executor.task; + +public class Car { + String brand; + + String getBrand() { + return brand; + } + + void setBrand(String brand) { + this.brand = brand; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/CarWorker.java b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/CarWorker.java new file mode 100644 index 000000000..990736d15 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/sdk/src/test/java/com/netflix/conductor/sdk/workflow/executor/task/CarWorker.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Conductor Authors. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.sdk.workflow.executor.task; + +import java.util.List; + +import com.netflix.conductor.sdk.workflow.task.InputParam; +import com.netflix.conductor.sdk.workflow.task.OutputParam; +import com.netflix.conductor.sdk.workflow.task.WorkerTask; + +public class CarWorker { + @WorkerTask("test_1") + public @OutputParam("result") List<Car> doWork(@InputParam("input") List<Car> input) { + return input; + } +}