Skip to content

Commit

Permalink
chore: remove user and group from task args
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Nov 26, 2024
1 parent 4771cd5 commit bb4f471
Show file tree
Hide file tree
Showing 28 changed files with 398 additions and 254 deletions.
15 changes: 7 additions & 8 deletions datashare-app/src/main/java/org/icij/datashare/CliApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -112,37 +111,37 @@ private static void runTaskWorker(CommonMode mode, Properties properties) throws
logger.info("executing {}", pipeline);
if (pipeline.has(Stage.DEDUPLICATE)) {
taskManager.startTask(
new Task<>(DeduplicateTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(DeduplicateTask.class.getName(), propertiesToMap(properties)), nullUser());
}

if (pipeline.has(Stage.SCANIDX)) {
taskManager.startTask(
new Task<>(ScanIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(ScanIndexTask.class.getName(), propertiesToMap(properties)), nullUser());
}

if (pipeline.has(Stage.SCAN)) {
taskManager.startTask(
new Task<>(ScanTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(ScanTask.class.getName(), propertiesToMap(properties)), nullUser());
}

if (pipeline.has(Stage.INDEX)) {
taskManager.startTask(
new Task<>(IndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(IndexTask.class.getName(), propertiesToMap(properties)), nullUser());
}

if (pipeline.has(Stage.ENQUEUEIDX)) {
taskManager.startTask(
new Task<>(EnqueueFromIndexTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(EnqueueFromIndexTask.class.getName(), propertiesToMap(properties)), nullUser());
}

if (pipeline.has(Stage.NLP)) {
taskManager.startTask(
new Task<>(ExtractNlpTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(ExtractNlpTask.class.getName(), propertiesToMap(properties)), nullUser());
}

if (pipeline.has(Stage.ARTIFACT)) {
taskManager.startTask(
new Task<>(ArtifactTask.class.getName(), nullUser(), propertiesToMap(properties)));
new Task<>(ArtifactTask.class.getName(), propertiesToMap(properties)), nullUser());
}
taskManager.shutdownAndAwaitTermination(Integer.MAX_VALUE, SECONDS);
indexer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private void configureBatchQueuesMemory() {
}

private void configureBatchQueuesRedis(RedissonClient redissonClient) {
bind(new TypeLiteral<BlockingQueue<Task<?>>>(){}).toInstance(new RedisBlockingQueue<>(redissonClient, DS_TASKS_QUEUE_NAME, new org.icij.datashare.asynctasks.TaskManagerRedis.TaskViewCodec()));
bind(new TypeLiteral<BlockingQueue<Task<?>>>(){}).toInstance(new RedisBlockingQueue<>(redissonClient, DS_TASKS_QUEUE_NAME, new org.icij.datashare.asynctasks.TaskManagerRedis.RedisCodec<>(Task.class)));
}

public Properties properties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.function.Pair;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.Project;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.indexing.elasticsearch.SourceExtractor;
import org.icij.datashare.user.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,7 +20,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Optional.ofNullable;
import static org.icij.datashare.cli.DatashareCliOptions.ARTIFACT_DIR_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_DEFAULT_PROJECT;
import static org.icij.datashare.cli.DatashareCliOptions.DEFAULT_PROJECT_OPT;
Expand All @@ -33,8 +32,8 @@ public class ArtifactTask extends PipelineTask<String> {
private final Path artifactDir;

@Inject
public ArtifactTask(DocumentCollectionFactory<String> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ARTIFACT, taskView.getUser(), factory, propertiesProvider, String.class);
public ArtifactTask(DocumentCollectionFactory<String> factory, Indexer indexer, PropertiesProvider propertiesProvider, @Assisted Task<Long> taskView, @Assisted final User user, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ARTIFACT, user, factory, propertiesProvider, String.class);
this.indexer = indexer;
project = Project.project(propertiesProvider.get(DEFAULT_PROJECT_OPT).orElse(DEFAULT_DEFAULT_PROJECT));
artifactDir = Path.of(propertiesProvider.get(ARTIFACT_DIR_OPT).orElseThrow(() -> new IllegalArgumentException(String.format("cannot create artifact task with empty %s", ARTIFACT_DIR_OPT))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,24 @@ public class BatchSearchRunner implements CancellableTask, UserTask, Callable<In
private final CountDownLatch callWaiterLatch;
private final BatchSearchRepository repository;
protected final Task<String> taskView;
private final User user;
protected volatile boolean cancelAsked = false;
protected volatile Thread callThread;
protected volatile boolean requeueCancel;

@Inject
public BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository,
@Assisted Task<?> taskView, @Assisted Function<Double, Void> updateCallback) {
this(indexer, propertiesProvider, repository, taskView, updateCallback, new CountDownLatch(1));
@Assisted Task<?> taskView, @Assisted final User user, @Assisted Function<Double, Void> updateCallback) {
this(indexer, propertiesProvider, repository, taskView, user, updateCallback, new CountDownLatch(1));
}

BatchSearchRunner(Indexer indexer, PropertiesProvider propertiesProvider, BatchSearchRepository repository,
Task<?> taskView, Function<Double, Void> updateCallback, CountDownLatch latch) {
Task<?> taskView, User user, Function<Double, Void> updateCallback, CountDownLatch latch) {
this.indexer = indexer;
this.propertiesProvider = propertiesProvider;
this.repository = repository;
this.taskView = (Task<String>) taskView;
this.user = user;
this.updateCallback = updateCallback;
this.callWaiterLatch = latch;
}
Expand All @@ -100,7 +102,7 @@ public Integer call() throws Exception {
int scrollSize = min(scrollSizeFromParams, MAX_SCROLL_SIZE);
callThread = Thread.currentThread();
callWaiterLatch.countDown(); // for tests
BatchSearch batchSearch = repository.get(taskView.getUser(), taskView.id);
BatchSearch batchSearch = repository.get(user, taskView.id);
if (batchSearch == null) {
logger.warn("batch search {} not found in database (check that database url is the same as datashare backend)", taskView.id);
return 0;
Expand Down Expand Up @@ -164,7 +166,7 @@ public Integer call() throws Exception {

@Override
public User getUser() {
return taskView.getUser();
return user;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@


public interface DatashareTaskFactory extends org.icij.datashare.asynctasks.TaskFactory {
BatchSearchRunner createBatchSearchRunner(Task<?> taskView, Function<Double, Void> updateCallback);
BatchDownloadRunner createBatchDownloadRunner(Task<?> taskView, Function<Double, Void> updateCallback);
BatchSearchRunner createBatchSearchRunner(Task<?> taskView, User user, Function<Double, Void> updateCallback);
BatchDownloadRunner createBatchDownloadRunner(Task<?> taskView, User user, Function<Double, Void> updateCallback);

ScanTask createScanTask(Task<Long> taskView, Function<Double, Void> updateCallback);
IndexTask createIndexTask(Task<Long> taskView, Function<Double, Void> updateCallback);
ScanIndexTask createScanIndexTask(Task<Long> taskView, Function<Double, Void> updateCallback);
ExtractNlpTask createExtractNlpTask(Task<Long> taskView, Function<Double, Void> updateCallback);
EnqueueFromIndexTask createEnqueueFromIndexTask(Task<Long> taskView, Function<Double, Void> updateCallback);
DeduplicateTask createDeduplicateTask(Task<Long> taskView, Function<Double, Void> updateCallback);
ArtifactTask createArtifactTask(Task<Long> taskView, Function<Double, Void> updateCallback);
ScanTask createScanTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);
IndexTask createIndexTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);
ScanIndexTask createScanIndexTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);
ExtractNlpTask createExtractNlpTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);
EnqueueFromIndexTask createEnqueueFromIndexTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);
DeduplicateTask createDeduplicateTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);
ArtifactTask createArtifactTask(Task<Long> taskView, User user, Function<Double, Void> updateCallback);

GenApiKeyTask createGenApiKey(User user);
DelApiKeyTask createDelApiKey(User user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.user.User;
import org.icij.extract.queue.DocumentQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,8 +25,8 @@ public class DeduplicateTask extends PipelineTask<Path> {
private final DocumentCollectionFactory<Path> factory;

@Inject
public DeduplicateTask(final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.DEDUPLICATE, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
public DeduplicateTask(final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final User user, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.DEDUPLICATE, user, factory, new PropertiesProvider(taskView.args), Path.class);
this.factory = factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;

import java.util.Optional;
import java.util.function.Function;
import org.icij.datashare.Entity;
import org.icij.datashare.PropertiesProvider;
Expand All @@ -12,10 +11,10 @@
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.ProjectProxy;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.indexing.SearchQuery;
import org.icij.datashare.text.nlp.Pipeline;
import org.icij.datashare.user.User;
import org.icij.extract.queue.DocumentQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -47,8 +46,8 @@ public class EnqueueFromIndexTask extends PipelineTask<String> {

@Inject
public EnqueueFromIndexTask(final DocumentCollectionFactory<String> factory, final Indexer indexer,
@Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ENQUEUEIDX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class);
@Assisted Task<Long> taskView, @Assisted final User user, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.ENQUEUEIDX, user, factory, new PropertiesProvider(taskView.args), String.class);
this.factory = factory;
this.indexer = indexer;
this.nlpPipeline = Pipeline.Type.parse((String) taskView.args.getOrDefault(NLP_PIPELINE_OPT, Pipeline.Type.CORENLP.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.icij.datashare.text.Project;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.nlp.Pipeline;
import org.icij.datashare.user.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +42,14 @@ public class ExtractNlpTask extends PipelineTask<String> implements Monitorable
private final int maxContentLengthChars;

@Inject
public ExtractNlpTask(Indexer indexer, PipelineRegistry registry, final DocumentCollectionFactory<String> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
this(indexer, registry.get(Pipeline.Type.parse((String)taskView.args.get(NLP_PIPELINE_OPT))), factory, taskView, updateCallback);
public ExtractNlpTask(Indexer indexer, PipelineRegistry registry, final DocumentCollectionFactory<String> factory, @Assisted Task<Long> taskView, @Assisted final User user, @Assisted final Function<Double, Void> updateCallback) {
this(indexer, registry.get(Pipeline.Type.parse((String)taskView.args.get(NLP_PIPELINE_OPT))), factory, taskView, user, updateCallback);
}


ExtractNlpTask(Indexer indexer, Pipeline pipeline, final DocumentCollectionFactory<String> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.NLP, taskView.getUser(), factory, new PropertiesProvider(taskView.args), String.class);
ExtractNlpTask(Indexer indexer, Pipeline pipeline, final DocumentCollectionFactory<String> factory, @Assisted Task<Long> taskView,
@Assisted User user, @Assisted final Function<Double, Void> updateCallback) {
super(Stage.NLP, user, factory, new PropertiesProvider(taskView.args), String.class);
this.nlpPipeline = pipeline;
project = Project.project(ofNullable((String)taskView.args.get(DEFAULT_PROJECT_OPT)).orElse(DEFAULT_DEFAULT_PROJECT));
maxContentLengthChars = (int) HumanReadableSize.parse(ofNullable((String)taskView.args.get(MAX_CONTENT_LENGTH_OPT)).orElse(valueOf(DEFAULT_MAX_CONTENT_LENGTH)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.monitoring.Monitorable;
import org.icij.datashare.text.indexing.elasticsearch.ElasticsearchSpewer;
import org.icij.datashare.user.User;
import org.icij.extract.document.DocumentFactory;
import org.icij.extract.extractor.DocumentConsumer;
import org.icij.extract.extractor.Extractor;
Expand Down Expand Up @@ -43,8 +44,9 @@ public class IndexTask extends PipelineTask<Path> implements Monitorable{
private final Integer parallelism;

@Inject
public IndexTask(final ElasticsearchSpewer spewer, final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView, @Assisted final Function<Double, Void> updateCallback) throws IOException {
super(Stage.INDEX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
public IndexTask(final ElasticsearchSpewer spewer, final DocumentCollectionFactory<Path> factory, @Assisted Task<Long> taskView,
@Assisted final User user, @Assisted final Function<Double, Void> updateCallback) throws IOException {
super(Stage.INDEX, user, factory, new PropertiesProvider(taskView.args), Path.class);
parallelism = propertiesProvider.get(PARALLELISM_OPT).map(Integer::parseInt).orElse(Runtime.getRuntime().availableProcessors());

Options<String> allTaskOptions = options().createFrom(Options.from(taskView.args));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.text.Document;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.user.User;
import org.icij.extract.extractor.ExtractionStatus;
import org.icij.extract.report.Report;
import org.icij.extract.report.ReportMap;
Expand Down Expand Up @@ -53,8 +54,8 @@ public class ScanIndexTask extends PipelineTask<Path> {

@Inject
public ScanIndexTask(DocumentCollectionFactory<Path> factory, final Indexer indexer,
@Assisted Task<Long> taskView, @Assisted Function<Double, Void> updateCallback) {
super(Stage.SCANIDX, taskView.getUser(), factory, new PropertiesProvider(taskView.args), Path.class);
@Assisted Task<Long> taskView, @Assisted final User user, @Assisted Function<Double, Void> updateCallback) {
super(Stage.SCANIDX, user, factory, new PropertiesProvider(taskView.args), Path.class);
this.scrollDuration = propertiesProvider.get(SCROLL_DURATION_OPT).orElse(DEFAULT_SCROLL_DURATION);
this.scrollSize = parseInt(propertiesProvider.get(SCROLL_SIZE_OPT).orElse(valueOf(DEFAULT_SCROLL_SIZE)));
this.scrollSlices = parseInt(propertiesProvider.get(SCROLL_SLICES_OPT).orElse(valueOf(DEFAULT_SCROLL_SLICES)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.icij.datashare.asynctasks.TaskGroup;
import org.icij.datashare.cli.DatashareCliOptions;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.user.User;
import org.icij.extract.Scanner;
import org.icij.extract.ScannerVisitor;
import org.icij.task.Options;
Expand All @@ -24,8 +25,8 @@ public class ScanTask extends PipelineTask<Path> {
private final Path path;

@Inject
public ScanTask(DocumentCollectionFactory<Path> factory, @Assisted Task<Long> task, @Assisted Function<Double, Void> updateCallback) {
super(Stage.SCAN, task.getUser(), factory, new PropertiesProvider(task.args), Path.class);
public ScanTask(DocumentCollectionFactory<Path> factory, @Assisted Task<Long> task, @Assisted final User user, @Assisted Function<Double, Void> updateCallback) {
super(Stage.SCAN, user, factory, new PropertiesProvider(task.args), Path.class);
scanner = new Scanner(outputQueue).configure(options().createFrom(Options.from(task.args)));
path = Paths.get((String)task.args.get(DatashareCliOptions.DATA_DIR_OPT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.asynctasks.TaskManagerRedis;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor;
import org.icij.datashare.cli.DatashareCliOptions;
import org.icij.datashare.mode.CommonMode;
import org.jetbrains.annotations.NotNull;
import org.redisson.Redisson;
import org.redisson.RedissonMap;
import org.redisson.api.RedissonClient;
Expand All @@ -32,8 +29,8 @@ public TaskManagerAmqp(AmqpInterlocutor amqp, RedissonClient redissonClient, Pro
super(amqp, createTaskQueue(redissonClient), Utils.getRoutingStrategy(propertiesProvider), eventCallback);
}

private static RedissonMap<String, Task<?>> createTaskQueue(RedissonClient redissonClient) {
return new RedissonMap<>(new TaskManagerRedis.TaskViewCodec(),
private static RedissonMap<String, TaskMetadata<?>> createTaskQueue(RedissonClient redissonClient) {
return new RedissonMap<>(new TaskManagerRedis.RedisCodec<>(TaskMetadata.class),
new CommandSyncService(((Redisson) redissonClient).getConnectionManager(),
new RedissonObjectBuilder(redissonClient)),
CommonMode.DS_TASK_MANAGER_QUEUE_NAME,
Expand Down
Loading

0 comments on commit bb4f471

Please sign in to comment.