Skip to content

Commit

Permalink
Feature: Use TaskStatusListener.onTaskInProgress() (#58)
Browse files Browse the repository at this point in the history
* apply task-task-status-listener

* fix test error

* prevent NPE
  • Loading branch information
ghkdwlgns612 authored Mar 5, 2024
1 parent 26af4cb commit 700619a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.QueueDAO;
Expand All @@ -52,6 +53,7 @@ public class ExecutionService {
private final QueueDAO queueDAO;
private final ExternalPayloadStorage externalPayloadStorage;
private final SystemTaskRegistry systemTaskRegistry;
private final TaskStatusListener taskStatusListener;

private final long queueTaskMessagePostponeSecs;

Expand All @@ -65,7 +67,8 @@ public ExecutionService(
QueueDAO queueDAO,
ConductorProperties properties,
ExternalPayloadStorage externalPayloadStorage,
SystemTaskRegistry systemTaskRegistry) {
SystemTaskRegistry systemTaskRegistry,
TaskStatusListener taskStatusListener) {
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.queueDAO = queueDAO;
Expand All @@ -74,6 +77,7 @@ public ExecutionService(
this.queueTaskMessagePostponeSecs =
properties.getTaskExecutionPostponeDuration().getSeconds();
this.systemTaskRegistry = systemTaskRegistry;
this.taskStatusListener = taskStatusListener;
}

public Task poll(String taskType, String workerId) {
Expand Down Expand Up @@ -181,6 +185,11 @@ public List<Task> poll(
queueDAO.postpone(queueName, taskId, 0, queueTaskMessagePostponeSecs);
}
}
taskIds.stream()
.map(executionDAOFacade::getTaskModel)
.filter(Objects::nonNull)
.filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus()))
.forEach(taskStatusListener::onTaskInProgress);
executionDAOFacade.updateTaskLastPoll(taskType, domain, workerId);
Monitors.recordTaskPoll(queueName);
tasks.forEach(this::ackTaskReceived);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.dao.QueueDAO;

import static junit.framework.TestCase.assertEquals;
Expand All @@ -48,6 +49,7 @@ public class ExecutionServiceTest {
@Mock private ConductorProperties conductorProperties;
@Mock private ExternalPayloadStorage externalPayloadStorage;
@Mock private SystemTaskRegistry systemTaskRegistry;
@Mock private TaskStatusListener taskStatusListener;

private ExecutionService executionService;

Expand All @@ -68,7 +70,8 @@ public void setup() {
queueDAO,
conductorProperties,
externalPayloadStorage,
systemTaskRegistry);
systemTaskRegistry,
taskStatusListener);
WorkflowDef workflowDef = new WorkflowDef();
workflow1 = new Workflow();
workflow1.setWorkflowId("wf1");
Expand Down

0 comments on commit 700619a

Please sign in to comment.