Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure an executor broadcast is in a single batch [databricks] #10660

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions integration_tests/src/main/python/aqe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,40 @@ def do_it(spark):

assert_gpu_and_cpu_are_equal_collect(do_it, conf=bhj_disable_conf)


# See https://github.com/NVIDIA/spark-rapids/issues/10645. Sometimes the exchange can provide multiple
# batches, so we to coalesce them into a single batch for the broadcast hash join.
@ignore_order(local=True)
@pytest.mark.skipif(not (is_databricks_runtime()), \
reason="Executor side broadcast only supported on Databricks")
def test_aqe_join_executor_broadcast_enforce_single_batch():
# Use a small batch to see if Databricks could send multiple batches
conf = copy_and_update(_adaptive_conf, { "spark.rapids.sql.batchSizeBytes": "25" })
def prep(spark):
id_gen = RepeatSeqGen(IntegerGen(nullable=False), length=250)
name_gen = RepeatSeqGen(["Adam", "Bob", "Cathy"], data_type=StringType())
school_gen = RepeatSeqGen(["School1", "School2", "School3"], data_type=StringType())

df = gen_df(spark, StructGen([('id', id_gen), ('name', name_gen)], nullable=False), length=1000)
df.createOrReplaceTempView("df")

df_school = gen_df(spark, StructGen([('id', id_gen), ('school', school_gen)], nullable=False), length=250)
df.createOrReplaceTempView("df_school")

with_cpu_session(prep)

def do_it(spark):
res = spark.sql(
"""
select /*+ BROADCAST(df_school) */ * from df, df_school where df.id == df_school.id
"""
Comment on lines +325 to +327
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: PySpark API looks nicer and would not require resorting to view registration along the lines

df.join(broadcast(df_school), df.id == df_school.id) 

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, but this test requires a join hint to ensure a broadcast hash join is used in this test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
res.explain()
return res
# Ensure this is an EXECUTOR_BROADCAST
assert_cpu_and_gpu_are_equal_collect_with_capture(
do_it,
exist_classes="GpuShuffleExchangeExec,GpuBroadcastHashJoinExec",
non_exist_classes="GpuBroadcastExchangeExec",
conf=conf)

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution

import com.nvidia.spark.rapids.{ConcatAndConsumeAll, GpuColumnVector, GpuMetric, GpuShuffleCoalesceIterator, HostShuffleCoalesceIterator}
import com.nvidia.spark.rapids.{ConcatAndConsumeAll, GpuCoalesceIterator, GpuColumnVector, GpuMetric, GpuShuffleCoalesceIterator, HostShuffleCoalesceIterator, NoopMetric, RequireSingleBatch}
import com.nvidia.spark.rapids.Arm.withResource

import org.apache.spark.TaskContext
Expand All @@ -40,6 +40,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* which means they require a format of the data that can be used on the GPU.
*/
object GpuExecutorBroadcastHelper {
import GpuMetric._

// This reads the shuffle data that we have retrieved using `getShuffleRDD` from the shuffle
// exchange. WARNING: Do not use this method outside of this context. This method can only be
Expand All @@ -65,11 +66,28 @@ object GpuExecutorBroadcastHelper {
// Use the GPU Shuffle Coalesce iterator to concatenate and load batches onto the
// host as needed. Since we don't have GpuShuffleCoalesceExec in the plan for the
// executor broadcast scenario, we have to use that logic here to efficiently
// grab and release the semaphore while doing I/O
// grab and release the semaphore while doing I/O. We wrap this with GpuCoalesceIterator
// to ensure this always a single batch for the following step.
val shuffleMetrics = Map(
CONCAT_TIME -> metricsMap(CONCAT_TIME),
OP_TIME -> metricsMap(OP_TIME)
).withDefaultValue(NoopMetric)

val iter = shuffleDataIterator(shuffleData)
new GpuShuffleCoalesceIterator(
new HostShuffleCoalesceIterator(iter, targetSize, metricsMap),
dataTypes, metricsMap).asInstanceOf[Iterator[ColumnarBatch]]
new GpuCoalesceIterator(
new GpuShuffleCoalesceIterator(
new HostShuffleCoalesceIterator(iter, targetSize, shuffleMetrics),
dataTypes, shuffleMetrics).asInstanceOf[Iterator[ColumnarBatch]],
dataTypes,
RequireSingleBatch,
NoopMetric, // numInputRows
NoopMetric, // numInputBatches
NoopMetric, // numOutputRows
NoopMetric, // numOutputBatches
NoopMetric, // collectTime
metricsMap(CONCAT_TIME), // concatTime
metricsMap(OP_TIME), // opTime
"GpuBroadcastHashJoinExec").asInstanceOf[Iterator[ColumnarBatch]]
}

/**
Expand Down
Loading