Skip to content

Commit

Permalink
Added a methods for TaskGraphExecutor to be able to track individual …
Browse files Browse the repository at this point in the history
…nodes.
  • Loading branch information
kelemen committed Jul 15, 2017
1 parent 0ad4abf commit d7e91e4
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.jtrim2.taskgraph;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.jtrim2.taskgraph.basic.DependencyDag;

import static java.util.Collections.*;
import static org.jtrim2.utils.ExceptionHelper.*;

/**
* Defines a whole task execution graph. That is, the set of nodes and the
* edges.
*
* <h3>Thread safety</h3>
* Instances of {@code BuiltGraph} are immutable and so can be used safely by
* multiple threads concurrently.
*
* <h4>Synchronization transparency</h4>
* The methods of {@code BuiltGraph} are <I>synchronization transparent</I>.
*/
public final class BuiltGraph {
private final Set<TaskNodeKey<?, ?>> nodes;
private final DependencyDag<TaskNodeKey<?, ?>> graph;

/**
* Creates a new task execution graph with the given edges and nodes.
*
* @param nodes specifies the set of all the nodes of the task execution
* graph. This argument cannot be {@code null} and cannot contain
* {@code null} elements.
* @param graph defines the edges of the task execution graph. That is,
* the dependencies between the nodes. This argument cannot be {@code null}.
*/
public BuiltGraph(
Set<TaskNodeKey<?, ?>> nodes,
DependencyDag<TaskNodeKey<?, ?>> graph) {
this.nodes = unmodifiableSet(checkNotNullElements(new HashSet<>(nodes), "nodes"));
this.graph = Objects.requireNonNull(graph, "graph");
}

/**
* Returns the nodes of the task execution graph.
*
* @return the nodes of the task execution graph. The returned set is never
* {@code null} and is not modifiable.
*/
public Set<TaskNodeKey<?, ?>> getNodes() {
return nodes;
}

/**
* Returns the edges of the task execution graph. That is, the dependencies
* between the nodes.
*
* @return the edges of the task execution graph. This method never returns
* {@code null}.
*/
public DependencyDag<TaskNodeKey<?, ?>> getGraph() {
return graph;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,41 @@ public interface TaskGraphExecutor {
*/
public TaskGraphExecutorProperties.Builder properties();

/**
* Returns the whole task graph to be executed. This method may only be called
* before executing the task graph.
*
* @return the whole task graph to be executed. This method never returns
* {@code null}.
*
* @throws IllegalStateException thrown if the graph was already started
*/
public BuiltGraph getBuiltGraph();

/**
* Returns the {@code CompletionStage} tracking the completion of the given
* task node.
* <P>
* Note that the returned {@code CompletionStage} might be notified <I>after</I>
* the task graph execution terminates. Therefore, it is usually recommended
* to combine the returned {@code CompletionStage} with future of the task
* graph execution.
*
* @param <R> the type of the result of the requested node
* @param nodeKey the node key identifying the task node whose
* {@code CompletionStage} is requested. The node with this id must
* exist. This argument cannot be {@code null}.
* @return the {@code CompletionStage} tracking the completion of the given
* task node. This method never returns {@code null}.
*
* @throws IllegalArgumentException thrown if there is no node in the graph
* with the given key
* @throws IllegalStateException thrown if the graph was already started
*
* @see #getBuiltGraph()
*/
public <R> CompletionStage<R> futureOf(TaskNodeKey<R, ?> nodeKey);

/**
* Starts executing the associated task graph and will notify the returned {@code CompletionStage}
* once task execution terminates.
Expand All @@ -49,7 +84,7 @@ public interface TaskGraphExecutor {
* Any other exception: When some unexpected issues prevented the task graph execution
* to complete.
* </li>
* </ul>//TaskGraphExecutionException
* </ul>
*
* @param cancelToken the {@code CancellationToken} which can be used to cancel the execution
* of the task graph. The framework will make a best effort to cancel the execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.jtrim2.cancel.OperationCanceledException;
import org.jtrim2.concurrent.Tasks;
import org.jtrim2.event.CountDownEvent;
import org.jtrim2.taskgraph.BuiltGraph;
import org.jtrim2.taskgraph.ExecutionResultType;
import org.jtrim2.taskgraph.TaskGraphExecutionException;
import org.jtrim2.taskgraph.TaskGraphExecutionResult;
Expand Down Expand Up @@ -74,15 +75,53 @@ public TaskGraphExecutorProperties.Builder properties() {
return properties;
}

private void verifyNotExecuted(StaticInput staticInput) {
if (staticInput == null) {
throw new IllegalStateException("Already executed.");
}
}

private StaticInput getStaticInput() {
StaticInput staticInput = staticInputRef.get();
verifyNotExecuted(staticInput);
return staticInput;
}

/**
* {@inheritDoc }
*/
@Override
public BuiltGraph getBuiltGraph() {
StaticInput staticInput = getStaticInput();

return new BuiltGraph(staticInput.nodes.keySet(), staticInput.graph);
}

/**
* {@inheritDoc }
*/
@Override
public <R> CompletionStage<R> futureOf(TaskNodeKey<R, ?> nodeKey) {
Objects.requireNonNull(nodeKey, "nodeKey");

StaticInput staticInput = getStaticInput();

@SuppressWarnings("unchecked")
TaskNode<R, ?> node = (TaskNode<R, ?>) staticInput.nodes.get(nodeKey);
if (node == null) {
throw new IllegalArgumentException("Unknown node: " + nodeKey);
}

return node.taskFuture();
}

/**
* {@inheritDoc }
*/
@Override
public CompletionStage<TaskGraphExecutionResult> execute(CancellationToken cancelToken) {
StaticInput staticInput = staticInputRef.getAndSet(null);
if (staticInput == null) {
throw new IllegalStateException("Already executed.");
}
verifyNotExecuted(staticInput);

GraphExecutor executor = new GraphExecutor(cancelToken, properties.build(), staticInput);
return executor.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.jtrim2.executor.TaskExecutor;
import org.jtrim2.executor.TaskExecutors;
import org.jtrim2.logs.LogCollector;
import org.jtrim2.taskgraph.BuiltGraph;
import org.jtrim2.taskgraph.TaskErrorHandler;
import org.jtrim2.taskgraph.TaskFactory;
import org.jtrim2.taskgraph.TaskFactoryConfig;
Expand Down Expand Up @@ -836,6 +837,20 @@ public TestOutput computeAndVerifyResult(Object factoryKey, Object nodeKey, Obje
return nodesMap;
}

@Override
public BuiltGraph getBuiltGraph() {
return new BuiltGraph(nodesMap.keySet(), graph);
}

@Override
public <R> CompletionStage<R> futureOf(TaskNodeKey<R, ?> nodeKey) {
@SuppressWarnings("unchecked")
TaskNode<R, ?> node = (TaskNode<R, ?>) nodesMap.get(nodeKey);
assertNotNull("node", node);

return node.taskFuture();
}

@Override
public TaskGraphExecutorProperties.Builder properties() {
throw new UnsupportedOperationException("Not supported yet.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.jtrim2.executor.SyncTaskExecutor;
import org.jtrim2.executor.TaskExecutor;
import org.jtrim2.logs.LogCollector;
import org.jtrim2.taskgraph.BuiltGraph;
import org.jtrim2.taskgraph.ExecutionResultType;
import org.jtrim2.taskgraph.TaskErrorHandler;
import org.jtrim2.taskgraph.TaskGraphExecutionException;
import org.jtrim2.taskgraph.TaskGraphExecutionResult;
import org.jtrim2.taskgraph.TaskNodeKey;
import org.jtrim2.taskgraph.TaskNodeProperties;
import org.jtrim2.testutils.TestUtils;
import org.jtrim2.utils.ExceptionHelper;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -683,6 +685,85 @@ public void testFailureResultWithStopOnFailure() {
verifyNotRequestedResult(result, "child2.child2");
}

private static RestrictableTaskGraphExecutor createDummyExecutor() {
DependencyDag<TaskNodeKey<?, ?>> graph = doubleSplitLeafsGraph();

Object[] nodeKeys = {
"root", "child1", "child2", "child1.child1", "child1.child2", "child2.child1", "child2.child2"
};
TaskNodes nodes = new TaskNodes(SyncTaskExecutor.getSimpleExecutor(), nodeKeys);

return new RestrictableTaskGraphExecutor(
graph,
nodes.getAllNodes(),
TaskExecutionRestrictionStrategies.eagerStrategy());
}

@Test
public void testGetBuiltGraph() {
DependencyDag<TaskNodeKey<?, ?>> graph = doubleSplitLeafsGraph();

Object[] nodeKeys = {
"X", "root", "child1", "child2", "child1.child1", "child1.child2", "child2.child1", "child2.child2"
};
TaskNodes nodes = new TaskNodes(SyncTaskExecutor.getSimpleExecutor(), nodeKeys);

RestrictableTaskGraphExecutor executor = new RestrictableTaskGraphExecutor(
graph,
nodes.getAllNodes(),
TaskExecutionRestrictionStrategies.eagerStrategy());

BuiltGraph builtGraph = executor.getBuiltGraph();
assertEquals("nodes",
Arrays.stream(nodeKeys).map(TestNodes::node).collect(Collectors.toSet()),
builtGraph.getNodes());
assertSame("graph", graph, builtGraph.getGraph());
}

@Test
public void testIllegalGetBuiltGraph() {
RestrictableTaskGraphExecutor executor = createDummyExecutor();
executor.execute(Cancellation.UNCANCELABLE_TOKEN);
TestUtils.expectError(IllegalStateException.class, () -> {
executor.getBuiltGraph();
});
}

@Test
public void testFutureOf() {
DependencyDag<TaskNodeKey<?, ?>> graph = doubleSplitLeafsGraph();

Object[] nodeKeys = {
"X", "root", "child1", "child2", "child1.child1", "child1.child2", "child2.child1", "child2.child2"
};
TaskNodes nodes = new TaskNodes(SyncTaskExecutor.getSimpleExecutor(), nodeKeys);

RestrictableTaskGraphExecutor executor = new RestrictableTaskGraphExecutor(
graph,
nodes.getAllNodes(),
TaskExecutionRestrictionStrategies.eagerStrategy());

for (Object key : nodeKeys) {
CompletionStage<Object> executorFuture = executor.futureOf(node(key));
if (executorFuture != nodes.getFutureFor(key)) {
throw new AssertionError("Wrong future for " + key);
}
}

TestUtils.expectError(IllegalArgumentException.class, () -> {
executor.futureOf(node("MissingNode"));
});
}

@Test
public void testIllegalFutureOf() {
RestrictableTaskGraphExecutor executor = createDummyExecutor();
executor.execute(Cancellation.UNCANCELABLE_TOKEN);
TestUtils.expectError(IllegalStateException.class, () -> {
executor.futureOf(node("root"));
});
}

private static final class CollectorErrorHandler implements TaskErrorHandler {
private final Map<TaskNodeKey<?, ?>, Throwable> errors;
private AssertionError overwrittenErrors;
Expand Down Expand Up @@ -810,6 +891,14 @@ public TaskNodes(TaskExecutor executor, Object... keys) {
}
}

public CompletionStage<?> getFutureFor(Object key) {
TestTaskNode taskNode = tasks.get(key);
if (taskNode == null) {
throw new AssertionError("No TaskNode: " + key);
}
return taskNode.node.taskFuture();
}

public List<TaskNode<?, ?>> getAllNodes() {
return tasks.values().stream()
.map(TestTaskNode::getNode)
Expand Down

0 comments on commit d7e91e4

Please sign in to comment.