Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ chore: remove group from task args and move task user to datashare app #1614

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.icij.datashare.asynctasks.TaskWorkerLoop;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.DatashareTaskFactory;
import org.icij.datashare.text.indexing.Indexer;
import org.redisson.api.RedissonClient;

Expand All @@ -13,7 +14,7 @@
public class BatchDownloadApp {
public static void start(Properties properties) throws Exception {
CommonMode commonMode = CommonMode.create(properties);
TaskWorkerLoop taskWorkerLoop = new TaskWorkerLoop(commonMode.get(TaskFactory.class), commonMode.get(TaskSupplier.class));
TaskWorkerLoop taskWorkerLoop = new TaskWorkerLoop(commonMode.get(DatashareTaskFactory.class), commonMode.get(TaskSupplier.class));
taskWorkerLoop.call();
commonMode.get(Indexer.class).close();
commonMode.get(RedissonClient.class).shutdown();
Expand Down
18 changes: 8 additions & 10 deletions datashare-app/src/main/java/org/icij/datashare/CliApp.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.icij.datashare;

import com.google.inject.ConfigurationException;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.cli.CliExtensionService;
import org.icij.datashare.cli.spi.CliExtension;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.ArtifactTask;
import org.icij.datashare.tasks.DatashareTask;
import org.icij.datashare.tasks.DeduplicateTask;
import org.icij.datashare.tasks.EnqueueFromIndexTask;
import org.icij.datashare.tasks.ExtractNlpTask;
Expand All @@ -20,7 +20,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -111,38 +110,37 @@ private static void runTaskWorker(CommonMode mode, Properties properties) throws
PipelineHelper pipeline = new PipelineHelper(new PropertiesProvider(properties));
logger.info("executing {}", pipeline);
if (pipeline.has(Stage.DEDUPLICATE)) {
taskManager.startTask(
new Task<>(DeduplicateTask.class.getName(), nullUser(), propertiesToMap(properties)));
taskManager.startTask(DatashareTask.task(DeduplicateTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.SCANIDX)) {
taskManager.startTask(
new Task<>(ScanIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ScanIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.SCAN)) {
taskManager.startTask(
new Task<>(ScanTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ScanTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.INDEX)) {
taskManager.startTask(
new Task<>(IndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(IndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.ENQUEUEIDX)) {
taskManager.startTask(
new Task<>(EnqueueFromIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(EnqueueFromIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.NLP)) {
taskManager.startTask(
new Task<>(ExtractNlpTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ExtractNlpTask.class.getName(), nullUser(), propertiesToMap(properties)));
}

if (pipeline.has(Stage.ARTIFACT)) {
taskManager.startTask(
new Task<>(ArtifactTask.class.getName(), nullUser(), propertiesToMap(properties)));
DatashareTask.task(ArtifactTask.class.getName(), nullUser(), propertiesToMap(properties)));
}
taskManager.shutdownAndAwaitTermination(Integer.MAX_VALUE, SECONDS);
indexer.close();
Expand Down
6 changes: 3 additions & 3 deletions datashare-app/src/main/java/org/icij/datashare/WebApp.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.icij.datashare;

import net.codestory.http.WebServer;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.bus.amqp.QpidAmqpServer;
import org.icij.datashare.batch.BatchSearch;
import org.icij.datashare.batch.BatchSearchRepository;
Expand All @@ -16,6 +15,7 @@
import java.net.Socket;
import java.net.URI;
import java.util.Properties;
import org.icij.datashare.tasks.DatashareTaskManager;

import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
Expand Down Expand Up @@ -48,7 +48,7 @@ static void start(Properties properties) throws Exception {
waitForServerToBeUp(parseInt(mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT)));
Desktop.getDesktop().browse(URI.create(new URI("http://localhost:")+mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT)));
}
requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(TaskManager.class));
requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(DatashareTaskManager.class));
webServerThread.join();
}

Expand All @@ -66,7 +66,7 @@ private static void waitForServerToBeUp(int tcpListenPort) throws InterruptedExc
}
}

private static void requeueDatabaseBatchSearches(BatchSearchRepository repository, TaskManager taskManager) throws IOException {
private static void requeueDatabaseBatchSearches(BatchSearchRepository repository, DatashareTaskManager taskManager) throws IOException {
for (String batchSearchUuid: repository.getQueued()) {
BatchSearch batchSearch = repository.get(batchSearchUuid);
taskManager.startTask(batchSearchUuid, BatchSearchRunner.class, batchSearch.user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.icij.datashare.Repository;
import org.icij.datashare.TesseractOCRParserWrapper;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.TaskModifier;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor;
Expand All @@ -37,6 +36,7 @@
import org.icij.datashare.nlp.EmailPipeline;
import org.icij.datashare.nlp.OptimaizeLanguageGuesser;
import org.icij.datashare.tasks.DatashareTaskFactory;
import org.icij.datashare.tasks.DatashareTaskManager;
import org.icij.datashare.tasks.TaskManagerAmqp;
import org.icij.datashare.tasks.TaskManagerMemory;
import org.icij.datashare.tasks.TaskManagerRedis;
Expand Down Expand Up @@ -156,19 +156,19 @@ protected void configure() {
switch ( batchQueueType ) {
case REDIS:
configureBatchQueuesRedis(redissonClient);
bind(TaskManager.class).to(TaskManagerRedis.class);
bind(DatashareTaskManager.class).to(TaskManagerRedis.class);
bind(TaskModifier.class).to(TaskSupplierRedis.class);
bind(TaskSupplier.class).to(TaskSupplierRedis.class);
break;
case AMQP:
configureBatchQueuesRedis(redissonClient);
bind(TaskManager.class).to(TaskManagerAmqp.class);
bind(DatashareTaskManager.class).to(TaskManagerAmqp.class);
bind(TaskSupplier.class).to(TaskSupplierAmqp.class);
bind(TaskModifier.class).to(TaskSupplierAmqp.class);
break;
default:
configureBatchQueuesMemory();
bind(TaskManager.class).to(TaskManagerMemory.class);
bind(DatashareTaskManager.class).to(TaskManagerMemory.class);
bind(TaskModifier.class).to(TaskManagerMemory.class);
bind(TaskSupplier.class).to(TaskManagerMemory.class);
}
Expand Down Expand Up @@ -199,7 +199,7 @@ private void configureBatchQueuesMemory() {
}

private void configureBatchQueuesRedis(RedissonClient redissonClient) {
bind(new TypeLiteral<BlockingQueue<Task<?>>>(){}).toInstance(new RedisBlockingQueue<>(redissonClient, DS_TASKS_QUEUE_NAME, new org.icij.datashare.asynctasks.TaskManagerRedis.TaskViewCodec()));
bind(new TypeLiteral<BlockingQueue<Task<?>>>(){}).toInstance(new RedisBlockingQueue<>(redissonClient, DS_TASKS_QUEUE_NAME, new org.icij.datashare.asynctasks.TaskManagerRedis.RedisCodec<>(Task.class)));
}

public Properties properties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.function.Pair;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Project;
import org.icij.datashare.text.indexing.Indexer;
Expand All @@ -20,7 +19,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Optional.ofNullable;
import static org.icij.datashare.cli.DatashareCliOptions.ARTIFACT_DIR_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_DEFAULT_PROJECT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_PROJECT_OPT;
Expand All @@ -34,7 +32,7 @@ public class ArtifactTask extends PipelineTask<String> {

@Inject
public ArtifactTask(DocumentCollectionFactory<String> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, String.class);
super(Stage.ARTIFACT, DatashareTask.getUser(taskView), factory, propertiesProvider, String.class);
this.indexer = indexer;
project = Project.project(propertiesProvider.get(DEFAULT_PROJECT_OPT).orElse(DEFAULT_DEFAULT_PROJECT));
artifactDir = Path.of(propertiesProvider.get(ARTIFACT_DIR_OPT).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,23 @@ public class BatchSearchRunner implements CancellableTask, UserTask, Callable<In
private final CountDownLatch callWaiterLatch;
private final BatchSearchRepository repository;
protected final Task<String> taskView;
private final User user;
protected volatile boolean cancelAsked = false;
protected volatile Thread callThread;
protected volatile boolean requeueCancel;

@Inject
public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository,
@Assisted Task<?> taskView, @Assisted Function<Double, Void> updateCallback) {
public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository, @Assisted Task<?> taskView, @Assisted Function<Double, Void> updateCallback) {
this(indexer, propertiesProvider, repository, taskView, updateCallback, new CountDownLatch(1));
}

BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository,
Task<?> taskView, Function<Double, Void> updateCallback, CountDownLatch latch) {
Task<?> taskView, Function<Double, Void> updateCallback, CountDownLatch latch) {
this.indexer = indexer;
this.propertiesProvider = propertiesProvider;
this.repository = repository;
this.taskView = (Task<String>) taskView;
this.user = DatashareTask.getUser(this.taskView);
this.updateCallback = updateCallback;
this.callWaiterLatch = latch;
}
Expand All @@ -100,7 +101,7 @@ public Integer call() throws Exception {
int scrollSize = min(scrollSizeFromParams, MAX_SCROLL_SIZE);
callThread = Thread.currentThread();
callWaiterLatch.countDown(); // for tests
BatchSearch batchSearch = repository.get(taskView.getUser(), taskView.id);
BatchSearch batchSearch = repository.get(user, taskView.id);
if (batchSearch == null) {
logger.warn("batch search {} not found in database (check that database url is the same as datashare backend)", taskView.id);
return 0;
Expand Down Expand Up @@ -164,7 +165,7 @@ public Integer call() throws Exception {

@Override
public User getUser() {
return taskView.getUser();
return user;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.icij.datashare.tasks;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.user.User;

public class DatashareTask {
public static final String USER_KEY = "user";

public static <V> Task<V> task(String name, User user, Map<String, Object> args) {
return new Task<>(name, addTo(args, user));
}

public static <V> Task<V> task(String id, String name, User user) {
return new Task<>(id, name, addTo(new HashMap<>(), user));
}

public static User getUser(Task<?> task) {
return (User) task.args.get(USER_KEY);
}

private static Map<String, Object> addTo(Map<String, Object> properties, User user) {
LinkedHashMap<String, Object> result = new LinkedHashMap<>(properties);
result.put(USER_KEY, user);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.icij.datashare.tasks;

import static java.util.stream.Collectors.toMap;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.icij.datashare.asynctasks.Group;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.user.User;

public interface DatashareTaskManager extends TaskManager {
// TODO: can we do better using generics instead of casts ?
default String startTask(Class<?> taskClass, User user, Map<String, Object> properties) throws IOException {
return startTask(DatashareTask.task(taskClass.getName(), user, properties), new Group(taskClass.getAnnotation(
TaskGroup.class).value()));
}

default String startTask(String id, Class<?> taskClass, User user) throws IOException {
return startTask(DatashareTask.task(id, taskClass.getName(), user), new Group(taskClass.getAnnotation(TaskGroup.class).value()));
}

default List<Task<?>> getTasks(Stream<Task<?>> stream, User user, Pattern pattern) {
return getTasks(stream, pattern).stream().map( t -> ( Task<?> ) t)
.filter(t -> user.equals(DatashareTask.getUser(t))).collect(Collectors.toList());
}

default List<Task<?>> getTasks(User user, Pattern pattern) throws IOException {
return getTasks(this.getTasks().stream(), user, pattern);
}

default Map<String, Boolean> stopAllTasks(User user) throws IOException {
return getTasks().stream()
.filter(t -> user.equals(DatashareTask.getUser(t)))
.filter(t -> t.getState() == Task.State.RUNNING || t.getState() == Task.State.QUEUED).collect(
toMap(t -> t.id, t -> {
try {
return stopTask(t.id);
} catch (IOException e) {
logger.error("cannot stop task {}", t.id, e);
return false;
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class DeduplicateTask extends PipelineTask<Path> {

@Inject
public DeduplicateTask(final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.DEDUPLICATE, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
super(Stage.DEDUPLICATE, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class);
this.factory = factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;

import java.util.Optional;
import java.util.function.Function;
import org.icij.datashare.Entity;
import org.icij.datashare.PropertiesProvider;
Expand All @@ -12,7 +11,6 @@
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.ProjectProxy;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.indexing.SearchQuery;
import org.icij.datashare.text.nlp.Pipeline;
Expand Down Expand Up @@ -46,9 +44,8 @@ public class EnqueueFromIndexTask extends PipelineTask<String> {
private final int scrollSize;

@Inject
public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, final Indexer indexer,
@Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ENQUEUEIDX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class);
public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, final Indexer indexer, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ENQUEUEIDX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), String.class);
this.factory = factory;
this.indexer = indexer;
this.nlpPipeline = Pipeline.Type.parse((String) taskView.args.getOrDefault(NLP_PIPELINE_OPT, Pipeline.Type.CORENLP.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ExtractNlpTask(Indexer indexer, PipelineRegistry registry, final Document


ExtractNlpTask(Indexer indexer, Pipeline pipeline, final DocumentCollectionFactory<String> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.NLP, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class);
super(Stage.NLP, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), String.class);
this.nlpPipeline = pipeline;
project = Project.project(ofNullable((String)taskView.args.get(DEFAULT_PROJECT_OPT)).orElse(DEFAULT_DEFAULT_PROJECT));
maxContentLengthChars = (int) HumanReadableSize.parse(ofNullable((String)taskView.args.get(MAX_CONTENT_LENGTH_OPT)).orElse(valueOf(DEFAULT_MAX_CONTENT_LENGTH)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class IndexTask extends PipelineTask<Path> implements Monitorable{

@Inject
public IndexTask(final ElasticsearchSpewer spewer, final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) throws IOException {
super(Stage.INDEX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
super(Stage.INDEX, DatashareTask.getUser(taskView), factory, new PropertiesProvider(taskView.args), Path.class);
parallelism = propertiesProvider.get(PARALLELISM_OPT).map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors());

Options<String> allTaskOptions = options().createFrom(Options.from(taskView.args));
Expand Down
Loading