From 6cb5f5208ef7af2dade64d3d7fe33c1a59d09115 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Tue, 5 Mar 2024 18:51:42 +0530 Subject: [PATCH] Fix running tasks in non coordinator when former coordinator re-joining the cluster When cluster coordinator become unresponsive (not shut down) the task scheduler is still running even when it is added to the cluster again as only a member. So had to identify a rejoining member and delete already assigned tasks then re-run a task scheduler. Fixes: https://github.com/wso2/micro-integrator/issues/3155 --- .../coordination/task/TaskEventListener.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java index 1a75a00296..2d2a173ef1 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java @@ -74,6 +74,18 @@ public void memberAdded(NodeDetail nodeDetail) { LOG.error("Exception occurred while resolving un assigned tasks upon member addition " + nodeDetail .getNodeId(), e); } + } else if (clusterCoordinator.getThisNodeId().equals(nodeDetail.getNodeId()) + && isMemberRejoinedAfterUnresponsiveness()) { + // This node became unresponsive and rejoined the cluster hence removing all tasks assigned to this node + // then start the scheduler again after cleaning the locally running tasks. + becameUnresponsive(nodeDetail.getNodeId()); + try { + //Remove from database + taskStore.deleteTasks(nodeDetail.getNodeId()); + } catch (TaskCoordinationException e) { + LOG.error("Error while removing the tasks of this node.", e); + } + reJoined(nodeDetail.getNodeId()); } } @@ -119,6 +131,16 @@ public void becameUnresponsive(String nodeId) { }); } + /** + * Check whether the member has rejoined after being unresponsive. + * + * @return true if the member has rejoined after being unresponsive, false otherwise + */ + public boolean isMemberRejoinedAfterUnresponsiveness() { + return taskManager.getLocallyRunningCoordinatedTasks().size() > 0; + } + + @Override public void reJoined(String nodeId) {