Skip to content

Speed up streamed-proto query output by distributing work to multiple threads #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,26 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.query.output;

import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.packages.LabelPrinter;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.proto.proto2api.Build;
import com.google.protobuf.CodedOutputStream;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An output formatter that outputs a protocol buffer representation of a query result and outputs
* the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()}
* on a {@code Build.QueryResult} object the full result can be reconstructed.
*/
public class StreamedProtoOutputFormatter extends ProtoOutputFormatter {

@Override
public String getName() {
return "streamed_proto";
Expand All @@ -34,13 +42,107 @@ public String getName() {
public OutputFormatterCallback<Target> createPostFactoStreamCallback(
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
return new OutputFormatterCallback<Target>() {
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
private static final int TARGETS_PER_CHUNK = 500;

private final LabelPrinter ourLabelPrinter = labelPrinter;

@Override
public void processOutput(Iterable<Target> partialResult)
throws IOException, InterruptedException {
for (Target target : partialResult) {
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out);
ForkJoinTask<?> writeAllTargetsFuture;
try (ForkJoinPool executor =
new ForkJoinPool(
Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
// we use asyncMode to ensure the queue is processed FIFO, which maximizes
// throughput
true)) {
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
var stillAddingTargetsToQueue = new AtomicBoolean(true);
writeAllTargetsFuture =
executor.submit(
() -> {
try {
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
Future<List<byte[]>> targets = targetQueue.take();
for (byte[] target : targets.get()) {
out.write(target);
}
Comment on lines +55 to +72
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

ForkJoinPool worker is blocking on take()/get() – high risk of thread starvation

The consumer task runs inside the same ForkJoinPool that processes the producer tasks yet:

  • performs unmanaged blocking (targetQueue.take() and Future.get()),
  • uses a bounded queue that may fill up, causing the producer thread (caller of processOutput) to block on put().

Because ForkJoinPool counts blocked workers toward parallelism, the pool can dead-lock or under-utilise CPUs when many long-running producer tasks occupy the limited workers while the single consumer waits, or vice-versa.

Recommended approaches:

  1. Start the consumer on a dedicated thread (e.g. Executors.newSingleThreadExecutor) outside the FJP, or
  2. Wrap blocking calls with ForkJoinPool.managedBlock, or
  3. Replace the FJP entirely with a plain ExecutorService that tolerates blocking.

This will eliminate the starvation risk and make behaviour more predictable.

}
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
} catch (IOException e) {
throw new WrappedIOException(e);
} catch (ExecutionException e) {
// TODO: figure out what might be in here and propagate
throw new RuntimeException(e);
}
Comment on lines +79 to +81

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This TODO indicates that the exception handling here is incomplete. It's important to propagate the original ExecutionException's cause to provide more context about the failure. Consider using e.getCause() to get the underlying exception and wrap it in a more specific exception type, or rethrow it if appropriate.

Also, consider logging the exception with sufficient context to aid debugging.

                      // TODO: figure out what might be in here and propagate
                      Throwable cause = e.getCause();
                      if (cause instanceof IOException) {
                        throw new WrappedIOException((IOException) cause);
                      } else if (cause instanceof InterruptedException) {
                        throw new WrappedInterruptedException((InterruptedException) cause);
                      } else {
                        throw new RuntimeException("Error during target processing", cause);
                      }

});
try {
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
}
} finally {
stillAddingTargetsToQueue.set(false);
}
}
Comment on lines +88 to +90

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

After setting stillAddingTargetsToQueue to false, it's good practice to explicitly shutdown the executor and wait for its termination. This ensures that all submitted tasks are completed before proceeding. Otherwise, there is a risk that the executor will not shutdown cleanly, and resources may be leaked. Consider adding executor.shutdown() and executor.awaitTermination() calls within the finally block.

          } finally {
            stillAddingTargetsToQueue.set(false);
            executor.shutdown(); // Initiate shutdown
            try {
              if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Wait for tasks to complete
                System.err.println("ForkJoinPool did not terminate in the specified timeout.");
                // Optionally, log the state of the tasks that are still running.
              }
            } catch (InterruptedException ie) {
              // (Re-)Cancel if current thread also interrupted
              executor.shutdownNow();
              // Preserve interrupt status
              Thread.currentThread().interrupt();
            }
          }

try {
writeAllTargetsFuture.get();
} catch (ExecutionException e) {
// TODO: propagate
throw new RuntimeException(e);
}
Comment on lines +94 to +96

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the previous TODO, this one also requires proper exception propagation. The ExecutionException should be unwrapped to get the root cause and either rethrown as a more specific exception or wrapped in a RuntimeException with the cause attached. Logging the exception is also recommended.

        } catch (ExecutionException e) {
          // TODO: propagate
          Throwable cause = e.getCause();
          if (cause instanceof IOException) {
            throw new WrappedIOException((IOException) cause);
          } else if (cause instanceof InterruptedException) {
            throw new WrappedInterruptedException((InterruptedException) cause);
          } else {
            throw new RuntimeException("Error during target processing", cause);
          }

}

private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) {
return targets.stream().map(target -> writeDelimited(toProto(target))).toList();
}

private Build.Target toProto(Target target) {
try {
return toTargetProtoBuffer(target, ourLabelPrinter);
} catch (InterruptedException e) {
throw new WrappedInterruptedException(e);
}
}
};
}

private static byte[] writeDelimited(Build.Target targetProtoBuffer) {
try {
var serializedSize = targetProtoBuffer.getSerializedSize();
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
var output = new byte[headerSize + serializedSize];
var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
targetProtoBuffer.writeTo(codedOut);
codedOut.flush();
return output;
Comment on lines +114 to +121
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Length-delimited framing is incorrect – header bytes are never written

writeDelimited allocates space for the var-int size prefix (headerSize) but never actually serialises the length into that region; the message bytes start at the offset headerSize, leaving the prefix zero-filled. Any consumer calling mergeDelimitedFrom will treat the first byte(s) as the size field, interpret 0, and immediately stop parsing.

-      var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
-      var output = new byte[headerSize + serializedSize];
-      var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
-      targetProtoBuffer.writeTo(codedOut);
+      int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
+      byte[] output = new byte[headerSize + serializedSize];
+
+      // 1. write the var-int length prefix
+      CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+      headerOut.writeUInt32NoTag(serializedSize);
+      headerOut.flush();
+
+      // 2. write the message bytes immediately after the prefix
+      CodedOutputStream bodyOut =
+          CodedOutputStream.newInstance(output, headerSize, serializedSize);
+      targetProtoBuffer.writeTo(bodyOut);
+      bodyOut.flush();

Without this fix every emitted target is malformed, so downstream tools will silently break.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
var serializedSize = targetProtoBuffer.getSerializedSize();
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
var output = new byte[headerSize + serializedSize];
var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
targetProtoBuffer.writeTo(codedOut);
codedOut.flush();
return output;
try {
var serializedSize = targetProtoBuffer.getSerializedSize();
- var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
- var output = new byte[headerSize + serializedSize];
- var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
- targetProtoBuffer.writeTo(codedOut);
+ int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
+ byte[] output = new byte[headerSize + serializedSize];
+
+ // 1. write the var-int length prefix
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush();
return output;
}

} catch (IOException e) {
throw new WrappedIOException(e);
}
}

private static class WrappedIOException extends RuntimeException {
private WrappedIOException(IOException cause) {
super(cause);
}

@Override
public IOException getCause() {
return (IOException) super.getCause();
Comment on lines +132 to +134

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider whether wrapping the IOException in a RuntimeException is truly necessary. If the calling code can handle IOException directly, it might be better to avoid the wrapping. If wrapping is necessary, ensure that the original exception is accessible via getCause().

    @Override
    public IOException getCause() {
      return cause;
    }

}
}

private static class WrappedInterruptedException extends RuntimeException {
private WrappedInterruptedException(InterruptedException cause) {
super(cause);
}

@Override
public InterruptedException getCause() {
return (InterruptedException) super.getCause();
}
Comment on lines +144 to +146

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to WrappedIOException, evaluate the necessity of wrapping InterruptedException. If the calling code can handle it directly, avoid wrapping. If wrapping is necessary, ensure the original exception is accessible via getCause().

    @Override
    public InterruptedException getCause() {
      return cause;
    }

}
}