diff --git a/integration_tests/src/main/python/rand_test.py b/integration_tests/src/main/python/rand_test.py new file mode 100644 index 00000000000..a91bc518b04 --- /dev/null +++ b/integration_tests/src/main/python/rand_test.py @@ -0,0 +1,85 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect +from data_gen import * +from marks import * +from spark_session import is_before_spark_351 + +import pyspark.sql.functions as f + + +@ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 +def test_group_agg_with_rand(): + # GPU and CPU produce the same grouping rows but in different orders after Shuffle, + # while the rand() always generates the same sequence. Then CPU and GPU will produce + # different final rows after aggregation. See as below: + # GPU output: + # +---+-------------------+ + # | a| random| + # +---+-------------------+ + # | 3| 0.619189370225301| + # | 5| 0.5096018842446481| + # | 2| 0.8325259388871524| + # | 4|0.26322809041172357| + # | 1| 0.6702867696264135| + # +---+-------------------+ + # CPU output: + # +---+-------------------+ + # | a| random| + # +---+-------------------+ + # | 1| 0.619189370225301| + # | 2| 0.5096018842446481| + # | 3| 0.8325259388871524| + # | 4|0.26322809041172357| + # | 5| 0.6702867696264135| + # +---+-------------------+ + # To make the output comparable, here builds a generator to generate only one group. + const_int_gen = IntegerGen(nullable=False, min_val=1, max_val=1, special_cases=[]) + + def test(spark): + return unary_op_df(spark, const_int_gen, num_slices=1).groupby('a').agg(f.rand(42)) + assert_gpu_and_cpu_are_equal_collect(test) + + +@ignore_order(local=True) +def test_project_with_rand(): + # To make the output comparable, here build a generator to generate only one value. + # Not sure if Project could have the same order issue as groupBy, but still just in case. + const_int_gen = IntegerGen(nullable=False, min_val=1, max_val=1, special_cases=[]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, const_int_gen, num_slices=1).select('a', f.rand(42)) + ) + + +@ignore_order(local=True) +def test_filter_with_rand(): + const_int_gen = IntegerGen(nullable=False, min_val=1, max_val=1, special_cases=[]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, const_int_gen, num_slices=1).filter(f.rand(42) > 0.5) + ) + +# See https://github.com/apache/spark/commit/9c0b803ba124a6e70762aec1e5559b0d66529f4d +@ignore_order(local=True) +@pytest.mark.skipif(is_before_spark_351(), + reason='Generate supports nondeterministic inputs from Spark 3.5.1') +def test_generate_with_rand(): + const_int_gen = IntegerGen(nullable=False, min_val=1, max_val=1, special_cases=[]) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, const_int_gen, num_slices=1).select( + f.explode(f.array(f.rand(42)))) + ) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 831680e4feb..d292dba46d3 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -41,6 +41,7 @@ def _from_scala_map(scala_map): # Many of these are redundant with default settings for the configs but are set here explicitly # to ensure any cluster settings do not interfere with tests that assume the defaults. _default_conf = { + 'spark.rapids.sql.test.retryContextCheck.enabled': 'true', 'spark.rapids.sql.castDecimalToFloat.enabled': 'false', 'spark.rapids.sql.castFloatToDecimal.enabled': 'false', 'spark.rapids.sql.castFloatToIntegralTypes.enabled': 'false', diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index 7ca3e84e9ba..8060198c789 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -16,7 +16,7 @@ from pyspark import BarrierTaskContext, TaskContext from conftest import is_at_least_precommit_run, is_databricks_runtime -from spark_session import is_before_spark_330, is_before_spark_350, is_spark_341 +from spark_session import is_before_spark_330, is_before_spark_331, is_before_spark_350, is_spark_341 from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version @@ -474,3 +474,19 @@ def add_one(a): lambda spark: unary_op_df(spark, int_gen, num_slices=4, length=52345) .select(my_udf(f.lit(0))), conf=arrow_udf_conf) + + +# Python UDFs support nondeterministic expressions from Spark 3.3.1. +# See https://github.com/apache/spark/commit/1a01a492c051bb861c480f224a3c310e133e4d01 +@ignore_order(local=True) +@pytest.mark.skipif(is_before_spark_331(), + reason='Nondeterministic expressions are supported from Spark 3.3.1') +def test_pandas_math_udf_with_rand(): + def add(rand_value): + return rand_value + my_udf = f.pandas_udf(add, returnType=IntegerType()) + assert_gpu_and_cpu_are_equal_collect( + # Ensure there is only one partition to make the output comparable. + lambda spark: unary_op_df(spark, int_gen, length=10, num_slices=1).select( + my_udf(f.rand(42))), + conf=arrow_udf_conf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala index e3ca330b409..0b8896b4a63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala @@ -816,13 +816,13 @@ object GpuAggFinalPassIterator { boundResultReferences) } - private[this] def reorderFinalBatch(finalBatch: ColumnarBatch, + private[this] def reorderFinalBatch(finalBatch: SpillableColumnarBatch, boundExpressions: BoundExpressionsModeAggregates, metrics: GpuHashAggregateMetrics): ColumnarBatch = { // Perform the last project to get the correct shape that Spark expects. Note this may // add things like literals that were not part of the aggregate into the batch. - closeOnExcept(GpuProjectExec.projectAndClose(finalBatch, - boundExpressions.boundResultReferences, NoopMetric)) { ret => + closeOnExcept(GpuProjectExec.projectAndCloseWithRetrySingleBatch(finalBatch, + boundExpressions.boundResultReferences)) { ret => metrics.numOutputRows += ret.numRows() metrics.numOutputBatches += 1 ret @@ -838,9 +838,12 @@ object GpuAggFinalPassIterator { withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime, opTime)) { _ => val finalBatch = boundExpressions.boundFinalProjections.map { exprs => - GpuProjectExec.projectAndClose(batch, exprs, NoopMetric) + GpuProjectExec.projectAndCloseWithRetrySingleBatch( + SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY), exprs) }.getOrElse(batch) - reorderFinalBatch(finalBatch, boundExpressions, metrics) + val finalSCB = + SpillableColumnarBatch(finalBatch, SpillPriorities.ACTIVE_BATCHING_PRIORITY) + reorderFinalBatch(finalSCB, boundExpressions, metrics) } } } @@ -854,12 +857,10 @@ object GpuAggFinalPassIterator { withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime, opTime)) { _ => val finalBatch = boundExpressions.boundFinalProjections.map { exprs => - GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, exprs) - }.getOrElse { - withRetryNoSplit(sb) { _ => - sb.getColumnarBatch() - } - } + SpillableColumnarBatch( + GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, exprs), + SpillPriorities.ACTIVE_BATCHING_PRIORITY) + }.getOrElse(sb) reorderFinalBatch(finalBatch, boundExpressions, metrics) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index c1ede7554a6..c23728ba43a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2432,7 +2432,8 @@ object GpuOverrides extends Logging { (TypeSig.INT + TypeSig.LONG).withAllLit(), (TypeSig.INT + TypeSig.LONG).withAllLit()))), (a, conf, p, r) => new UnaryExprMeta[Rand](a, conf, p, r) { - override def convertToGpu(child: Expression): GpuExpression = GpuRand(child) + override def convertToGpu(child: Expression): GpuExpression = + GpuRand(child, this.conf.isRetryContextCheckEnabled) }), expr[SparkPartitionID] ( "Returns the current partition id", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index c4199e3ea75..609155e4b86 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1643,6 +1643,15 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern") .booleanConf .createWithDefault(false) + val TEST_RETRY_CONTEXT_CHECK_ENABLED = conf("spark.rapids.sql.test.retryContextCheck.enabled") + .doc("Only to be used in tests. When set to true, enable the context check for " + + "GPU nondeterministic expressions but declaring to be retryable. A GPU retryable " + + "nondeterministic expression should run inside a checkpoint-restore context. And it " + + "will blow up when the context does not satisfy.") + .internal() + .booleanConf + .createWithDefault(false) + val TEST_CONF = conf("spark.rapids.sql.test.enabled") .doc("Intended to be used by unit tests, if enabled all operations must run on the " + "GPU or an error happens.") @@ -2733,6 +2742,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isTestEnabled: Boolean = get(TEST_CONF) + lazy val isRetryContextCheckEnabled: Boolean = get(TEST_RETRY_CONTEXT_CHECK_ENABLED) + lazy val isFoldableNonLitAllowed: Boolean = get(FOLDABLE_NON_LIT_ALLOWED) lazy val asyncWriteMaxInFlightHostMemoryBytes: Long = diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala index 04bc56af0c4..68a83cafd86 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RmmRapidsRetryIterator.scala @@ -606,6 +606,7 @@ object RmmRapidsRetryIterator extends Logging { var doSplit = false var isFromGpuOom = true while (result.isEmpty && attemptIter.hasNext) { + RetryStateTracker.setCurThreadRetrying(!firstAttempt) if (!firstAttempt) { // call thread block API try { @@ -685,6 +686,7 @@ object RmmRapidsRetryIterator extends Logging { // else another exception wrapped a retry. So we are going to try again } } + RetryStateTracker.clearCurThreadRetrying() if (result.isEmpty) { // then lastException must be set, throw it. throw lastException @@ -791,3 +793,21 @@ object RmmRapidsRetryIterator extends Logging { case class AutoCloseableTargetSize(targetSize: Long, minSize: Long) extends AutoCloseable { override def close(): Unit = () } + +/** + * This leverages a ThreadLocal of boolean to track if a task thread is currently + * executing a retry. And the boolean state will be used by all the + * `GpuExpressionRetryable`s to determine if the context is safe to retry the evaluation. + */ +object RetryStateTracker { + private val localIsRetrying = new ThreadLocal[java.lang.Boolean]() + + def isCurThreadRetrying: Boolean = { + val ret = localIsRetrying.get() + ret != null && ret + } + + def setCurThreadRetrying(retrying: Boolean): Unit = localIsRetrying.set(retrying) + + def clearCurThreadRetrying(): Unit = localIsRetrying.remove() +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala index efc59749d2d..415a171b034 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids.catalyst.expressions import ai.rapids.cudf.{DType, HostColumnVector, NvtxColor, NvtxRange} import com.nvidia.spark.Retryable -import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuLiteral} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuLiteral, RetryStateTracker} import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.shims.ShimUnaryExpression @@ -30,13 +30,51 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils import org.apache.spark.util.random.rapids.RapidsXORShiftRandom +/** + * An expression expected to be evaluated inside a retry with checkpoint-restore context. + * It will throw an exception if it is retried without being checkpointed. + * All the nondeterministic GPU expressions that support Retryable should extend from + * this trait. + */ +trait GpuExpressionRetryable extends GpuExpression with Retryable { + private var checked = false + + def doColumnarEval(batch: ColumnarBatch): GpuColumnVector + def doCheckpoint(): Unit + def doRestore(): Unit + + def doContextCheck(): Boolean // For tests + + override final def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + if (doContextCheck && !checked) { // This is for tests + throw new IllegalStateException( + "The Retryable was called outside of a checkpoint-restore context") + } + if (!checked && RetryStateTracker.isCurThreadRetrying) { + // It is retrying the evaluation without checkpointing, which is not allowed. + throw new IllegalStateException( + "The Retryable should be retried only inside a checkpoint-restore context") + } + doColumnarEval(batch) + } + + override final def checkpoint(): Unit = { + checked = true + doCheckpoint() + } + + override final def restore(): Unit = doRestore() +} + /** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */ -case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpression - with ExpectsInputTypes with ExpressionWithRandomSeed with Retryable { +case class GpuRand(child: Expression, doContextCheck: Boolean) extends ShimUnaryExpression + with ExpectsInputTypes with ExpressionWithRandomSeed with GpuExpressionRetryable { - def this() = this(GpuLiteral(Utils.random.nextLong(), LongType)) + def this(doContextCheck: Boolean) = this(GpuLiteral(Utils.random.nextLong(), LongType), + doContextCheck) - override def withNewSeed(seed: Long): GpuRand = GpuRand(GpuLiteral(seed, LongType)) + override def withNewSeed(seed: Long): GpuRand = GpuRand(GpuLiteral(seed, LongType), + doContextCheck) def seedExpression: Expression = child @@ -76,7 +114,7 @@ case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpres } } - override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + override def doColumnarEval(batch: ColumnarBatch): GpuColumnVector = { if (curXORShiftRandomSeed.isEmpty) { // checkpoint not called, need to init the random generator here initRandom() @@ -93,14 +131,14 @@ case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpres } } - override def checkpoint(): Unit = { + override def doCheckpoint(): Unit = { // In a task, checkpoint is called before columnarEval, so need to try to // init the random generator here. initRandom() curXORShiftRandomSeed = Some(rng.currentSeed) } - override def restore(): Unit = { + override def doRestore(): Unit = { assert(wasInitialized && curXORShiftRandomSeed.isDefined) rng.setHashedSeed(curXORShiftRandomSeed.get) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index c99d0403ed0..50dcdfb5ab5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -434,7 +434,9 @@ case class GpuArrowEvalPythonExec( new RebatchingRoundoffIterator(iter, inputSchema, targetBatchSize, numInputRows, numInputBatches)) val pyInputIterator = batchProducer.asIterator.map { batch => - withResource(batch)(GpuProjectExec.project(_, boundReferences)) + GpuProjectExec.projectAndCloseWithRetrySingleBatch( + SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY), + boundReferences) } if (isPythonOnGpuEnabled) { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala index d018726ef35..1b91d9736ee 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/NonDeterministicRetrySuite.scala @@ -38,8 +38,11 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase { Array(GpuColumnVector.from(ColumnVector.fromInts(ints: _*), IntegerType)), ints.length) } + private def newGpuRand(ctxCheck: Boolean=false) = + GpuRand(GpuLiteral(RAND_SEED, IntegerType), ctxCheck) + test("GPU rand outputs the same sequence with checkpoint and restore") { - val gpuRand = GpuRand(GpuLiteral(RAND_SEED, IntegerType)) + val gpuRand = newGpuRand() withResource(buildBatch()) { inputCB => // checkpoint the state gpuRand.checkpoint() @@ -65,8 +68,7 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase { } test("GPU project retry with GPU rand") { - def projectRand(): Seq[GpuExpression] = Seq( - GpuAlias(GpuRand(GpuLiteral(RAND_SEED)), "rand")()) + def projectRand(): Seq[GpuExpression] = Seq(GpuAlias(newGpuRand(), "rand")()) Seq(true, false).foreach { useTieredProject => val conf = new SQLConf() @@ -109,7 +111,7 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase { test("GPU filter retry with GPU rand") { def filterRand(): Seq[GpuExpression] = Seq( GpuGreaterThan( - GpuRand(GpuLiteral.create(RAND_SEED, IntegerType)), + newGpuRand(), GpuLiteral.create(0.1d, DoubleType))) Seq(true, false).foreach { useTieredProject => @@ -155,4 +157,70 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase { } } + test("GPU project with GPU rand for context check enabled") { + // We dont check the output correctness, so it is ok to reuse the bound expressions. + val boundCheckExprs = GpuBindReferences.bindGpuReferences( + Seq(GpuAlias(newGpuRand(true), "rand")()), + batchAttrs) + + // 1) Context check + no-retry + no checkpoint-restore + assertThrows[IllegalStateException] { + GpuProjectExec.projectAndClose(buildBatch(), boundCheckExprs, NoopMetric) + } + + // 2) Context check + retry + no checkpoint-restore + assertThrows[IllegalStateException] { + RmmRapidsRetryIterator.withRetryNoSplit(buildBatch()) { cb => + GpuProjectExec.project(cb, boundCheckExprs) + } + } + + // 3) Context check + retry + checkpoint-restore + // This is the expected usage for the GPU Rand. + Seq(true, false).foreach { forceOOM => + val scb = SpillableColumnarBatch(buildBatch(), SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + if (forceOOM) { // make a retrying really happen during the projection + RmmSpark.forceRetryOOM( + RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) + } + GpuProjectExec.projectAndCloseWithRetrySingleBatch(scb, boundCheckExprs).close() + } + } + + test("GPU project with GPU rand for context check disabled") { + // We dont check the output correctness, so it is ok to reuse the bound expressions. + val boundExprs = GpuBindReferences.bindGpuReferences( + Seq(GpuAlias(newGpuRand(false), "rand")()), + batchAttrs) + + // 1) No context check + no retry + no checkpoint-restore + // It works but not the expected usage for the GPU Rand + GpuProjectExec.projectAndClose(buildBatch(), boundExprs, NoopMetric).close() + + // 2) No context check + retry (no real retrying) + no checkpoint-restore + // It works but not the expected usage for the GPU Rand + RmmRapidsRetryIterator.withRetryNoSplit(buildBatch()) { cb => + GpuProjectExec.project(cb, boundExprs) + }.close() + + // 3) No context check + retry (A retrying happens) + no checkpoint-restore + assertThrows[IllegalStateException] { + val cb = buildBatch() + // Make a retrying really happen + RmmSpark.forceRetryOOM( + RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) + RmmRapidsRetryIterator.withRetryNoSplit(cb)(GpuProjectExec.project(_, boundExprs)) + } + + // 4) No context check + retry + checkpoint-restore + // This is the expected usage for the GPU Rand + Seq(true, false).foreach { forceOOM => + val scb = SpillableColumnarBatch(buildBatch(), SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + if (forceOOM) { // make a retrying really happen during the projection + RmmSpark.forceRetryOOM( + RmmSpark.getCurrentThreadId, 1, RmmSpark.OomInjectionType.GPU.ordinal, 0) + } + GpuProjectExec.projectAndCloseWithRetrySingleBatch(scb, boundExprs).close() + } + } }