diff --git a/datashare-app/src/main/java/org/icij/datashare/BatchDownloadApp.java b/datashare-app/src/main/java/org/icij/datashare/BatchDownloadApp.java index c0aa97ae4..59c1f7f51 100644 --- a/datashare-app/src/main/java/org/icij/datashare/BatchDownloadApp.java +++ b/datashare-app/src/main/java/org/icij/datashare/BatchDownloadApp.java @@ -4,6 +4,7 @@ import org.icij.datashare.asynctasks.TaskWorkerLoop; import org.icij.datashare.asynctasks.TaskSupplier; import org.icij.datashare.mode.CommonMode; +import org.icij.datashare.tasks.DatashareTaskFactory; import org.icij.datashare.text.indexing.Indexer; import org.redisson.api.RedissonClient; @@ -13,7 +14,7 @@ public class BatchDownloadApp { public static void start(Properties properties) throws Exception { CommonMode commonMode = CommonMode.create(properties); - TaskWorkerLoop taskWorkerLoop = new TaskWorkerLoop(commonMode.get(TaskFactory.class), commonMode.get(TaskSupplier.class)); + TaskWorkerLoop taskWorkerLoop = new TaskWorkerLoop(commonMode.get(DatashareTaskFactory.class), commonMode.get(TaskSupplier.class)); taskWorkerLoop.call(); commonMode.get(Indexer.class).close(); commonMode.get(RedissonClient.class).shutdown(); diff --git a/datashare-app/src/main/java/org/icij/datashare/CliApp.java b/datashare-app/src/main/java/org/icij/datashare/CliApp.java index 71b957df7..4de81703b 100644 --- a/datashare-app/src/main/java/org/icij/datashare/CliApp.java +++ b/datashare-app/src/main/java/org/icij/datashare/CliApp.java @@ -1,11 +1,11 @@ package org.icij.datashare; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.cli.CliExtensionService; import org.icij.datashare.cli.spi.CliExtension; import org.icij.datashare.mode.CommonMode; import org.icij.datashare.tasks.ArtifactTask; import org.icij.datashare.tasks.DatashareTaskFactory; +import org.icij.datashare.tasks.DatashareTaskManager; import org.icij.datashare.tasks.DeduplicateTask; import org.icij.datashare.tasks.EnqueueFromIndexTask; import org.icij.datashare.tasks.ExtractNlpTask; @@ -64,7 +64,7 @@ private static void process(DeliverableService deliverableService, Properties } private static void runTaskWorker(CommonMode mode, Properties properties) throws Exception { - TaskManager taskManager = mode.get(TaskManager.class); + DatashareTaskManager taskManager = mode.get(DatashareTaskManager.class); DatashareTaskFactory taskFactory = mode.get(DatashareTaskFactory.class); Indexer indexer = mode.get(Indexer.class); diff --git a/datashare-app/src/main/java/org/icij/datashare/WebApp.java b/datashare-app/src/main/java/org/icij/datashare/WebApp.java index d3d3c084a..c1c52b768 100644 --- a/datashare-app/src/main/java/org/icij/datashare/WebApp.java +++ b/datashare-app/src/main/java/org/icij/datashare/WebApp.java @@ -1,7 +1,6 @@ package org.icij.datashare; import net.codestory.http.WebServer; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.asynctasks.TaskSupplier; import org.icij.datashare.asynctasks.TaskWorkerLoop; import org.icij.datashare.batch.BatchSearch; @@ -22,9 +21,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.icij.datashare.tasks.DatashareTaskManager; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import static java.lang.Boolean.parseBoolean; @@ -62,7 +61,7 @@ static void start(Properties properties) throws Exception { waitForServerToBeUp(parseInt(mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT))); Desktop.getDesktop().browse(URI.create(new URI("http://localhost:") + mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT))); } - requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(TaskManager.class)); + requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(DatashareTaskManager.class)); } private static boolean shouldStartWorkers(Properties properties) { @@ -79,7 +78,7 @@ private static void waitForServerToBeUp(int tcpListenPort) throws InterruptedExc } } - private static void requeueDatabaseBatchSearches(BatchSearchRepository repository, TaskManager taskManager) throws IOException { + private static void requeueDatabaseBatchSearches(BatchSearchRepository repository, DatashareTaskManager taskManager) throws IOException { for (String batchSearchUuid: repository.getQueued()) { BatchSearch batchSearch = repository.get(batchSearchUuid); taskManager.startTask(batchSearchUuid, BatchSearchRunner.class, batchSearch.user, Map.of("batchRecord", new BatchSearchRecord(batchSearch))); diff --git a/datashare-app/src/main/java/org/icij/datashare/mode/CommonMode.java b/datashare-app/src/main/java/org/icij/datashare/mode/CommonMode.java index 407a70130..31a619c87 100644 --- a/datashare-app/src/main/java/org/icij/datashare/mode/CommonMode.java +++ b/datashare-app/src/main/java/org/icij/datashare/mode/CommonMode.java @@ -20,7 +20,6 @@ import org.icij.datashare.PipelineRegistry; import org.icij.datashare.PropertiesProvider; import org.icij.datashare.Repository; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.asynctasks.TaskModifier; import org.icij.datashare.asynctasks.TaskSupplier; import org.icij.datashare.batch.BatchSearchRepository; @@ -35,6 +34,7 @@ import org.icij.datashare.nlp.EmailPipeline; import org.icij.datashare.nlp.OptimaizeLanguageGuesser; import org.icij.datashare.tasks.DatashareTaskFactory; +import org.icij.datashare.tasks.DatashareTaskManager; import org.icij.datashare.tasks.TaskManagerAmqp; import org.icij.datashare.tasks.TaskManagerMemory; import org.icij.datashare.tasks.TaskManagerRedis; @@ -138,17 +138,17 @@ protected void configure() { QueueType batchQueueType = getQueueType(propertiesProvider, BATCH_QUEUE_TYPE_OPT, QueueType.MEMORY); switch ( batchQueueType ) { case REDIS: - bind(TaskManager.class).to(TaskManagerRedis.class); + bind(DatashareTaskManager.class).to(TaskManagerRedis.class); bind(TaskModifier.class).to(TaskSupplierRedis.class); bind(TaskSupplier.class).to(TaskSupplierRedis.class); break; case AMQP: - bind(TaskManager.class).to(TaskManagerAmqp.class); + bind(DatashareTaskManager.class).to(TaskManagerAmqp.class); bind(TaskSupplier.class).to(TaskSupplierAmqp.class); bind(TaskModifier.class).to(TaskSupplierAmqp.class); break; default: - bind(TaskManager.class).to(TaskManagerMemory.class); + bind(DatashareTaskManager.class).to(TaskManagerMemory.class); bind(TaskModifier.class).to(TaskManagerMemory.class); bind(TaskSupplier.class).to(TaskManagerMemory.class); } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java index ded79cf44..ecdd4928d 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/ArtifactTask.java @@ -7,7 +7,6 @@ import org.icij.datashare.asynctasks.Task; import org.icij.datashare.asynctasks.TaskGroup; import org.icij.datashare.extract.DocumentCollectionFactory; -import org.icij.datashare.function.Pair; import org.icij.datashare.text.Document; import org.icij.datashare.text.Project; import org.icij.datashare.text.indexing.Indexer; @@ -20,7 +19,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static java.util.Optional.ofNullable; import static org.icij.datashare.cli.DatashareCliOptions.ARTIFACT_DIR_OPT; import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_DEFAULT_PROJECT; import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_POLLING_INTERVAL_SEC; @@ -37,7 +35,7 @@ public class ArtifactTask extends PipelineTask { @Inject public ArtifactTask(DocumentCollectionFactory factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task taskView, @Assisted final Function updateCallback) { - super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, String.class); + super(Stage.ARTIFACT, DatashareTask.getUser(taskView), factory, propertiesProvider, String.class); this.indexer = indexer; project = Project.project(propertiesProvider.get(DEFAULT_PROJECT_OPT).orElse(DEFAULT_DEFAULT_PROJECT)); pollingInterval = Integer.parseInt(propertiesProvider.get(POLLING_INTERVAL_OPT).orElse(DEFAULT_POLLING_INTERVAL_SEC)); diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java index 709506632..24a1015f2 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/BatchSearchRunner.java @@ -66,22 +66,23 @@ public class BatchSearchRunner implements CancellableTask, UserTask, Callable taskView; + private final User user; protected volatile boolean cancelAsked = false; protected volatile Thread callThread; protected volatile boolean requeueCancel; @Inject - public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository, - @Assisted Task taskView, @Assisted Function updateCallback) { + public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository, @Assisted Task taskView, @Assisted Function updateCallback) { this(indexer, propertiesProvider, repository, taskView, updateCallback, new CountDownLatch(1)); } BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository, - Task taskView, Function updateCallback, CountDownLatch latch) { + Task taskView, Function updateCallback, CountDownLatch latch) { this.indexer = indexer; this.propertiesProvider = propertiesProvider; this.repository = repository; this.taskView = (Task) taskView; + this.user = DatashareTask.getUser(this.taskView); this.updateCallback = updateCallback; this.callWaiterLatch = latch; } @@ -100,7 +101,7 @@ public Integer call() throws Exception { int scrollSize = min(scrollSizeFromParams, MAX_SCROLL_SIZE); callThread = Thread.currentThread(); callWaiterLatch.countDown(); // for tests - BatchSearch batchSearch = repository.get(taskView.getUser(), taskView.id); + BatchSearch batchSearch = repository.get(user, taskView.id); if (batchSearch == null) { logger.warn("batch search {} not found in database (check that database url is the same as datashare backend)", taskView.id); return 0; @@ -164,7 +165,7 @@ public Integer call() throws Exception { @Override public User getUser() { - return taskView.getUser(); + return user; } /** diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/DatashareTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/DatashareTask.java new file mode 100644 index 000000000..253b4220b --- /dev/null +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/DatashareTask.java @@ -0,0 +1,33 @@ +package org.icij.datashare.tasks; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import org.icij.datashare.asynctasks.Task; +import org.icij.datashare.user.User; + +public class DatashareTask { + public static final String USER_KEY = "user"; + + public static Task task(String name, User user, Map args) { + return new Task<>(name, addTo(args, user)); + } + + public static Task task(String id, String name, User user) { + return new Task<>(id, name, addTo(new HashMap<>(), user)); + } + + public static Task task(String id, String name, User user, Map args) { + return new Task<>(id, name, addTo(args, user)); + } + + public static User getUser(Task task) { + return (User) task.args.get(USER_KEY); + } + + private static Map addTo(Map properties, User user) { + LinkedHashMap result = new LinkedHashMap<>(properties); + result.put(USER_KEY, user); + return result; + } +} diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/DatashareTaskManager.java b/datashare-app/src/main/java/org/icij/datashare/tasks/DatashareTaskManager.java new file mode 100644 index 000000000..62ca5bd43 --- /dev/null +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/DatashareTaskManager.java @@ -0,0 +1,46 @@ +package org.icij.datashare.tasks; + +import static java.util.stream.Collectors.toMap; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.icij.datashare.asynctasks.Group; +import org.icij.datashare.asynctasks.Task; +import org.icij.datashare.asynctasks.TaskGroup; +import org.icij.datashare.asynctasks.TaskManager; +import org.icij.datashare.batch.WebQueryPagination; +import org.icij.datashare.user.User; + +public interface DatashareTaskManager extends TaskManager { + // TODO: can we do better using generics instead of casts ? + default String startTask(Class taskClass, User user, Map properties) throws IOException { + return startTask(DatashareTask.task(taskClass.getName(), user, properties), new Group(taskClass.getAnnotation( + TaskGroup.class).value())); + } + + default String startTask(String id, Class taskClass, User user, Map properties) throws IOException { + return startTask(DatashareTask.task(id, taskClass.getName(), user, properties), new Group(taskClass.getAnnotation(TaskGroup.class).value())); + } + + default List> getTasks(User user, Map filters, WebQueryPagination pagination) throws IOException { + return getTasks(filters, pagination).stream().map( t -> ( Task ) t) + .filter(t -> user.equals(DatashareTask.getUser(t))).collect(Collectors.toList()); + } + + default Map stopAllTasks(User user) throws IOException { + return getTasks().stream() + .filter(t -> user.equals(DatashareTask.getUser(t))) + .filter(t -> t.getState() == Task.State.RUNNING || t.getState() == Task.State.QUEUED).collect( + toMap(t -> t.id, t -> { + try { + return stopTask(t.id); + } catch (IOException e) { + logger.error("cannot stop task {}", t.id, e); + return false; + } + })); + } +} diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/DeduplicateTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/DeduplicateTask.java index 481dfaaef..418db0d6d 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/DeduplicateTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/DeduplicateTask.java @@ -25,7 +25,7 @@ public class DeduplicateTask extends PipelineTask { @Inject public DeduplicateTask(final DocumentCollectionFactory factory, @Assisted Task taskView, @Assisted final Function updateCallback) { - super(Stage.DEDUPLICATE, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class); + super(Stage.DEDUPLICATE, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class); this.factory = factory; } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/EnqueueFromIndexTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/EnqueueFromIndexTask.java index e7ba3fb58..17a6e78a9 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/EnqueueFromIndexTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/EnqueueFromIndexTask.java @@ -3,7 +3,6 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -import java.util.Optional; import java.util.function.Function; import org.icij.datashare.Entity; import org.icij.datashare.PropertiesProvider; @@ -12,7 +11,6 @@ import org.icij.datashare.asynctasks.TaskGroup; import org.icij.datashare.extract.DocumentCollectionFactory; import org.icij.datashare.text.Document; -import org.icij.datashare.text.ProjectProxy; import org.icij.datashare.text.indexing.Indexer; import org.icij.datashare.text.indexing.SearchQuery; import org.icij.datashare.text.nlp.Pipeline; @@ -46,9 +44,8 @@ public class EnqueueFromIndexTask extends PipelineTask { private final int scrollSize; @Inject - public EnqueueFromIndexTask(final DocumentCollectionFactory factory, final Indexer indexer, - @Assisted Task taskView, @Assisted final Function updateCallback) { - super(Stage.ENQUEUEIDX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class); + public EnqueueFromIndexTask(final DocumentCollectionFactory factory, final Indexer indexer, @Assisted Task taskView, @Assisted final Function updateCallback) { + super(Stage.ENQUEUEIDX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), String.class); this.factory = factory; this.indexer = indexer; this.nlpPipeline = Pipeline.Type.parse((String) taskView.args.getOrDefault(NLP_PIPELINE_OPT, Pipeline.Type.CORENLP.name())); diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/ExtractNlpTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/ExtractNlpTask.java index a182a163e..56393408e 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/ExtractNlpTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/ExtractNlpTask.java @@ -47,7 +47,7 @@ public ExtractNlpTask(Indexer indexer, PipelineRegistry registry, final Document ExtractNlpTask(Indexer indexer, Pipeline pipeline, final DocumentCollectionFactory factory, @Assisted Task taskView, @Assisted final Function updateCallback) { - super(Stage.NLP, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class); + super(Stage.NLP, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), String.class); this.nlpPipeline = pipeline; project = Project.project(ofNullable((String)taskView.args.get(DEFAULT_PROJECT_OPT)).orElse(DEFAULT_DEFAULT_PROJECT)); maxContentLengthChars = (int) HumanReadableSize.parse(ofNullable((String)taskView.args.get(MAX_CONTENT_LENGTH_OPT)).orElse(valueOf(DEFAULT_MAX_CONTENT_LENGTH))); diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/IndexTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/IndexTask.java index d9d6c7980..1835e5197 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/IndexTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/IndexTask.java @@ -44,7 +44,7 @@ public class IndexTask extends PipelineTask implements Monitorable{ @Inject public IndexTask(final ElasticsearchSpewer spewer, final DocumentCollectionFactory factory, @Assisted Task taskView, @Assisted final Function updateCallback) throws IOException { - super(Stage.INDEX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class); + super(Stage.INDEX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class); parallelism = propertiesProvider.get(PARALLELISM_OPT).map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors()); Options allTaskOptions = options().createFrom(Options.from(taskView.args)); diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/ScanIndexTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/ScanIndexTask.java index 08e87dc75..3bb4a74fc 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/ScanIndexTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/ScanIndexTask.java @@ -52,9 +52,8 @@ public class ScanIndexTask extends PipelineTask { private final int scrollSlices; @Inject - public ScanIndexTask(DocumentCollectionFactory factory, final Indexer indexer, - @Assisted Task taskView, @Assisted Function updateCallback) { - super(Stage.SCANIDX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class); + public ScanIndexTask(DocumentCollectionFactory factory, final Indexer indexer, @Assisted Task taskView, @Assisted Function updateCallback) { + super(Stage.SCANIDX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class); this.scrollDuration = propertiesProvider.get(SCROLL_DURATION_OPT).orElse(DEFAULT_SCROLL_DURATION); this.scrollSize = parseInt(propertiesProvider.get(SCROLL_SIZE_OPT).orElse(valueOf(DEFAULT_SCROLL_SIZE))); this.scrollSlices = parseInt(propertiesProvider.get(SCROLL_SLICES_OPT).orElse(valueOf(DEFAULT_SCROLL_SLICES))); diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/ScanTask.java b/datashare-app/src/main/java/org/icij/datashare/tasks/ScanTask.java index ed76db18e..ee1b86007 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/ScanTask.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/ScanTask.java @@ -25,7 +25,7 @@ public class ScanTask extends PipelineTask { @Inject public ScanTask(DocumentCollectionFactory factory, @Assisted Task task, @Assisted Function updateCallback) { - super(Stage.SCAN, task.getUser(), factory, new PropertiesProvider(task.args), Path.class); + super(Stage.SCAN, DatashareTask.getUser(task), factory, new PropertiesProvider(task.args), Path.class); scanner = new Scanner(outputQueue).configure(options().createFrom(Options.from(task.args))); path = Paths.get((String)task.args.get(DatashareCliOptions.DATA_DIR_OPT)); } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerAmqp.java b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerAmqp.java index b85eed1ba..b2bc6f115 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerAmqp.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerAmqp.java @@ -6,11 +6,9 @@ import org.icij.datashare.PropertiesProvider; import org.icij.datashare.asynctasks.TaskManagerRedis; -import org.icij.datashare.asynctasks.Task; +import org.icij.datashare.asynctasks.TaskMetadata; import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor; -import org.icij.datashare.cli.DatashareCliOptions; import org.icij.datashare.mode.CommonMode; -import org.jetbrains.annotations.NotNull; import org.redisson.Redisson; import org.redisson.RedissonMap; import org.redisson.api.RedissonClient; @@ -18,7 +16,7 @@ import org.redisson.liveobject.core.RedissonObjectBuilder; @Singleton -public class TaskManagerAmqp extends org.icij.datashare.asynctasks.TaskManagerAmqp { +public class TaskManagerAmqp extends org.icij.datashare.asynctasks.TaskManagerAmqp implements DatashareTaskManager { @Inject public TaskManagerAmqp(AmqpInterlocutor amqp, RedissonClient redissonClient, PropertiesProvider propertiesProvider) @@ -26,14 +24,14 @@ public TaskManagerAmqp(AmqpInterlocutor amqp, RedissonClient redissonClient, Pro this(amqp, redissonClient, propertiesProvider, null); } - TaskManagerAmqp(AmqpInterlocutor amqp, RedissonClient redissonClient, PropertiesProvider propertiesProvider, Runnable eventCallback) throws IOException { + TaskManagerAmqp(AmqpInterlocutor amqp, RedissonClient redissonClient, PropertiesProvider propertiesProvider, Runnable eventCallback) throws IOException { // We start with a fresh list of known task everytime, we could decide to allow inheriting // existing tasks super(amqp, createTaskQueue(redissonClient), Utils.getRoutingStrategy(propertiesProvider), eventCallback); } - private static RedissonMap> createTaskQueue(RedissonClient redissonClient) { - return new RedissonMap<>(new TaskManagerRedis.TaskViewCodec(), + private static RedissonMap> createTaskQueue(RedissonClient redissonClient) { + return new RedissonMap<>(new TaskManagerRedis.RedisCodec<>(TaskMetadata.class), new CommandSyncService(((Redisson) redissonClient).getConnectionManager(), new RedissonObjectBuilder(redissonClient)), CommonMode.DS_TASK_MANAGER_QUEUE_NAME, @@ -42,4 +40,5 @@ private static RedissonMap> createTaskQueue(RedissonClient redis null ); } + } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerMemory.java b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerMemory.java index 9da07b7f2..620256302 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerMemory.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerMemory.java @@ -8,14 +8,14 @@ @Singleton -public class TaskManagerMemory extends org.icij.datashare.asynctasks.TaskManagerMemory { +public class TaskManagerMemory extends org.icij.datashare.asynctasks.TaskManagerMemory implements DatashareTaskManager { @Inject public TaskManagerMemory(DatashareTaskFactory taskFactory, PropertiesProvider propertiesProvider) { this(taskFactory, propertiesProvider, new CountDownLatch(1)); } - TaskManagerMemory(DatashareTaskFactory taskFactory, PropertiesProvider propertiesProvider, CountDownLatch latch) { + TaskManagerMemory(DatashareTaskFactory taskFactory, PropertiesProvider propertiesProvider, CountDownLatch latch) { super(taskFactory, propertiesProvider, latch); } } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerRedis.java b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerRedis.java index e687f7750..97286eac5 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerRedis.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskManagerRedis.java @@ -9,7 +9,7 @@ import org.redisson.api.RedissonClient; @Singleton -public class TaskManagerRedis extends org.icij.datashare.asynctasks.TaskManagerRedis { +public class TaskManagerRedis extends org.icij.datashare.asynctasks.TaskManagerRedis implements DatashareTaskManager { private final int pollingIntervalMs; // for tests // Convenience class made to ease injection and test @@ -22,11 +22,11 @@ public TaskManagerRedis(PropertiesProvider propertiesProvider) { this(propertiesProvider, CommonMode.DS_TASK_MANAGER_MAP_NAME, null, POLLING_INTERVAL); } - TaskManagerRedis(PropertiesProvider propertiesProvider, String taskMapName, Runnable eventCallback, int pollingIntervalMs) { + TaskManagerRedis(PropertiesProvider propertiesProvider, String taskMapName, Runnable eventCallback, int pollingIntervalMs) { this(new RedissonClientFactory().withOptions(Options.from(propertiesProvider.getProperties())).create(), taskMapName, Utils.getRoutingStrategy(propertiesProvider), eventCallback, pollingIntervalMs); } - TaskManagerRedis(RedissonClient redissonClient, String taskMapName, RoutingStrategy routingStrategy, Runnable eventCallback, int pollingIntervalMs) { + TaskManagerRedis(RedissonClient redissonClient, String taskMapName, RoutingStrategy routingStrategy, Runnable eventCallback, int pollingIntervalMs) { super(redissonClient, taskMapName, routingStrategy, eventCallback); this.pollingIntervalMs = pollingIntervalMs; } diff --git a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskSupplierRedis.java b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskSupplierRedis.java index 3ddddef41..ccaa69756 100644 --- a/datashare-app/src/main/java/org/icij/datashare/tasks/TaskSupplierRedis.java +++ b/datashare-app/src/main/java/org/icij/datashare/tasks/TaskSupplierRedis.java @@ -16,7 +16,7 @@ public TaskSupplierRedis(RedissonClient redissonClient, PropertiesProvider prope super(redissonClient, Utils.getRoutingKey(propertiesProvider)); } - TaskSupplierRedis(PropertiesProvider propertiesProvider) { + TaskSupplierRedis(PropertiesProvider propertiesProvider) { this(new RedissonClientFactory().withOptions(Options.from(propertiesProvider.getProperties())).create(), propertiesProvider); } } diff --git a/datashare-app/src/main/java/org/icij/datashare/web/BatchSearchResource.java b/datashare-app/src/main/java/org/icij/datashare/web/BatchSearchResource.java index 62a9ce4a1..e2c8e881a 100644 --- a/datashare-app/src/main/java/org/icij/datashare/web/BatchSearchResource.java +++ b/datashare-app/src/main/java/org/icij/datashare/web/BatchSearchResource.java @@ -26,9 +26,7 @@ import java.util.*; import java.util.stream.Collectors; -import static java.lang.Integer.parseInt; import static java.lang.String.format; -import static java.util.Arrays.stream; import static java.util.Optional.ofNullable; import static net.codestory.http.payload.Payload.*; import static org.icij.datashare.function.ThrowingFunctions.parseBoolean; diff --git a/datashare-app/src/main/java/org/icij/datashare/web/TaskResource.java b/datashare-app/src/main/java/org/icij/datashare/web/TaskResource.java index 41dad0498..ccf604d16 100644 --- a/datashare-app/src/main/java/org/icij/datashare/web/TaskResource.java +++ b/datashare-app/src/main/java/org/icij/datashare/web/TaskResource.java @@ -25,7 +25,6 @@ import net.codestory.http.payload.Payload; import org.icij.datashare.PropertiesProvider; import org.icij.datashare.asynctasks.Task; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.asynctasks.bus.amqp.UriResult; import org.icij.datashare.batch.BatchDownload; import org.icij.datashare.batch.BatchSearch; @@ -36,7 +35,9 @@ import org.icij.datashare.json.JsonObjectMapper; import org.icij.datashare.tasks.BatchDownloadRunner; import org.icij.datashare.tasks.BatchSearchRunner; +import org.icij.datashare.tasks.DatashareTask; import org.icij.datashare.tasks.DatashareTaskFactory; +import org.icij.datashare.tasks.DatashareTaskManager; import org.icij.datashare.tasks.EnqueueFromIndexTask; import org.icij.datashare.tasks.ExtractNlpTask; import org.icij.datashare.tasks.IndexTask; @@ -94,13 +95,13 @@ @Prefix("/api/task") public class TaskResource { private final DatashareTaskFactory taskFactory; - private final TaskManager taskManager; + private final DatashareTaskManager taskManager; private final PropertiesProvider propertiesProvider; private final BatchSearchRepository batchSearchRepository; private final int MAX_BATCH_SIZE = 60000; @Inject - public TaskResource(final DatashareTaskFactory taskFactory, final TaskManager taskManager, + public TaskResource(final DatashareTaskFactory taskFactory, final DatashareTaskManager taskManager, final PropertiesProvider propertiesProvider, final BatchSearchRepository batchSearchRepository) { this.taskFactory = taskFactory; this.taskManager = taskManager; @@ -125,7 +126,7 @@ public TaskResource(final DatashareTaskFactory taskFactory, final TaskManager ta @Parameter(name = "name", description = "as an example: pattern contained in the task name", in = ParameterIn.QUERY)}) @ApiResponse(responseCode = "200", description = "returns the list of tasks", useReturnTypeSchema = true) @Get("/all") - public List> tasks(Context context) throws IOException { + public List< Task> tasks(Context context) throws IOException { Set paginationFields = WebQueryPagination.fields(); Map paginationMap = context.query() .keys() @@ -169,7 +170,7 @@ public Payload createTask(@Parameter(name = "id", description = "task id", r @ApiResponse(responseCode = "404", description = "returns 404 if the task doesn't exist") @Get("/:id/result") public Payload getTaskResult(@Parameter(name = "id", description = "task id", in = ParameterIn.PATH) String id, Context context) throws IOException { - Task task = forbiddenIfNotSameUser(context, notFoundIfNull(taskManager.getTask(id))); + Task task = forbiddenIfNotSameUser(context, ( Task) notFoundIfNull(taskManager.getTask(id))); Object result = task.getResult(); if (result instanceof UriResult uriResult) { Path filePath = Path.of(uriResult.uri().getPath()); @@ -367,13 +368,13 @@ public Payload indexFile(@Parameter(name = "filePath", description = "path of th taskIds.add(scanResponse.taskId); Properties properties = applyProjectProperties(optionsWrapper); User user = (User) context.currentUser(); - Task scanIndex; + Task scanIndex; // Use a report map only if the request's body contains a "filter" attribute if (properties.get("filter") != null && Boolean.parseBoolean(properties.getProperty("filter"))) { // TODO remove taskFactory.createScanIndexTask would allow to get rid of taskfactory dependency in taskresource // problem for now is that if we call taskManager.startTask(ScanIndexTask.class.getName(), user, propertiesToMap(properties)) // the task will be run as a background task that will have race conditions with indexTask report loading - scanIndex = new Task<>(ScanIndexTask.class.getName(), user, propertiesToMap(properties)); + scanIndex = DatashareTask.task(ScanIndexTask.class.getName(), user, propertiesToMap(properties)); taskFactory.createScanIndexTask(scanIndex, (p) -> null).call(); taskIds.add(scanIndex.id); } else { @@ -408,7 +409,7 @@ public List> cleanDoneTasks() throws IOException { @ApiResponse(responseCode = "403", description = "returns 403 if the task is still in RUNNING state") @Delete("/clean/:taskName:") public Payload cleanTask(@Parameter(name = "taskName", description = "name of the task to delete", in = ParameterIn.PATH) final String taskId, Context context) throws IOException { - Task task = forbiddenIfNotSameUser(context, notFoundIfNull(taskManager.getTask(taskId))); + Task task = forbiddenIfNotSameUser(context, notFoundIfNull(( Task) taskManager.getTask(taskId))); if (task.getState() == Task.State.RUNNING) { return forbidden(); } else { @@ -499,8 +500,8 @@ public static String getReportMapNameFor(Properties properties) { return "extract:report:" + projectName; } - private static Task forbiddenIfNotSameUser(Context context, Task task) { - if (!task.getUser().equals(context.currentUser())) throw new ForbiddenException(); + private static Task forbiddenIfNotSameUser(Context context, Task task) { + if (!DatashareTask.getUser(task).equals(context.currentUser())) throw new ForbiddenException(); return task; } diff --git a/datashare-app/src/test/java/org/icij/datashare/mode/CliModeWorkerAcceptanceTest.java b/datashare-app/src/test/java/org/icij/datashare/mode/CliModeWorkerAcceptanceTest.java index 6b613ec3f..900e1ccb0 100644 --- a/datashare-app/src/test/java/org/icij/datashare/mode/CliModeWorkerAcceptanceTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/mode/CliModeWorkerAcceptanceTest.java @@ -1,10 +1,10 @@ package org.icij.datashare.mode; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.asynctasks.TaskSupplier; import org.icij.datashare.asynctasks.TaskWorkerLoop; import org.icij.datashare.cli.QueueType; import org.icij.datashare.tasks.DatashareTaskFactory; +import org.icij.datashare.tasks.DatashareTaskManager; import org.icij.datashare.text.indexing.Indexer; import org.junit.Test; import org.junit.runner.RunWith; @@ -56,8 +56,8 @@ public void test_task_worker() throws Exception { workerApp.start(); workerStarted.await(); - mode.get(TaskManager.class).shutdown(); // to send shutdown - mode.get(TaskManager.class).awaitTermination(1, TimeUnit.SECONDS); + mode.get(DatashareTaskManager.class).shutdown(); // to send shutdown + mode.get(DatashareTaskManager.class).awaitTermination(1, TimeUnit.SECONDS); workerApp.join(); mode.get(Indexer.class).close(); } diff --git a/datashare-app/src/test/java/org/icij/datashare/mode/CommonModeTest.java b/datashare-app/src/test/java/org/icij/datashare/mode/CommonModeTest.java index 6bec668e5..ea6cf9a10 100644 --- a/datashare-app/src/test/java/org/icij/datashare/mode/CommonModeTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/mode/CommonModeTest.java @@ -3,11 +3,11 @@ import com.google.inject.Guice; import com.google.inject.Injector; import org.icij.datashare.PropertiesProvider; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.asynctasks.TaskModifier; import org.icij.datashare.asynctasks.TaskSupplier; import org.icij.datashare.cli.Mode; import org.icij.datashare.cli.QueueType; +import org.icij.datashare.tasks.DatashareTaskManager; import org.junit.Test; import java.util.HashMap; @@ -38,8 +38,8 @@ public void test_tasks_instances() { put("mode", Mode.LOCAL.name()); put("queueType", QueueType.MEMORY.name()); }})); - assertThat(injector.getInstance(TaskManager.class)).isSameAs(injector.getInstance(TaskModifier.class)); - assertThat(injector.getInstance(TaskManager.class)).isSameAs(injector.getInstance(TaskSupplier.class)); + assertThat(injector.getInstance(DatashareTaskManager.class)).isSameAs(injector.getInstance(TaskModifier.class)); + assertThat(injector.getInstance(DatashareTaskManager.class)).isSameAs(injector.getInstance(TaskSupplier.class)); } @Test public void test_check_queue_type_with_default_value() { diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java index 53d82921a..674d48d1b 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/ArtifactTaskTest.java @@ -29,7 +29,7 @@ public class ArtifactTaskTest { @Test(expected = IllegalArgumentException.class) public void test_missing_artifact_dir() { - new ArtifactTask(factory, mockEs, new PropertiesProvider(Map.of()), new Task<>(ArtifactTask.class.getName(), User.local(), Map.of()), null); + new ArtifactTask(factory, mockEs, new PropertiesProvider(Map.of()), DatashareTask.task(ArtifactTask.class.getName(), User.local(), Map.of()), null); } @Test(timeout = 5000) @@ -46,7 +46,7 @@ public void test_create_artifact_cache_one_file() throws Exception { "defaultProject", "prj", "pollingInterval", "1" )), - new Task<>(ArtifactTask.class.getName(), User.local(), new HashMap<>()), null) + DatashareTask.task(ArtifactTask.class.getName(), User.local(), new HashMap<>()), null) .call(); assertThat(numberOfDocuments).isEqualTo(1); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerEncryptedIntTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerEncryptedIntTest.java index 06298a1ef..1ee0366e9 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerEncryptedIntTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerEncryptedIntTest.java @@ -40,8 +40,8 @@ public void test_zip_with_password_should_encrypt_file_and_send_mail() throws Ex new IndexerHelper(es.client).indexFile("mydoc.txt", "content", fs); BatchDownload batchDownload = createBatchDownload("*"); MailSender mailSender = mock(MailSender.class); - Task taskView = - new Task<>(BatchDownloadRunner.class.getName(), batchDownload.user, + Task taskView = + DatashareTask.task(BatchDownloadRunner.class.getName(), batchDownload.user, new HashMap<>() {{ put("batchDownload", batchDownload); }}); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerTest.java index 2fbceb8e3..be04e5617 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchDownloadRunnerTest.java @@ -43,7 +43,7 @@ public class BatchDownloadRunnerTest { @Test(expected = AssertionError.class) public void test_task_with_no_batch_download() { - new BatchDownloadRunner(indexer, new PropertiesProvider(), new Task("name", User.local(), new HashMap<>()), null); + new BatchDownloadRunner(indexer, new PropertiesProvider(), DatashareTask.task("name", User.local(), new HashMap<>()), null); } @Test @@ -51,7 +51,7 @@ public void test_max_default_results() throws Exception { Document[] documents = IntStream.range(0, 3).mapToObj(i -> createDoc("doc" + i).with(createFile(i)).build()).toArray(Document[]::new); mockSearch.willReturn(2, documents); BatchDownload batchDownload = new BatchDownload(singletonList(project("test-datashare")), User.local(), "query"); - Task taskView = getTaskView(batchDownload); + Task taskView = getTaskView(batchDownload); UriResult result = new BatchDownloadRunner(indexer, new PropertiesProvider(new HashMap<>() {{ put(BATCH_DOWNLOAD_MAX_NB_FILES_OPT, "3"); put(SCROLL_SIZE_OPT, "3"); @@ -64,7 +64,7 @@ public void test_max_default_results() throws Exception { public void test_max_zip_size() throws Exception { Document[] documents = IntStream.range(0, 3).mapToObj(i -> createDoc("doc" + i).with(createFile(i)).with("hello world " + i).build()).toArray(Document[]::new); mockSearch.willReturn(2, documents); - Task taskView = getTaskView(new BatchDownload(singletonList(project("test-datashare")), User.local(), "query")); + Task taskView = getTaskView(new BatchDownload(singletonList(project("test-datashare")), User.local(), "query")); UriResult result = new BatchDownloadRunner(indexer, new PropertiesProvider(new HashMap<>() {{ put(BATCH_DOWNLOAD_MAX_SIZE_OPT, valueOf("hello world 1".getBytes(StandardCharsets.UTF_8).length * 3 - 1)); // to avoid adding the 4th doc put(SCROLL_SIZE_OPT, "3"); @@ -76,7 +76,7 @@ public void test_max_zip_size() throws Exception { @Test(expected = ElasticsearchException.class) public void test_elasticsearch_status_exception__should_be_sent() throws Exception { mockSearch.willThrow(new ElasticsearchException("error", RestStatus.BAD_REQUEST, new RuntimeException())); - Task taskView = getTaskView(new BatchDownload(singletonList(project("test-datashare")), User.local(), "query")); + Task taskView = getTaskView(new BatchDownload(singletonList(project("test-datashare")), User.local(), "query")); new BatchDownloadRunner(indexer, new PropertiesProvider(), taskView, taskView.progress(updater::progress)).call(); } @@ -92,8 +92,8 @@ private Path createFile(int index) { } @NotNull - private static Task getTaskView(BatchDownload batchDownload) { - return new Task<>(BatchDownloadRunner.class.toString(), batchDownload.user, new HashMap<>() {{ + private static Task getTaskView(BatchDownload batchDownload) { + return DatashareTask.task(BatchDownloadRunner.class.toString(), batchDownload.user, new HashMap<>() {{ put("batchDownload", batchDownload); }}); } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerIntTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerIntTest.java index 567e161ec..2c954eaf2 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerIntTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerIntTest.java @@ -65,7 +65,7 @@ public void test_search_with_file_types_ok() throws Exception { } private Task taskView(BatchSearch search) { - return new Task<>(search.uuid, BatchSearchRunner.class.getName(), User.local(), new Group("TestGroup")); + return DatashareTask.task(search.uuid, BatchSearchRunner.class.getName(), User.local()); } @Test diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerTest.java index 3c1628638..8614a0676 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/BatchSearchRunnerTest.java @@ -64,8 +64,8 @@ public void test_run_batch_search() throws Exception { verify(progressCb).apply( 1.0); } - private Task taskView(BatchSearch search) { - return new Task<>(search.uuid, BatchSearchRunner.class.getName(), local(), new Group("TestGroup")); + private Task taskView(BatchSearch search) { + return DatashareTask.task(search.uuid, BatchSearchRunner.class.getName(), local()); } @Test(expected = RuntimeException.class) diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/DeduplicateTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/DeduplicateTaskTest.java index ba73cb652..f5080c312 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/DeduplicateTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/DeduplicateTaskTest.java @@ -17,12 +17,12 @@ public class DeduplicateTaskTest { DocumentCollectionFactory docCollectionFactory = new MemoryDocumentCollectionFactory<>(); Map defaultOpts = Map.of("queueName", "test:queue", "stages", "DEDUPLICATE"); - DeduplicateTask task = new DeduplicateTask(docCollectionFactory, new Task<>(DeduplicateTask.class.getName(), User.local(), defaultOpts), null); + DeduplicateTask task = new DeduplicateTask(docCollectionFactory, DatashareTask.task(DeduplicateTask.class.getName(), User.local(), defaultOpts), null); @Test(timeout = 2000) public void test_filter_empty() throws Exception { docCollectionFactory.createQueue("test:queue:deduplicate", Path.class).add(PATH_POISON); - assertThat(new DeduplicateTask(docCollectionFactory, new Task<>(DeduplicateTask.class.getName(), User.local(), defaultOpts), null).call()).isEqualTo(0); + assertThat(new DeduplicateTask(docCollectionFactory, DatashareTask.task(DeduplicateTask.class.getName(), User.local(), defaultOpts), null).call()).isEqualTo(0); } @Test(timeout = 2000) @@ -31,7 +31,7 @@ public void test_filter_queue_removes_duplicates() throws Exception { docCollectionFactory.createQueue("test:queue:deduplicate", Path.class).put(get("/path/to/doc")); docCollectionFactory.createQueue("test:queue:deduplicate", Path.class).add(PATH_POISON); - assertThat(new DeduplicateTask(docCollectionFactory, new Task<>(DeduplicateTask.class.getName(), User.local(), defaultOpts), null).call()).isEqualTo(1); + assertThat(new DeduplicateTask(docCollectionFactory, DatashareTask.task(DeduplicateTask.class.getName(), User.local(), defaultOpts), null).call()).isEqualTo(1); assertThat(docCollectionFactory.createQueue("test:queue:index", Path.class).size()).isEqualTo(2); // with POISON } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/EnqueueFromIndexTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/EnqueueFromIndexTaskTest.java index f0c6a9609..13715f638 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/EnqueueFromIndexTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/EnqueueFromIndexTaskTest.java @@ -39,7 +39,7 @@ public void test_size_of_search() throws Exception { "queueName", "test:queue", NLP_PIPELINE_OPT, Pipeline.Type.OPENNLP.name()); MemoryDocumentCollectionFactory factory = new MemoryDocumentCollectionFactory<>(); - EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, new Task<>(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null); + EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, DatashareTask.task(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null); enqueueFromIndex.call(); assertThat(factory.queues.get("test:queue:nlp")).hasSize(20); } @@ -61,7 +61,7 @@ public void test_with_query_body() throws Exception { """); MemoryDocumentCollectionFactory factory = new MemoryDocumentCollectionFactory<>(); - EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, new Task<>(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null); + EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, DatashareTask.task(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null); enqueueFromIndex.call(); assertThat(factory.queues.get("test:queue:nlp")).hasSize(1); } @@ -77,7 +77,7 @@ public void test_with_query_string() throws Exception { "searchQuery", "extractionLevel:0"); MemoryDocumentCollectionFactory factory = new MemoryDocumentCollectionFactory<>(); - EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, new Task<>(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null); + EnqueueFromIndexTask enqueueFromIndex = new EnqueueFromIndexTask(factory, indexer, DatashareTask.task(EnqueueFromIndexTask.class.getName(), new User("test"), properties), null); enqueueFromIndex.call(); assertThat(factory.queues.get("test:queue:nlp")).hasSize(1); } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskIntTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskIntTest.java index 9377d517e..38bcb27b1 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskIntTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskIntTest.java @@ -92,7 +92,7 @@ protected void configure() { @Before public void setUp() { initMocks(this); - nlpTask = new ExtractNlpTask(indexer, pipeline, factory, new Task<>(ExtractNlpTask.class.getName(), User.local(), new HashMap<>(){{ + nlpTask = new ExtractNlpTask(indexer, pipeline, factory, DatashareTask.task(ExtractNlpTask.class.getName(), User.local(), new HashMap<>(){{ put("maxContentLength", "32"); }}), null); } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskTest.java index e34ce6c0d..9060d5115 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/ExtractNlpTaskTest.java @@ -35,7 +35,7 @@ public class ExtractNlpTaskTest { @Before public void setUp() { initMocks(this); - nlpTask = new ExtractNlpTask(indexer, pipeline, factory, new Task<>(ExtractNlpTask.class.getName(), User.local(), new HashMap<>(){{ + nlpTask = new ExtractNlpTask(indexer, pipeline, factory, DatashareTask.task(ExtractNlpTask.class.getName(), User.local(), new HashMap<>(){{ put("maxContentLength", "32"); }}), null); } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskIntTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskIntTest.java index c2c27acc8..c2bdffa31 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskIntTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskIntTest.java @@ -42,7 +42,7 @@ public void index_task_should_enqueue_poison_pill() throws Exception { DocumentQueue queue = inputQueueFactory.createQueue(new PipelineHelper(propertiesProvider).getQueueNameFor(Stage.INDEX), Path.class); queue.add(Paths.get(ClassLoader.getSystemResource("docs/doc.txt").getPath())); - Long nbDocs = new IndexTask(spewer, inputQueueFactory, new Task<>(IndexTask.class.getName(), User.local(), map), null).call(); + Long nbDocs = new IndexTask(spewer, inputQueueFactory, DatashareTask.task(IndexTask.class.getName(), User.local(), map), null).call(); assertThat(nbDocs).isEqualTo(1); DocumentQueue outputQueue = outputQueueFactory.createQueue(new PipelineHelper(propertiesProvider).getOutputQueueNameFor(Stage.INDEX), String.class); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskTest.java index ccd4ceb7d..4653e4b4e 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/IndexTaskTest.java @@ -24,7 +24,7 @@ public class IndexTaskTest { public void test_options_include_ocr() throws Exception { ElasticsearchSpewer spewer = mock(ElasticsearchSpewer.class); Mockito.when(spewer.configure(Mockito.any())).thenReturn(spewer); - IndexTask indexTask = new IndexTask(spewer, mock(DocumentCollectionFactory.class), new Task<>(IndexTask.class.getName(), nullUser(), new HashMap<>(){{ + IndexTask indexTask = new IndexTask(spewer, mock(DocumentCollectionFactory.class), DatashareTask.task(IndexTask.class.getName(), nullUser(), new HashMap<>(){{ put("queueName", "test:queue"); }}), null); Options options = indexTask.options(); @@ -35,7 +35,7 @@ public void test_options_include_ocr() throws Exception { public void test_options_include_ocr_language() throws Exception { ElasticsearchSpewer spewer = mock(ElasticsearchSpewer.class); Mockito.when(spewer.configure(Mockito.any())).thenReturn(spewer); - IndexTask indexTask = new IndexTask(spewer, mock(DocumentCollectionFactory.class), new Task<>(IndexTask.class.getName(), nullUser(), new HashMap<>(){{ + IndexTask indexTask = new IndexTask(spewer, mock(DocumentCollectionFactory.class), DatashareTask.task(IndexTask.class.getName(), nullUser(), new HashMap<>(){{ put("queueName", "test:queue"); }}), null); Options options = indexTask.options(); @@ -46,7 +46,7 @@ public void test_options_include_ocr_language() throws Exception { public void test_options_include_language() throws Exception { ElasticsearchSpewer spewer = mock(ElasticsearchSpewer.class); Mockito.when(spewer.configure(Mockito.any())).thenReturn(spewer); - IndexTask indexTask = new IndexTask(spewer, mock(DocumentCollectionFactory.class), new Task<>(IndexTask.class.getName(), nullUser(), new HashMap<>(){{ + IndexTask indexTask = new IndexTask(spewer, mock(DocumentCollectionFactory.class), DatashareTask.task(IndexTask.class.getName(), nullUser(), new HashMap<>(){{ put("language", "FRENCH"); put("queueName", "test:queue"); }}), null); @@ -60,7 +60,7 @@ public void test_configure_called_on_spewer() throws Exception { ElasticsearchSpewer spewer = mock(ElasticsearchSpewer.class); Mockito.when(spewer.configure(Mockito.any())).thenReturn(spewer); - new IndexTask(spewer, mock(DocumentCollectionFactory.class), new Task<>(IndexTask.class.getName(), nullUser(), Map.of("charset", "UTF-16")), null); + new IndexTask(spewer, mock(DocumentCollectionFactory.class), DatashareTask.task(IndexTask.class.getName(), nullUser(), Map.of("charset", "UTF-16")), null); ArgumentCaptor captor = ArgumentCaptor.forClass(Options.class); verify(spewer).configure(captor.capture()); @@ -73,7 +73,7 @@ public void test_configure_project_on_spewer() throws Exception { ElasticsearchSpewer spewer = mock(ElasticsearchSpewer.class); Mockito.when(spewer.configure(Mockito.any())).thenReturn(spewer); - new IndexTask(spewer, mock(DocumentCollectionFactory.class), new Task<>(IndexTask.class.getName(), nullUser(), Map.of("defaultProject", "foo", "projectName", "bar")), null); + new IndexTask(spewer, mock(DocumentCollectionFactory.class), DatashareTask.task(IndexTask.class.getName(), nullUser(), Map.of("defaultProject", "foo", "projectName", "bar")), null); ArgumentCaptor captor = ArgumentCaptor.forClass(Options.class); verify(spewer).configure(captor.capture()); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/ScanIndexTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/ScanIndexTaskTest.java index c31a0ed57..5ff5c53e3 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/ScanIndexTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/ScanIndexTaskTest.java @@ -38,7 +38,7 @@ public class ScanIndexTaskTest { @Test public void test_empty_index() throws Exception { - assertThat(new ScanIndexTask(documentCollectionFactory, indexer, new Task<>( + assertThat(new ScanIndexTask(documentCollectionFactory, indexer, DatashareTask.task( ScanIndexTask.class.getName(), User.nullUser(), propertiesToMap(propertiesProvider.getProperties())), null).call()).isEqualTo(0); } @@ -47,7 +47,7 @@ public void test_transfer_indexed_paths_to_filter_set() throws Exception { indexer.add(TEST_INDEX, DocumentBuilder.createDoc("id1").build()); indexer.add(TEST_INDEX, DocumentBuilder.createDoc("id2").build()); - assertThat(new ScanIndexTask(documentCollectionFactory, indexer, new Task<>( + assertThat(new ScanIndexTask(documentCollectionFactory, indexer, DatashareTask.task( ScanIndexTask.class.getName(), User.nullUser(), propertiesToMap(propertiesProvider.getProperties())), null).call()).isEqualTo(2); ReportMap actualReportMap = documentCollectionFactory.createMap("test:report"); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/ScanTaskTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/ScanTaskTest.java index 605701650..f511bc327 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/ScanTaskTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/ScanTaskTest.java @@ -1,7 +1,6 @@ package org.icij.datashare.tasks; import junit.framework.TestCase; -import org.icij.datashare.asynctasks.Task; import org.icij.datashare.extract.MemoryDocumentCollectionFactory; import org.icij.datashare.user.User; import org.icij.extract.queue.DocumentQueue; @@ -18,14 +17,14 @@ public class ScanTaskTest extends TestCase { private final MemoryDocumentCollectionFactory documentCollectionFactory = new MemoryDocumentCollectionFactory<>(); public void test_scan() throws Exception { - assertThat(new ScanTask(documentCollectionFactory, new Task<>("org.icij.datashare.tasks.ScanTask", User.local(), + assertThat(new ScanTask(documentCollectionFactory, DatashareTask.task("org.icij.datashare.tasks.ScanTask", User.local(), Map.of(DATA_DIR_OPT, Paths.get(ClassLoader.getSystemResource("docs").getPath()).toString())), null).call()).isEqualTo(3); DocumentQueue queue = documentCollectionFactory.createQueue("extract:queue:index", Path.class); assertThat(queue.size()).isEqualTo(4); // with POISON } public void test_scan_with_queue_name() throws Exception { - assertThat(new ScanTask(documentCollectionFactory, new Task<>("org.icij.datashare.tasks.ScanTask", User.local(), + assertThat(new ScanTask(documentCollectionFactory, DatashareTask.task("org.icij.datashare.tasks.ScanTask", User.local(), Map.of(DATA_DIR_OPT, Paths.get(ClassLoader.getSystemResource("docs").getPath()).toString(), QUEUE_NAME_OPT, "foo")), null).call()).isEqualTo(3); DocumentQueue queue = documentCollectionFactory.createQueue("foo:index", Path.class); diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskManagerMemoryForBatchSearchTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskManagerMemoryForBatchSearchTest.java index d88c69346..50b0d1b6d 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskManagerMemoryForBatchSearchTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskManagerMemoryForBatchSearchTest.java @@ -92,7 +92,7 @@ public void test_run_batch_search_failure() throws Exception { when(factory.createBatchSearchRunner(any(), any())).thenReturn(batchSearchRunner); mockSearch.willThrow(new IOException("io exception")); - taskManager.startTask(new Task<>(testBatchSearch.uuid, BatchSearchRunner.class.getName(), local(), new Group("TestGroup"), Map.of("batchRecord", new BatchSearchRecord(testBatchSearch)))); + taskManager.startTask(DatashareTask.task(testBatchSearch.uuid, BatchSearchRunner.class.getName(), local(), Map.of("batchRecord", new BatchSearchRecord(testBatchSearch))), new Group("TestGroup")); taskManager.awaitTermination(1, TimeUnit.SECONDS); verify(repository).setState(testBatchSearch.uuid, BatchSearch.State.RUNNING); @@ -141,7 +141,7 @@ public void setUp() throws IOException { taskManager = new TaskManagerMemory(factory, new PropertiesProvider(), startLoop); mockSearch = new MockSearch<>(indexer, Indexer.QueryBuilderSearcher.class); - Task taskView = new Task<>(testBatchSearch.uuid, BatchSearchRunner.class.getName(), local(), new Group("TestGroup")); + Task taskView = DatashareTask.task(testBatchSearch.uuid, BatchSearchRunner.class.getName(), local()); batchSearchRunner = new BatchSearchRunner(indexer, new PropertiesProvider(), repository, taskView, taskView.progress(taskManager::progress)); when(repository.get(eq(local()), anyString())).thenReturn(testBatchSearch); when(factory.createBatchSearchRunner(any(), any())).thenReturn(batchSearchRunner); @@ -157,7 +157,7 @@ private class SleepingBatchSearchRunner extends BatchSearchRunner { private final CountDownLatch countDownLatch; public SleepingBatchSearchRunner(int sleepingMilliseconds, CountDownLatch countDownLatch, BatchSearch bs) { - super(mock(Indexer.class), new PropertiesProvider(), repository, new Task<>(bs.uuid, BatchSearchRunner.class.getName(), local(), new Group("TestGroup")), (b) -> null); + super(mock(Indexer.class), new PropertiesProvider(), repository, DatashareTask.task(bs.uuid, BatchSearchRunner.class.getName(), local()), (b) -> null); this.sleepingMilliseconds = sleepingMilliseconds; this.countDownLatch = countDownLatch; } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerIntTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerIntTest.java index 06aa15478..ba98e5a88 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerIntTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerIntTest.java @@ -59,7 +59,7 @@ public void test_one_result() throws Exception { File file = new IndexerHelper(es.client).indexFile("mydoc.txt", content, fs); BatchDownload bd = createBatchDownload("fox"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(bd.filename.toFile()).isFile(); @@ -75,7 +75,7 @@ public void test_one_result_with_json_query() throws Exception { new IndexerHelper(es.client).indexFile("mydoc.txt", content, fs); BatchDownload bd = createBatchDownload("{\"match_all\":{}}"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(bd.filename.toFile()).isFile(); @@ -88,7 +88,7 @@ public void test_two_results() throws Exception { new IndexerHelper(es.client).indexFile("doc2.txt", "Portez ce vieux whisky au juge blond qui fume", fs); BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(bd.filename.toFile()).isFile(); @@ -102,7 +102,7 @@ public void test_update_batch_download_zip_size() throws Exception { new IndexerHelper(es.client).indexFile("doc2.txt", "Portez ce vieux whisky au juge blond qui fume", fs); BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); UriResult result = new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(result.size()).isGreaterThan(0); @@ -114,7 +114,7 @@ public void test_two_files_one_result() throws Exception { new IndexerHelper(es.client).indexFile("doc2.txt", "Portez ce vieux whisky au juge blond qui fume", fs); BatchDownload bd = createBatchDownload("juge"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(bd.filename.toFile()).isFile(); @@ -127,7 +127,7 @@ public void test_two_results_two_dirs() throws Exception { File doc2 = new IndexerHelper(es.client).indexFile("dir2/doc2.txt", "Portez ce vieux whisky au juge blond qui fume", fs); BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(bd.filename.toFile()).isFile(); @@ -142,7 +142,7 @@ public void test_two_results_two_indexes() throws Exception { File doc2 = new IndexerHelper(es.client).indexFile("dir2/doc2.txt", "Portez ce vieux whisky au juge blond qui fume", fs, TEST_INDEXES[2]); BatchDownload bd = createBatchDownload(asList(project(TEST_INDEXES[1]), project(TEST_INDEXES[2])),"*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(bd.filename.toFile()).isFile(); @@ -154,7 +154,7 @@ public void test_two_results_two_indexes() throws Exception { @Test public void test_progress_rate() throws Exception { new IndexerHelper(es.client).indexFile("mydoc.txt", "content", fs); - Task taskView = createTaskView(createBatchDownload("*")); + Task taskView = createTaskView(createBatchDownload("*")); BatchDownloadRunner batchDownloadRunner = new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)); assertThat(batchDownloadRunner.getProgressRate()).isEqualTo(0); batchDownloadRunner.call(); @@ -166,7 +166,7 @@ public void test_embedded_doc_should_not_interrupt_zip_creation() throws Excepti File file = new IndexerHelper(es.client).indexEmbeddedFile(TEST_INDEX, "/docs/embedded_doc.eml"); BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(new ZipFile(bd.filename.toFile()).size()).isEqualTo(2); @@ -178,7 +178,7 @@ public void test_embedded_doc_with_not_found_embedded_should_not_interrupt_zip_c new IndexerHelper(es.client).indexEmbeddedFile("bad_project_name", "/docs/embedded_doc.eml"); BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)).call(); assertThat(new ZipFile(bd.filename.toFile()).size()).isEqualTo(1); @@ -187,7 +187,7 @@ public void test_embedded_doc_with_not_found_embedded_should_not_interrupt_zip_c @Test public void test_to_string_contains_batch_download_uuid() { BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); BatchDownloadRunner batchDownloadRunner = new BatchDownloadRunner(indexer, createProvider(), taskView, taskView.progress(taskModifier::progress)); assertThat(batchDownloadRunner.toString()).startsWith("BatchDownloadRunner@"); @@ -199,7 +199,7 @@ public void test_cancel_current_batch_download() throws Exception { new IndexerHelper(es.client).indexFile("mydoc.txt", "content", fs); ExecutorService executor = Executors.newFixedThreadPool(1); CountDownLatch countDownLatch = new CountDownLatch(1); - Task taskView = createTaskView(createBatchDownload("*")); + Task taskView = createTaskView(createBatchDownload("*")); BatchDownloadRunner batchDownloadRunner = new BatchDownloadRunner(indexer, createProvider(), taskView.progress(taskModifier::progress), taskView, null, countDownLatch); Future result = executor.submit(batchDownloadRunner); @@ -214,7 +214,7 @@ public void test_cancel_current_batch_download() throws Exception { @Test(expected = ElasticSearchAdapterException.class) public void test_use_batch_download_scroll_size_value_over_scroll_size_value() throws Exception { BatchDownload bd = createBatchDownload("*"); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); PropertiesProvider propertiesProvider = new PropertiesProvider(new HashMap<>() {{ put("downloadFolder", fs.getRoot().toString()); put(SCROLL_SIZE_OPT, "100"); @@ -230,7 +230,7 @@ public void test_use_scroll_size_value() throws Exception { put("downloadFolder", fs.getRoot().toString()); put(SCROLL_SIZE_OPT, "0"); }}); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, propertiesProvider, taskView, taskView.progress(taskModifier::progress)).call(); } @@ -240,7 +240,7 @@ public void test_elasticsearch_exception() throws Exception { PropertiesProvider propertiesProvider = new PropertiesProvider( Map.of("downloadFolder", "/unused", BATCH_DOWNLOAD_SCROLL_DURATION_OPT, "10foo")); - Task taskView = createTaskView(bd); + Task taskView = createTaskView(bd); new BatchDownloadRunner(indexer, propertiesProvider, taskView, taskView.progress(taskModifier::progress)).call(); } @@ -249,7 +249,7 @@ private BatchDownload createBatchDownload(String query) { return new BatchDownload(asList(project(TEST_INDEX)), local(), query, null, fs.getRoot().toPath(), false); } private Task createTaskView(BatchDownload bd) { - return new Task<>(BatchDownloadRunner.class.getName(), bd.user, new HashMap<>() {{ + return new DatashareTask().task(BatchDownloadRunner.class.getName(), bd.user, new HashMap<>() {{ put("batchDownload", bd); }}); } diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopForPipelineTasksTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopForPipelineTasksTest.java index 34cad9187..cf1d5f878 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopForPipelineTasksTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopForPipelineTasksTest.java @@ -14,8 +14,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import static org.fest.assertions.Assertions.assertThat; @@ -29,12 +27,11 @@ public class TaskWorkerLoopForPipelineTasksTest { DatashareTaskFactory taskFactory; @Mock Function updateCallback; @Mock ElasticsearchSpewer spewer; - private final BlockingQueue> taskQueue = new LinkedBlockingQueue<>(); - private final TaskManagerMemory taskSupplier = new TaskManagerMemory(taskFactory, new PropertiesProvider()); + private final TaskManagerMemory taskManager = new TaskManagerMemory(taskFactory, new PropertiesProvider()); @Test public void test_scan_task() throws Exception { - Task task = new Task<>(ScanTask.class.getName(), User.local(), Map.of("dataDir", "/path/to/files")); + Task task = DatashareTask.task(ScanTask.class.getName(), User.local(), Map.of("dataDir", "/path/to/files")); ScanTask taskRunner = new ScanTask(mock(DocumentCollectionFactory.class), task, updateCallback); when(taskFactory.createScanTask(any(), any())).thenReturn(taskRunner); @@ -43,7 +40,7 @@ public void test_scan_task() throws Exception { @Test public void test_index_task() throws Exception { - Task task = new Task<>(IndexTask.class.getName(), User.local(), new HashMap<>()); + Task task = DatashareTask.task(IndexTask.class.getName(), User.local(), new HashMap<>()); IndexTask taskRunner = new IndexTask(spewer, mock(DocumentCollectionFactory.class), task, updateCallback); when(taskFactory.createIndexTask(any(), any())).thenReturn(taskRunner); @@ -52,7 +49,7 @@ public void test_index_task() throws Exception { @Test public void test_extract_nlp_task() throws Exception { - Task task = new Task<>(ExtractNlpTask.class.getName(), User.local(), Map.of("nlpPipeline", "EMAIL")); + Task task = DatashareTask.task(ExtractNlpTask.class.getName(), User.local(), Map.of("nlpPipeline", "EMAIL")); ExtractNlpTask taskRunner = new ExtractNlpTask(mock(Indexer.class), new PipelineRegistry(new PropertiesProvider()), mock(DocumentCollectionFactory.class), task, updateCallback); when(taskFactory.createExtractNlpTask(any(), any())).thenReturn(taskRunner); @@ -61,7 +58,7 @@ public void test_extract_nlp_task() throws Exception { @Test public void test_scan_index_task() throws Exception { - Task task = new Task<>(ScanIndexTask.class.getName(), User.local(), new HashMap<>()); + Task task = DatashareTask.task(ScanIndexTask.class.getName(), User.local(), new HashMap<>()); ScanIndexTask taskRunner = new ScanIndexTask(mock(DocumentCollectionFactory.class), mock(Indexer.class), task, updateCallback); when(taskFactory.createScanIndexTask(any(), any())).thenReturn(taskRunner); @@ -70,7 +67,7 @@ public void test_scan_index_task() throws Exception { @Test public void test_enqueue_from_index_task() throws Exception { - Task task = new Task<>(EnqueueFromIndexTask.class.getName(), User.local(), Map.of("nlpPipeline", "EMAIL")); + Task task = DatashareTask.task(EnqueueFromIndexTask.class.getName(), User.local(), Map.of("nlpPipeline", "EMAIL")); EnqueueFromIndexTask taskRunner = new EnqueueFromIndexTask(mock(DocumentCollectionFactory.class), mock(Indexer.class), task, updateCallback); when(taskFactory.createEnqueueFromIndexTask(any(), any())).thenReturn(taskRunner); @@ -79,19 +76,19 @@ public void test_enqueue_from_index_task() throws Exception { @Test public void test_deduplicate_task() throws Exception { - Task task = new Task<>(DeduplicateTask.class.getName(), User.local(), new HashMap<>()); + Task task = DatashareTask.task(DeduplicateTask.class.getName(), User.local(), new HashMap<>()); DeduplicateTask taskRunner = new DeduplicateTask(mock(DocumentCollectionFactory.class), task, updateCallback); when(taskFactory.createDeduplicateTask(any(), any())).thenReturn(taskRunner); testTaskWithTaskRunner(task); } - private void testTaskWithTaskRunner(Task task) throws Exception { - taskSupplier.startTask(task.name, User.local(), task.args); - taskSupplier.awaitTermination(1, TimeUnit.SECONDS); + private void testTaskWithTaskRunner( Task task) throws Exception { + taskManager.startTask(task.name, task.args); + taskManager.awaitTermination(1, TimeUnit.SECONDS); - assertThat(taskSupplier.getTasks()).hasSize(1); - assertThat(taskSupplier.getTasks().get(0).name).isEqualTo(task.name); + assertThat(taskManager.getTasks()).hasSize(1); + assertThat(taskManager.getTasks().get(0).name).isEqualTo(task.name); } @Before diff --git a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopIntTest.java b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopIntTest.java index 7ee37c21d..6bfa4b55d 100644 --- a/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopIntTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/tasks/TaskWorkerLoopIntTest.java @@ -32,7 +32,7 @@ public void test_batch_download_task_view_properties() throws Exception { DatashareTaskFactory factory = mock(DatashareTaskFactory.class); BatchDownload batchDownload = new BatchDownload(singletonList(project("prj")), User.local(), "foo"); Map properties = Map.of("batchDownload", batchDownload); - Task taskView = new Task<>(BatchDownloadRunner.class.getName(), batchDownload.user, properties); + Task taskView = DatashareTask.task(BatchDownloadRunner.class.getName(), batchDownload.user, properties); BatchDownloadRunner runner = new BatchDownloadRunner(mock(Indexer.class), new PropertiesProvider(), taskView, taskView.progress(taskSupplier::progress)); when(factory.createBatchDownloadRunner(any(), any())).thenReturn(runner); @@ -41,7 +41,7 @@ public void test_batch_download_task_view_properties() throws Exception { Thread worker = new Thread(taskWorkerLoop::call); worker.start(); workerStarted.await(); - taskManager.startTask(BatchDownloadRunner.class.getName(), User.local(), properties); + taskManager.startTask(BatchDownloadRunner.class, User.local(), properties); Thread.sleep(100); // this is a symptom of a possible flaky test but for now I can't figure out how to be event driven taskManager.awaitTermination(1, TimeUnit.SECONDS); @@ -51,7 +51,7 @@ public void test_batch_download_task_view_properties() throws Exception { assertThat(taskManager.getTasks().get(0).getError()).isNotNull(); assertThat(taskManager.getTasks().get(0).getProgress()).isEqualTo(1); assertThat(taskManager.getTasks().get(0).args).hasSize(2); - assertThat(taskManager.getTasks().get(0).getUser()).isEqualTo(User.local()); + assertThat(DatashareTask.getUser(taskManager.getTasks().get(0))).isEqualTo(User.local()); } @Before diff --git a/datashare-app/src/test/java/org/icij/datashare/web/ApiKeyResourceTest.java b/datashare-app/src/test/java/org/icij/datashare/web/ApiKeyResourceTest.java index 15f317f77..ef581f802 100644 --- a/datashare-app/src/test/java/org/icij/datashare/web/ApiKeyResourceTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/web/ApiKeyResourceTest.java @@ -4,17 +4,16 @@ import org.icij.datashare.PropertiesProvider; import org.icij.datashare.db.JooqRepository; import org.icij.datashare.session.LocalUserFilter; +import org.icij.datashare.tasks.DatashareTaskFactory; import org.icij.datashare.tasks.DelApiKeyTask; import org.icij.datashare.tasks.GenApiKeyTask; import org.icij.datashare.tasks.GetApiKeyTask; -import org.icij.datashare.tasks.DatashareTaskFactory; import org.icij.datashare.user.User; import org.icij.datashare.web.testhelpers.AbstractProdWebServerTest; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; diff --git a/datashare-app/src/test/java/org/icij/datashare/web/TaskResourceTest.java b/datashare-app/src/test/java/org/icij/datashare/web/TaskResourceTest.java index 1d307bd38..2f16c04c2 100644 --- a/datashare-app/src/test/java/org/icij/datashare/web/TaskResourceTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/web/TaskResourceTest.java @@ -214,7 +214,6 @@ public void test_index_and_scan_directory_with_options() { defaultProperties.put("foo", "baz"); defaultProperties.put("key", "val"); defaultProperties.put("user", User.local()); - defaultProperties.put("group", new Group("Java")); defaultProperties.remove(REPORT_NAME_OPT); assertThat(taskManager.getTasks()).hasSize(2); @@ -235,7 +234,6 @@ public void test_index_queue_with_options() { defaultProperties.put("key1", "val1"); defaultProperties.put("key2", "val2"); defaultProperties.put("user", User.local()); - defaultProperties.put("group", new Group("Java")); assertThat(taskManager.getTasks()).hasSize(1); assertThat(taskManager.getTasks().get(0).name).isEqualTo("org.icij.datashare.tasks.IndexTask"); @@ -496,6 +494,8 @@ public void test_create_new_task() { "arguments": {"user":{"@type":"org.icij.datashare.user.User", "id":"local","name":null,"email":null,"provider":"local","details":{"uid":"local","groups_by_applications":{"datashare":["local-datashare"]}} }}}""", TaskCreation.class.getName())) .should().respond(201); + // Cancel the all tasks to avoid side-effects with other tests + put("/api/task/stopAll").should().respond(200); } @NotNull diff --git a/datashare-app/src/test/java/org/icij/datashare/web/UserTaskResourceTest.java b/datashare-app/src/test/java/org/icij/datashare/web/UserTaskResourceTest.java index 9ec788eca..84c34e7e3 100644 --- a/datashare-app/src/test/java/org/icij/datashare/web/UserTaskResourceTest.java +++ b/datashare-app/src/test/java/org/icij/datashare/web/UserTaskResourceTest.java @@ -9,7 +9,6 @@ import org.icij.datashare.PropertiesProvider; import org.icij.datashare.asynctasks.Task; import org.icij.datashare.asynctasks.TaskGroup; -import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.asynctasks.TaskModifier; import org.icij.datashare.asynctasks.bus.amqp.UriResult; import org.icij.datashare.tasks.DatashareTaskFactory; diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java index 1f0d2ec5a..d3185dd44 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java @@ -11,34 +11,28 @@ import org.icij.datashare.asynctasks.bus.amqp.Event; import org.icij.datashare.asynctasks.bus.amqp.TaskError; import org.icij.datashare.asynctasks.bus.amqp.UriResult; -import org.icij.datashare.batch.WebQueryPagination; -import org.icij.datashare.user.User; -import java.io.Serial; import java.io.Serializable; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; +import org.icij.datashare.batch.WebQueryPagination; import static java.util.Optional.ofNullable; import static java.util.UUID.randomUUID; import static org.icij.datashare.batch.WebQueryPagination.OrderDirection.ASC; -import static org.icij.datashare.batch.WebQueryPagination.OrderDirection.DESC; @JsonInclude(JsonInclude.Include.NON_NULL) -public class Task extends Event implements Entity, Comparable> { - public static final String USER_KEY = "user"; - public static final String GROUP_KEY = "group"; +public class Task extends Event implements Entity, Comparable>{ @JsonIgnore private StateLatch stateLatch; @JsonIgnore private final Object lock = new Object(); - public enum State {CREATED, QUEUED, RUNNING, CANCELLED, ERROR, DONE;} + public enum State {CREATED, QUEUED, RUNNING, CANCELLED, ERROR, DONE} @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "@type") public final Map args; public final String id; @@ -54,24 +48,16 @@ public enum State {CREATED, QUEUED, RUNNING, CANCELLED, ERROR, DONE;} @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type") private volatile V result; - public Task(String name, User user, Map args) { - this(randomUUID().toString(), name, user, args); + public Task(String name, Map args) { + this(randomUUID().toString(), name, State.CREATED, 0, null, args); } - public Task(String name, User user, Group group, Map args) { - this(randomUUID().toString(), name, State.CREATED, 0, null, addTo(args, user, group)); + public Task(String id, String name) { + this(id, name, new HashMap<>()); } - public Task(String id, String name, User user, Group group) { - this(id, name, user,addTo(new HashMap<>(), user, group)); - } - - public Task(String id, String name, User user, Group group, Map args) { - this(id, name, State.CREATED, 0, null, addTo(args, user, group)); - } - - public Task(String id, String name, User user, Map args) { - this(id, name, State.CREATED, 0, null, addTo(args, user)); + public Task(String id, String name, Map args) { + this(id, name, State.CREATED, 0, null, args); } @JsonCreator @@ -94,16 +80,6 @@ public V getResult() { return result; } - /** - * Beware that the lock is working only on a "local" use. - * If the task is serialized/deserialized then the lock won't do anything - * because the lock instance will change. - * - * @param timeout - * @param unit - * @return - * @throws InterruptedException - */ public V getResult(int timeout, TimeUnit unit) throws InterruptedException { synchronized (lock) { if (!isFinished()) { @@ -197,16 +173,6 @@ public boolean isNull() { return id == null; } - @JsonIgnore - public User getUser() { - return (User) args.get(USER_KEY); - } - - @JsonIgnore - public Group getGroup() { - return (Group) args.get(GROUP_KEY); - } - public static String getId(Callable task) { return task.toString(); } @@ -228,11 +194,9 @@ public int compareTo(Task task) { public record Comparator(String field, WebQueryPagination.OrderDirection order) implements java.util.Comparator> { public static Map, ?>> SORT_FIELDS = Map.of( "id", Task::getId, - "user", Task::getUser, "createdAt", t -> t.createdAt, "name", t -> t.name, "state", Task::getState, - "group", Task::getGroup, "finished", Task::isFinished ); @@ -263,17 +227,4 @@ private void setState(State state) { this.state = state; ofNullable(stateLatch).ifPresent(sl -> sl.setTaskState(state)); } - - private static Map addTo(Map properties, User user) { - LinkedHashMap result = new LinkedHashMap<>(properties); - result.put(USER_KEY, user); - return result; - } - - private static Map addTo(Map properties, User user, Group group) { - LinkedHashMap result = new LinkedHashMap<>(properties); - result.put(USER_KEY, user); - result.put(GROUP_KEY, group); - return result; - } } \ No newline at end of file diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskAlreadyExists.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskAlreadyExists.java new file mode 100644 index 000000000..b7aa671c7 --- /dev/null +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskAlreadyExists.java @@ -0,0 +1,9 @@ +package org.icij.datashare.asynctasks; + +public class TaskAlreadyExists extends Exception { + final String taskId; + + public TaskAlreadyExists(String taskId) { + this.taskId = taskId; + } +} diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java index b418bedc2..5d7d9fa16 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java @@ -1,5 +1,7 @@ package org.icij.datashare.asynctasks; +import static java.util.stream.Collectors.toMap; + import org.icij.datashare.asynctasks.bus.amqp.CancelledEvent; import org.icij.datashare.asynctasks.bus.amqp.ErrorEvent; import org.icij.datashare.asynctasks.bus.amqp.ProgressEvent; @@ -7,7 +9,6 @@ import org.icij.datashare.asynctasks.bus.amqp.TaskEvent; import org.icij.datashare.batch.WebQueryPagination; import org.icij.datashare.json.JsonObjectMapper; -import org.icij.datashare.user.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +23,6 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import static org.icij.datashare.text.StringUtils.getValue; /** @@ -36,20 +36,22 @@ public interface TaskManager extends Closeable { Task clearTask(String taskId) throws IOException; boolean stopTask(String taskId) throws IOException; - List> getTasks() throws IOException; List> clearDoneTasks() throws IOException; + Group getTaskGroup(String taskId); boolean shutdown() throws IOException; void clear() throws IOException; - default List> getTasks(User user) throws IOException { - return getTasks(user, new HashMap<>(), new WebQueryPagination()); + default List> getTasks() throws IOException { + return getTasks(new HashMap<>(), new WebQueryPagination()); } - default List> getTasks(User user, Map filters) throws IOException { - return getTasks(user, filters, new WebQueryPagination()); + default List> getTasks(Map filters) throws IOException { + return getTasks(filters, new WebQueryPagination()); } + void saveMetadata(TaskMetadata taskMetadata) throws IOException, TaskAlreadyExists; + void persistUpdate(Task task) throws IOException, UnknownTask; - default List> getTasks(User user, Map filters, WebQueryPagination pagination) throws IOException { + default List> getTasks(Map filters, WebQueryPagination pagination) throws IOException { Stream> taskStream = getTasks().stream().sorted(new Task.Comparator(pagination.sort, pagination.order)); for (Map.Entry filter : filters.entrySet()) { taskStream = taskStream.filter(task -> { @@ -57,7 +59,7 @@ default List> getTasks(User user, Map filters, WebQuery return filter.getValue().matcher(String.valueOf(getValue(objectMap, filter.getKey()))).matches(); }); } - return taskStream.filter(t -> user.equals(t.getUser())).skip(pagination.from).limit(pagination.size).collect(toList()); + return taskStream.skip(pagination.from).limit(pagination.size).collect(toList()); } default int getTerminationPollingInterval() {return POLLING_INTERVAL;} @@ -65,10 +67,9 @@ default boolean awaitTermination(int timeout, TimeUnit timeUnit) throws Interrup return !waitTasksToBeDone(timeout, timeUnit).isEmpty(); } - default Map stopAllTasks(User user) throws IOException { - return getTasks().stream(). - filter(t -> user.equals(t.getUser())). - filter(t -> t.getState() == Task.State.RUNNING || t.getState() == Task.State.QUEUED).collect( + default Map stopAllTasks() throws IOException { + return getTasks().stream() + .filter(t -> t.getState() == Task.State.RUNNING || t.getState() == Task.State.QUEUED).collect( toMap(t -> t.id, t -> { try { return stopTask(t.id); @@ -79,16 +80,6 @@ default Map stopAllTasks(User user) throws IOException { })); } - /** - * This is a "inner method" that is used in the template method for start(task). - * It saves the method in the inner persistent state of TaskManagers implementations. - * - * @param task to be saved in persistent state - * @return true if task has been saved - * @throws IOException if a network error occurs - */ - boolean save(Task task) throws IOException; - /** * This is a "inner method" that is used in the template method for start(task). * It put the task in the task queue for workers. @@ -98,22 +89,28 @@ default Map stopAllTasks(User user) throws IOException { void enqueue(Task task) throws IOException; // TaskResource and pipeline tasks - default String startTask(Class taskClass, User user, Map properties) throws IOException { - return startTask(new Task<>(taskClass.getName(), user, new Group(taskClass.getAnnotation(TaskGroup.class).value()), properties)); + default String startTask(Class taskClass, Map properties) throws IOException { + return startTask(new Task<>(taskClass.getName(), properties), new Group(taskClass.getAnnotation(TaskGroup.class).value())); } // BatchSearchResource and WebApp for batch searches - default String startTask(String uuid, Class taskClass, User user, Map properties) throws IOException { - return startTask(new Task<>(uuid, taskClass.getName(), user, new Group(taskClass.getAnnotation(TaskGroup.class).value()), properties)); + default String startTask(String uuid, Class taskClass, Map properties) throws IOException { + return startTask(new Task<>(uuid, taskClass.getName(), properties), new Group(taskClass.getAnnotation(TaskGroup.class).value())); } // for tests - default String startTask(String taskName, User user, Map properties) throws IOException { - return startTask(new Task<>(taskName, user, properties)); + default String startTask(String taskName, Map properties) throws IOException { + return startTask(new Task<>(taskName, properties), null); } + // for tests - default String startTask(String taskName, User user, Group group, Map properties) throws IOException { - return startTask(new Task<>(taskName, user, group, properties)); + default String startTask(String taskName, Group group, Map properties) throws IOException { + return startTask(new Task<>(taskName, properties), group); + } + + // BatchSearchResource and WebApp for batch searches + default String startTask(String id, Class taskClass) throws IOException { + return startTask(new Task<>(id, taskClass.getName()), new Group(taskClass.getAnnotation(TaskGroup.class).value())); } /** @@ -121,17 +118,35 @@ default String startTask(String taskName, User user, Group group, Map String startTask(Task taskView, Group group) throws IOException { + try { + save(taskView, group); + } catch (TaskAlreadyExists ignored) { + return null; + } + taskView.queue(); + enqueue(taskView); + return taskView.id; + } + default String startTask(Task taskView) throws IOException { - boolean saved = save(taskView); - if (saved) { - taskView.queue(); - enqueue(taskView); - return taskView.id; + return startTask(taskView, null); + } + + default void save(Task task, Group group) throws IOException, TaskAlreadyExists { + saveMetadata(new TaskMetadata<>(task, group)); + } + + default void update(Task task) throws IOException { + try { + persistUpdate(task); + } catch (UnknownTask e) { + throw new RuntimeException("task " + task.id + " is unknown, save it first !"); } - return null; } default Task setResult(ResultEvent e) throws IOException { @@ -139,7 +154,7 @@ default Task setResult(ResultEvent e) throws IOEx if (taskView != null) { logger.info("result event for {}", e.taskId); taskView.setResult(e.result); - save(taskView); + update(taskView); } else { logger.warn("no task found for result event {}", e.taskId); } @@ -151,7 +166,7 @@ default Task setError(ErrorEvent e) throws IOExcepti if (taskView != null) { logger.info("error event for {}", e.taskId); taskView.setError(e.error); - save(taskView); + update(taskView); } else { logger.warn("no task found for error event {}", e.taskId); } @@ -163,7 +178,7 @@ default Task setCanceled(CancelledEvent e) throws IOException { if (taskView != null) { logger.info("canceled event for {}", e.taskId); taskView.cancel(); - save(taskView); + update(taskView); if (e.requeue) { try { enqueue(taskView); @@ -182,7 +197,7 @@ default Task setProgress(ProgressEvent e) throws IOException { Task taskView = getTask(e.taskId); if (taskView != null) { taskView.setProgress(e.progress); - save(taskView); + update(taskView); } return taskView; } diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java index c74dae1be..2643bc7fe 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java @@ -1,5 +1,6 @@ package org.icij.datashare.asynctasks; +import java.util.Optional; import org.icij.datashare.asynctasks.bus.amqp.AmqpConsumer; import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor; import org.icij.datashare.asynctasks.bus.amqp.AmqpQueue; @@ -8,35 +9,32 @@ import org.icij.datashare.asynctasks.bus.amqp.TaskEvent; import org.icij.datashare.tasks.RoutingStrategy; -import org.icij.datashare.user.User; import java.io.IOException; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.Consumer; -import java.util.regex.Pattern; import static java.util.Optional.ofNullable; import static java.util.stream.Collectors.toList; public class TaskManagerAmqp implements TaskManager { - private final Map> tasks; + private final Map> taskMetas; private final RoutingStrategy routingStrategy; private final AmqpInterlocutor amqp; private final AmqpConsumer> eventConsumer; - public TaskManagerAmqp(AmqpInterlocutor amqp, Map> tasks) throws IOException { - this(amqp, tasks, RoutingStrategy.UNIQUE); + public TaskManagerAmqp(AmqpInterlocutor amqp, Map> taskMetas) throws IOException { + this(amqp, taskMetas, RoutingStrategy.UNIQUE); } - public TaskManagerAmqp(AmqpInterlocutor amqp, Map> tasks, RoutingStrategy routingStrategy) throws IOException { - this(amqp, tasks, routingStrategy, null); + public TaskManagerAmqp(AmqpInterlocutor amqp, Map> taskMetas, RoutingStrategy routingStrategy) throws IOException { + this(amqp, taskMetas, routingStrategy, null); } - public TaskManagerAmqp(AmqpInterlocutor amqp, Map> tasks, RoutingStrategy routingStrategy, Runnable eventCallback) throws IOException { + public TaskManagerAmqp(AmqpInterlocutor amqp, Map> taskMetas, RoutingStrategy routingStrategy, Runnable eventCallback) throws IOException { this.amqp = amqp; - this.tasks = tasks; + this.taskMetas = taskMetas; this.routingStrategy = routingStrategy; eventConsumer = new AmqpConsumer<>(amqp, event -> ofNullable(TaskManager.super.handleAck(event)).flatMap(t -> @@ -45,7 +43,7 @@ public TaskManagerAmqp(AmqpInterlocutor amqp, Map> tasks, Routin @Override public boolean stopTask(String taskId) { - Task taskView = tasks.get(taskId); + Task taskView = this.getTask(taskId); if (taskView != null) { try { logger.info("sending cancel event for {}", taskId); @@ -62,11 +60,11 @@ public boolean stopTask(String taskId) { @Override public Task clearTask(String taskId) { - if (tasks.get(taskId).getState() == Task.State.RUNNING) { + if (this.getTask(taskId).getState() == Task.State.RUNNING) { throw new IllegalStateException(String.format("task id <%s> is already in RUNNING state", taskId)); } logger.info("deleting task id <{}>", taskId); - return (Task) tasks.remove(taskId); + return (Task) taskMetas.remove(taskId).task(); } @Override @@ -75,15 +73,29 @@ public boolean shutdown() throws IOException { return true; } - public boolean save(Task task) { - Task oldVal = tasks.put(task.id, task); - return oldVal == null; + @Override + public void saveMetadata(TaskMetadata taskMetadata) throws TaskAlreadyExists { + String taskId = taskMetadata.taskId(); + if (taskMetas.containsKey(taskId)) { + throw new TaskAlreadyExists(taskId); + } + this.taskMetas.put(taskId, taskMetadata); + } + + @Override + public void persistUpdate(Task task) throws UnknownTask { + TaskMetadata updated = (TaskMetadata) taskMetas.get(task.id); + if (updated == null) { + throw new UnknownTask(task.id); + } + updated = updated.withTask(task); + this.taskMetas.put(task.id, updated); } @Override public void enqueue(Task task) throws IOException { switch (routingStrategy) { - case GROUP -> amqp.publish(AmqpQueue.TASK, task.getGroup().id(), task); + case GROUP -> amqp.publish(AmqpQueue.TASK, this.taskMetas.get(task.id).group().id(), task); case NAME -> amqp.publish(AmqpQueue.TASK, task.name, task); default -> amqp.publish(AmqpQueue.TASK, task); } @@ -91,17 +103,23 @@ public void enqueue(Task task) throws IOException { @Override public Task getTask(String taskId) { - return (Task) tasks.get(taskId); + return (Task) Optional.ofNullable(taskMetas.get(taskId)).map(TaskMetadata::task).orElse(null); } @Override public List> getTasks() { - return new LinkedList<>(tasks.values()); + return taskMetas.values().stream().map(TaskMetadata::task).collect(toList()); + } + + @Override + public Group getTaskGroup(String taskId) { + return taskMetas.get(taskId).group(); } @Override public List> clearDoneTasks() { - return tasks.values().stream().filter(f -> f.getState() != Task.State.RUNNING).map(t -> tasks.remove(t.id)).collect(toList()); + return taskMetas.values().stream().map(TaskMetadata::task).filter(Task::isFinished) + .map(t -> taskMetas.remove(t.id).task()).collect(toList()); } public void close() throws IOException { @@ -111,6 +129,6 @@ public void close() throws IOException { @Override public void clear() { - tasks.clear(); + taskMetas.clear(); } } diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerMemory.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerMemory.java index d806238c3..02e8f61c5 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerMemory.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerMemory.java @@ -1,34 +1,30 @@ package org.icij.datashare.asynctasks; +import java.util.Optional; import org.apache.commons.lang3.NotImplementedException; import org.icij.datashare.PropertiesProvider; import org.icij.datashare.asynctasks.bus.amqp.Event; import org.icij.datashare.asynctasks.bus.amqp.TaskError; -import org.icij.datashare.user.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.lang.Integer.parseInt; import static java.util.stream.Collectors.toList; -import static org.icij.datashare.asynctasks.Task.State.RUNNING; public class TaskManagerMemory implements TaskManager, TaskSupplier { private final Logger logger = LoggerFactory.getLogger(getClass()); private final ExecutorService executor; - private final ConcurrentMap> tasks = new ConcurrentHashMap<>(); + private final ConcurrentMap> taskMetas = new ConcurrentHashMap<>(); private final BlockingQueue> taskQueue; private final List loops; private final AtomicInteger executedTasks = new AtomicInteger(0); @@ -49,17 +45,17 @@ public TaskManagerMemory(TaskFactory taskFactory, PropertiesProvider propertiesP } public Task getTask(final String taskId) { - return (Task) tasks.get(taskId); + return (Task) Optional.ofNullable(taskMetas.get(taskId)).map(TaskMetadata::task).orElse(null); } @Override public List> getTasks() { - return new LinkedList<>(tasks.values()); + return taskMetas.values().stream().map(TaskMetadata::task).collect(toList()); } @Override public Void progress(String taskId, double rate) { - Task taskView = tasks.get(taskId); + Task taskView = getTask(taskId); if (taskView != null) { taskView.setProgress(rate); } else { @@ -70,7 +66,7 @@ public Void progress(String taskId, double rate) { @Override public void result(String taskId, V result) { - Task taskView = (Task) tasks.get(taskId); + Task taskView = getTask(taskId); if (taskView != null) { taskView.setResult(result); executedTasks.incrementAndGet(); @@ -81,7 +77,7 @@ public void result(String taskId, V result) { @Override public void canceled(Task task, boolean requeue) { - Task taskView = tasks.get(task.id); + Task taskView = taskMetas.get(task.id).task(); if (taskView != null) { taskView.cancel(); if (requeue) { @@ -92,7 +88,7 @@ public void canceled(Task task, boolean requeue) { @Override public void error(String taskId, TaskError reason) { - Task taskView = tasks.get(taskId); + Task taskView = taskMetas.get(taskId).task(); if (taskView != null) { taskView.setError(reason); executedTasks.incrementAndGet(); @@ -101,9 +97,28 @@ public void error(String taskId, TaskError reason) { } } - public boolean save(Task taskView) { - Task oldTask = tasks.put(taskView.id, taskView); - return oldTask == null; + @Override + public void saveMetadata(TaskMetadata taskMetadata) throws TaskAlreadyExists { + String taskId = taskMetadata.taskId(); + if (taskMetas.containsKey(taskId)) { + throw new TaskAlreadyExists(taskId); + } + this.taskMetas.put(taskId, taskMetadata); + } + + @Override + public void persistUpdate(Task task) throws UnknownTask { + TaskMetadata updated = (TaskMetadata) taskMetas.get(task.id); + if (updated == null) { + throw new UnknownTask(task.id); + } + updated = updated.withTask(task); + this.taskMetas.put(task.id, updated); + } + + @Override + public Group getTaskGroup(String taskId) { + return taskMetas.get(taskId).group(); } @Override @@ -123,20 +138,21 @@ public boolean shutdown() throws IOException { } public List> clearDoneTasks() { - return tasks.values().stream().filter(taskView -> taskView.getState() != RUNNING).map(t -> tasks.remove(t.id)).collect(toList()); + return taskMetas.values().stream().map(TaskMetadata::task).filter(Task::isFinished) + .map(t -> taskMetas.remove(t.id).task()).collect(toList()); } @Override - public Task clearTask(String taskName) { - if (tasks.get(taskName).getState() == Task.State.RUNNING) { - throw new IllegalStateException(String.format("task id <%s> is already in RUNNING state", taskName)); + public Task clearTask(String taskId) { + if (getTask(taskId).getState() == Task.State.RUNNING) { + throw new IllegalStateException(String.format("task id <%s> is already in RUNNING state", taskId)); } - logger.info("deleting task id <{}>", taskName); - return (Task) tasks.remove(taskName); + logger.info("deleting task id <{}>", taskId); + return (Task) taskMetas.remove(taskId).task(); } public boolean stopTask(String taskId) { - Task taskView = tasks.get(taskId); + Task taskView = getTask(taskId); if (taskView != null) { switch (taskView.getState()) { case QUEUED: @@ -188,7 +204,7 @@ int numberOfExecutedTasks() { public void clear() { executedTasks.set(0); taskQueue.clear(); - tasks.clear(); + taskMetas.clear(); } @Override diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerRedis.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerRedis.java index 868338776..f1b4e105b 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerRedis.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerRedis.java @@ -5,13 +5,14 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; +import java.util.Optional; +import java.util.stream.Collectors; import org.icij.datashare.asynctasks.bus.amqp.AmqpQueue; import org.icij.datashare.asynctasks.bus.amqp.CancelEvent; import org.icij.datashare.asynctasks.bus.amqp.ShutdownEvent; import org.icij.datashare.asynctasks.bus.amqp.TaskEvent; import org.icij.datashare.json.JsonObjectMapper; import org.icij.datashare.tasks.RoutingStrategy; -import org.icij.datashare.user.User; import org.redisson.Redisson; import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonMap; @@ -30,9 +31,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.regex.Pattern; import java.util.stream.StreamSupport; @@ -43,7 +42,7 @@ public class TaskManagerRedis implements TaskManager { private final Runnable eventCallback; // for test public static final String EVENT_CHANNEL_NAME = "EVENT"; - private final RedissonMap> tasks; + private final RedissonMap> taskMetas; private final RTopic eventTopic; private final RedissonClient redissonClient; private final RoutingStrategy routingStrategy; @@ -56,34 +55,39 @@ public TaskManagerRedis(RedissonClient redissonClient, String taskMapName, Routi this.redissonClient = redissonClient; this.routingStrategy = routingStrategy; CommandSyncService commandSyncService = getCommandSyncService(); - this.tasks = new RedissonMap<>(new TaskViewCodec(), commandSyncService, taskMapName, redissonClient, null, null); + this.taskMetas = new RedissonMap<>(new RedisCodec<>(TaskMetadata.class), commandSyncService, taskMapName, redissonClient, null, null); this.eventTopic = redissonClient.getTopic(EVENT_CHANNEL_NAME); this.eventCallback = eventCallback; eventTopic.addListener(TaskEvent.class, (channelString, message) -> handleEvent(message)); } - @Override - public Task getTask(String id) { - return (Task) tasks.get(id); + public Task getTask(final String taskId) { + return (Task) Optional.ofNullable(taskMetas.get(taskId)).map(TaskMetadata::task).orElse(null); } @Override public List> getTasks() { - return new LinkedList<>(tasks.values()); + return taskMetas.values().stream().map(TaskMetadata::task).collect(Collectors.toList()); + } + + @Override + public Group getTaskGroup(String taskId) { + return taskMetas.get(taskId).group(); } @Override public List> clearDoneTasks() { - return tasks.values().stream().filter(Task::isFinished).map(t -> tasks.remove(t.id)).collect(toList()); + return taskMetas.values().stream().map(TaskMetadata::task).filter(Task::isFinished) + .map(t -> taskMetas.remove(t.id).task()).collect(toList()); } @Override public Task clearTask(String taskId) { - if (tasks.get(taskId).getState() == Task.State.RUNNING) { + if (getTask(taskId).getState() == Task.State.RUNNING) { throw new IllegalStateException(String.format("task id <%s> is already in RUNNING state", taskId)); } logger.info("deleting task id <{}>", taskId); - return (Task) tasks.remove(taskId); + return (Task) taskMetas.remove(taskId).task(); } @Override @@ -110,13 +114,13 @@ public boolean shutdown() throws IOException { BlockingQueue> taskQueue(Task task) { switch (routingStrategy) { case GROUP -> { - return new RedissonBlockingQueue<>(new TaskViewCodec(), getCommandSyncService(), String.format("%s.%s", AmqpQueue.TASK.name(), task.getGroup().id()), redissonClient); + return new RedissonBlockingQueue<>(new RedisCodec<>(Task.class), getCommandSyncService(), String.format("%s.%s", AmqpQueue.TASK.name(), this.taskMetas.get(task.id).group().id()), redissonClient); } case NAME -> { - return new RedissonBlockingQueue<>(new TaskViewCodec(), getCommandSyncService(), String.format("%s.%s", AmqpQueue.TASK.name(), task.name), redissonClient); + return new RedissonBlockingQueue<>(new RedisCodec<>(Task.class), getCommandSyncService(), String.format("%s.%s", AmqpQueue.TASK.name(), task.name), redissonClient); } default -> { - return new RedissonBlockingQueue<>(new TaskViewCodec(), getCommandSyncService(), AmqpQueue.TASK.name(), redissonClient); + return new RedissonBlockingQueue<>(new RedisCodec<>(Task.class), getCommandSyncService(), AmqpQueue.TASK.name(), redissonClient); } } } @@ -134,7 +138,7 @@ public void close() throws IOException { @Override public void clear() { - tasks.clear(); + taskMetas.clear(); clearTaskQueues(); } @@ -147,9 +151,23 @@ private void clearTaskQueues() { .forEach(k -> redissonClient.getQueue(k).delete()); } - public boolean save(Task task) { - Task oldVal = tasks.put(task.id, task); - return oldVal == null; + @Override + public void saveMetadata(TaskMetadata taskMetadata) throws TaskAlreadyExists { + String taskId = taskMetadata.taskId(); + if (taskMetas.containsKey(taskId)) { + throw new TaskAlreadyExists(taskId); + } + this.taskMetas.put(taskId, taskMetadata); + } + + @Override + public void persistUpdate(Task task) throws UnknownTask { + TaskMetadata updated = (TaskMetadata) taskMetas.get(task.id); + if (updated == null) { + throw new UnknownTask(task.id); + } + updated = updated.withTask(task); + this.taskMetas.put(task.id, updated); } @Override @@ -157,14 +175,16 @@ public void enqueue(Task task) { taskQueue(task).add(task); } - public static class TaskViewCodec extends BaseCodec { + public static class RedisCodec extends BaseCodec { + private final Class clazz; private final Encoder keyEncoder; private final Decoder keyDecoder; protected final ObjectMapper mapObjectMapper; - public TaskViewCodec() { + public RedisCodec(Class clazz) { + // Ugly but this doesn't work with type ref directly + this.clazz = clazz; this.mapObjectMapper = JsonObjectMapper.MAPPER; - this.keyEncoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); out.writeCharSequence(in.toString(), Charset.defaultCharset()); @@ -194,8 +214,8 @@ public ByteBuf encode(Object in) throws IOException { private final Decoder decoder = new Decoder<>() { @Override - public Object decode(ByteBuf buf, State state) throws IOException { - return mapObjectMapper.readValue((InputStream) new ByteBufInputStream(buf), Task.class); + public T decode(ByteBuf buf, State state) throws IOException { + return mapObjectMapper.readValue((InputStream) new ByteBufInputStream(buf), clazz); } }; diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskMetadata.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskMetadata.java new file mode 100644 index 000000000..29b804152 --- /dev/null +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskMetadata.java @@ -0,0 +1,11 @@ +package org.icij.datashare.asynctasks; + +public record TaskMetadata(Task task, Group group) { + String taskId() { + return task.id; + } + + TaskMetadata withTask(Task task) { + return new TaskMetadata<>(task, this.group); + } +} diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplier.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplier.java index 504523d4f..27d315f84 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplier.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplier.java @@ -2,7 +2,6 @@ import org.icij.datashare.asynctasks.bus.amqp.Event; import org.icij.datashare.asynctasks.bus.amqp.TaskError; -import org.icij.datashare.asynctasks.bus.amqp.TaskEvent; import java.io.Closeable; import java.io.Serializable; diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplierRedis.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplierRedis.java index a5640588a..9fc6c8f3d 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplierRedis.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskSupplierRedis.java @@ -1,5 +1,6 @@ package org.icij.datashare.asynctasks; +import java.util.Optional; import org.apache.commons.lang3.NotImplementedException; import org.icij.datashare.asynctasks.bus.amqp.AmqpQueue; import org.icij.datashare.asynctasks.bus.amqp.CancelledEvent; @@ -81,8 +82,8 @@ public void waitForConsumer() {} private BlockingQueue> taskQueue() { return this.taskQueueKey == null ? - new RedissonBlockingQueue<>(new TaskManagerRedis.TaskViewCodec(), getCommandSyncService(), AmqpQueue.TASK.name(), redissonClient): - new RedissonBlockingQueue<>(new TaskManagerRedis.TaskViewCodec(), getCommandSyncService(), String.format("%s.%s", AmqpQueue.TASK.name(), taskQueueKey), redissonClient); + new RedissonBlockingQueue<>(new TaskManagerRedis.RedisCodec<>(Task.class), getCommandSyncService(), AmqpQueue.TASK.name(), redissonClient): + new RedissonBlockingQueue<>(new TaskManagerRedis.RedisCodec<>(Task.class), getCommandSyncService(), String.format("%s.%s", AmqpQueue.TASK.name(), taskQueueKey), redissonClient); } private CommandSyncService getCommandSyncService() { diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/UnknownTask.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/UnknownTask.java new file mode 100644 index 000000000..6a75fad0f --- /dev/null +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/UnknownTask.java @@ -0,0 +1,9 @@ +package org.icij.datashare.asynctasks; + +public class UnknownTask extends Exception { + final String taskId; + + public UnknownTask(String taskId) { + this.taskId = taskId; + } +} diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskComparatorTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskComparatorTest.java index 16f0a01b6..d5b3d197f 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskComparatorTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskComparatorTest.java @@ -1,6 +1,5 @@ package org.icij.datashare.asynctasks; -import org.icij.datashare.user.User; import org.junit.Test; import java.util.LinkedHashMap; @@ -16,8 +15,8 @@ public class TaskComparatorTest { @Test public void test_default_comparator() { assertThat(Stream.of( - new Task<>("t2", User.local(), new LinkedHashMap<>()), - new Task<>("t1", User.local(), new LinkedHashMap<>()) + new Task<>("t2", new LinkedHashMap<>()), + new Task<>("t1", new LinkedHashMap<>()) ).sorted().map(t -> t.name).collect(Collectors.toList())).isEqualTo(List.of("t1", "t2")); } @@ -29,16 +28,16 @@ public void test_unknown_field() { @Test public void test_comparator_with_field() { assertThat(Stream.of( - new Task<>("t1", User.localUser("b_user"), new LinkedHashMap<>()), - new Task<>("t2", User.localUser("a_user"), new LinkedHashMap<>()) - ).sorted(new Task.Comparator("user", ASC)).map(t -> t.getUser().id).collect(Collectors.toList())).isEqualTo(List.of("a_user", "b_user")); + new Task<>("t1", new LinkedHashMap<>()), + new Task<>("t2", new LinkedHashMap<>()) + ).sorted(new Task.Comparator("name", ASC)).map(t -> t.name).collect(Collectors.toList())).isEqualTo(List.of("t1", "t2")); } @Test public void test_comparator_with_desc_sort_direction() { assertThat(Stream.of( - new Task<>("t1", User.local(), new LinkedHashMap<>()), - new Task<>("t2", User.local(), new LinkedHashMap<>()) + new Task<>("t1", new LinkedHashMap<>()), + new Task<>("t2", new LinkedHashMap<>()) ).sorted(new Task.Comparator("name", DESC)).map(t -> t.name).collect(Collectors.toList())).isEqualTo(List.of("t2", "t1")); } } diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java index cdf3111e8..8f8da4c7f 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java @@ -6,27 +6,25 @@ import org.icij.datashare.asynctasks.bus.amqp.AmqpServerRule; import org.icij.datashare.asynctasks.bus.amqp.TaskError; import org.icij.datashare.tasks.RoutingStrategy; -import org.icij.datashare.user.User; -import org.icij.extract.redis.RedissonClientFactory; -import org.icij.task.Options; -import org.junit.*; -import org.redisson.Redisson; -import org.redisson.RedissonMap; -import org.redisson.api.RedissonClient; -import org.redisson.command.CommandSyncService; -import org.redisson.liveobject.core.RedissonObjectBuilder; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; import static org.fest.assertions.Assertions.assertThat; public class TaskManagerAmqpTest { private static AmqpInterlocutor AMQP; - @ClassRule static public AmqpServerRule qpid = new AmqpServerRule(5672); + @ClassRule + static public AmqpServerRule qpid = new AmqpServerRule(5672); BlockingQueue> taskQueue = new LinkedBlockingQueue<>(); TaskManagerAmqp taskManager; TaskSupplierAmqp taskSupplier; @@ -34,7 +32,7 @@ public class TaskManagerAmqpTest { @Test(timeout = 2000) public void test_new_task() throws Exception { - String expectedTaskViewId = taskManager.startTask("taskName", User.local(), Map.of("key", "value")); + String expectedTaskViewId = taskManager.startTask("taskName", Map.of("key", "value")); assertThat(taskManager.getTask(expectedTaskViewId)).isNotNull(); Task actualTaskView = taskQueue.poll(1, TimeUnit.SECONDS); @@ -48,12 +46,12 @@ public void test_new_task_with_group_routing() throws Exception { try (TaskManagerAmqp groupTaskManager = new TaskManagerAmqp(AMQP, new ConcurrentHashMap<>(), RoutingStrategy.GROUP, () -> nextMessage.countDown()); TaskSupplierAmqp groupTaskSupplier = new TaskSupplierAmqp(AMQP, key)) { groupTaskSupplier.consumeTasks(t -> taskQueue.add(t)); - String expectedTaskViewId = groupTaskManager.startTask("taskName", User.local(), new Group(key), Map.of()); + String expectedTaskViewId = groupTaskManager.startTask("taskName", new Group(key), Map.of()); assertThat(groupTaskManager.getTask(expectedTaskViewId)).isNotNull(); Task actualTaskView = taskQueue.poll(1, TimeUnit.SECONDS); assertThat(actualTaskView).isNotNull(); - assertThat(actualTaskView.getGroup()).isEqualTo(new Group(key)); + assertThat(groupTaskManager.getTaskGroup(actualTaskView.id)).isEqualTo(new Group(key)); } } @@ -62,7 +60,7 @@ public void test_new_task_with_name_routing() throws Exception { try (TaskManagerAmqp groupTaskManager = new TaskManagerAmqp(AMQP, new ConcurrentHashMap<>(), RoutingStrategy.NAME, () -> nextMessage.countDown()); TaskSupplierAmqp groupTaskSupplier = new TaskSupplierAmqp(AMQP, "TaskName")) { groupTaskSupplier.consumeTasks(t -> taskQueue.add(t)); - String expectedTaskViewId = groupTaskManager.startTask("TaskName", User.local(), Map.of()); + String expectedTaskViewId = groupTaskManager.startTask("TaskName", Map.of()); assertThat(groupTaskManager.getTask(expectedTaskViewId)).isNotNull(); Task actualTaskView = taskQueue.poll(1, TimeUnit.SECONDS); @@ -75,8 +73,8 @@ public void test_new_task_with_name_routing() throws Exception { public void test_new_task_two_workers() throws Exception { try (TaskSupplierAmqp otherConsumer = new TaskSupplierAmqp(AMQP)) { otherConsumer.consumeTasks(t -> taskQueue.add(t)); - taskManager.startTask("taskName1", User.local(), new HashMap<>()); - taskManager.startTask("taskName2", User.local(), new HashMap<>()); + taskManager.startTask("taskName1", new HashMap<>()); + taskManager.startTask("taskName2", new HashMap<>()); Task actualTask1 = taskQueue.poll(1, TimeUnit.SECONDS); Task actualTask2 = taskQueue.poll(1, TimeUnit.SECONDS); @@ -88,7 +86,7 @@ public void test_new_task_two_workers() throws Exception { @Test(timeout = 2000) public void test_task_progress() throws Exception { - taskManager.startTask("taskName", User.local(), new HashMap<>()); + taskManager.startTask("taskName", new HashMap<>()); // in the task runner loop Task task = taskQueue.poll(2, TimeUnit.SECONDS); // to sync @@ -100,7 +98,7 @@ public void test_task_progress() throws Exception { @Test(timeout = 2000) public void test_task_result() throws Exception { - taskManager.startTask("taskName", User.local(), new HashMap<>()); + taskManager.startTask("taskName", new HashMap<>()); // in the task runner loop Task task = taskQueue.poll(2, TimeUnit.SECONDS); // to sync @@ -113,7 +111,7 @@ public void test_task_result() throws Exception { @Test(timeout = 2000) public void test_task_error() throws Exception { - taskManager.startTask("taskName", User.local(), new HashMap<>()); + taskManager.startTask("taskName", new HashMap<>()); // in the task runner loop Task task = taskQueue.poll(2, TimeUnit.SECONDS); // to sync @@ -127,8 +125,8 @@ public void test_task_error() throws Exception { @Test public void test_clear_task_among_two_tasks() throws Exception { - String taskView1Id = taskManager.startTask("taskName1", User.local(), new HashMap<>()); - taskManager.startTask("taskName2", User.local(), new HashMap<>()); + String taskView1Id = taskManager.startTask("taskName1", new HashMap<>()); + taskManager.startTask("taskName2", new HashMap<>()); assertThat(taskManager.getTasks()).hasSize(2); @@ -141,7 +139,7 @@ public void test_clear_task_among_two_tasks() throws Exception { @Test(expected = IllegalStateException.class) public void test_clear_running_task_should_throw_exception() throws Exception { - taskManager.startTask("taskName", User.local(), new HashMap<>()); + taskManager.startTask("taskName", new HashMap<>()); assertThat(taskManager.getTasks()).hasSize(1); @@ -157,7 +155,7 @@ public void test_clear_running_task_should_throw_exception() throws Exception { @Test(timeout = 2000) public void test_task_canceled() throws Exception { - taskManager.startTask("taskName", User.local(), new HashMap<>()); + taskManager.startTask("taskName", new HashMap<>()); // in the task runner loop Task task = taskQueue.poll(2, TimeUnit.SECONDS); // to sync @@ -168,6 +166,30 @@ public void test_task_canceled() throws Exception { assertThat(taskManager.getTask(task.id).getState()).isEqualTo(Task.State.CANCELLED); } + @Test + public void test_save_task() throws TaskAlreadyExists, IOException { + Task task = new Task<>("name", new HashMap<>()); + + taskManager.save(task, null); + + assertThat(taskManager.getTasks()).hasSize(1); + assertThat(taskManager.getTask(task.id)).isNotNull(); + } + + @Test + public void test_update_task() throws TaskAlreadyExists, IOException { + // Given + Task task = new Task<>("HelloWorld", Map.of("greeted", "world")); + TaskMetadata meta = new TaskMetadata<>(task, null); + Task update = new Task<>(task.id, task.name, task.getState(), 0.5, null, task.args); + // When + taskManager.saveMetadata(meta); + taskManager.update(update); + Task updated = taskManager.getTask(task.id); + // Then + assertThat(updated).isEqualTo(update); + } + @BeforeClass public static void beforeClass() throws Exception { AMQP = new AmqpInterlocutor(new PropertiesProvider(new HashMap<>() {{ @@ -180,16 +202,7 @@ public static void beforeClass() throws Exception { @Before public void setUp() throws IOException { nextMessage = new CountDownLatch(1); - final RedissonClient redissonClient = new RedissonClientFactory().withOptions( - Options.from(new PropertiesProvider(Map.of("redisAddress", "redis://redis:6379")).getProperties())).create(); - Map> tasks = new RedissonMap<>(new TaskManagerRedis.TaskViewCodec(), - new CommandSyncService(((Redisson) redissonClient).getConnectionManager(), - new RedissonObjectBuilder(redissonClient)), - "tasks:queue:test", - redissonClient, - null, - null - ); + Map> tasks = new ConcurrentHashMap<>(); taskManager = new TaskManagerAmqp(AMQP, tasks, RoutingStrategy.UNIQUE, () -> nextMessage.countDown()); taskSupplier = new TaskSupplierAmqp(AMQP); taskSupplier.consumeTasks(t -> taskQueue.add(t)); @@ -199,7 +212,7 @@ public void setUp() throws IOException { public void tearDown() throws Exception { taskQueue.clear(); taskManager.clear(); - taskManager.stopAllTasks(User.local()); + taskManager.stopAllTasks(); taskSupplier.close(); taskManager.close(); } diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerMemoryTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerMemoryTest.java index 10ee8f6c7..37831464c 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerMemoryTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerMemoryTest.java @@ -1,8 +1,9 @@ package org.icij.datashare.asynctasks; +import java.io.IOException; +import java.util.HashMap; import org.icij.datashare.PropertiesProvider; import org.icij.datashare.test.LogbackCapturingRule; -import org.icij.datashare.user.User; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -38,7 +39,7 @@ public void setUp() throws Exception { @Test public void test_run_task() throws Exception { - Task task = new Task<>(TestFactory.HelloWorld.class.getName(), User.local(), Map.of("greeted", "world")); + Task task = new Task<>(TestFactory.HelloWorld.class.getName(), Map.of("greeted", "world")); String tid = taskManager.startTask(task); taskManager.awaitTermination(100, TimeUnit.MILLISECONDS); @@ -50,7 +51,7 @@ public void test_run_task() throws Exception { @Test public void test_stop_current_task() throws Exception { - Task task = new Task<>(TestFactory.SleepForever.class.getName(), User.local(), Map.of("intParameter", 2000)); + Task task = new Task<>(TestFactory.SleepForever.class.getName(), Map.of("intParameter", 2000)); String taskId = taskManager.startTask(task); taskInspector.awaitToBeStarted(taskId, 10000); @@ -63,8 +64,8 @@ public void test_stop_current_task() throws Exception { @Test public void test_stop_queued_task() throws Exception { - Task t1 = new Task<>(TestFactory.SleepForever.class.getName(), User.local(), Map.of()); - Task t2 = new Task<>(TestFactory.HelloWorld.class.getName(), User.local(), Map.of("greeted", "stucked task")); + Task t1 = new Task<>(TestFactory.SleepForever.class.getName(), Map.of()); + Task t2 = new Task<>(TestFactory.HelloWorld.class.getName(), Map.of("greeted", "stucked task")); taskManager.startTask(t1); taskManager.startTask(t2); @@ -81,7 +82,7 @@ public void test_stop_queued_task() throws Exception { @Test public void test_clear_the_only_task() throws Exception { - Task task = new Task<>("sleep", User.local(), Map.of("intParameter", 12)); + Task task = new Task<>("sleep", Map.of("intParameter", 12)); taskManager.startTask(task); taskManager.awaitTermination(1, TimeUnit.SECONDS); @@ -94,7 +95,7 @@ public void test_clear_the_only_task() throws Exception { @Test(expected = IllegalStateException.class) public void test_clear_running_task_should_throw_exception() throws Exception { - Task task = new Task<>("sleep", User.local(), Map.of("intParameter", 12)); + Task task = new Task<>("sleep", Map.of("intParameter", 12)); taskManager.startTask(task); taskManager.awaitTermination(1, TimeUnit.SECONDS); @@ -120,9 +121,32 @@ public void test_result_on_unknown_task() throws Exception { "unknown task id for result=0.5 call"); } + @Test + public void test_save_task() throws TaskAlreadyExists, IOException { + Task task = new Task<>("name", new HashMap<>()); + + taskManager.save(task, null); + + assertThat(taskManager.getTasks()).hasSize(1); + assertThat(taskManager.getTask(task.id)).isNotNull(); + } + + @Test + public void test_update_task() throws TaskAlreadyExists, IOException { + // Given + Task task = new Task<>("HelloWorld", Map.of("greeted", "world")); + TaskMetadata meta = new TaskMetadata<>(task, null); + Task update = new Task<>(task.id, task.name, task.getState(), 0.5, null, task.args); + // When + taskManager.saveMetadata(meta); + taskManager.update(update); + Task updated = taskManager.getTask(task.id); + // Then + assertThat(updated).isEqualTo(update); + } @Test public void test_wait_task_to_be_done() throws Exception { - taskManager.startTask(TestFactory.Sleep.class, User.local(), Map.of("duration", 100)); + taskManager.startTask(TestFactory.Sleep.class, Map.of("duration", 100)); List> tasks = taskManager.waitTasksToBeDone(200, TimeUnit.MILLISECONDS); assertThat(tasks).hasSize(1); } diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisCodecTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisCodecTest.java index 6722e9abb..6ecd986e3 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisCodecTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisCodecTest.java @@ -3,7 +3,6 @@ import io.netty.buffer.Unpooled; import org.fest.assertions.Assertions; import org.icij.datashare.asynctasks.bus.amqp.UriResult; -import org.icij.datashare.user.User; import org.junit.Test; import org.redisson.client.handler.State; @@ -15,14 +14,13 @@ import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.MapAssert.entry; -import static org.icij.datashare.json.JsonObjectMapper.MAPPER; public class TaskManagerRedisCodecTest { - TaskManagerRedis.TaskViewCodec codec = new TaskManagerRedis.TaskViewCodec(); + TaskManagerRedis.RedisCodec codec = new TaskManagerRedis.RedisCodec<>(Task.class); @Test public void test_json_serialize_deserialize_with_inline_properties_map() throws Exception { - Task taskView = new Task<>("name", User.local(), Map.of("key", "value")); + Task taskView = new Task<>("name", Map.of("key", "value")); String json = codec.getValueEncoder().encode(taskView).toString(Charset.defaultCharset()); assertThat(json).contains("\"key\":\"value\""); @@ -30,14 +28,13 @@ public void test_json_serialize_deserialize_with_inline_properties_map() throws Task actualTask = (Task) codec.getValueDecoder().decode(Unpooled.wrappedBuffer(json.getBytes()), new State()); Assertions.assertThat(actualTask.name).isEqualTo("name"); - Assertions.assertThat(actualTask.args).hasSize(2); + Assertions.assertThat(actualTask.args).hasSize(1); Assertions.assertThat(actualTask.args).includes(entry("key", "value")); - Assertions.assertThat(actualTask.getUser()).isEqualTo(User.local()); } @Test public void test_uri_result() throws Exception { - Task task = new Task<>("name", User.local(), new HashMap<>()); + Task task = new Task<>("name", new HashMap<>()); task.setResult(new UriResult(new URI("file://uri"), 123L)); assertThat(encodeDecode(task).getResult()).isInstanceOf(UriResult.class); @@ -45,7 +42,7 @@ public void test_uri_result() throws Exception { @Test public void test_simple_results() throws Exception { - Task task = new Task<>("name", User.local(), new HashMap<>()); + Task task = new Task<>("name", new HashMap<>()); task.setResult(123L); assertThat(encodeDecode(task).getResult()).isInstanceOf(Long.class); diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisTest.java index d7e1e7c97..c2867d701 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerRedisTest.java @@ -2,7 +2,6 @@ import org.icij.datashare.PropertiesProvider; import org.icij.datashare.tasks.RoutingStrategy; -import org.icij.datashare.user.User; import org.icij.extract.redis.RedissonClientFactory; import org.icij.task.Options; import org.junit.After; @@ -38,21 +37,34 @@ public class TaskManagerRedisTest { taskSupplier = new TaskSupplierRedis(redissonClient); @Test - public void test_save_task() { - Task task = new Task<>("name", User.local(), new HashMap<>()); + public void test_save_task() throws TaskAlreadyExists, IOException { + Task task = new Task<>("name", new HashMap<>()); - taskManager.save(task); + taskManager.save(task, null); assertThat(taskManager.getTasks()).hasSize(1); assertThat(taskManager.getTask(task.id)).isNotNull(); } + @Test + public void test_update_task() throws TaskAlreadyExists, IOException { + // Given + Task task = new Task<>("HelloWorld", Map.of("greeted", "world")); + TaskMetadata meta = new TaskMetadata<>(task, null); + Task update = new Task<>(task.id, task.name, task.getState(), 0.5, null, task.args); + // When + taskManager.saveMetadata(meta); + taskManager.update(update); + Task updated = taskManager.getTask(task.id); + // Then + assertThat(updated).isEqualTo(update); + } + @Test public void test_start_task() throws IOException { - assertThat(taskManager.startTask("HelloWorld", User.local(), - new HashMap<>() {{ put("greeted", "world"); }})).isNotNull(); + String taskId = taskManager.startTask("HelloWorld", Map.of("greeted", "world")); + assertThat(taskId).isNotNull(); assertThat(taskManager.getTasks()).hasSize(1); - assertThat(taskManager.getTasks().get(0).getUser()).isEqualTo(User.local()); } @Test @@ -63,10 +75,10 @@ public void test_start_task_with_group_routing() throws Exception { this::callback); TaskSupplierRedis taskSupplier = new TaskSupplierRedis(redissonClient, "Group")) { - assertThat(groupTaskManager.startTask("HelloWorld", User.local(), new Group("Group"),Map.of("greeted", "world"))).isNotNull(); + assertThat(groupTaskManager.startTask("HelloWorld", new Group("Group"),Map.of("greeted", "world"))).isNotNull(); Task task = taskSupplier.get(2, TimeUnit.SECONDS); - assertThat(task.getGroup()).isEqualTo(new Group("Group")); + assertThat(groupTaskManager.getTaskGroup(task.id)).isEqualTo(new Group("Group")); assertThat(((RedissonBlockingQueue) groupTaskManager.taskQueue(task)).getName()).isEqualTo("TASK.Group"); } } @@ -78,7 +90,7 @@ public void test_start_task_with_name_routing() throws Exception { "test:task:manager", RoutingStrategy.NAME, this::callback); TaskSupplierRedis taskSupplier = new TaskSupplierRedis(redissonClient, "HelloWorld")) { - assertThat(nameTaskManager.startTask("HelloWorld", User.local(), Map.of("greeted", "world"))).isNotNull(); + assertThat(nameTaskManager.startTask("HelloWorld", Map.of("greeted", "world"))).isNotNull(); Task task = taskSupplier.get(2, TimeUnit.SECONDS); assertThat(((RedissonBlockingQueue) nameTaskManager.taskQueue(task)).getName()).isEqualTo("TASK.HelloWorld"); @@ -88,7 +100,7 @@ public void test_start_task_with_name_routing() throws Exception { @Test @Ignore("remove is async and clearTasks is not always done before the size is changed") public void test_done_tasks() throws Exception { - String taskViewId = taskManager.startTask("sleep", User.local(), new HashMap<>()); + String taskViewId = taskManager.startTask("sleep", new HashMap<>()); assertThat(taskManager.getTasks()).hasSize(1); @@ -103,8 +115,8 @@ public void test_done_tasks() throws Exception { @Test @Ignore("remove is async and clearTasks is not always done before the size is changed") public void test_clear_task_among_two_tasks() throws Exception { - String taskView1Id = taskManager.startTask("sleep", User.local(), new HashMap<>()); - String taskView2Id = taskManager.startTask("sleep", User.local(), new HashMap<>()); + String taskView1Id = taskManager.startTask("sleep", new HashMap<>()); + String taskView2Id = taskManager.startTask("sleep", new HashMap<>()); taskSupplier.result(taskView1Id, 123); assertThat(waitForEvent.await(1, TimeUnit.SECONDS)).isTrue(); @@ -119,7 +131,7 @@ public void test_clear_task_among_two_tasks() throws Exception { @Test(expected = IllegalStateException.class) public void test_clear_running_task_should_throw_exception() throws Exception { - String taskViewId = taskManager.startTask("sleep", User.local(), new HashMap<>()); + String taskViewId = taskManager.startTask("sleep", new HashMap<>()); taskSupplier.progress(taskViewId,0.5); assertThat(waitForEvent.await(1, TimeUnit.SECONDS)).isTrue(); @@ -130,7 +142,7 @@ public void test_clear_running_task_should_throw_exception() throws Exception { @Test public void test_done_task_result_for_file() throws Exception { - String taskViewId = taskManager.startTask("HelloWorld", User.local(), new HashMap<>() {{ + String taskViewId = taskManager.startTask("HelloWorld", new HashMap<>() {{ put("greeted", "world"); }}); String expectedResult = "Hello world !"; diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagersIntTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagersIntTest.java index 3175d5962..621be85ed 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagersIntTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagersIntTest.java @@ -6,7 +6,6 @@ import org.icij.datashare.asynctasks.bus.amqp.AmqpQueue; import org.icij.datashare.asynctasks.bus.amqp.AmqpServerRule; import org.icij.datashare.tasks.RoutingStrategy; -import org.icij.datashare.user.User; import org.icij.extract.redis.RedissonClientFactory; import org.icij.task.Options; import org.junit.After; @@ -28,7 +27,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; @@ -59,7 +57,7 @@ public static Collection taskServices() throws Exception { "messageBusAddress", "amqp://admin:admin@rabbitmq")); final RedissonClient redissonClient = new RedissonClientFactory().withOptions( Options.from(propertiesProvider.getProperties())).create(); - Map> amqpTasks = new RedissonMap<>(new TaskManagerRedis.TaskViewCodec(), + Map> amqpTasks = new RedissonMap<>(new TaskManagerRedis.RedisCodec<>(TaskMetadata.class), new CommandSyncService(((Redisson) redissonClient).getConnectionManager(), new RedissonObjectBuilder(redissonClient)), "tasks:queue:test", @@ -99,7 +97,7 @@ public TaskManagersIntTest(Creator managerCreator, Creator()); + String taskViewId = taskManager.startTask(TestFactory.SleepForever.class, new HashMap<>()); taskInspector.awaitStatus(taskViewId, Task.State.RUNNING, 1, SECONDS); taskManager.stopTask(taskViewId); @@ -112,8 +110,8 @@ public void test_stop_running_task() throws Exception { @Test(timeout = 10000) public void test_stop_queued_task() throws Exception { - String tv1Id = taskManager.startTask(TestFactory.SleepForever.class, User.local(), new HashMap<>()); - String tv2Id = taskManager.startTask(TestFactory.SleepForever.class, User.local(), new HashMap<>()); + String tv1Id = taskManager.startTask(TestFactory.SleepForever.class, new HashMap<>()); + String tv2Id = taskManager.startTask(TestFactory.SleepForever.class, new HashMap<>()); taskInspector.awaitStatus(tv1Id, Task.State.RUNNING, 1, SECONDS); taskManager.stopTask(tv2Id); @@ -128,8 +126,8 @@ public void test_stop_queued_task() throws Exception { @Test(timeout = 10000) public void test_await_tasks_termination() throws Exception { - String tv1Id = taskManager.startTask(TestFactory.Sleep.class, User.local(), Map.of("duration", 100)); - String tv2Id = taskManager.startTask(TestFactory.Sleep.class, User.local(), Map.of("duration", 200)); + String tv1Id = taskManager.startTask(TestFactory.Sleep.class, Map.of("duration", 100)); + String tv2Id = taskManager.startTask(TestFactory.Sleep.class, Map.of("duration", 200)); taskManager.awaitTermination(2, SECONDS); assertThat(taskManager.getTask(tv1Id).getState()).isEqualTo(Task.State.DONE); diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskTest.java index c5226c284..fe893fca3 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskTest.java @@ -2,7 +2,6 @@ import org.fest.assertions.Assertions; import org.icij.datashare.json.JsonObjectMapper; -import org.icij.datashare.user.User; import org.junit.Test; import java.util.HashMap; @@ -12,14 +11,13 @@ import java.util.concurrent.TimeUnit; import static org.fest.assertions.Assertions.assertThat; -import static org.fest.assertions.MapAssert.entry; public class TaskTest { private final ExecutorService executor = Executors.newSingleThreadExecutor(); @Test public void test_get_result_sync_when_task_is_running() throws InterruptedException { - Task taskView = new Task<>("name", User.local(), new HashMap<>()); + Task taskView = new Task<>("name", new HashMap<>()); executor.execute(() -> { try { taskView.getResult(1, TimeUnit.SECONDS); @@ -44,15 +42,9 @@ public void test_get_result_sync_when_task_is_not_local() { assertThat(taskView.getProgress()).isEqualTo(1); } - @Test - public void test_user_group_parameters() { - Task taskView = new Task<>("foo", User.local(), new Group("bar"), Map.of("baz", "qux")); - assertThat(taskView.args).includes(entry("group", new Group("bar")), entry("user", User.local()), entry("baz", "qux")); - } - @Test public void test_progress() { - Task taskView = new Task<>("name", User.local(), new HashMap<>()); + Task taskView = new Task<>("name", new HashMap<>()); assertThat(taskView.getProgress()).isEqualTo(0); assertThat(taskView.getState()).isEqualTo(Task.State.CREATED); @@ -90,10 +82,9 @@ public void test_json_deserialize() throws Exception { @Test public void test_serialize_deserialize() throws Exception { - Task taskView = new Task<>("name", User.local(), Map.of("key", "value")); + Task taskView = new Task<>("name", Map.of("key", "value")); String json = JsonObjectMapper.MAPPER.writeValueAsString(taskView); assertThat(json).contains("\"@type\":\"Task\""); - assertThat(json).contains("\"user\":{\"@type\":\"org.icij.datashare.user.User\""); Task taskCreation = JsonObjectMapper.MAPPER.readValue(json, Task.class); assertThat(taskCreation).isEqualTo(taskView); diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskWorkerLoopTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskWorkerLoopTest.java index 4cda5e2a9..bc189e044 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskWorkerLoopTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskWorkerLoopTest.java @@ -1,6 +1,5 @@ package org.icij.datashare.asynctasks; -import org.icij.datashare.user.User; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -28,7 +27,7 @@ public class TaskWorkerLoopTest { @Test(timeout = 2000) public void test_loop() throws Exception { TaskWorkerLoop app = new TaskWorkerLoop(registry, supplier); - Task taskView = new Task<>(TestFactory.HelloWorld.class.getName(), User.local(), Map.of("greeted", "world")); + Task taskView = new Task<>(TestFactory.HelloWorld.class.getName(), Map.of("greeted", "world")); Mockito.when(supplier.get(ArgumentMatchers.anyInt(), ArgumentMatchers.any())).thenReturn(taskView); CountDownLatch taskStarted = whenTaskHasStarted(taskView.id); @@ -42,9 +41,9 @@ public void test_loop() throws Exception { } @Test(timeout = 2000) - public void test_unknown_task() throws Exception { + public void test_unknown_task() { TaskWorkerLoop app = new TaskWorkerLoop(registry, supplier); - Task taskView = new Task<>("unknown_task", User.local(), Map.of()); + Task taskView = new Task<>("unknown_task", Map.of()); try { app.handle(taskView); @@ -57,7 +56,7 @@ public void test_unknown_task() throws Exception { @Test(timeout = 2000) public void test_cancel_task() throws Exception { TaskWorkerLoop app = new TaskWorkerLoop(registry, supplier); - Task taskView = new Task<>(TestFactory.SleepForever.class.getName(), User.local(), Map.of()); + Task taskView = new Task<>(TestFactory.SleepForever.class.getName(), Map.of()); Mockito.when(supplier.get(ArgumentMatchers.anyInt(), ArgumentMatchers.any())).thenReturn(taskView); boolean requeue = false; CountDownLatch taskStarted = whenTaskHasStarted(taskView.id); @@ -75,7 +74,7 @@ public void test_cancel_task() throws Exception { @Test(timeout = 2000) public void test_cancel_task_and_requeue() throws Exception { TaskWorkerLoop app = new TaskWorkerLoop(registry, supplier); - Task taskView = new Task<>(TestFactory.SleepForever.class.getName(), User.local(), Map.of()); + Task taskView = new Task<>(TestFactory.SleepForever.class.getName(), Map.of()); Mockito.when(supplier.get(ArgumentMatchers.anyInt(), ArgumentMatchers.any())).thenReturn(taskView); boolean requeue = true; CountDownLatch taskStarted = whenTaskHasStarted(taskView.id); @@ -93,7 +92,7 @@ public void test_cancel_task_and_requeue() throws Exception { @Test(timeout = 2000) public void test_task_interrupted() throws Exception { TaskWorkerLoop app = new TaskWorkerLoop(registry, supplier); - Task taskView = new Task<>(TestFactory.SleepForever.class.getName(), User.local(), Map.of()); + Task taskView = new Task<>(TestFactory.SleepForever.class.getName(), Map.of()); Mockito.when(supplier.get(ArgumentMatchers.anyInt(), ArgumentMatchers.any())).thenReturn(taskView); CountDownLatch taskStarted = whenTaskHasStarted(taskView.id);