-
Notifications
You must be signed in to change notification settings - Fork 60
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Streaming collectors for stable performance (#536)
* Streaming collectors for stable performance * Fix checkstyle --------- Co-authored-by: Karthik Ramgopal <[email protected]>
- Loading branch information
1 parent
88a00c1
commit 62510e8
Showing
4 changed files
with
70 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
53 changes: 53 additions & 0 deletions
53
avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/util/StreamUtil.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
* Copyright 2024 LinkedIn Corp. | ||
* Licensed under the BSD 2-Clause License (the "License"). | ||
* See License in the project root for license information. | ||
*/ | ||
|
||
package com.linkedin.avroutil1.builder.util; | ||
|
||
import com.pivovarit.collectors.ParallelCollectors; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.SynchronousQueue; | ||
import java.util.concurrent.ThreadPoolExecutor; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import java.util.stream.Collector; | ||
import java.util.stream.Stream; | ||
|
||
|
||
/** | ||
* Utilities for dealing with java streams. | ||
*/ | ||
public final class StreamUtil { | ||
|
||
/** | ||
* An (effectively) unbounded {@link ExecutorService} used for parallel processing. This is kept unbounded to avoid | ||
* deadlocks caused when using {@link #toParallelStream(Function, int)} recursively. Callers are supposed to set | ||
* sane values for parallelism to avoid spawning a crazy number of concurrent threads. | ||
*/ | ||
private static final ExecutorService WORK_EXECUTOR = | ||
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<>()); | ||
|
||
private StreamUtil() { | ||
// Disallow external instantiation. | ||
} | ||
|
||
/** | ||
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} | ||
* and returning a {@link Stream} instance returning results as they arrive. | ||
* <p> | ||
* For the parallelism of 1, the stream is executed by the calling thread. | ||
* | ||
* @param mapper a transformation to be performed in parallel | ||
* @param parallelism the max parallelism level | ||
* @param <T> the type of the collected elements | ||
* @param <R> the result returned by {@code mapper} | ||
* | ||
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel. | ||
*/ | ||
public static <T, R> Collector<T, ?, Stream<R>> toParallelStream(Function<T, R> mapper, int parallelism) { | ||
return ParallelCollectors.parallelToStream(mapper, WORK_EXECUTOR, parallelism); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters