From d7e91e4d2c11d465ff7b191dc57f8c0298cb5ebd Mon Sep 17 00:00:00 2001 From: Attila Kelemen Date: Sun, 16 Jul 2017 00:21:58 +0200 Subject: [PATCH] Added a methods for TaskGraphExecutor to be able to track individual nodes. --- .../java/org/jtrim2/taskgraph/BuiltGraph.java | 62 +++++++++++++ .../jtrim2/taskgraph/TaskGraphExecutor.java | 37 +++++++- .../basic/RestrictableTaskGraphExecutor.java | 45 +++++++++- .../basic/CollectingTaskGraphBuilderTest.java | 15 ++++ .../RestrictableTaskGraphExecutorTest.java | 89 +++++++++++++++++++ 5 files changed, 244 insertions(+), 4 deletions(-) create mode 100644 subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/BuiltGraph.java diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/BuiltGraph.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/BuiltGraph.java new file mode 100644 index 00000000..15629b7d --- /dev/null +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/BuiltGraph.java @@ -0,0 +1,62 @@ +package org.jtrim2.taskgraph; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import org.jtrim2.taskgraph.basic.DependencyDag; + +import static java.util.Collections.*; +import static org.jtrim2.utils.ExceptionHelper.*; + +/** + * Defines a whole task execution graph. That is, the set of nodes and the + * edges. + * + *

Thread safety

+ * Instances of {@code BuiltGraph} are immutable and so can be used safely by + * multiple threads concurrently. + * + *

Synchronization transparency

+ * The methods of {@code BuiltGraph} are synchronization transparent. + */ +public final class BuiltGraph { + private final Set> nodes; + private final DependencyDag> graph; + + /** + * Creates a new task execution graph with the given edges and nodes. + * + * @param nodes specifies the set of all the nodes of the task execution + * graph. This argument cannot be {@code null} and cannot contain + * {@code null} elements. + * @param graph defines the edges of the task execution graph. That is, + * the dependencies between the nodes. This argument cannot be {@code null}. + */ + public BuiltGraph( + Set> nodes, + DependencyDag> graph) { + this.nodes = unmodifiableSet(checkNotNullElements(new HashSet<>(nodes), "nodes")); + this.graph = Objects.requireNonNull(graph, "graph"); + } + + /** + * Returns the nodes of the task execution graph. + * + * @return the nodes of the task execution graph. The returned set is never + * {@code null} and is not modifiable. + */ + public Set> getNodes() { + return nodes; + } + + /** + * Returns the edges of the task execution graph. That is, the dependencies + * between the nodes. + * + * @return the edges of the task execution graph. This method never returns + * {@code null}. + */ + public DependencyDag> getGraph() { + return graph; + } +} diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskGraphExecutor.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskGraphExecutor.java index b562246c..b61ed140 100644 --- a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskGraphExecutor.java +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskGraphExecutor.java @@ -31,6 +31,41 @@ public interface TaskGraphExecutor { */ public TaskGraphExecutorProperties.Builder properties(); + /** + * Returns the whole task graph to be executed. This method may only be called + * before executing the task graph. + * + * @return the whole task graph to be executed. This method never returns + * {@code null}. + * + * @throws IllegalStateException thrown if the graph was already started + */ + public BuiltGraph getBuiltGraph(); + + /** + * Returns the {@code CompletionStage} tracking the completion of the given + * task node. + *

+ * Note that the returned {@code CompletionStage} might be notified after + * the task graph execution terminates. Therefore, it is usually recommended + * to combine the returned {@code CompletionStage} with future of the task + * graph execution. + * + * @param the type of the result of the requested node + * @param nodeKey the node key identifying the task node whose + * {@code CompletionStage} is requested. The node with this id must + * exist. This argument cannot be {@code null}. + * @return the {@code CompletionStage} tracking the completion of the given + * task node. This method never returns {@code null}. + * + * @throws IllegalArgumentException thrown if there is no node in the graph + * with the given key + * @throws IllegalStateException thrown if the graph was already started + * + * @see #getBuiltGraph() + */ + public CompletionStage futureOf(TaskNodeKey nodeKey); + /** * Starts executing the associated task graph and will notify the returned {@code CompletionStage} * once task execution terminates. @@ -49,7 +84,7 @@ public interface TaskGraphExecutor { * Any other exception: When some unexpected issues prevented the task graph execution * to complete. * - * //TaskGraphExecutionException + * * * @param cancelToken the {@code CancellationToken} which can be used to cancel the execution * of the task graph. The framework will make a best effort to cancel the execution. diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java index 78db5296..aa661bd9 100644 --- a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java @@ -20,6 +20,7 @@ import org.jtrim2.cancel.OperationCanceledException; import org.jtrim2.concurrent.Tasks; import org.jtrim2.event.CountDownEvent; +import org.jtrim2.taskgraph.BuiltGraph; import org.jtrim2.taskgraph.ExecutionResultType; import org.jtrim2.taskgraph.TaskGraphExecutionException; import org.jtrim2.taskgraph.TaskGraphExecutionResult; @@ -74,15 +75,53 @@ public TaskGraphExecutorProperties.Builder properties() { return properties; } + private void verifyNotExecuted(StaticInput staticInput) { + if (staticInput == null) { + throw new IllegalStateException("Already executed."); + } + } + + private StaticInput getStaticInput() { + StaticInput staticInput = staticInputRef.get(); + verifyNotExecuted(staticInput); + return staticInput; + } + + /** + * {@inheritDoc } + */ + @Override + public BuiltGraph getBuiltGraph() { + StaticInput staticInput = getStaticInput(); + + return new BuiltGraph(staticInput.nodes.keySet(), staticInput.graph); + } + + /** + * {@inheritDoc } + */ + @Override + public CompletionStage futureOf(TaskNodeKey nodeKey) { + Objects.requireNonNull(nodeKey, "nodeKey"); + + StaticInput staticInput = getStaticInput(); + + @SuppressWarnings("unchecked") + TaskNode node = (TaskNode) staticInput.nodes.get(nodeKey); + if (node == null) { + throw new IllegalArgumentException("Unknown node: " + nodeKey); + } + + return node.taskFuture(); + } + /** * {@inheritDoc } */ @Override public CompletionStage execute(CancellationToken cancelToken) { StaticInput staticInput = staticInputRef.getAndSet(null); - if (staticInput == null) { - throw new IllegalStateException("Already executed."); - } + verifyNotExecuted(staticInput); GraphExecutor executor = new GraphExecutor(cancelToken, properties.build(), staticInput); return executor.execute(); diff --git a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/CollectingTaskGraphBuilderTest.java b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/CollectingTaskGraphBuilderTest.java index 773e9ce9..5a8afeef 100644 --- a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/CollectingTaskGraphBuilderTest.java +++ b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/CollectingTaskGraphBuilderTest.java @@ -28,6 +28,7 @@ import org.jtrim2.executor.TaskExecutor; import org.jtrim2.executor.TaskExecutors; import org.jtrim2.logs.LogCollector; +import org.jtrim2.taskgraph.BuiltGraph; import org.jtrim2.taskgraph.TaskErrorHandler; import org.jtrim2.taskgraph.TaskFactory; import org.jtrim2.taskgraph.TaskFactoryConfig; @@ -836,6 +837,20 @@ public TestOutput computeAndVerifyResult(Object factoryKey, Object nodeKey, Obje return nodesMap; } + @Override + public BuiltGraph getBuiltGraph() { + return new BuiltGraph(nodesMap.keySet(), graph); + } + + @Override + public CompletionStage futureOf(TaskNodeKey nodeKey) { + @SuppressWarnings("unchecked") + TaskNode node = (TaskNode) nodesMap.get(nodeKey); + assertNotNull("node", node); + + return node.taskFuture(); + } + @Override public TaskGraphExecutorProperties.Builder properties() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutorTest.java b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutorTest.java index d9b0362b..bf132b02 100644 --- a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutorTest.java +++ b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutorTest.java @@ -27,12 +27,14 @@ import org.jtrim2.executor.SyncTaskExecutor; import org.jtrim2.executor.TaskExecutor; import org.jtrim2.logs.LogCollector; +import org.jtrim2.taskgraph.BuiltGraph; import org.jtrim2.taskgraph.ExecutionResultType; import org.jtrim2.taskgraph.TaskErrorHandler; import org.jtrim2.taskgraph.TaskGraphExecutionException; import org.jtrim2.taskgraph.TaskGraphExecutionResult; import org.jtrim2.taskgraph.TaskNodeKey; import org.jtrim2.taskgraph.TaskNodeProperties; +import org.jtrim2.testutils.TestUtils; import org.jtrim2.utils.ExceptionHelper; import org.junit.Before; import org.junit.Test; @@ -683,6 +685,85 @@ public void testFailureResultWithStopOnFailure() { verifyNotRequestedResult(result, "child2.child2"); } + private static RestrictableTaskGraphExecutor createDummyExecutor() { + DependencyDag> graph = doubleSplitLeafsGraph(); + + Object[] nodeKeys = { + "root", "child1", "child2", "child1.child1", "child1.child2", "child2.child1", "child2.child2" + }; + TaskNodes nodes = new TaskNodes(SyncTaskExecutor.getSimpleExecutor(), nodeKeys); + + return new RestrictableTaskGraphExecutor( + graph, + nodes.getAllNodes(), + TaskExecutionRestrictionStrategies.eagerStrategy()); + } + + @Test + public void testGetBuiltGraph() { + DependencyDag> graph = doubleSplitLeafsGraph(); + + Object[] nodeKeys = { + "X", "root", "child1", "child2", "child1.child1", "child1.child2", "child2.child1", "child2.child2" + }; + TaskNodes nodes = new TaskNodes(SyncTaskExecutor.getSimpleExecutor(), nodeKeys); + + RestrictableTaskGraphExecutor executor = new RestrictableTaskGraphExecutor( + graph, + nodes.getAllNodes(), + TaskExecutionRestrictionStrategies.eagerStrategy()); + + BuiltGraph builtGraph = executor.getBuiltGraph(); + assertEquals("nodes", + Arrays.stream(nodeKeys).map(TestNodes::node).collect(Collectors.toSet()), + builtGraph.getNodes()); + assertSame("graph", graph, builtGraph.getGraph()); + } + + @Test + public void testIllegalGetBuiltGraph() { + RestrictableTaskGraphExecutor executor = createDummyExecutor(); + executor.execute(Cancellation.UNCANCELABLE_TOKEN); + TestUtils.expectError(IllegalStateException.class, () -> { + executor.getBuiltGraph(); + }); + } + + @Test + public void testFutureOf() { + DependencyDag> graph = doubleSplitLeafsGraph(); + + Object[] nodeKeys = { + "X", "root", "child1", "child2", "child1.child1", "child1.child2", "child2.child1", "child2.child2" + }; + TaskNodes nodes = new TaskNodes(SyncTaskExecutor.getSimpleExecutor(), nodeKeys); + + RestrictableTaskGraphExecutor executor = new RestrictableTaskGraphExecutor( + graph, + nodes.getAllNodes(), + TaskExecutionRestrictionStrategies.eagerStrategy()); + + for (Object key : nodeKeys) { + CompletionStage executorFuture = executor.futureOf(node(key)); + if (executorFuture != nodes.getFutureFor(key)) { + throw new AssertionError("Wrong future for " + key); + } + } + + TestUtils.expectError(IllegalArgumentException.class, () -> { + executor.futureOf(node("MissingNode")); + }); + } + + @Test + public void testIllegalFutureOf() { + RestrictableTaskGraphExecutor executor = createDummyExecutor(); + executor.execute(Cancellation.UNCANCELABLE_TOKEN); + TestUtils.expectError(IllegalStateException.class, () -> { + executor.futureOf(node("root")); + }); + } + private static final class CollectorErrorHandler implements TaskErrorHandler { private final Map, Throwable> errors; private AssertionError overwrittenErrors; @@ -810,6 +891,14 @@ public TaskNodes(TaskExecutor executor, Object... keys) { } } + public CompletionStage getFutureFor(Object key) { + TestTaskNode taskNode = tasks.get(key); + if (taskNode == null) { + throw new AssertionError("No TaskNode: " + key); + } + return taskNode.node.taskFuture(); + } + public List> getAllNodes() { return tasks.values().stream() .map(TestTaskNode::getNode)