-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Improve Task Cleanup and Immediate Termination in Workflow Stop (#573) #581
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -312,12 +312,37 @@ public void startMonitor(javax.websocket.Session socketsession) { | |
|
||
/** This function is called when the task is not loaded by a worker */ | ||
public void endPrematurely() { | ||
|
||
// Mark the task as stopped | ||
this.curstatus = ExecutionStatus.STOPPED; | ||
|
||
// If the task is running in a separate thread, interrupt the thread | ||
if (Thread.currentThread().isInterrupted()) { | ||
Thread.currentThread().interrupt(); // Interrupt the current thread (if it's blocking) | ||
} | ||
|
||
// Ensure the task is not performing any further operations | ||
stopRunningOperations(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function does nothing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function doesn't do anything as of now, I just wrote this for an edge case(for the future case) if the processes come under these three cases:
But the inside logic can be written for the future case |
||
|
||
// Update the task's status | ||
updateEverything(); | ||
} | ||
|
||
// This method ensures the task halts long-running operations | ||
/* | ||
* Check if task is in a long-running operation and forcefully stop it | ||
* Example: If it's stuck in a loop or waiting on some resource, break the | ||
* operation | ||
*/ | ||
private void stopRunningOperations() { | ||
if (this.curstatus == ExecutionStatus.STOPPED) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this function does nothing. Why is it needed? |
||
// Example of terminating a running operation or loop | ||
// You can break the loop or stop waiting for resources | ||
// if (someLongRunningCondition) { | ||
// // Break or terminate operation | ||
// } | ||
} | ||
} | ||
|
||
/** Stop the monitoring of the task */ | ||
public void stopMonitor() { | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,21 +168,24 @@ public synchronized void notifyWaitinglist() { | |
logger.debug("notify waiting list to pay attention to the released worker"); | ||
if (waitinglist.size() > 0 && wm.getCurrentWorkerNumber() < worknumber) { | ||
orderWaitingList(); | ||
|
||
// Process tasks iteratively | ||
for (int i = 0; i < waitinglist.size(); i++) { | ||
GeoweaverProcessTask newtask = (GeoweaverProcessTask) waitinglist.get(i); | ||
if (newtask.getIsReady()) { | ||
waitinglist.remove(newtask); | ||
if (newtask.checkShouldPassOrNot()){ | ||
newtask.endPrematurely(); | ||
notifyWaitinglist(); | ||
}else{ | ||
executeATask(newtask); | ||
waitinglist.remove(newtask); // Remove task from waiting list | ||
if (newtask.checkShouldPassOrNot()) { | ||
newtask.endPrematurely(); // End the task immediately if it should not proceed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you removed the notifyWaitinglist function, please explain why? how does the queue know that this waiting list is changed for worker to pick up new one? |
||
} else { | ||
executeATask(newtask); // Otherwise, execute the task | ||
} | ||
i--; // Adjusting index due to the removal | ||
} | ||
} | ||
} | ||
} | ||
|
||
|
||
/** | ||
* A task is done, being triggered to start doing another task. | ||
* | ||
|
@@ -206,46 +209,40 @@ public void arrive(Task t) { | |
* @param history_id | ||
*/ | ||
public void stopTask(String history_id) { | ||
|
||
try { | ||
|
||
// Stop tasks in waiting list | ||
synchronized (waitinglist) { | ||
Iterator<Task> iterator = waitinglist.iterator(); | ||
List<Task> tasksToRemove = new ArrayList<>(); | ||
|
||
while (iterator.hasNext()) { | ||
|
||
GeoweaverProcessTask thet = (GeoweaverProcessTask) iterator.next(); | ||
|
||
if (thet.getHistory_id().equals(history_id)) { | ||
|
||
thet.endPrematurely(); | ||
|
||
waitinglist.remove(thet); // remove from waiting list | ||
thet.endPrematurely(); // Immediately end the task | ||
tasksToRemove.add(thet); // Mark for removal | ||
} | ||
} | ||
waitinglist.removeAll(tasksToRemove); // Remove all tasks at once after iteration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we remove all that at once, what is the difference from the previous one exactly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the previous version of code - Removing During Iteration. Removing During Iteration vs. After Iteration: Previous Version: tasks are removed from the waitinglist while iterating over it. This can cause problems with the iteration itself, especially if you remove items during the iteration process. Issue with Direct Removal During Iteration: When you remove a task from the list, the iterator moves forward to the next element, but it skips checking the element that comes immediately after the removed task. This happens because after removal, the next element shifts into the current index position, and the iterator’s next() method skips over it. This can cause missing tasks that should have been processed. Current Version: The updated code collects tasks to be removed in a separate List tasksToRemove. Once the iteration is complete, the tasks to be removed are all removed in a single operation with waitinglist.removeAll(tasksToRemove). Advantages: By collecting tasks to be removed first and then performing the removal after the iteration, you avoid modifying the list during iteration, preventing skipping tasks and ensuring that all matching tasks are processed correctly. |
||
} | ||
} | ||
} | ||
|
||
synchronized (runninglist) { | ||
Iterator<Task> iterator = runninglist.iterator(); | ||
|
||
while (iterator.hasNext()) { | ||
|
||
GeoweaverProcessTask thet = (GeoweaverProcessTask) iterator.next(); | ||
|
||
if (thet.getHistory_id().equals(history_id)) { | ||
|
||
// for task ongoing, it will be ended using pt.stop at above level. | ||
|
||
thet.endPrematurely(); | ||
|
||
runninglist.remove(thet); // remove from waiting list | ||
// Stop tasks in running list | ||
synchronized (runninglist) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why making it synchronized? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The synchronized (runninglist) block is necessary to ensure that no other thread can modify the runninglist while it is being iterated (i.e when stop operation in progress), preventing concurrency issues and ensuring no data loss. This is crucial in multi-threaded environments where multiple threads might interact with shared data. |
||
Iterator<Task> iterator = runninglist.iterator(); | ||
List<Task> tasksToRemove = new ArrayList<>(); | ||
|
||
while (iterator.hasNext()) { | ||
GeoweaverProcessTask thet = (GeoweaverProcessTask) iterator.next(); | ||
if (thet.getHistory_id().equals(history_id)) { | ||
thet.endPrematurely(); // Immediately end the task | ||
tasksToRemove.add(thet); // Mark for removal | ||
} | ||
} | ||
runninglist.removeAll(tasksToRemove); // Remove all tasks at once after iteration | ||
} | ||
} | ||
} | ||
|
||
} catch (Exception e) { | ||
|
||
} catch (Exception e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very weird and risky way to stop current thread. Please explain why it won't kill geoweaver itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interrupting the Current Thread Only Affects the Task's Thread:
It only affects the thread executing the task, not the remaining threads.