Skip to content

Commit

Permalink
chore: Add custom metric for native shuffle fetching batches from JVM (
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Nov 21, 2024
1 parent 19dd58d commit e602305
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 13 deletions.
18 changes: 13 additions & 5 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Comet provides some tuning options to help you get the best performance from you

## Memory Tuning

Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.

Each executor will have a single memory pool which will be shared by all native plans being executed within that
Expand Down Expand Up @@ -105,8 +105,16 @@ then any shuffle operations that cannot be supported in this mode will fall back

## Metrics

Comet metrics are not directly comparable to Spark metrics in some cases.
Some Comet metrics are not directly comparable to Spark metrics in some cases:

`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision. In one case we saw total scan time
of 41 seconds reported as 23 seconds for example.
- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
between Spark and Comet.

Comet also adds some custom metrics:

### ShuffleWriterExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
26 changes: 20 additions & 6 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, Arc::clone(&context))?;
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);
let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand All @@ -151,6 +152,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
jvm_fetch_time,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -1083,6 +1085,7 @@ impl Debug for ShuffleRepartitioner {
}
}

#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
Expand All @@ -1091,6 +1094,7 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
jvm_fetch_time: Time,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1104,13 +1108,23 @@ async fn external_shuffle(
context.session_config().batch_size(),
);

while let Some(batch) = input.next().await {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch?))?;
loop {
let mut timer = jvm_fetch_time.timer();
let b = input.next().await;
timer.stop();

match b {
Some(batch_result) => {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch_result?))?;
}
_ => break,
}
}

repartitioner.shuffle_write().await
}

Expand Down
5 changes: 4 additions & 1 deletion native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ use std::{
};

/// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file
/// scan or the result of reading a broadcast or shuffle exchange.
/// scan or the result of reading a broadcast or shuffle exchange. ScanExec isn't invoked
/// until the data is already available in the JVM. When CometExecIterator invokes
/// Native.executePlan, it passes in the memory addresses of the input batches.
#[derive(Debug, Clone)]
pub struct ScanExec {
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the JVM
Expand All @@ -73,6 +75,7 @@ pub struct ScanExec {
cache: PlanProperties,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time fetching batches from JVM"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -480,7 +483,14 @@ class CometShuffleWriteProcessor(
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))
val nativeMetrics = CometMetricNode(nativeSQLMetrics)

val nativeMetrics = if (metrics.contains("jvm_fetch_time")) {
CometMetricNode(
nativeSQLMetrics ++ Map("jvm_fetch_time" ->
metrics("jvm_fetch_time")))
} else {
CometMetricNode(nativeSQLMetrics)
}

// Getting rid of the fake partitionId
val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)
Expand Down

0 comments on commit e602305

Please sign in to comment.