Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Improve Task Cleanup and Immediate Termination in Workflow Stop (#573) #581

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/main/java/com/gw/tasks/GeoweaverProcessTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,37 @@ public void startMonitor(javax.websocket.Session socketsession) {

/** This function is called when the task is not loaded by a worker */
public void endPrematurely() {

// Mark the task as stopped
this.curstatus = ExecutionStatus.STOPPED;

// If the task is running in a separate thread, interrupt the thread
if (Thread.currentThread().isInterrupted()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very weird and risky way to stop current thread. Please explain why it won't kill geoweaver itself.

Copy link
Author

@saikiranAnnam saikiranAnnam Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interrupting the Current Thread Only Affects the Task's Thread:

  1. Thread.currentThread().interrupt() - interrupts the currently executing thread, which means it only affects the task that is running in that thread.
  2. Calling interrupt() does not kill or forcefully stop the thread; it merely sets the interrupt flag of the thread.
  3. Geoweaver manages its worker threads in a thread pool. Interrupting one worker thread (through Thread.currentThread().interrupt()) will not bring down the entire system.

It only affects the thread executing the task, not the remaining threads.

Thread.currentThread().interrupt(); // Interrupt the current thread (if it's blocking)
}

// Ensure the task is not performing any further operations
stopRunningOperations();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does nothing.

Copy link
Author

@saikiranAnnam saikiranAnnam Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function doesn't do anything as of now, I just wrote this for an edge case(for the future case) if the processes come under these three cases:

  1. Long-running Loops
  2. I/O Operations: reading from a file or waiting for network data
  3. Timers or Delayed Tasks

But the inside logic can be written for the future case


// Update the task's status
updateEverything();
}

// This method ensures the task halts long-running operations
/*
* Check if task is in a long-running operation and forcefully stop it
* Example: If it's stuck in a loop or waiting on some resource, break the
* operation
*/
private void stopRunningOperations() {
if (this.curstatus == ExecutionStatus.STOPPED) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this function does nothing. Why is it needed?

// Example of terminating a running operation or loop
// You can break the loop or stop waiting for resources
// if (someLongRunningCondition) {
// // Break or terminate operation
// }
}
}

/** Stop the monitoring of the task */
public void stopMonitor() {

Expand Down
65 changes: 31 additions & 34 deletions src/main/java/com/gw/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,24 @@ public synchronized void notifyWaitinglist() {
logger.debug("notify waiting list to pay attention to the released worker");
if (waitinglist.size() > 0 && wm.getCurrentWorkerNumber() < worknumber) {
orderWaitingList();

// Process tasks iteratively
for (int i = 0; i < waitinglist.size(); i++) {
GeoweaverProcessTask newtask = (GeoweaverProcessTask) waitinglist.get(i);
if (newtask.getIsReady()) {
waitinglist.remove(newtask);
if (newtask.checkShouldPassOrNot()){
newtask.endPrematurely();
notifyWaitinglist();
}else{
executeATask(newtask);
waitinglist.remove(newtask); // Remove task from waiting list
if (newtask.checkShouldPassOrNot()) {
newtask.endPrematurely(); // End the task immediately if it should not proceed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you removed the notifyWaitinglist function, please explain why? how does the queue know that this waiting list is changed for worker to pick up new one?

} else {
executeATask(newtask); // Otherwise, execute the task
}
i--; // Adjusting index due to the removal
}
}
}
}


/**
* A task is done, being triggered to start doing another task.
*
Expand All @@ -206,46 +209,40 @@ public void arrive(Task t) {
* @param history_id
*/
public void stopTask(String history_id) {

try {

// Stop tasks in waiting list
synchronized (waitinglist) {
Iterator<Task> iterator = waitinglist.iterator();
List<Task> tasksToRemove = new ArrayList<>();

while (iterator.hasNext()) {

GeoweaverProcessTask thet = (GeoweaverProcessTask) iterator.next();

if (thet.getHistory_id().equals(history_id)) {

thet.endPrematurely();

waitinglist.remove(thet); // remove from waiting list
thet.endPrematurely(); // Immediately end the task
tasksToRemove.add(thet); // Mark for removal
}
}
waitinglist.removeAll(tasksToRemove); // Remove all tasks at once after iteration
Copy link
Member

@ZihengSun ZihengSun Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove all that at once, what is the difference from the previous one exactly?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous version of code - Removing During Iteration.
revised code - Removal after Iteration.

Removing During Iteration vs. After Iteration:

Previous Version: tasks are removed from the waitinglist while iterating over it. This can cause problems with the iteration itself, especially if you remove items during the iteration process.

Issue with Direct Removal During Iteration: When you remove a task from the list, the iterator moves forward to the next element, but it skips checking the element that comes immediately after the removed task. This happens because after removal, the next element shifts into the current index position, and the iterator’s next() method skips over it. This can cause missing tasks that should have been processed.

Current Version: The updated code collects tasks to be removed in a separate List tasksToRemove. Once the iteration is complete, the tasks to be removed are all removed in a single operation with waitinglist.removeAll(tasksToRemove).

Advantages: By collecting tasks to be removed first and then performing the removal after the iteration, you avoid modifying the list during iteration, preventing skipping tasks and ensuring that all matching tasks are processed correctly.

}
}
}

synchronized (runninglist) {
Iterator<Task> iterator = runninglist.iterator();

while (iterator.hasNext()) {

GeoweaverProcessTask thet = (GeoweaverProcessTask) iterator.next();

if (thet.getHistory_id().equals(history_id)) {

// for task ongoing, it will be ended using pt.stop at above level.

thet.endPrematurely();

runninglist.remove(thet); // remove from waiting list
// Stop tasks in running list
synchronized (runninglist) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why making it synchronized?

Copy link
Author

@saikiranAnnam saikiranAnnam Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The synchronized (runninglist) block is necessary to ensure that no other thread can modify the runninglist while it is being iterated (i.e when stop operation in progress), preventing concurrency issues and ensuring no data loss. This is crucial in multi-threaded environments where multiple threads might interact with shared data.

Iterator<Task> iterator = runninglist.iterator();
List<Task> tasksToRemove = new ArrayList<>();

while (iterator.hasNext()) {
GeoweaverProcessTask thet = (GeoweaverProcessTask) iterator.next();
if (thet.getHistory_id().equals(history_id)) {
thet.endPrematurely(); // Immediately end the task
tasksToRemove.add(thet); // Mark for removal
}
}
runninglist.removeAll(tasksToRemove); // Remove all tasks at once after iteration
}
}
}

} catch (Exception e) {

} catch (Exception e) {
e.printStackTrace();
}
}

}