Skip to content

Commit

Permalink
Merge pull request kitodo#5776 from matthias-ronge/sychronize-task-ma…
Browse files Browse the repository at this point in the history
…nager

Synchronize accesses to 'taskList'
  • Loading branch information
solth authored Jan 11, 2024
2 parents d3bba5c + 74ab5eb commit 22f0864
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.kitodo.config.ConfigCore;
import org.kitodo.config.enums.ParameterCore;
import org.kitodo.production.helper.tasks.EmptyTask.Behaviour;
Expand All @@ -32,6 +34,7 @@
* {@link org.kitodo.production.forms.TaskManagerForm}.
*/
public class TaskManager {
private static final Logger logger = LogManager.getLogger(TaskManager.class);

/**
* The field singletonInstance holds the singleton instance of the
Expand Down Expand Up @@ -72,7 +75,10 @@ private TaskManager() {
* task to add
*/
public static void addTask(EmptyTask task) {
singleton().taskList.addLast(task);
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
tasks.addLast(task);
}
}

/**
Expand All @@ -91,12 +97,15 @@ public static void addTask(EmptyTask task) {
*/
static void addTaskIfMissing(EmptyTask task) {
LinkedList<EmptyTask> tasks = singleton().taskList;
if (!tasks.contains(task)) {
int pos = lastIndexOf(TaskState.WORKING) + 1;
try {
tasks.add(pos, task);
} catch (IndexOutOfBoundsException e) {
tasks.addLast(task);
synchronized (tasks) {
if (!tasks.contains(task)) {
int pos = lastIndexOf(TaskState.WORKING) + 1;
try {
tasks.add(pos, task);
} catch (IndexOutOfBoundsException e) {
logger.catching(e);
tasks.addLast(task);
}
}
}
}
Expand All @@ -113,7 +122,10 @@ static void addTaskIfMissing(EmptyTask task) {
* @return a copy of the task list
*/
public static List<EmptyTask> getTaskList() {
return new ArrayList<>(singleton().taskList);
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
return new ArrayList<>(tasks);
}
}

/**
Expand All @@ -127,10 +139,13 @@ public static List<EmptyTask> getTaskList() {
private static int lastIndexOf(TaskState state) {
int lastIndex = -1;
int pos = -1;
for (EmptyTask task : singleton().taskList) {
pos++;
if (task.getTaskState().equals(state)) {
lastIndex = pos;
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
for (EmptyTask task : tasks) {
pos++;
if (task.getTaskState().equals(state)) {
lastIndex = pos;
}
}
}
return lastIndex;
Expand All @@ -145,8 +160,12 @@ public static void removeAllFinishedTasks() {
do {
redo = false;
try {
singleton().taskList.removeIf(emptyTask -> emptyTask.getState().equals(Thread.State.TERMINATED));
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
tasks.removeIf(emptyTask -> emptyTask.getState().equals(Thread.State.TERMINATED));
}
} catch (ConcurrentModificationException listModifiedByAnotherThreadWhileIterating) {
logger.catching(listModifiedByAnotherThreadWhileIterating);
redo = true;
}
} while (redo);
Expand All @@ -160,10 +179,12 @@ public static void removeAllFinishedTasks() {
* task to move forwards
*/
public static void runEarlier(EmptyTask task) {
TaskManager theManager = singleton();
int index = theManager.taskList.indexOf(task);
if (index > 0) {
Collections.swap(theManager.taskList, index - 1, index);
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
int index = tasks.indexOf(task);
if (index > 0) {
Collections.swap(tasks, index - 1, index);
}
}
}

Expand All @@ -175,10 +196,12 @@ public static void runEarlier(EmptyTask task) {
* task to move backwards
*/
public static void runLater(EmptyTask task) {
TaskManager theManager = singleton();
int index = theManager.taskList.indexOf(task);
if (index > -1 && index + 1 < theManager.taskList.size()) {
Collections.swap(theManager.taskList, index, index + 1);
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
int index = tasks.indexOf(task);
if (index > -1 && index + 1 < tasks.size()) {
Collections.swap(tasks, index, index + 1);
}
}
}

Expand Down Expand Up @@ -215,16 +238,20 @@ public static void stopAndDeleteAllTasks() {
do {
redo = false;
try {
Iterator<EmptyTask> inspector = singleton().taskList.iterator();
while (inspector.hasNext()) {
EmptyTask task = inspector.next();
if (task.isAlive()) {
task.interrupt(Behaviour.DELETE_IMMEDIATELY);
} else {
inspector.remove();
LinkedList<EmptyTask> tasks = singleton().taskList;
synchronized (tasks) {
Iterator<EmptyTask> inspector = tasks.iterator();
while (inspector.hasNext()) {
EmptyTask task = inspector.next();
if (task.isAlive()) {
task.interrupt(Behaviour.DELETE_IMMEDIATELY);
} else {
inspector.remove();
}
}
}
} catch (ConcurrentModificationException listModifiedByAnotherThreadWhileIterating) {
logger.catching(listModifiedByAnotherThreadWhileIterating);
redo = true;
}
} while (redo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,47 +144,49 @@ public static boolean isAutoRunningThreads() {
*/
@Override
public void run() {
TaskManager taskManager = TaskManager.singleton();
if (taskManager.taskList.isEmpty()) {
return;
}

LinkedList<EmptyTask> launchableThreads = new LinkedList<>();
LinkedList<EmptyTask> finishedThreads = new LinkedList<>();
LinkedList<EmptyTask> failedThreads = new LinkedList<>();
int availableClearance = autoRunLimit;

int successfulMaxCount = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.TASK_MANAGER_KEEP_SUCCESSFUL);
int failedMaxCount = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.TASK_MANAGER_KEEP_FAILED);
Duration successfulMaxAge = ConfigCore.getDurationParameter(ParameterCore.TASK_MANAGER_KEEP_SUCCESSFUL_MINS,
ChronoUnit.MINUTES);
Duration failedMaxAge = ConfigCore.getDurationParameter(ParameterCore.TASK_MANAGER_KEEP_FAILED_MINS,
ChronoUnit.MINUTES);
LinkedList<EmptyTask> taskList = TaskManager.singleton().taskList;
synchronized (taskList) {
if (taskList.isEmpty()) {
return;
}

LinkedList<EmptyTask> launchableThreads = new LinkedList<>();
LinkedList<EmptyTask> finishedThreads = new LinkedList<>();
LinkedList<EmptyTask> failedThreads = new LinkedList<>();
int availableClearance = autoRunLimit;

int successfulMaxCount = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.TASK_MANAGER_KEEP_SUCCESSFUL);
int failedMaxCount = ConfigCore.getIntParameterOrDefaultValue(ParameterCore.TASK_MANAGER_KEEP_FAILED);
Duration successfulMaxAge = ConfigCore.getDurationParameter(ParameterCore.TASK_MANAGER_KEEP_SUCCESSFUL_MINS,
ChronoUnit.MINUTES);
Duration failedMaxAge = ConfigCore.getDurationParameter(ParameterCore.TASK_MANAGER_KEEP_FAILED_MINS,
ChronoUnit.MINUTES);

ListIterator<EmptyTask> position = taskManager.taskList.listIterator();
EmptyTask task;
try {
while (position.hasNext()) {
availableClearance = handleTaskModification(launchableThreads, finishedThreads, failedThreads,
availableClearance, successfulMaxAge, failedMaxAge, position);
ListIterator<EmptyTask> position = taskList.listIterator();
EmptyTask task;
try {
while (position.hasNext()) {
availableClearance = handleTaskModification(launchableThreads, finishedThreads, failedThreads,
availableClearance, successfulMaxAge, failedMaxAge, position);
}
} catch (ConcurrentModificationException e) {
return;
}
} catch (ConcurrentModificationException e) {
return;
}

while (finishedThreads.size() > successfulMaxCount && (task = finishedThreads.pollFirst()) != null) {
taskManager.taskList.remove(task);
}
while (finishedThreads.size() > successfulMaxCount && (task = finishedThreads.pollFirst()) != null) {
taskList.remove(task);
}

while (failedThreads.size() > failedMaxCount && (task = failedThreads.pollFirst()) != null) {
taskManager.taskList.remove(task);
}
while (failedThreads.size() > failedMaxCount && (task = failedThreads.pollFirst()) != null) {
taskList.remove(task);
}

while (launchableThreads.size() > availableClearance) {
launchableThreads.removeLast();
}
while ((task = launchableThreads.pollFirst()) != null) {
task.start();
while (launchableThreads.size() > availableClearance) {
launchableThreads.removeLast();
}
while ((task = launchableThreads.pollFirst()) != null) {
task.start();
}
}
}

Expand Down

0 comments on commit 22f0864

Please sign in to comment.