-
Notifications
You must be signed in to change notification settings - Fork 0
Speed up streamed-proto query output by distributing work to multiple threads #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Sorry, one more concern with H/T from @michajlo. Can you please make sure that the non-determinism from the parallel iterations doesn't break builds with There are relevant tests in https://cs.opensource.google/bazel/bazel/+/master:src/test/java/com/google/devtools/build/lib/buildtool/QueryIntegrationTest.java to validate this, or add more if the tests aren't sufficient to cover the new cc @zhengwei143 too, who worked on query output ordering before. |
@michaeledgar too. |
Good point. I updated the code to use |
Do you think this is something we could add behind a flag to start off? I agree it's useful to have, but it also comes with misc subtleties that are hard to reason about all at once, so having an escape hatch in case we get it wrong the first time would be nice. |
@michajlo adding an "escape hatch" flag is a reasonable thing to do, but only if it defaults to true and has an appropriately scary name so that it doesn't grow uses that we don't want and so that it can be removed at will. |
I had actually been thinking of more of an opt-in that possibly makes it more clear it's experimental, depending on how solid we can get it on the first try, then eventually flip the default. The whole parallel-streaming-query thing gets tricky when we need to get exception handling involved, an opt-in will let us get code in and iterate with less risk of breakages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @arvi18, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
This pull request addresses issue bazelbuild#24304 by speeding up the bazel query ...
command when used with the --output=streamed_proto
option. The changes introduce multi-threading to distribute the work of writing the proto output, resulting in a reported 54% reduction in wall time for a fully warm query. The original implementation was single-threaded, and this change leverages a ForkJoinPool
to parallelize the writing of target proto buffers to the output stream.
Highlights
- Performance Improvement: The primary goal of this PR is to improve the performance of
bazel query
with--output=streamed_proto
by distributing the workload across multiple threads. - Multi-threading Implementation: A
ForkJoinPool
is used to parallelize the process of convertingTarget
objects to proto buffers and writing them to the output stream. The number of threads defaults to the number of available processors. - Chunking and Queuing: The input
Iterable<Target>
is partitioned into chunks of 500 targets each. These chunks are then submitted to theForkJoinPool
via aLinkedBlockingQueue
to be processed in parallel. - Error Handling: WrappedIOException and WrappedInterruptedException are introduced to handle exceptions thrown during the parallel processing and re-thrown as RuntimeExceptions.
Changelog
- src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
- Introduces multi-threading using
ForkJoinPool
to parallelize proto output writing. - Partitions the input targets into chunks for parallel processing.
- Uses a
LinkedBlockingQueue
to manage the chunks to be processed by the thread pool. - Adds
WrappedIOException
andWrappedInterruptedException
to handle exceptions during parallel processing. - Implements
writeTargetsDelimitedToByteArrays
to convert a list of targets to byte arrays in parallel. - Implements
toProto
to convert a target to a proto object. - Implements
writeDelimited
to write a proto object to a byte array in a delimited format. - Adds constants for
MAX_CHUNKS_IN_QUEUE
andTARGETS_PER_CHUNK
to control the chunking and queuing behavior.
- Introduces multi-threading using
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
A query's slow pace,
Threads now race, time and space,
Swift proto takes flight.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
WalkthroughThe Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant OutputFormatterCallback
participant ForkJoinPool
participant BlockingQueue
participant OutputStream
Client->>OutputFormatterCallback: processOutput(targets)
OutputFormatterCallback->>ForkJoinPool: Submit chunked serialization tasks
ForkJoinPool-->>BlockingQueue: Place futures for each chunk
loop For each chunk future in order
BlockingQueue->>OutputFormatterCallback: Take next completed future
OutputFormatterCallback->>OutputStream: Write serialized bytes
end
OutputFormatterCallback-->>Client: Completion
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces multi-threading to the streamed proto output formatter, significantly improving query performance. The approach of using a ForkJoinPool
and a blocking queue seems reasonable for distributing the work. However, there are a few areas that could be improved, particularly around error handling and resource management.
Summary of Findings
- Error Propagation from Executor: The code uses
RuntimeException
to wrap exceptions from the executor, losing valuable information about the original exception. It's crucial to propagate the original exception to provide better debugging insights. - Resource Management of ForkJoinPool: While the
ForkJoinPool
is created within a try-with-resources block, ensuring proper shutdown, the tasks submitted to the pool might still be running after the block completes if they are long-running or blocked. Consider explicitly shutting down the executor and awaiting termination to ensure all tasks are completed before proceeding. - Wrapped Exception Classes: The
WrappedIOException
andWrappedInterruptedException
classes are simple wrappers around checked exceptions. Consider whether these wrappers are truly necessary, or if the exceptions can be handled directly.
Merge Readiness
The performance improvements introduced by this pull request are significant, making it a valuable contribution. However, the error handling and resource management aspects need to be addressed before merging. I recommend resolving the issues raised in the review comments, particularly regarding exception propagation and ForkJoinPool
shutdown. I am unable to approve this pull request, and other reviewers should review and approve this code before merging.
// TODO: figure out what might be in here and propagate | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO
indicates that the exception handling here is incomplete. It's important to propagate the original ExecutionException
's cause to provide more context about the failure. Consider using e.getCause()
to get the underlying exception and wrap it in a more specific exception type, or rethrow it if appropriate.
Also, consider logging the exception with sufficient context to aid debugging.
// TODO: figure out what might be in here and propagate
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error during target processing", cause);
}
// TODO: propagate | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the previous TODO
, this one also requires proper exception propagation. The ExecutionException
should be unwrapped to get the root cause and either rethrown as a more specific exception or wrapped in a RuntimeException
with the cause attached. Logging the exception is also recommended.
} catch (ExecutionException e) {
// TODO: propagate
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error during target processing", cause);
}
stillAddingTargetsToQueue.set(false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After setting stillAddingTargetsToQueue
to false
, it's good practice to explicitly shutdown the executor
and wait for its termination. This ensures that all submitted tasks are completed before proceeding. Otherwise, there is a risk that the executor
will not shutdown cleanly, and resources may be leaked. Consider adding executor.shutdown()
and executor.awaitTermination()
calls within the finally
block.
} finally {
stillAddingTargetsToQueue.set(false);
executor.shutdown(); // Initiate shutdown
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Wait for tasks to complete
System.err.println("ForkJoinPool did not terminate in the specified timeout.");
// Optionally, log the state of the tasks that are still running.
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
@Override | ||
public IOException getCause() { | ||
return (IOException) super.getCause(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider whether wrapping the IOException
in a RuntimeException
is truly necessary. If the calling code can handle IOException
directly, it might be better to avoid the wrapping. If wrapping is necessary, ensure that the original exception is accessible via getCause()
.
@Override
public IOException getCause() {
return cause;
}
public InterruptedException getCause() { | ||
return (InterruptedException) super.getCause(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to WrappedIOException
, evaluate the necessity of wrapping InterruptedException
. If the calling code can handle it directly, avoid wrapping. If wrapping is necessary, ensure the original exception is accessible via getCause()
.
@Override
public InterruptedException getCause() {
return cause;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (1)
92-96
: Propagate checked exceptions instead of wrapping them in genericRuntimeException
writeAllTargetsFuture.get()
can surfaceWrappedIOException
/WrappedInterruptedException
. By catchingExecutionException
and blindly re-wrapping, callers lose the ability to handleIOException
or honour the method’s declaredInterruptedException
.Suggestion:
- } catch (ExecutionException e) { - // TODO: propagate - throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof WrappedIOException ioe) { + throw ioe.getCause(); // preserves declared throws IOException + } + if (cause instanceof WrappedInterruptedException ie) { + Thread.currentThread().interrupt(); + throw ie.getCause(); + } + throw new RuntimeException(cause); }This keeps the public contract intact and avoids masking interruption.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
(2 hunks)
🔇 Additional comments (1)
src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java (1)
63-90
: Potential producer/consumer termination race
stillAddingTargetsToQueue
is set tofalse
only in thefinally
block after the producer loop. IfpartialResult
is empty, the queue remains empty and the consumer immediately blocks ontake()
forever, preventingwriteAllTargetsFuture
from completing.Consider signalling completion explicitly:
// after producing chunks targetQueue.put(POISON_PILL_FUTURE); // Future that returns an empty list ... while (true) { Future<List<byte[]>> f = targetQueue.take(); if (f == POISON_PILL_FUTURE) break; ... }or a
BlockingQueue
withnull
sentinel, ensuring graceful shutdown even for empty inputs.
new ForkJoinPool( | ||
Runtime.getRuntime().availableProcessors(), | ||
ForkJoinPool.defaultForkJoinWorkerThreadFactory, | ||
null, | ||
// we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||
// throughput | ||
true)) { | ||
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE); | ||
var stillAddingTargetsToQueue = new AtomicBoolean(true); | ||
writeAllTargetsFuture = | ||
executor.submit( | ||
() -> { | ||
try { | ||
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||
Future<List<byte[]>> targets = targetQueue.take(); | ||
for (byte[] target : targets.get()) { | ||
out.write(target); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
ForkJoinPool worker is blocking on take()
/get()
– high risk of thread starvation
The consumer task runs inside the same ForkJoinPool
that processes the producer tasks yet:
- performs unmanaged blocking (
targetQueue.take()
andFuture.get()
), - uses a bounded queue that may fill up, causing the producer thread (caller of
processOutput
) to block onput()
.
Because ForkJoinPool
counts blocked workers toward parallelism
, the pool can dead-lock or under-utilise CPUs when many long-running producer tasks occupy the limited workers while the single consumer waits, or vice-versa.
Recommended approaches:
- Start the consumer on a dedicated thread (e.g.
Executors.newSingleThreadExecutor
) outside the FJP, or - Wrap blocking calls with
ForkJoinPool.managedBlock
, or - Replace the FJP entirely with a plain
ExecutorService
that tolerates blocking.
This will eliminate the starvation risk and make behaviour more predictable.
try { | ||
var serializedSize = targetProtoBuffer.getSerializedSize(); | ||
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||
var output = new byte[headerSize + serializedSize]; | ||
var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | ||
targetProtoBuffer.writeTo(codedOut); | ||
codedOut.flush(); | ||
return output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Length-delimited framing is incorrect – header bytes are never written
writeDelimited
allocates space for the var-int size prefix (headerSize
) but never actually serialises the length into that region; the message bytes start at the offset headerSize
, leaving the prefix zero-filled. Any consumer calling mergeDelimitedFrom
will treat the first byte(s) as the size field, interpret 0
, and immediately stop parsing.
- var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
- var output = new byte[headerSize + serializedSize];
- var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
- targetProtoBuffer.writeTo(codedOut);
+ int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
+ byte[] output = new byte[headerSize + serializedSize];
+
+ // 1. write the var-int length prefix
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();
Without this fix every emitted target is malformed, so downstream tools will silently break.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
try { | |
var serializedSize = targetProtoBuffer.getSerializedSize(); | |
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | |
var output = new byte[headerSize + serializedSize]; | |
var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | |
targetProtoBuffer.writeTo(codedOut); | |
codedOut.flush(); | |
return output; | |
try { | |
var serializedSize = targetProtoBuffer.getSerializedSize(); | |
- var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | |
- var output = new byte[headerSize + serializedSize]; | |
- var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | |
- targetProtoBuffer.writeTo(codedOut); | |
+ int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | |
+ byte[] output = new byte[headerSize + serializedSize]; | |
+ | |
+ // 1. write the var-int length prefix | |
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize); | |
+ headerOut.writeUInt32NoTag(serializedSize); | |
+ headerOut.flush(); | |
+ | |
+ // 2. write the message bytes immediately after the prefix | |
+ CodedOutputStream bodyOut = | |
+ CodedOutputStream.newInstance(output, headerSize, serializedSize); | |
+ targetProtoBuffer.writeTo(bodyOut); | |
+ bodyOut.flush(); | |
return output; | |
} |
This is a proposed fix for bazelbuild#24304
This speeds up a fully warm
bazel query ...
by 54%, reducing wall time from 1m49s to 50sCurrent state:
This PR:
💁♂️ Note: when combined with bazelbuild#24298, total wall time is 37s, an overall reduction of 66%.
Summary by CodeRabbit