Skip to content

Commit

Permalink
GH-4711 Improve chances that iterations are closed on time in FedX (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Aug 5, 2023
2 parents 48881cb + ab04ad7 commit 5abd412
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ public static <E, X extends Exception> List<E> asList(Iteration<? extends E, X>
* @return a List containing all elements obtained from the specified iteration.
*/
public static <E, X extends Exception> List<E> asList(CloseableIteration<? extends E, X> iter) throws X {
// stream.collect is slightly slower than addAll for lists
List<E> list = new ArrayList<>();
try (iter) {
// stream.collect is slightly slower than addAll for lists
List<E> list = new ArrayList<>();

// addAll closes the iteration
return addAll(iter, list);
// addAll closes the iteration
return addAll(iter, list);
}
}

/**
Expand Down Expand Up @@ -115,15 +117,12 @@ public static <E, X extends Exception, C extends Collection<E>> C addAll(Iterati
*/
public static <E, X extends Exception, C extends Collection<E>> C addAll(CloseableIteration<? extends E, X> iter,
C collection) throws X {
try {
try (iter) {
while (iter.hasNext()) {
collection.add(iter.next());
}
} finally {
closeCloseable(iter);
return collection;
}

return collection;
}

/**
Expand Down Expand Up @@ -215,9 +214,11 @@ public static <X extends Exception> String toString(Iteration<?, X> iteration, S
* @return A String representation of the objects provided by the supplied iteration.
*/
public static <X extends Exception> String toString(CloseableIteration<?, X> iteration, String separator) throws X {
StringBuilder sb = new StringBuilder();
toString(iteration, separator, sb);
return sb.toString();
try (iteration) {
StringBuilder sb = new StringBuilder();
toString(iteration, separator, sb);
return sb.toString();
}
}

/**
Expand Down Expand Up @@ -253,11 +254,13 @@ public static <X extends Exception> void toString(Iteration<?, X> iteration, Str
public static <X extends Exception> void toString(CloseableIteration<?, X> iteration, String separator,
StringBuilder sb)
throws X {
while (iteration.hasNext()) {
sb.append(iteration.next());
try (iteration) {
while (iteration.hasNext()) {
sb.append(iteration.next());

if (iteration.hasNext()) {
sb.append(separator);
if (iteration.hasNext()) {
sb.append(separator);
}
}
}

Expand Down Expand Up @@ -295,4 +298,5 @@ public static <E, X extends Exception> Set<E> asSet(CloseableIteration<? extends
}
return set;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ protected CloseableIteration<BindingSet, QueryEvaluationException> performTaskIn
// Note: in order two avoid deadlocks we consume the SERVICE result.
// This is basically required to avoid processing background tuple
// request (i.e. HTTP slots) in the correct order.
return new CollectionIteration<>(Iterations.asList(strategy.evaluate(service.getService(), bindings)));
try (var evaluate = strategy.evaluate(service.getService(), bindings)) {
return new CollectionIteration<>(Iterations.asList(evaluate));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,23 @@ public void close() {
logger.debug("Attempting to cancel task {}", this);
boolean successfullyCanceled = scheduledFuture.cancel(true);
if (!successfullyCanceled) {
logger.debug("Task {} could not be cancelled properly.", this);
logger.debug("Task {} could not be cancelled properly. Maybe it has already completed.",
this);
}

int timeout = 100;
for (int i = 0; i < timeout && !scheduledFuture.isDone(); i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

if (!scheduledFuture.isDone()) {
logger.error("Timeout while waiting for task {} to terminate after it was cancelled.",
this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public CloseDependentConnectionIteration(CloseableIteration<T, QueryEvaluationEx
@Override
public boolean hasNext() throws QueryEvaluationException {
try {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
close();
return false;
}

boolean res = inner.hasNext();
if (!res) {
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,32 @@ public ConsumingIteration(CloseableIteration<BindingSet, QueryEvaluationExceptio
try {
while (consumed.size() < max && iter.hasNext()) {
consumed.add(iter.next());
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
close();
return;
}
}
if (!iter.hasNext()) {
iter.close();
}
completed = true;
} finally {
if (!completed) {
iter.close();
close();
}
}

}

@Override
public boolean hasNext() throws QueryEvaluationException {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
close();
return false;
}

return currentIndex < consumed.size() || innerIter.hasNext();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ protected CloseableIteration<BindingSet, QueryEvaluationException> performTaskIn
// Note: in order two avoid deadlocks we consume the SERVICE result.
// This is basically required to avoid processing background tuple
// request (i.e. HTTP slots) in the correct order.
return new CollectionIteration<>(Iterations.asList(strategy.evaluateService(expr, bindings)));
try (var iter = strategy.evaluateService(expr, bindings)) {
return new CollectionIteration<>(Iterations.asList(iter));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ public ParallelCheckTask(Endpoint endpoint, StatementPattern stmt, QueryInfo que
protected CloseableIteration<BindingSet, QueryEvaluationException> performTaskInternal() throws Exception {
try {
TripleSource t = endpoint.getTripleSource();
boolean hasResults;
hasResults = t.hasStatements(stmt, EmptyBindingSet.getInstance(), queryInfo, queryInfo.getDataset());
boolean hasResults = t.hasStatements(stmt, EmptyBindingSet.getInstance(), queryInfo,
queryInfo.getDataset());

SourceSelection sourceSelection = control.sourceSelection;
sourceSelection.cache.updateInformation(new SubQuery(stmt, queryInfo.getDataset()), endpoint,
Expand Down

0 comments on commit 5abd412

Please sign in to comment.