Skip to content

Commit

Permalink
Merge pull request #328 from conductor-oss/task_poll_update_v2
Browse files Browse the repository at this point in the history
Task poll update v2
  • Loading branch information
v1r3n authored Dec 10, 2024
2 parents fdf354f + 7ddedbb commit 1259a4a
Show file tree
Hide file tree
Showing 17 changed files with 207 additions and 60 deletions.
1 change: 1 addition & 0 deletions conductor-clients/java/conductor-java-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ subprojects {
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${versions.jackson}"
implementation "com.fasterxml.jackson.module:jackson-module-afterburner:${versions.jackson}"

implementation "org.slf4j:slf4j-api:${versions.slf4j}"
implementation "org.apache.commons:commons-lang3:${versions.commonsLang}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -64,6 +66,8 @@ class TaskRunner {
private volatile boolean pollingAndExecuting = true;
private final List<PollFilter> pollFilters;
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;
private final LinkedBlockingQueue<Task> tasksTobeExecuted;
private final boolean enableUpdateV2;

TaskRunner(Worker worker,
TaskClient taskClient,
Expand All @@ -83,7 +87,9 @@ class TaskRunner {
this.permits = new Semaphore(threadCount);
this.pollFilters = pollFilters;
this.eventDispatcher = eventDispatcher;

this.tasksTobeExecuted = new LinkedBlockingQueue<>();
this.enableUpdateV2 = Boolean.valueOf(System.getProperty("taskUpdateV2", "false"));
LOGGER.info("taskUpdateV2 is set to {}", this.enableUpdateV2);
//1. Is there a worker level override?
this.domain = PropertyFactory.getString(taskType, Worker.PROP_DOMAIN, null);
if (this.domain == null) {
Expand Down Expand Up @@ -191,7 +197,7 @@ private List<Task> pollTasksForWorker() {
Stopwatch stopwatch = Stopwatch.createStarted(); //TODO move this to the top?
try {
LOGGER.trace("Polling task of type: {} in domain: '{}' with size {}", taskType, domain, pollCount);
tasks = pollTask(domain, pollCount);
tasks = pollTask(pollCount);
permits.release(pollCount - tasks.size()); //release extra permits
stopwatch.stop();
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -222,10 +228,16 @@ private List<Task> pollTasksForWorker() {
return tasks;
}

private List<Task> pollTask(String domain, int count) {
private List<Task> pollTask(int count) {
if (count < 1) {
return Collections.emptyList();
}
LOGGER.trace("in memory queue size for tasks: {}", tasksTobeExecuted.size());
List<Task> polled = new ArrayList<>(count);
tasksTobeExecuted.drainTo(polled, count);
if(!polled.isEmpty()) {
return polled;
}
String workerId = worker.getIdentity();
LOGGER.debug("poll {} in the domain {} with batch size {}", taskType, domain, count);
return taskClient.batchPollTasksInDomain(
Expand Down Expand Up @@ -328,15 +340,22 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
result.setExternalOutputPayloadStoragePath(optionalExternalStorageLocation.get());
result.setOutputData(null);
}

retryOperation(
if(enableUpdateV2) {
Task nextTask = retryOperation(taskClient::updateTaskV2, count, result, "updateTaskV2");
if (nextTask != null) {
tasksTobeExecuted.add(nextTask);
}
} else {
retryOperation(
(TaskResult taskResult) -> {
taskClient.updateTask(taskResult);
return null;
},
count,
result,
"updateTask");
}

} catch (Exception e) {
worker.onErrorUpdate(task);
LOGGER.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,26 @@ public void updateTask(TaskResult taskResult) {
client.execute(request);
}

/**
* Updates the result of a task execution. If the size of the task output payload is bigger than
* {@link ExternalPayloadStorage}, if enabled, else the task is marked as
* FAILED_WITH_TERMINAL_ERROR.
*
* @param taskResult the {@link TaskResult} of the executed task to be updated.
*/
public Task updateTaskV2(TaskResult taskResult) {
Validate.notNull(taskResult, "Task result cannot be null");
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.POST)
.path("/tasks/update-v2")
.body(taskResult)
.build();

ConductorClientResponse<Task> response = client.execute(request, new TypeReference<>() {
});
return response.getData();
}

public Optional<String> evaluateAndUploadLargePayload(Map<String, Object> taskOutputData, String taskType) {
if (!conductorClientConfiguration.isEnforceThresholds()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import com.fasterxml.jackson.module.kotlin.KotlinModule;

public class ObjectMapperProvider {
Expand All @@ -37,6 +38,7 @@ private static ObjectMapper _getObjectMapper() {
JsonInclude.Include.NON_NULL, JsonInclude.Include.ALWAYS));
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.registerModule(new JavaTimeModule());
objectMapper.registerModule(new AfterburnerModule());
objectMapper.registerModule(new KotlinModule.Builder().build());
return objectMapper;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=4.0.1
version=4.0.2-beta
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ public List<Task> batchPollTasksInDomain(String taskType, String domain, String
return taskClient.batchPollTasksInDomain(taskType, domain, workerId, count, timeoutInMillisecond);
}

public Task updateTaskV2(TaskResult taskResult) {
return taskClient.updateTaskV2(taskResult);
}

public void updateTask(TaskResult taskResult) {
taskClient.updateTask(taskResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,6 @@

public class AnnotatedWorkerTests {

static class Car {
String brand;

String getBrand() {
return brand;
}

void setBrand(String brand) {
this.brand = brand;
}
}

static class Bike {
String brand;

String getBrand() {
return brand;
}

void setBrand(String brand) {
this.brand = brand;
}
}

static class CarWorker {
@WorkerTask("test_1")
public @OutputParam("result") List<Car> doWork(@InputParam("input") List<Car> input) {
return input;
}
}

@Test
@DisplayName("it should handle null values when InputParam is a List")
void nullListAsInputParam() throws NoSuchMethodException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

public class Bike {
String brand;

String getBrand() {
return brand;
}

void setBrand(String brand) {
this.brand = brand;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

public class Car {
String brand;

String getBrand() {
return brand;
}

void setBrand(String brand) {
this.brand = brand;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.sdk.workflow.executor.task;

import java.util.List;

import com.netflix.conductor.sdk.workflow.task.InputParam;
import com.netflix.conductor.sdk.workflow.task.OutputParam;
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

public class CarWorker {
@WorkerTask("test_1")
public @OutputParam("result") List<Car> doWork(@InputParam("input") List<Car> input) {
return input;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void restart(String workflowId, boolean useLatestDefinitions)
* @throws IllegalArgumentException if the {@link TaskResult} is null.
* @throws NotFoundException if the Task is not found.
*/
void updateTask(TaskResult taskResult);
TaskModel updateTask(TaskResult taskResult);

/**
* @param taskId id of the task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,16 +712,16 @@ public WorkflowModel terminateWorkflow(

/**
* @param taskResult the task result to be updated.
* @throws IllegalArgumentException if the {@link TaskResult} is null.
* @throws IllegalArgumentException if the {@link TaskResult} is null. @Returns Updated task
* @throws NotFoundException if the Task is not found.
*/
@Override
public void updateTask(TaskResult taskResult) {
public TaskModel updateTask(TaskResult taskResult) {
if (taskResult == null) {
throw new IllegalArgumentException("Task object is null");
} else if (taskResult.isExtendLease()) {
extendLease(taskResult);
return;
return null;
}

String workflowId = taskResult.getWorkflowInstanceId();
Expand Down Expand Up @@ -750,7 +750,7 @@ public void updateTask(TaskResult taskResult) {
taskQueueName);
Monitors.recordUpdateConflict(
task.getTaskType(), workflowInstance.getWorkflowName(), task.getStatus());
return;
return task;
}

if (workflowInstance.getStatus().isTerminal()) {
Expand All @@ -765,7 +765,7 @@ public void updateTask(TaskResult taskResult) {
task.getTaskType(),
workflowInstance.getWorkflowName(),
workflowInstance.getStatus());
return;
return task;
}

// for system tasks, setting to SCHEDULED would mean restarting the task which is
Expand Down Expand Up @@ -885,6 +885,7 @@ public void updateTask(TaskResult taskResult) {
if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) {
decide(workflowId);
}
return task;
}

private void notifyTaskStatusListener(TaskModel task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ public void terminateWorkflow(String workflowId, String reason) {
workflowExecutor.terminateWorkflow(workflowId, reason);
}

public void updateTask(TaskResult taskResult) {
workflowExecutor.updateTask(taskResult);
public TaskModel updateTask(TaskResult taskResult) {
return workflowExecutor.updateTask(taskResult);
}

public List<Task> getTasks(String taskType, String startKey, int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.common.run.ExternalStorageLocation;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.model.TaskModel;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
Expand Down Expand Up @@ -91,9 +92,9 @@ Task getPendingTaskForWorkflow(
* Updates a task.
*
* @param taskResult Instance of {@link TaskResult}
* @return task Id of the updated task.
* @return the updated task.
*/
String updateTask(
TaskModel updateTask(
@NotNull(message = "TaskResult cannot be null or empty.") @Valid TaskResult taskResult);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;

@Audit
@Trace
Expand Down Expand Up @@ -124,19 +125,14 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam
* Updates a task.
*
* @param taskResult Instance of {@link TaskResult}
* @return task Id of the updated task.
* @return the updated task.
*/
public String updateTask(TaskResult taskResult) {
public TaskModel updateTask(TaskResult taskResult) {
LOGGER.debug(
"Update Task: {} with callback time: {}",
taskResult,
taskResult.getCallbackAfterSeconds());
executionService.updateTask(taskResult);
LOGGER.debug(
"Task: {} updated successfully with callback time: {}",
taskResult,
taskResult.getCallbackAfterSeconds());
return taskResult.getTaskId();
return executionService.updateTask(taskResult);
}

@Override
Expand All @@ -157,7 +153,11 @@ public String updateTask(
if (StringUtils.isNotBlank(workerId)) {
taskResult.setWorkerId(workerId);
}
return updateTask(taskResult);
TaskModel updatedTask = updateTask(taskResult);
if (updatedTask != null) {
return updatedTask.getTaskId();
}
return null;
}

/**
Expand Down
Loading

0 comments on commit 1259a4a

Please sign in to comment.