diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala index a031a2aaeed..e3d71818315 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DataTypeUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,11 @@ object DataTypeUtils { case _ => false } + def hasOffset(dataType: DataType): Boolean = dataType match { + case _: ArrayType | _: StringType | _: BinaryType => true + case _ => false + } + def hasNestedTypes(schema: StructType): Boolean = schema.exists(f => isNestedType(f.dataType)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index 8fc5326705e..847a21d81e6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1095,16 +1095,13 @@ class GpuMergeAggregateIterator( closeOnExcept(new ArrayBuffer[AutoClosableArrayBuffer[SpillableColumnarBatch]]) { toAggregateBuckets => var currentSize = 0L - while (batchesByBucket.nonEmpty && - ( - // for some test cases targetMergeBatchSize is too small to fit any bucket, - // in this case we put the first bucket into toAggregateBuckets anyway - // refer to https://github.com/NVIDIA/spark-rapids/issues/11790 for examples - toAggregateBuckets.isEmpty || - batchesByBucket.last.size() + currentSize <= targetMergeBatchSize)) { - val bucket = batchesByBucket.remove(batchesByBucket.size - 1) - currentSize += bucket.map(_.sizeInBytes).sum - toAggregateBuckets += bucket + var keepGoing = true + while (batchesByBucket.nonEmpty && keepGoing) { + currentSize += batchesByBucket.last.map(_.sizeInBytes).sum + keepGoing = currentSize <= targetMergeBatchSize || toAggregateBuckets.isEmpty + if (keepGoing) { + toAggregateBuckets += batchesByBucket.remove(batchesByBucket.size - 1) + } } AggregateUtils.concatenateAndMerge( @@ -2225,4 +2222,4 @@ class DynamicGpuPartialAggregateIterator( throw new NoSuchElementException() } } -} \ No newline at end of file +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index 891e837d7e1..29e0a56e5dc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import ai.rapids.cudf import ai.rapids.cudf._ import com.nvidia.spark.Retryable import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.DataTypeUtils.hasOffset import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit} @@ -35,11 +36,13 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SampleExec, SparkPlan} import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler} import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.types.{DataType, LongType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.random.BernoulliCellSampler class GpuProjectExecMeta( @@ -308,12 +311,97 @@ object PreProjectSplitIterator { } else { boundExprs.getPassThroughIndex(index).map { inputIndex => cb.column(inputIndex).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize + }.orElse { + // A literal has an exact size that should be taken into account. + extractGpuLit(boundExprs.exprTiers.last(index)).map { gpuLit => + calcMemorySizeForLiteral(gpuLit.value, gpuLit.dataType, numRows) + } }.getOrElse { GpuBatchUtils.minGpuMemory(dataType, true, numRows) } } }.sum } + + private def calcMemorySizeForLiteral(litVal: Any, litType: DataType, numRows: Int): Long = { + // One literal size = value size + additional buffers size (offset and/or validity) + val litSize = calcLitValueSize(litVal, litType) + { + val pickRowNum: Int => Int = rowNum => if (litVal == null) 0 else rowNum + litType match { + case ArrayType(elemType, hasNullElem) => + val numElems = pickRowNum(litVal.asInstanceOf[ArrayData].numElements()) + // A GPU array literal requires only one column as the child + estimateLitAdditionSize(hasNullElem, hasOffset(elemType), numElems) + case StructType(fields) => + val childrenNumRows = pickRowNum(1) + // A GPU struct literal requires "fields.size" columns as the children. + fields.map(f => + estimateLitAdditionSize(f.nullable, hasOffset(f.dataType), childrenNumRows) + ).sum + case MapType(keyType, valType, hasNullValue) => + val mapRowsNum = pickRowNum(litVal.asInstanceOf[MapData].numElements()) + // A GPU map literal requires 4 columns as the children. + // the key and value column, along with two wrapper columns as below + // " list " + estimateLitAdditionSize(false, hasOffset(keyType), mapRowsNum) + // key + estimateLitAdditionSize(hasNullValue, hasOffset(valType), mapRowsNum) + // value + estimateLitAdditionSize(false, false, mapRowsNum) + // struct + estimateLitAdditionSize(false, true, mapRowsNum) // top list + case _ => 0L // primitive types has no nested additional buffers + } + } + // totalSize = litSize * numRows + top additional buffers size after expanding to a column. + litSize * numRows + estimateLitAdditionSize(litVal == null, hasOffset(litType), numRows) + } + + private def estimateLitAdditionSize(hasNull: Boolean, hasOffset: Boolean, rows: Int): Long = { + // Additional buffers size, it is not nested for literals, + // so no need to do it recursively. + var totalSize = 0L + if (hasNull) { + totalSize += GpuBatchUtils.calculateValidityBufferSize(rows) + } + if (hasOffset) { + totalSize += GpuBatchUtils.calculateOffsetBufferSize(rows) + } + totalSize + } + + private def calcLitValueSize(lit: Any, litTp: DataType): Long = if (lit == null) { + if (GpuBatchUtils.isFixedWidth(litTp)) { + litTp.defaultSize + } else { + 0L + } + } else { + litTp match { + case StringType => lit.asInstanceOf[UTF8String].numBytes() + case BinaryType => lit.asInstanceOf[Array[Byte]].length + case ArrayType(elemType, _) => + lit.asInstanceOf[ArrayData].array.map(calcLitValueSize(_, elemType)).sum + case StructType(fields) => + val stData = lit.asInstanceOf[InternalRow] + fields.zipWithIndex.map { case (f, i) => + calcLitValueSize(stData.get(i, f.dataType), f.dataType) + }.sum + case MapType(keyType, valType, _) => + val mapData = lit.asInstanceOf[MapData] + val keyData = mapData.keyArray() + val valData = mapData.valueArray() + (0 until mapData.numElements()).map { i => + calcLitValueSize(keyData.get(i, keyType), keyType) + + calcLitValueSize(valData.get(i, valType), valType) + }.sum + case _ => litTp.defaultSize + } + } + + @scala.annotation.tailrec + def extractGpuLit(exp: Expression): Option[GpuLiteral] = exp match { + case gl: GpuLiteral => Some(gl) + case ga: GpuAlias => extractGpuLit(ga.child) + case _ => None + } } /**