Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina committed Jan 14, 2025
1 parent 43674d8 commit 6c1178e
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -131,18 +131,23 @@ object GpuShuffleEnv extends Logging {
def isRapidsShuffleAvailable(conf: RapidsConf): Boolean = {
// the driver has `mgr` defined when this is checked
val sparkEnv = SparkEnv.get
val isRapidsManager = sparkEnv.shuffleManager.isInstanceOf[RapidsShuffleManagerLike]
if (isRapidsManager) {
validateRapidsShuffleManager(sparkEnv.shuffleManager.getClass.getName)
if (sparkEnv == null) {
// we may hit this in some tests that don't need to use the RAPIDS shuffle manager.
false
} else {
val isRapidsManager = sparkEnv.shuffleManager.isInstanceOf[RapidsShuffleManagerLike]
if (isRapidsManager) {
validateRapidsShuffleManager(sparkEnv.shuffleManager.getClass.getName)
}
// executors have `env` defined when this is checked
// in tests
val isConfiguredInEnv = Option(env).exists(_.isRapidsShuffleConfigured)
(isConfiguredInEnv || isRapidsManager) &&
(conf.isMultiThreadedShuffleManagerMode ||
(conf.isGPUShuffle && !isExternalShuffleEnabled &&
!isSparkAuthenticateEnabled)) &&
conf.isSqlExecuteOnGPU
}
// executors have `env` defined when this is checked
// in tests
val isConfiguredInEnv = Option(env).exists(_.isRapidsShuffleConfigured)
(isConfiguredInEnv || isRapidsManager) &&
(conf.isMultiThreadedShuffleManagerMode ||
(conf.isGPUShuffle && !isExternalShuffleEnabled &&
!isSparkAuthenticateEnabled)) &&
conf.isSqlExecuteOnGPU
}

def useGPUShuffle(conf: RapidsConf): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.unsafe.types.UTF8String
/**
* Unit tests for utility methods in [[ BatchWithPartitionDataUtils ]]
*/
class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQueryCompareTestSuite {
class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase {

test("test splitting partition data into groups") {
val maxGpuColumnSizeBytes = 1000L
Expand All @@ -55,48 +55,46 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
// This test uses single-row partition values that should throw a GpuSplitAndRetryOOM exception
// when a retry is forced.
val maxGpuColumnSizeBytes = 1000L
withGpuSparkSession(_ => {
val (_, partValues, _, partSchema) = getSamplePartitionData
closeOnExcept(buildBatch(getSampleValueData)) { valueBatch =>
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
Array(1), partValues.take(1), partSchema, maxGpuColumnSizeBytes)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(resultBatchIter) { _ =>
assertThrows[GpuSplitAndRetryOOM] {
resultBatchIter.next()
}
val (_, partValues, _, partSchema) = getSamplePartitionData
closeOnExcept(buildBatch(getSampleValueData)) { valueBatch =>
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
Array(1), partValues.take(1), partSchema, maxGpuColumnSizeBytes)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(resultBatchIter) { _ =>
assertThrows[GpuSplitAndRetryOOM] {
resultBatchIter.next()
}
}
})
}
}

test("test adding partition values to batch with OOM split and retry") {
// This test should split the input batch and process them when a retry is forced.
val maxGpuColumnSizeBytes = 1000L
withGpuSparkSession(_ => {
val (partCols, partValues, partRows, partSchema) = getSamplePartitionData
withResource(buildBatch(getSampleValueData)) { valueBatch =>
withResource(buildBatch(partCols)) { partBatch =>
withResource(GpuColumnVector.combineColumns(valueBatch, partBatch)) { expectedBatch =>
// we incRefCounts here because `addPartitionValuesToBatch` takes ownership of
// `valueBatch`, but we are keeping it alive since its columns are part of
// `expectedBatch`
GpuColumnVector.incRefCounts(valueBatch)
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
partRows, partValues, partSchema, maxGpuColumnSizeBytes)
withResource(resultBatchIter) { _ =>
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
// Assert that the final count of rows matches expected batch
// We also need to close each batch coming from `resultBatchIter`.
val rowCounts = resultBatchIter.map(withResource(_){_.numRows()}).sum
assert(rowCounts == expectedBatch.numRows())
}
val (partCols, partValues, partRows, partSchema) = getSamplePartitionData
withResource(buildBatch(getSampleValueData)) { valueBatch =>
withResource(buildBatch(partCols)) { partBatch =>
withResource(GpuColumnVector.combineColumns(valueBatch, partBatch)) { expectedBatch =>
// we incRefCounts here because `addPartitionValuesToBatch` takes ownership of
// `valueBatch`, but we are keeping it alive since its columns are part of
// `expectedBatch`
GpuColumnVector.incRefCounts(valueBatch)
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
partRows, partValues, partSchema, maxGpuColumnSizeBytes)
withResource(resultBatchIter) { _ =>
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
// Assert that the final count of rows matches expected batch
// We also need to close each batch coming from `resultBatchIter`.
val rowCounts = resultBatchIter.map(withResource(_) {
_.numRows()
}).sum
assert(rowCounts == expectedBatch.numRows())
}
}
}
})
}
}

private def getSamplePartitionData: (Array[Array[String]], Array[InternalRow], Array[Long],
Expand Down Expand Up @@ -140,4 +138,4 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
GpuColumnVector.from(ColumnVector.fromStrings(v: _*), StringType))
new ColumnarBatch(colVectors.toArray, numRows)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@ import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, ExprId, SortOrder, SpecificInternalRow}
import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -36,49 +35,45 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
}

private def testRoundRobinPartitioner(partNum: Int) = {
TestUtils.withGpuSparkSession(new SparkConf()) { _ =>
val rrp = GpuRoundRobinPartitioning(partNum)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(partNum === ret.size)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
val rrp = GpuRoundRobinPartitioning(partNum)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(partNum === ret.size)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
}
}

test("GPU range partition with retry") {
TestUtils.withGpuSparkSession(new SparkConf()) { _ =>
// Initialize range bounds
val fieldTypes: Array[DataType] = Array(IntegerType)
val bounds = new SpecificInternalRow(fieldTypes)
bounds.setInt(0, 3)
// Initialize GPU sorter
val ref = GpuBoundReference(0, IntegerType, nullable = true)(ExprId(0), "a")
val sortOrder = SortOrder(ref, Ascending)
val attrs = AttributeReference(ref.name, ref.dataType, ref.nullable)()
val gpuSorter = new GpuSorter(Seq(sortOrder), Array(attrs))
// Initialize range bounds
val fieldTypes: Array[DataType] = Array(IntegerType)
val bounds = new SpecificInternalRow(fieldTypes)
bounds.setInt(0, 3)
// Initialize GPU sorter
val ref = GpuBoundReference(0, IntegerType, nullable = true)(ExprId(0), "a")
val sortOrder = SortOrder(ref, Ascending)
val attrs = AttributeReference(ref.name, ref.dataType, ref.nullable)()
val gpuSorter = new GpuSorter(Seq(sortOrder), Array(attrs))

val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(ret.length === 2)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
assert(ret.length === 2)
} finally {
if (ret != null) {
ret.map(_._1).safeClose()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,10 +27,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.MapData
import org.apache.spark.sql.types._

class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
with RmmSparkRetrySuiteBase {
class InternalColumnarRDDConverterSuite extends RmmSparkRetrySuiteBase {

def compareMapAndMapDate[K,V](map: collection.Map[K, V], mapData: MapData): Assertion = {
def compareMapAndMapDate[K, V](map: collection.Map[K, V], mapData: MapData): Assertion = {
assert(map.size == mapData.numElements())
val outputMap = mutable.Map[Any, Any]()
// Only String now, TODO: support other data types in Map
Expand Down Expand Up @@ -66,12 +65,12 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
}

test("transform boolean, byte, short, int, float, long, double, date, timestamp data" +
" back and forth between Row and Columnar") {
" back and forth between Row and Columnar") {
val schema = StructType(Seq(
StructField("Boolean", BooleanType),
StructField("BinaryNotNull", BooleanType, nullable = false),
StructField("Byte", ByteType),
StructField("ByteNotNull",ByteType, nullable = false),
StructField("ByteNotNull", ByteType, nullable = false),
StructField("Short", ShortType),
StructField("ShortNotNull", ShortType, nullable = false),
StructField("Int", IntegerType),
Expand All @@ -86,8 +85,8 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
StructField("DateNotNull", DateType, nullable = false),
StructField("Timestamp", TimestampType),
StructField("TimestampNotNull", TimestampType, nullable = false),
StructField("Decimal", DecimalType(20,10)),
StructField("DecimalNotNull", DecimalType(20,10), nullable = false)
StructField("Decimal", DecimalType(20, 10)),
StructField("DecimalNotNull", DecimalType(20, 10), nullable = false)
))
val numRows = 100
val rows = GpuBatchUtilsSuite.createExternalRows(schema, numRows)
Expand All @@ -113,7 +112,7 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
} else {
if (f.dataType.isInstanceOf[DecimalType]) {
assert(input.get(i) == output.get(i, f.dataType)
.asInstanceOf[Decimal].toJavaBigDecimal)
.asInstanceOf[Decimal].toJavaBigDecimal)
} else {
assert(input.get(i) == output.get(i, f.dataType))
}
Expand Down Expand Up @@ -272,15 +271,19 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
assert(outputStructRow.isNullAt(2))
} else {
assert(inputStructRow.getSeq(2) sameElements outputStructRow.getArray(2)
.toDoubleArray())
.toDoubleArray())
}
}
}
assert(!c2rIterator.hasNext)
}
}
}
}

// these tests are separated out because they are really just spark tests, and they should skip
// the initialization done in `RmmSparkRetrySuiteBase`
class InternalColumnarRDDConverterSparkSessionSuite extends SparkQueryCompareTestSuite {
test("InternalColumnarRddConverter should extractRDDTable RDD[ColumnarBatch]") {
withGpuSparkSession(spark => {
val path = TestResourceFinder.getResourcePath("disorder-read-schema.parquet")
Expand Down Expand Up @@ -308,5 +311,4 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
assert(result.forall(_ == true))
}, new SparkConf().set("spark.rapids.sql.test.allowedNonGpu", "DeserializeToObjectExec"))
}

}

0 comments on commit 6c1178e

Please sign in to comment.