Skip to content

Commit

Permalink
chore: remove group from task args and move user to datashare app
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Nov 27, 2024
1 parent 4771cd5 commit e5a3b89
Show file tree
Hide file tree
Showing 59 changed files with 561 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.icij.datashare.asynctasks.TaskWorkerLoop;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.DatashareTaskFactory;
import org.icij.datashare.text.indexing.Indexer;
import org.redisson.api.RedissonClient;

Expand All @@ -13,7 +14,7 @@
public class BatchDownloadApp {
public static void start(Properties properties) throws Exception {
CommonMode commonMode = CommonMode.create(properties);
TaskWorkerLoop taskWorkerLoop = new TaskWorkerLoop(commonMode.get(TaskFactory.class), commonMode.get(TaskSupplier.class));
TaskWorkerLoop taskWorkerLoop = new TaskWorkerLoop(commonMode.get(DatashareTaskFactory.class), commonMode.get(TaskSupplier.class));
taskWorkerLoop.call();
commonMode.get(Indexer.class).close();
commonMode.get(RedissonClient.class).shutdown();
Expand Down
18 changes: 8 additions & 10 deletions datashare-app/src/main/java/org/icij/datashare/CliApp.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.icij.datashare;

import com.google.inject.ConfigurationException;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.cli.CliExtensionService;
import org.icij.datashare.cli.spi.CliExtension;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.ArtifactTask;
import org.icij.datashare.tasks.DatashareTask;
import org.icij.datashare.tasks.DeduplicateTask;
import org.icij.datashare.tasks.EnqueueFromIndexTask;
import org.icij.datashare.tasks.ExtractNlpTask;
Expand All @@ -20,7 +20,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -111,38 +110,37 @@ private static void runTaskWorker(CommonMode mode, Properties properties) throws
PipelineHelper pipeline = new PipelineHelper(new PropertiesProvider(properties));
logger.info("executing {}", pipeline);
if (pipeline.has(Stage.DEDUPLICATE)) {
taskManager.startTask(
new Task<>(DeduplicateTask.class.getName(), nullUser(), propertiesToMap(properties)));
taskManager.startTask(DatashareTask.task(DeduplicateTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.SCANIDX)) {
taskManager.startTask(
new Task<>(ScanIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ScanIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.SCAN)) {
taskManager.startTask(
new Task<>(ScanTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ScanTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.INDEX)) {
taskManager.startTask(
new Task<>(IndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(IndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.ENQUEUEIDX)) {
taskManager.startTask(
new Task<>(EnqueueFromIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(EnqueueFromIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.NLP)) {
taskManager.startTask(
new Task<>(ExtractNlpTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ExtractNlpTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.ARTIFACT)) {
taskManager.startTask(
new Task<>(ArtifactTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ArtifactTask.class.getName(), nullUser(), propertiesToMap(properties)));
}
taskManager.shutdownAndAwaitTermination(Integer.MAX_VALUE, SECONDS);
indexer.close();
Expand Down
6 changes: 3 additions & 3 deletions datashare-app/src/main/java/org/icij/datashare/WebApp.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.icij.datashare;

import net.codestory.http.WebServer;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.bus.amqp.QpidAmqpServer;
import org.icij.datashare.batch.BatchSearch;
import org.icij.datashare.batch.BatchSearchRepository;
Expand All @@ -16,6 +15,7 @@
import java.net.Socket;
import java.net.URI;
import java.util.Properties;
import org.icij.datashare.tasks.DatashareTaskManager;

import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
Expand Down Expand Up @@ -48,7 +48,7 @@ static void start(Properties properties) throws Exception {
waitForServerToBeUp(parseInt(mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT)));
Desktop.getDesktop().browse(URI.create(new URI("http://localhost:")+mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT)));
}
requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(TaskManager.class));
requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(DatashareTaskManager.class));
webServerThread.join();
}

Expand All @@ -66,7 +66,7 @@ private static void waitForServerToBeUp(int tcpListenPort) throws InterruptedExc
}
}

private static void requeueDatabaseBatchSearches(BatchSearchRepository repository, TaskManager taskManager) throws IOException {
private static void requeueDatabaseBatchSearches(BatchSearchRepository repository, DatashareTaskManager taskManager) throws IOException {
for (String batchSearchUuid: repository.getQueued()) {
BatchSearch batchSearch = repository.get(batchSearchUuid);
taskManager.startTask(batchSearchUuid, BatchSearchRunner.class, batchSearch.user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.icij.datashare.Repository;
import org.icij.datashare.TesseractOCRParserWrapper;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.TaskModifier;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor;
Expand All @@ -37,6 +36,7 @@
import org.icij.datashare.nlp.EmailPipeline;
import org.icij.datashare.nlp.OptimaizeLanguageGuesser;
import org.icij.datashare.tasks.DatashareTaskFactory;
import org.icij.datashare.tasks.DatashareTaskManager;
import org.icij.datashare.tasks.TaskManagerAmqp;
import org.icij.datashare.tasks.TaskManagerMemory;
import org.icij.datashare.tasks.TaskManagerRedis;
Expand Down Expand Up @@ -156,19 +156,19 @@ protected void configure() {
switch ( batchQueueType ) {
case REDIS:
configureBatchQueuesRedis(redissonClient);
bind(TaskManager.class).to(TaskManagerRedis.class);
bind(DatashareTaskManager.class).to(TaskManagerRedis.class);
bind(TaskModifier.class).to(TaskSupplierRedis.class);
bind(TaskSupplier.class).to(TaskSupplierRedis.class);
break;
case AMQP:
configureBatchQueuesRedis(redissonClient);
bind(TaskManager.class).to(TaskManagerAmqp.class);
bind(DatashareTaskManager.class).to(TaskManagerAmqp.class);
bind(TaskSupplier.class).to(TaskSupplierAmqp.class);
bind(TaskModifier.class).to(TaskSupplierAmqp.class);
break;
default:
configureBatchQueuesMemory();
bind(TaskManager.class).to(TaskManagerMemory.class);
bind(DatashareTaskManager.class).to(TaskManagerMemory.class);
bind(TaskModifier.class).to(TaskManagerMemory.class);
bind(TaskSupplier.class).to(TaskManagerMemory.class);
}
Expand Down Expand Up @@ -199,7 +199,7 @@ private void configureBatchQueuesMemory() {
}

private void configureBatchQueuesRedis(RedissonClient redissonClient) {
bind(new TypeLiteral<BlockingQueue<Task<?>>>(){}).toInstance(new RedisBlockingQueue<>(redissonClient, DS_TASKS_QUEUE_NAME, new org.icij.datashare.asynctasks.TaskManagerRedis.TaskViewCodec()));
bind(new TypeLiteral<BlockingQueue<Task<?>>>(){}).toInstance(new RedisBlockingQueue<>(redissonClient, DS_TASKS_QUEUE_NAME, new org.icij.datashare.asynctasks.TaskManagerRedis.RedisCodec<>(Task.class)));
}

public Properties properties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.function.Pair;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Project;
import org.icij.datashare.text.indexing.Indexer;
Expand All @@ -20,7 +19,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Optional.ofNullable;
import static org.icij.datashare.cli.DatashareCliOptions.ARTIFACT_DIR_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_DEFAULT_PROJECT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_PROJECT_OPT;
Expand All @@ -34,7 +32,7 @@ public class ArtifactTask extends PipelineTask<String> {

@Inject
public ArtifactTask(DocumentCollectionFactory<String> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, String.class);
super(Stage.ARTIFACT, DatashareTask.getUser(taskView), factory, propertiesProvider, String.class);
this.indexer = indexer;
project = Project.project(propertiesProvider.get(DEFAULT_PROJECT_OPT).orElse(DEFAULT_DEFAULT_PROJECT));
artifactDir = Path.of(propertiesProvider.get(ARTIFACT_DIR_OPT).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,23 @@ public class BatchSearchRunner implements CancellableTask, UserTask, Callable<In
private final CountDownLatch callWaiterLatch;
private final BatchSearchRepository repository;
protected final Task<String> taskView;
private final User user;
protected volatile boolean cancelAsked = false;
protected volatile Thread callThread;
protected volatile boolean requeueCancel;

@Inject
public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository,
@Assisted Task<?> taskView, @Assisted Function<Double, Void> updateCallback) {
public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository, @Assisted Task<?> taskView, @Assisted Function<Double, Void> updateCallback) {
this(indexer, propertiesProvider, repository, taskView, updateCallback, new CountDownLatch(1));
}

BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository,
Task<?> taskView, Function<Double, Void> updateCallback, CountDownLatch latch) {
Task<?> taskView, Function<Double, Void> updateCallback, CountDownLatch latch) {
this.indexer = indexer;
this.propertiesProvider = propertiesProvider;
this.repository = repository;
this.taskView = (Task<String>) taskView;
this.user = DatashareTask.getUser(this.taskView);
this.updateCallback = updateCallback;
this.callWaiterLatch = latch;
}
Expand All @@ -100,7 +101,7 @@ public Integer call() throws Exception {
int scrollSize = min(scrollSizeFromParams, MAX_SCROLL_SIZE);
callThread = Thread.currentThread();
callWaiterLatch.countDown(); // for tests
BatchSearch batchSearch = repository.get(taskView.getUser(), taskView.id);
BatchSearch batchSearch = repository.get(user, taskView.id);
if (batchSearch == null) {
logger.warn("batch search {} not found in database (check that database url is the same as datashare backend)", taskView.id);
return 0;
Expand Down Expand Up @@ -164,7 +165,7 @@ public Integer call() throws Exception {

@Override
public User getUser() {
return taskView.getUser();
return user;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.icij.datashare.tasks;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.user.User;

public class DatashareTask {
public static final String USER_KEY = "user";

public static <V> Task<V> task(String name, User user, Map<String, Object> args) {
return new Task<>(name, addTo(args, user));
}

public static <V> Task<V> task(String id, String name, User user) {
return new Task<>(id, name, addTo(new HashMap<>(), user));
}

public static User getUser(Task<?> task) {
return (User) task.args.get(USER_KEY);
}

private static Map<String, Object> addTo(Map<String, Object> properties, User user) {
LinkedHashMap<String, Object> result = new LinkedHashMap<>(properties);
result.put(USER_KEY, user);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.icij.datashare.tasks;

import static java.util.stream.Collectors.toMap;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.icij.datashare.asynctasks.Group;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.user.User;

public interface DatashareTaskManager extends TaskManager {
// TODO: can we do better using generics instead of casts ?
default String startTask(Class<?> taskClass, User user, Map<String, Object> properties) throws IOException {
return startTask(DatashareTask.task(taskClass.getName(), user, properties), new Group(taskClass.getAnnotation(
TaskGroup.class).value()));
}

default String startTask(String id, Class<?> taskClass, User user) throws IOException {
return startTask(DatashareTask.task(id, taskClass.getName(), user), new Group(taskClass.getAnnotation(TaskGroup.class).value()));
}

default List<Task<?>> getTasks(Stream<Task<?>> stream, User user, Pattern pattern) {
return getTasks(stream, pattern).stream().map( t -> ( Task<?> ) t)
.filter(t -> user.equals(DatashareTask.getUser(t))).collect(Collectors.toList());
}

default List<Task<?>> getTasks(User user, Pattern pattern) throws IOException {
return getTasks(this.getTasks().stream(), user, pattern);
}

default Map<String, Boolean> stopAllTasks(User user) throws IOException {
return getTasks().stream()
.filter(t -> user.equals(DatashareTask.getUser(t)))
.filter(t -> t.getState() == Task.State.RUNNING || t.getState() == Task.State.QUEUED).collect(
toMap(t -> t.id, t -> {
try {
return stopTask(t.id);
} catch (IOException e) {
logger.error("cannot stop task {}", t.id, e);
return false;
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class DeduplicateTask extends PipelineTask<Path> {

@Inject
public DeduplicateTask(final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.DEDUPLICATE, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
super(Stage.DEDUPLICATE, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class);
this.factory = factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;

import java.util.Optional;
import java.util.function.Function;
import org.icij.datashare.Entity;
import org.icij.datashare.PropertiesProvider;
Expand All @@ -12,7 +11,6 @@
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.ProjectProxy;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.indexing.SearchQuery;
import org.icij.datashare.text.nlp.Pipeline;
Expand Down Expand Up @@ -46,9 +44,8 @@ public class EnqueueFromIndexTask extends PipelineTask<String> {
private final int scrollSize;

@Inject
public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, final Indexer indexer,
@Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ENQUEUEIDX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class);
public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, final Indexer indexer, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ENQUEUEIDX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), String.class);
this.factory = factory;
this.indexer = indexer;
this.nlpPipeline = Pipeline.Type.parse((String) taskView.args.getOrDefault(NLP_PIPELINE_OPT, Pipeline.Type.CORENLP.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ExtractNlpTask(Indexer indexer, PipelineRegistry registry, final Document


ExtractNlpTask(Indexer indexer, Pipeline pipeline, final DocumentCollectionFactory<String> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.NLP, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class);
super(Stage.NLP, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), String.class);
this.nlpPipeline = pipeline;
project = Project.project(ofNullable((String)taskView.args.get(DEFAULT_PROJECT_OPT)).orElse(DEFAULT_DEFAULT_PROJECT));
maxContentLengthChars = (int) HumanReadableSize.parse(ofNullable((String)taskView.args.get(MAX_CONTENT_LENGTH_OPT)).orElse(valueOf(DEFAULT_MAX_CONTENT_LENGTH)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class IndexTask extends PipelineTask<Path> implements Monitorable{

@Inject
public IndexTask(final ElasticsearchSpewer spewer, final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) throws IOException {
super(Stage.INDEX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
super(Stage.INDEX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class);
parallelism = propertiesProvider.get(PARALLELISM_OPT).map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors());

Options<String> allTaskOptions = options().createFrom(Options.from(taskView.args));
Expand Down
Loading

0 comments on commit e5a3b89

Please sign in to comment.