Skip to content

Commit

Permalink
Fix two potential OOM issues in agg.
Browse files Browse the repository at this point in the history
The first one is by taking the nested literals into account when calculating the output
size for pre-split.

The second one is by using the correct size for buffer size comparison when collecting the
next bundle of batches in aggregate.

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman committed Jan 2, 2025
1 parent 9e2b28a commit c580b85
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,11 @@ object DataTypeUtils {
case _ => false
}

def hasOffset(dataType: DataType): Boolean = dataType match {
case _: ArrayType | _: StringType | _: BinaryType => true
case _ => false
}

def hasNestedTypes(schema: StructType): Boolean =
schema.exists(f => isNestedType(f.dataType))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1095,16 +1095,13 @@ class GpuMergeAggregateIterator(
closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) {
toAggregateBuckets =>
var currentSize = 0L
while (batchesByBucket.nonEmpty &&
(
// for some test cases targetMergeBatchSize is too small to fit any bucket,
// in this case we put the first bucket into toAggregateBuckets anyway
// refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples
toAggregateBuckets.isEmpty ||
batchesByBucket.last.size() + currentSize <= targetMergeBatchSize)) {
val bucket = batchesByBucket.remove(batchesByBucket.size - 1)
currentSize += bucket.map(_.sizeInBytes).sum
toAggregateBuckets += bucket
var keepGoing = true
while (batchesByBucket.nonEmpty && keepGoing) {
currentSize += batchesByBucket.last.map(_.sizeInBytes).sum
keepGoing = currentSize <= targetMergeBatchSize || toAggregateBuckets.isEmpty
if (keepGoing) {
toAggregateBuckets += batchesByBucket.remove(batchesByBucket.size - 1)
}
}

AggregateUtils.concatenateAndMerge(
Expand Down Expand Up @@ -2225,4 +2222,4 @@ class DynamicGpuPartialAggregateIterator(
throw new NoSuchElementException()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@ import ai.rapids.cudf
import ai.rapids.cudf._
import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.DataTypeUtils.hasOffset
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit}
Expand All @@ -35,11 +36,13 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, LongType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.random.BernoulliCellSampler

class GpuProjectExecMeta(
Expand Down Expand Up @@ -308,12 +311,97 @@ object PreProjectSplitIterator {
} else {
boundExprs.getPassThroughIndex(index).map { inputIndex =>
cb.column(inputIndex).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize
}.orElse {
// A literal has an exact size that should be taken into account.
extractGpuLit(boundExprs.exprTiers.last(index)).map { gpuLit =>
calcMemorySizeForLiteral(gpuLit.value, gpuLit.dataType, numRows)
}
}.getOrElse {
GpuBatchUtils.minGpuMemory(dataType, true, numRows)
}
}
}.sum
}

private def calcMemorySizeForLiteral(litVal: Any, litType: DataType, numRows: Int): Long = {
// One literal size = value size + additional buffers size (offset and/or validity)
val litSize = calcLitValueSize(litVal, litType) + {
val pickRowNum: Int => Int = rowNum => if (litVal == null) 0 else rowNum
litType match {
case ArrayType(elemType, hasNullElem) =>
val numElems = pickRowNum(litVal.asInstanceOf[ArrayData].numElements())
// A GPU array literal requires only one column as the child
estimateLitAdditionSize(hasNullElem, hasOffset(elemType), numElems)
case StructType(fields) =>
val childrenNumRows = pickRowNum(1)
// A GPU struct literal requires "fields.size" columns as the children.
fields.map(f =>
estimateLitAdditionSize(f.nullable, hasOffset(f.dataType), childrenNumRows)
).sum
case MapType(keyType, valType, hasNullValue) =>
val mapRowsNum = pickRowNum(litVal.asInstanceOf[MapData].numElements())
// A GPU map literal requires 4 columns as the children.
// the key and value column, along with two wrapper columns as below
// " list<struct{key, value}> "
estimateLitAdditionSize(false, hasOffset(keyType), mapRowsNum) + // key
estimateLitAdditionSize(hasNullValue, hasOffset(valType), mapRowsNum) + // value
estimateLitAdditionSize(false, false, mapRowsNum) + // struct
estimateLitAdditionSize(false, true, mapRowsNum) // top list
case _ => 0L // primitive types has no nested additional buffers
}
}
// totalSize = litSize * numRows + top additional buffers size after expanding to a column.
litSize * numRows + estimateLitAdditionSize(litVal == null, hasOffset(litType), numRows)
}

private def estimateLitAdditionSize(hasNull: Boolean, hasOffset: Boolean, rows: Int): Long = {
// Additional buffers size, it is not nested for literals,
// so no need to do it recursively.
var totalSize = 0L
if (hasNull) {
totalSize += GpuBatchUtils.calculateValidityBufferSize(rows)
}
if (hasOffset) {
totalSize += GpuBatchUtils.calculateOffsetBufferSize(rows)
}
totalSize
}

private def calcLitValueSize(lit: Any, litTp: DataType): Long = if (lit == null) {
if (GpuBatchUtils.isFixedWidth(litTp)) {
litTp.defaultSize
} else {
0L
}
} else {
litTp match {
case StringType => lit.asInstanceOf[UTF8String].numBytes()
case BinaryType => lit.asInstanceOf[Array[Byte]].length
case ArrayType(elemType, _) =>
lit.asInstanceOf[ArrayData].array.map(calcLitValueSize(_, elemType)).sum
case StructType(fields) =>
val stData = lit.asInstanceOf[InternalRow]
fields.zipWithIndex.map { case (f, i) =>
calcLitValueSize(stData.get(i, f.dataType), f.dataType)
}.sum
case MapType(keyType, valType, _) =>
val mapData = lit.asInstanceOf[MapData]
val keyData = mapData.keyArray()
val valData = mapData.valueArray()
(0 until mapData.numElements()).map { i =>
calcLitValueSize(keyData.get(i, keyType), keyType) +
calcLitValueSize(valData.get(i, valType), valType)
}.sum
case _ => litTp.defaultSize
}
}

@scala.annotation.tailrec
def extractGpuLit(exp: Expression): Option[GpuLiteral] = exp match {
case gl: GpuLiteral => Some(gl)
case ga: GpuAlias => extractGpuLit(ga.child)
case _ => None
}
}

/**
Expand Down

0 comments on commit c580b85

Please sign in to comment.