Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make maxCpuBatchSize in GpuPartitioning configurable #11929

Open
wants to merge 2 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuPartitioning {
// The maximum size of an Array minus a bit for overhead for metadata
val MaxCpuBatchSize = 2147483639L - 2048L
}

trait GpuPartitioning extends Partitioning {
private[this] val (maxCompressionBatchSize, _useGPUShuffle, _useMultiThreadedShuffle) = {
private[this] val (
maxCpuBatchSize, maxCompressionBatchSize, _useGPUShuffle, _useMultiThreadedShuffle) = {
binmahone marked this conversation as resolved.
Show resolved Hide resolved
val rapidsConf = new RapidsConf(SQLConf.get)
(rapidsConf.shuffleCompressionMaxBatchMemory,
(rapidsConf.shuffleParitioningMaxCpuBatchSize,
rapidsConf.shuffleCompressionMaxBatchMemory,
GpuShuffleEnv.useGPUShuffle(rapidsConf),
GpuShuffleEnv.useMultiThreadedShuffle(rapidsConf))
}
Expand Down Expand Up @@ -124,7 +121,7 @@ trait GpuPartitioning extends Partitioning {
// This should be a temp work around.
partitionColumns.foreach(_.getBase.getNullCount)
val totalInputSize = GpuColumnVector.getTotalDeviceMemoryUsed(partitionColumns)
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize
val mightNeedToSplit = totalInputSize > maxCpuBatchSize

// We have to wrap the NvtxWithMetrics over both copyToHostAsync and corresponding CudaSync,
// because the copyToHostAsync calls above are not guaranteed to be asynchronous (e.g.: when
Expand Down Expand Up @@ -164,7 +161,7 @@ trait GpuPartitioning extends Partitioning {
case (batch, part) =>
val totalSize = SlicedGpuColumnVector.getTotalHostMemoryUsed(batch)
val numOutputBatches =
math.ceil(totalSize.toDouble / GpuPartitioning.MaxCpuBatchSize).toInt
math.ceil(totalSize.toDouble / maxCpuBatchSize).toInt
if (numOutputBatches > 1) {
// For now we are going to slice it on number of rows instead of looking
// at each row to try and decide. If we get in trouble we can probably
Expand Down
13 changes: 13 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,17 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.integerConf
.createWithDefault(20)

val SHUFFLE_PARTITIONING_MAX_CPU_BATCH_SIZE =
binmahone marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copyright needs to be updated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I have a stupid question: why don't we replace all the license header for all files in a batch? It's troublesome to take care of the license header every time.

conf("spark.rapids.shuffle.partitioning.maxCpuBatchSize")
.doc("The maximum size of a sliced batch output to the CPU side " +
"when GPU partitioning shuffle data. This is used to limit the peak on-heap memory used " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one thing I would say was that this wasn't introduced to limit peak memory per se, it was introduced because we cannot go above this number (#45). If we are going to make it configurable, we should perhaps add a max check so we don't go above it. The comment could reflect this constraint.

+1 on the internal comment from @winningsix

Copy link
Collaborator Author

@binmahone binmahone Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"it was introduced because we cannot go above this number " yes I'm also aware of this, but it happends that we can take advantage of it to limit peak memory. I changed the wording from "This is used to " to "This can be used to" to be more precise.

"by CPU to serialize the shuffle data, especially for skew data cases. " +
"The default value is maximum size of an Array minus 2k overhead (2147483639L - 2048L), " +
"user should only set a smaller value than default value.")
.bytesConf(ByteUnit.BYTE)
// The maximum size of an Array minus a bit for overhead for metadata
.createWithDefault(2147483639L - 2048L)
binmahone marked this conversation as resolved.
Show resolved Hide resolved

val SHUFFLE_MULTITHREADED_READER_THREADS =
conf("spark.rapids.shuffle.multiThreaded.reader.threads")
.doc("The number of threads to use for reading shuffle blocks per executor in the " +
Expand Down Expand Up @@ -3176,6 +3187,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val shuffleMultiThreadedReaderThreads: Int = get(SHUFFLE_MULTITHREADED_READER_THREADS)

lazy val shuffleParitioningMaxCpuBatchSize: Long = get(SHUFFLE_PARTITIONING_MAX_CPU_BATCH_SIZE)

lazy val shuffleKudoSerializerEnabled: Boolean = get(SHUFFLE_KUDO_SERIALIZER_ENABLED)

def isUCXShuffleManagerMode: Boolean =
Expand Down
Loading