Skip to content

Commit

Permalink
add Stream operators prepend iterable, concatWith iterable, doOnCompl…
Browse files Browse the repository at this point in the history
…ete with count (#97)
  • Loading branch information
davidmoten authored Mar 4, 2024
1 parent 20c2fbe commit a1eca8a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
21 changes: 19 additions & 2 deletions kool/src/main/java/org/davidmoten/kool/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ static <T> Stream<T> merge(Stream<? extends T>... streams) {
static Stream<Integer> interval(long duration, TimeUnit unit) {
return range(1, Integer.MAX_VALUE).doOnNext(x -> unit.sleep(duration)).prepend(0);
}

static InputStream inputStream(Stream<? extends byte[]> stream) {
return StreamUtils.toInputStream(stream);
}
Expand Down Expand Up @@ -671,15 +671,23 @@ default Stream<T> prepend(T[] values) {
default Stream<T> prepend(StreamIterable<? extends T> values) {
return new Concat<T>(values, this);
}

default Stream<T> prepend(Iterable<? extends T> values) {
return prepend(Stream.from(values));
}

default Stream<T> concatWith(StreamIterable<? extends T> values) {
return new Concat<T>(this, values);
}

default Stream<T> concatWith(Iterable<? extends T> values) {
return concatWith(Stream.from(values));
}

default <R> Stream<R> flatMap(Function<? super T, ? extends StreamIterable<? extends R>> function) {
return new FlatMap<T, R>(function, this);
}

default <R> Stream<R> flatMap(BiConsumer<? super T, ? super Consumer<R>> generator,
Consumer<? super Consumer<R>> onFinish) {
return new FlatMapGenerator<T, R>(generator, onFinish, this);
Expand Down Expand Up @@ -752,6 +760,15 @@ default Stream<T> doAfterDispose(Action action) {
default Stream<T> doOnEmpty(Action action) {
return new DoOnEmpty<T>(this, action);
}

default Stream<T> doOnComplete(Consumer<? super Long> countAction) {
return defer(() -> {
long[] count = new long[1];
return this //
.doOnNext(t -> count[0]++) //
.doOnComplete(() -> countAction.accept(count[0]));
});
}

default Maybe<T> last() {
return new Last<T>(this);
Expand Down
21 changes: 20 additions & 1 deletion kool/src/test/java/org/davidmoten/kool/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ private static void checkTrue(Consumer<AtomicBoolean> consumer) {
public void testPrepend() {
Stream.of(1, 2, 3).prepend(0).test().assertValuesOnly(0, 1, 2, 3);
}

@Test
public void testPrependIterable() {
Stream.of(1, 2, 3).prepend(Arrays.asList(5, 6, 7)).test().assertValuesOnly(5, 6, 7, 1, 2, 3);
}

@Test
public void testPrependMany() {
Expand Down Expand Up @@ -425,9 +430,14 @@ public Integer next() {
}

@Test
public void testConcat() {
public void testConcatWith() {
Stream.of(1, 2).concatWith(Stream.of(3, 4)).test().assertValuesOnly(1, 2, 3, 4);
}

@Test
public void testConcatWithIterable() {
Stream.of(1, 2).concatWith(Arrays.asList(3, 4)).test().assertValuesOnly(1, 2, 3, 4);
}

@Test
public void testConcatEmpties() {
Expand All @@ -448,6 +458,15 @@ public void testDoOnComplete() {
.forEach();
assertEquals(Lists.newArrayList(1, 2, 3), list);
}

@Test
public void testDoOnCompleteCount() {
List<Integer> list = new ArrayList<>();
Stream.of(1, 1, 1) //
.doOnComplete(count -> list.add(count.intValue())) //
.forEach();
assertEquals(Lists.newArrayList(3), list);
}

@Test
public void testZip() {
Expand Down

0 comments on commit a1eca8a

Please sign in to comment.