From 0779107adf1d263a351433617ed0c3312910b16a Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sun, 8 Dec 2024 21:32:07 -0800 Subject: [PATCH] task services --- .../conductor/client/http/TaskClient.java | 2 +- .../core/execution/WorkflowExecutor.java | 2 +- .../core/execution/WorkflowExecutorOps.java | 11 ++-- .../conductor/service/ExecutionService.java | 4 +- .../conductor/service/TaskService.java | 5 +- .../conductor/service/TaskServiceImpl.java | 18 +++---- .../rest/controllers/TaskResource.java | 52 ++++++++++++++++++- 7 files changed, 72 insertions(+), 22 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index cb328dbc4..e592439be 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -193,7 +193,7 @@ public Task updateTaskV2(TaskResult taskResult) { Validate.notNull(taskResult, "Task result cannot be null"); ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) - .path("/tasks/v2") + .path("/tasks/update-v2") .body(taskResult) .build(); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index b70c4c5ed..1567a99e2 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -68,7 +68,7 @@ void restart(String workflowId, boolean useLatestDefinitions) * @throws IllegalArgumentException if the {@link TaskResult} is null. * @throws NotFoundException if the Task is not found. */ - void updateTask(TaskResult taskResult); + TaskModel updateTask(TaskResult taskResult); /** * @param taskId id of the task diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java index 77d5ec1b3..6affdff81 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java @@ -712,16 +712,16 @@ public WorkflowModel terminateWorkflow( /** * @param taskResult the task result to be updated. - * @throws IllegalArgumentException if the {@link TaskResult} is null. + * @throws IllegalArgumentException if the {@link TaskResult} is null. @Returns Updated task * @throws NotFoundException if the Task is not found. */ @Override - public void updateTask(TaskResult taskResult) { + public TaskModel updateTask(TaskResult taskResult) { if (taskResult == null) { throw new IllegalArgumentException("Task object is null"); } else if (taskResult.isExtendLease()) { extendLease(taskResult); - return; + return null; } String workflowId = taskResult.getWorkflowInstanceId(); @@ -750,7 +750,7 @@ public void updateTask(TaskResult taskResult) { taskQueueName); Monitors.recordUpdateConflict( task.getTaskType(), workflowInstance.getWorkflowName(), task.getStatus()); - return; + return task; } if (workflowInstance.getStatus().isTerminal()) { @@ -765,7 +765,7 @@ public void updateTask(TaskResult taskResult) { task.getTaskType(), workflowInstance.getWorkflowName(), workflowInstance.getStatus()); - return; + return task; } // for system tasks, setting to SCHEDULED would mean restarting the task which is @@ -885,6 +885,7 @@ public void updateTask(TaskResult taskResult) { if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { decide(workflowId); } + return task; } private void notifyTaskStatusListener(TaskModel task) { diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 4fb3f78ea..b60a98ae4 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -261,8 +261,8 @@ public void terminateWorkflow(String workflowId, String reason) { workflowExecutor.terminateWorkflow(workflowId, reason); } - public void updateTask(TaskResult taskResult) { - workflowExecutor.updateTask(taskResult); + public TaskModel updateTask(TaskResult taskResult) { + return workflowExecutor.updateTask(taskResult); } public List getTasks(String taskType, String startKey, int count) { diff --git a/core/src/main/java/com/netflix/conductor/service/TaskService.java b/core/src/main/java/com/netflix/conductor/service/TaskService.java index c69c5135d..d568ed3e2 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskService.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskService.java @@ -24,6 +24,7 @@ import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.model.TaskModel; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; @@ -91,9 +92,9 @@ Task getPendingTaskForWorkflow( * Updates a task. * * @param taskResult Instance of {@link TaskResult} - * @return task Id of the updated task. + * @return the updated task. */ - String updateTask( + TaskModel updateTask( @NotNull(message = "TaskResult cannot be null or empty.") @Valid TaskResult taskResult); /** diff --git a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java index 0007bdf4e..b42d43fb5 100644 --- a/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/TaskServiceImpl.java @@ -36,6 +36,7 @@ import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; +import com.netflix.conductor.model.TaskModel; @Audit @Trace @@ -124,19 +125,14 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam * Updates a task. * * @param taskResult Instance of {@link TaskResult} - * @return task Id of the updated task. + * @return the updated task. */ - public String updateTask(TaskResult taskResult) { + public TaskModel updateTask(TaskResult taskResult) { LOGGER.debug( "Update Task: {} with callback time: {}", taskResult, taskResult.getCallbackAfterSeconds()); - executionService.updateTask(taskResult); - LOGGER.debug( - "Task: {} updated successfully with callback time: {}", - taskResult, - taskResult.getCallbackAfterSeconds()); - return taskResult.getTaskId(); + return executionService.updateTask(taskResult); } @Override @@ -157,7 +153,11 @@ public String updateTask( if (StringUtils.isNotBlank(workerId)) { taskResult.setWorkerId(workerId); } - return updateTask(taskResult); + TaskModel updatedTask = updateTask(taskResult); + if (updatedTask != null) { + return updatedTask.getTaskId(); + } + return null; } /** diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index bad7968a9..b5421f48a 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -32,12 +32,18 @@ import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.exception.NotFoundException; +import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.service.TaskService; +import com.netflix.conductor.service.WorkflowService; import io.swagger.v3.oas.annotations.Operation; +import jakarta.validation.Valid; import static com.netflix.conductor.rest.config.RequestMappingConstants.TASKS; +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; import static org.springframework.http.MediaType.TEXT_PLAIN_VALUE; @RestController @@ -45,9 +51,11 @@ public class TaskResource { private final TaskService taskService; + private final WorkflowService workflowService; - public TaskResource(TaskService taskService) { + public TaskResource(TaskService taskService, WorkflowService workflowService) { this.taskService = taskService; + this.workflowService = workflowService; } @GetMapping("/poll/{tasktype}") @@ -80,7 +88,20 @@ public ResponseEntity> batchPoll( @PostMapping(produces = TEXT_PLAIN_VALUE) @Operation(summary = "Update a task") public String updateTask(@RequestBody TaskResult taskResult) { - return taskService.updateTask(taskResult); + taskService.updateTask(taskResult); + return taskResult.getTaskId(); + } + + @PostMapping("/update-v2") + @Operation(summary = "Update a task and return the next available task to be processed") + public ResponseEntity updateTaskV2(@RequestBody @Valid TaskResult taskResult) { + TaskModel updatedTask = taskService.updateTask(taskResult); + if (updatedTask == null) { + return ResponseEntity.noContent().build(); + } + String taskType = updatedTask.getTaskType(); + String domain = updatedTask.getDomain(); + return poll(taskType, taskResult.getWorkerId(), domain); } @PostMapping(value = "/{workflowId}/{taskRefName}/{status}", produces = TEXT_PLAIN_VALUE) @@ -95,6 +116,33 @@ public String updateTask( return taskService.updateTask(workflowId, taskRefName, status, workerId, output); } + @PostMapping( + value = "/{workflowId}/{taskRefName}/{status}/sync", + produces = APPLICATION_JSON_VALUE) + @Operation(summary = "Update a task By Ref Name synchronously and return the updated workflow") + public Workflow updateTaskSync( + @PathVariable("workflowId") String workflowId, + @PathVariable("taskRefName") String taskRefName, + @PathVariable("status") TaskResult.Status status, + @RequestParam(value = "workerid", required = false) String workerId, + @RequestBody Map output) { + + Task pending = taskService.getPendingTaskForWorkflow(workflowId, taskRefName); + if (pending == null) { + throw new NotFoundException( + String.format( + "Found no running task %s of workflow %s to update", + taskRefName, workflowId)); + } + + TaskResult taskResult = new TaskResult(pending); + taskResult.setStatus(status); + taskResult.getOutputData().putAll(output); + taskResult.setWorkerId(workerId); + taskService.updateTask(taskResult); + return workflowService.getExecutionStatus(pending.getWorkflowInstanceId(), true); + } + @PostMapping("/{taskId}/log") @Operation(summary = "Log Task Execution Details") public void log(@PathVariable("taskId") String taskId, @RequestBody String log) {