Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Use TaskStatusListener.onTaskInProgress() #58

Merged
merged 3 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.QueueDAO;
Expand All @@ -52,6 +53,7 @@ public class ExecutionService {
private final QueueDAO queueDAO;
private final ExternalPayloadStorage externalPayloadStorage;
private final SystemTaskRegistry systemTaskRegistry;
private final TaskStatusListener taskStatusListener;

private final long queueTaskMessagePostponeSecs;

Expand All @@ -65,7 +67,8 @@ public ExecutionService(
QueueDAO queueDAO,
ConductorProperties properties,
ExternalPayloadStorage externalPayloadStorage,
SystemTaskRegistry systemTaskRegistry) {
SystemTaskRegistry systemTaskRegistry,
TaskStatusListener taskStatusListener) {
this.workflowExecutor = workflowExecutor;
this.executionDAOFacade = executionDAOFacade;
this.queueDAO = queueDAO;
Expand All @@ -74,6 +77,7 @@ public ExecutionService(
this.queueTaskMessagePostponeSecs =
properties.getTaskExecutionPostponeDuration().getSeconds();
this.systemTaskRegistry = systemTaskRegistry;
this.taskStatusListener = taskStatusListener;
}

public Task poll(String taskType, String workerId) {
Expand Down Expand Up @@ -181,6 +185,11 @@ public List<Task> poll(
queueDAO.postpone(queueName, taskId, 0, queueTaskMessagePostponeSecs);
}
}
taskIds.stream()
.map(executionDAOFacade::getTaskModel)
.filter(Objects::nonNull)
.filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus()))
.forEach(taskStatusListener::onTaskInProgress);
executionDAOFacade.updateTaskLastPoll(taskType, domain, workerId);
Monitors.recordTaskPoll(queueName);
tasks.forEach(this::ackTaskReceived);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.dao.QueueDAO;

import static junit.framework.TestCase.assertEquals;
Expand All @@ -48,6 +49,7 @@ public class ExecutionServiceTest {
@Mock private ConductorProperties conductorProperties;
@Mock private ExternalPayloadStorage externalPayloadStorage;
@Mock private SystemTaskRegistry systemTaskRegistry;
@Mock private TaskStatusListener taskStatusListener;

private ExecutionService executionService;

Expand All @@ -68,7 +70,8 @@ public void setup() {
queueDAO,
conductorProperties,
externalPayloadStorage,
systemTaskRegistry);
systemTaskRegistry,
taskStatusListener);
WorkflowDef workflowDef = new WorkflowDef();
workflow1 = new Workflow();
workflow1.setWorkflowId("wf1");
Expand Down
Loading