Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MoreStream utility methods that accept future suppliers #329

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions streams/src/main/java/com/palantir/common/streams/MoreStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ public static <U, V> Stream<V> inCompletionOrder(
.map(Futures::getUnchecked);
}

/**
* Given a stream of arguments and a Function mapper to a listenable future, this function will return a blocking
* stream of the completed futures in completion order, looking at most {@code maxParallelism} arguments ahead in
* the stream.
*
* The function mapper must return a listenable future of the return type. Note that calls to the function mapper
* will be made serially.
*
* Note: the resulting stream may contain results in a different order than the input arguments. To receive results
* in the same order as input arguments, use {@link #blockingStreamWithParallelism(Stream, Function, Executor, int)}.
*/
public static <U, V, F extends ListenableFuture<V>> Stream<V> inCompletionOrder(
Stream<U> arguments, Function<U, F> mapper, int maxParallelism) {
return StreamSupport.stream(
new BufferingSpliterator<>(
InCompletionOrder.INSTANCE, arguments.spliterator(), mapper, maxParallelism),
NOT_PARALLEL)
.onClose(arguments::close)
.map(Futures::getUnchecked);
}

/**
* This function will return a blocking stream that waits for each future to complete before returning it,
* but which looks ahead {@code maxParallelism} futures to ensure a fixed parallelism rate.
Expand Down Expand Up @@ -115,6 +136,25 @@ public static <U, V> Stream<V> blockingStreamWithParallelism(
.map(Futures::getUnchecked);
}

/**
* Given a stream of arguments and a Function mapper to a listenable future, this function will return a blocking
* stream that waits for each future to complete before returning it, but which looks ahead {@code maxParallelism}
* arguments to ensure a fixed parallelism rate.
*
* The function mapper must return a listenable future of the return type. Note that calls to the function mapper
* will be made serially.
*/
public static <U, V, F extends ListenableFuture<V>> Stream<V> blockingStreamWithParallelism(
Stream<U> arguments, Function<U, F> mapper, int maxParallelism) {
return StreamSupport.stream(
new BufferingSpliterator<>(
InSourceOrder.INSTANCE, arguments.spliterator(), mapper, maxParallelism),
NOT_PARALLEL)
.onClose(arguments::close)
.map(MoreFutures::blockUntilCompletion)
.map(Futures::getUnchecked);
}

/**
* Returns a stream of the values returned by {@code iterable}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.time.Duration;
Expand All @@ -33,6 +35,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -111,6 +114,24 @@ public void testInCompletionOrder_transformWithExecutor() throws InterruptedExce
assertThat(streamClosed).isTrue();
}

@Test
public void testInCompletionOrder_transformWithFutureSupplier() throws InterruptedException {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
UnaryOperator<Integer> reorder = reorder();
Function<Integer, ListenableFuture<Integer>> futureSupplier =
i -> executorService.submit(() -> reorder.apply(i));

// 2 cannot start until a task has finished, 0 cannot start until 2 is running, so 1 must come first.
try (Stream<Integer> integerStream = MoreStreams.inCompletionOrder(
IntStream.range(0, 3).boxed().onClose(() -> streamClosed.set(true)), futureSupplier, 2)) {
assertThat(integerStream.collect(toList())).startsWith(1).containsExactlyInAnyOrder(0, 1, 2);
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
assertThat(streamClosed).isTrue();
}

@Test
public void testBlockingStreamWithParallelism_transformWithExecutor() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Expand All @@ -125,6 +146,23 @@ public void testBlockingStreamWithParallelism_transformWithExecutor() throws Int
assertThat(streamClosed).isTrue();
}

@Test
public void testBlockingStreamWithParallelism_transformWithFutureSupplier() throws InterruptedException {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
UnaryOperator<Integer> reorder = reorder();
Function<Integer, ListenableFuture<Integer>> futureSupplier =
i -> executorService.submit(() -> reorder.apply(i));
// due to size of thread pool, 1 must finish before 0, but 0 will return first.
try (Stream<Integer> integerStream = MoreStreams.blockingStreamWithParallelism(
IntStream.range(0, 3).boxed().onClose(() -> streamClosed.set(true)), futureSupplier, 3)) {
assertThat(integerStream.collect(toList())).containsExactly(0, 1, 2);
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
assertThat(streamClosed).isTrue();
}

@Test
public void testConcurrencySimpleStream() throws InterruptedException {
testConcurrency(IntStream.range(0, 3).boxed());
Expand Down Expand Up @@ -163,6 +201,43 @@ private void testConcurrency(Stream<Integer> value) throws InterruptedException
assertThat(streamClosed).isTrue();
}

@Test
public void testConcurrencySimpleStreamAndFutureSupplier() throws InterruptedException {
testConcurrencyWithFutureSupplier(IntStream.range(0, 3).boxed());
}

@Test
public void testConcurrencyWithFlatmapAndFutureSupplie() throws InterruptedException {
testConcurrencyWithFutureSupplier(
Stream.of(1).flatMap(_ignored -> IntStream.range(0, 3).boxed()));
}

private void testConcurrencyWithFutureSupplier(Stream<Integer> value) throws InterruptedException {
AtomicInteger maximum = new AtomicInteger(-1);
AtomicInteger current = new AtomicInteger();
int maxParallelism = 1;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
Function<Integer, ListenableFuture<Integer>> toFuture = input -> executorService.submit(() -> {
int running = current.incrementAndGet();
maximum.accumulateAndGet(running, Math::max);
try {
Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(200));
} finally {
current.decrementAndGet();
}
return input;
});
try (Stream<Integer> inCompletionOrder =
MoreStreams.inCompletionOrder(value.onClose(() -> streamClosed.set(true)), toFuture, maxParallelism)) {
assertThat(inCompletionOrder.collect(toList())).containsExactlyInAnyOrder(0, 1, 2);
assertThat(maximum).hasValue(maxParallelism);
} finally {
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
assertThat(streamClosed).isTrue();
}

private UnaryOperator<Integer> reorder() {
CyclicBarrier barrier = new CyclicBarrier(2);
return input -> {
Expand Down