diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java index d62a4b0f69d..28562e8246e 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java @@ -34,11 +34,13 @@ public class Iterations { * @return a List containing all elements obtained from the specified iteration. */ public static List asList(CloseableIteration iter) { - // stream.collect is slightly slower than addAll for lists - List list = new ArrayList<>(); + try (iter) { + // stream.collect is slightly slower than addAll for lists + List list = new ArrayList<>(); - // addAll closes the iteration - return addAll(iter, list); + // addAll closes the iteration + return addAll(iter, list); + } } /** @@ -107,9 +109,11 @@ public static Stream stream(CloseableIteration iteration) { * @return A String representation of the objects provided by the supplied iteration. */ public static String toString(CloseableIteration iteration, String separator) { - StringBuilder sb = new StringBuilder(); - toString(iteration, separator, sb); - return sb.toString(); + try (iteration) { + StringBuilder sb = new StringBuilder(); + toString(iteration, separator, sb); + return sb.toString(); + } } /** @@ -124,11 +128,13 @@ public static String toString(CloseableIteration iterat public static void toString(CloseableIteration iteration, String separator, StringBuilder sb) throws X { - while (iteration.hasNext()) { - sb.append(iteration.next()); + try (iteration) { + while (iteration.hasNext()) { + sb.append(iteration.next()); - if (iteration.hasNext()) { - sb.append(separator); + if (iteration.hasNext()) { + sb.append(separator); + } } } @@ -149,4 +155,5 @@ public static Set asSet(CloseableIteration value; private final Shape shape; @@ -73,7 +77,18 @@ public ValidationResult(Value focusNode, Value value, Shape shape, if (sourceConstraintComponent.producesValidationResultValue()) { assert value != null; - this.value = Optional.of(value); + + // value could be null if assertions are disabled + // noinspection ConstantValue + if (value == null) { + logger.error( + "Source constraint component {} was expected to produce a value, but value is null! Shape: {}", + sourceConstraintComponent, shape); + } + + // value could be null if assertions are disabled + // noinspection OptionalOfNullableMisuse + this.value = Optional.ofNullable(value); } else { assert scope != ConstraintComponent.Scope.propertyShape || value == null; this.value = Optional.empty(); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java index 33ad2c0f6b2..50c868a0398 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java @@ -166,7 +166,9 @@ protected CloseableIteration performTaskInternal() throws Exception // This is basically required to avoid processing background tuple // request (i.e. HTTP slots) in the correct order. Service service1 = service.getService(); - return new CollectionIteration<>(Iterations.asList(strategy.precompile(service1).evaluate(bindings))); + try (CloseableIteration evaluate = strategy.precompile(service1).evaluate(bindings)) { + return new CollectionIteration<>(Iterations.asList(evaluate)); + } } @Override diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java index 251383fbb2b..12208e72979 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java @@ -107,7 +107,23 @@ public void close() { logger.debug("Attempting to cancel task {}", this); boolean successfullyCanceled = scheduledFuture.cancel(true); if (!successfullyCanceled) { - logger.debug("Task {} could not be cancelled properly.", this); + logger.debug("Task {} could not be cancelled properly. Maybe it has already completed.", + this); + } + + int timeout = 100; + for (int i = 0; i < timeout && !scheduledFuture.isDone(); i++) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + + if (!scheduledFuture.isDone()) { + logger.error("Timeout while waiting for task {} to terminate after it was cancelled.", + this); } } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/CloseDependentConnectionIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/CloseDependentConnectionIteration.java index 3ca07c63637..c0bbff2f4c1 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/CloseDependentConnectionIteration.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/CloseDependentConnectionIteration.java @@ -38,6 +38,12 @@ public CloseDependentConnectionIteration(CloseableIteration inner, @Override public boolean hasNext() throws QueryEvaluationException { try { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + close(); + return false; + } + boolean res = inner.hasNext(); if (!res) { close(); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/ConsumingIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/ConsumingIteration.java index df48cc827bd..40e857f7af5 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/ConsumingIteration.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/ConsumingIteration.java @@ -55,6 +55,11 @@ public ConsumingIteration(CloseableIteration iter, int max) try { while (consumed.size() < max && iter.hasNext()) { consumed.add(iter.next()); + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + close(); + return; + } } if (!iter.hasNext()) { iter.close(); @@ -62,7 +67,7 @@ public ConsumingIteration(CloseableIteration iter, int max) completed = true; } finally { if (!completed) { - iter.close(); + close(); } } @@ -70,6 +75,12 @@ public ConsumingIteration(CloseableIteration iter, int max) @Override public boolean hasNext() throws QueryEvaluationException { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + close(); + return false; + } + return currentIndex < consumed.size() || innerIter.hasNext(); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelServiceJoinTask.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelServiceJoinTask.java index 5ab154e80bf..dea485aa843 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelServiceJoinTask.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelServiceJoinTask.java @@ -48,7 +48,9 @@ protected CloseableIteration performTaskInternal() throws Exception // Note: in order two avoid deadlocks we consume the SERVICE result. // This is basically required to avoid processing background tuple // request (i.e. HTTP slots) in the correct order. - return new CollectionIteration<>(Iterations.asList(strategy.evaluateService(expr, bindings))); + try (var iter = strategy.evaluateService(expr, bindings)) { + return new CollectionIteration<>(Iterations.asList(iter)); + } } @Override diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java index 599f15304b2..5d07a6c5433 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java @@ -320,8 +320,8 @@ public ParallelCheckTask(Endpoint endpoint, StatementPattern stmt, QueryInfo que protected CloseableIteration performTaskInternal() throws Exception { try { TripleSource t = endpoint.getTripleSource(); - boolean hasResults; - hasResults = t.hasStatements(stmt, EmptyBindingSet.getInstance(), queryInfo, queryInfo.getDataset()); + boolean hasResults = t.hasStatements(stmt, EmptyBindingSet.getInstance(), queryInfo, + queryInfo.getDataset()); SourceSelection sourceSelection = control.sourceSelection; sourceSelection.cache.updateInformation(new SubQuery(stmt, queryInfo.getDataset()), endpoint,