From 2c16857ee700d3aa6aa5fc1ebd8fb66cf4be1ca0 Mon Sep 17 00:00:00 2001 From: Martini <110882768+martini612@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:33:03 +0900 Subject: [PATCH 1/2] if status-listener error, task not executed --- .../conductor/core/execution/WorkflowExecutor.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index a25f06dd1..a296618e6 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -742,6 +742,17 @@ public void updateTask(TaskResult taskResult) { if (task.getStatus().isTerminal()) { // Task was already updated.... queueDAO.remove(taskQueueName, taskResult.getTaskId()); + if (task.getStatus().equals(CANCELED)) { + try { + notifyTaskStatusListener(task); + } catch (Exception e) { + String errorMsg = + String.format( + "Error while notifying TaskStatusListener: %s for workflow: %s", + task.getTaskId(), task.getWorkflowInstanceId()); + LOGGER.error(errorMsg, e); + } + } LOGGER.info( "Task: {} has already finished execution with status: {} within workflow: {}. Removed task from queue: {}", task.getTaskId(), From ca07ebe1da8ad59f32923e8144bb644b52803c62 Mon Sep 17 00:00:00 2001 From: Martini <110882768+martini612@users.noreply.github.com> Date: Thu, 2 May 2024 13:17:32 +0900 Subject: [PATCH 2/2] change the point of cancellation --- .../core/execution/WorkflowExecutor.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index a296618e6..c35547a69 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -742,17 +742,6 @@ public void updateTask(TaskResult taskResult) { if (task.getStatus().isTerminal()) { // Task was already updated.... queueDAO.remove(taskQueueName, taskResult.getTaskId()); - if (task.getStatus().equals(CANCELED)) { - try { - notifyTaskStatusListener(task); - } catch (Exception e) { - String errorMsg = - String.format( - "Error while notifying TaskStatusListener: %s for workflow: %s", - task.getTaskId(), task.getWorkflowInstanceId()); - LOGGER.error(errorMsg, e); - } - } LOGGER.info( "Task: {} has already finished execution with status: {} within workflow: {}. Removed task from queue: {}", task.getTaskId(), @@ -1214,6 +1203,15 @@ List cancelNonTerminalTasks(WorkflowModel workflow) { if (!task.getStatus().isTerminal()) { // Cancel the ones which are not completed yet.... task.setStatus(CANCELED); + try { + notifyTaskStatusListener(task); + } catch (Exception e) { + String errorMsg = + String.format( + "Error while notifying TaskStatusListener: %s for workflow: %s", + task.getTaskId(), task.getWorkflowInstanceId()); + LOGGER.error(errorMsg, e); + } if (systemTaskRegistry.isSystemTask(task.getTaskType())) { WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType());