Skip to content

Commit

Permalink
task services
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Dec 9, 2024
1 parent 293f54c commit 0779107
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Task updateTaskV2(TaskResult taskResult) {
Validate.notNull(taskResult, "Task result cannot be null");
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.POST)
.path("/tasks/v2")
.path("/tasks/update-v2")
.body(taskResult)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void restart(String workflowId, boolean useLatestDefinitions)
* @throws IllegalArgumentException if the {@link TaskResult} is null.
* @throws NotFoundException if the Task is not found.
*/
void updateTask(TaskResult taskResult);
TaskModel updateTask(TaskResult taskResult);

/**
* @param taskId id of the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,16 +712,16 @@ public WorkflowModel terminateWorkflow(

/**
* @param taskResult the task result to be updated.
* @throws IllegalArgumentException if the {@link TaskResult} is null.
* @throws IllegalArgumentException if the {@link TaskResult} is null. @Returns Updated task
* @throws NotFoundException if the Task is not found.
*/
@Override
public void updateTask(TaskResult taskResult) {
public TaskModel updateTask(TaskResult taskResult) {
if (taskResult == null) {
throw new IllegalArgumentException("Task object is null");
} else if (taskResult.isExtendLease()) {
extendLease(taskResult);
return;
return null;
}

String workflowId = taskResult.getWorkflowInstanceId();
Expand Down Expand Up @@ -750,7 +750,7 @@ public void updateTask(TaskResult taskResult) {
taskQueueName);
Monitors.recordUpdateConflict(
task.getTaskType(), workflowInstance.getWorkflowName(), task.getStatus());
return;
return task;
}

if (workflowInstance.getStatus().isTerminal()) {
Expand All @@ -765,7 +765,7 @@ public void updateTask(TaskResult taskResult) {
task.getTaskType(),
workflowInstance.getWorkflowName(),
workflowInstance.getStatus());
return;
return task;
}

// for system tasks, setting to SCHEDULED would mean restarting the task which is
Expand Down Expand Up @@ -885,6 +885,7 @@ public void updateTask(TaskResult taskResult) {
if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
decide(workflowId);
}
return task;
}

private void notifyTaskStatusListener(TaskModel task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ public void terminateWorkflow(String workflowId, String reason) {
workflowExecutor.terminateWorkflow(workflowId, reason);
}

public void updateTask(TaskResult taskResult) {
workflowExecutor.updateTask(taskResult);
public TaskModel updateTask(TaskResult taskResult) {
return workflowExecutor.updateTask(taskResult);
}

public List<Task> getTasks(String taskType, String startKey, int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.model.TaskModel;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
Expand Down Expand Up @@ -91,9 +92,9 @@ Task getPendingTaskForWorkflow(
* Updates a task.
*
* @param taskResult Instance of {@link TaskResult}
* @return task Id of the updated task.
* @return the updated task.
*/
String updateTask(
TaskModel updateTask(
@NotNull(message = "TaskResult cannot be null or empty.") @Valid TaskResult taskResult);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;

@Audit
@Trace
Expand Down Expand Up @@ -124,19 +125,14 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam
* Updates a task.
*
* @param taskResult Instance of {@link TaskResult}
* @return task Id of the updated task.
* @return the updated task.
*/
public String updateTask(TaskResult taskResult) {
public TaskModel updateTask(TaskResult taskResult) {
LOGGER.debug(
"Update Task: {} with callback time: {}",
taskResult,
taskResult.getCallbackAfterSeconds());
executionService.updateTask(taskResult);
LOGGER.debug(
"Task: {} updated successfully with callback time: {}",
taskResult,
taskResult.getCallbackAfterSeconds());
return taskResult.getTaskId();
return executionService.updateTask(taskResult);
}

@Override
Expand All @@ -157,7 +153,11 @@ public String updateTask(
if (StringUtils.isNotBlank(workerId)) {
taskResult.setWorkerId(workerId);
}
return updateTask(taskResult);
TaskModel updatedTask = updateTask(taskResult);
if (updatedTask != null) {
return updatedTask.getTaskId();
}
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,30 @@
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.service.TaskService;
import com.netflix.conductor.service.WorkflowService;

import io.swagger.v3.oas.annotations.Operation;
import jakarta.validation.Valid;

import static com.netflix.conductor.rest.config.RequestMappingConstants.TASKS;

import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
import static org.springframework.http.MediaType.TEXT_PLAIN_VALUE;

@RestController
@RequestMapping(value = TASKS)
public class TaskResource {

private final TaskService taskService;
private final WorkflowService workflowService;

public TaskResource(TaskService taskService) {
public TaskResource(TaskService taskService, WorkflowService workflowService) {
this.taskService = taskService;
this.workflowService = workflowService;
}

@GetMapping("/poll/{tasktype}")
Expand Down Expand Up @@ -80,7 +88,20 @@ public ResponseEntity<List<Task>> batchPoll(
@PostMapping(produces = TEXT_PLAIN_VALUE)
@Operation(summary = "Update a task")
public String updateTask(@RequestBody TaskResult taskResult) {
return taskService.updateTask(taskResult);
taskService.updateTask(taskResult);
return taskResult.getTaskId();
}

@PostMapping("/update-v2")
@Operation(summary = "Update a task and return the next available task to be processed")
public ResponseEntity<Task> updateTaskV2(@RequestBody @Valid TaskResult taskResult) {
TaskModel updatedTask = taskService.updateTask(taskResult);
if (updatedTask == null) {
return ResponseEntity.noContent().build();
}
String taskType = updatedTask.getTaskType();
String domain = updatedTask.getDomain();
return poll(taskType, taskResult.getWorkerId(), domain);
}

@PostMapping(value = "/{workflowId}/{taskRefName}/{status}", produces = TEXT_PLAIN_VALUE)
Expand All @@ -95,6 +116,33 @@ public String updateTask(
return taskService.updateTask(workflowId, taskRefName, status, workerId, output);
}

@PostMapping(
value = "/{workflowId}/{taskRefName}/{status}/sync",
produces = APPLICATION_JSON_VALUE)
@Operation(summary = "Update a task By Ref Name synchronously and return the updated workflow")
public Workflow updateTaskSync(
@PathVariable("workflowId") String workflowId,
@PathVariable("taskRefName") String taskRefName,
@PathVariable("status") TaskResult.Status status,
@RequestParam(value = "workerid", required = false) String workerId,
@RequestBody Map<String, Object> output) {

Task pending = taskService.getPendingTaskForWorkflow(workflowId, taskRefName);
if (pending == null) {
throw new NotFoundException(
String.format(
"Found no running task %s of workflow %s to update",
taskRefName, workflowId));
}

TaskResult taskResult = new TaskResult(pending);
taskResult.setStatus(status);
taskResult.getOutputData().putAll(output);
taskResult.setWorkerId(workerId);
taskService.updateTask(taskResult);
return workflowService.getExecutionStatus(pending.getWorkflowInstanceId(), true);
}

@PostMapping("/{taskId}/log")
@Operation(summary = "Log Task Execution Details")
public void log(@PathVariable("taskId") String taskId, @RequestBody String log) {
Expand Down

0 comments on commit 0779107

Please sign in to comment.