Skip to content

Commit

Permalink
use vectorized parameter where possible
Browse files Browse the repository at this point in the history
Signed-off-by: Hongbin Ma (Mahone) <[email protected]>
  • Loading branch information
binmahone committed May 22, 2024
1 parent 7237cb6 commit eac691c
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,19 @@ object GpuOverrides extends Logging {
override def convertToGpu(child: Expression): GpuExpression =
GpuAlias(child, a.name)(a.exprId, a.qualifier, a.explicitMetadata)
}),
expr[BoundReference](
"Reference to a bound variable",
ExprChecks.projectAndAst(
TypeSig.astTypes + GpuTypeShims.additionalCommonOperatorSupportedTypes,
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.MAP + TypeSig.ARRAY + TypeSig.STRUCT +
TypeSig.DECIMAL_128 + TypeSig.BINARY +
GpuTypeShims.additionalCommonOperatorSupportedTypes).nested(),
TypeSig.all),
(currentRow, conf, p, r) => new ExprMeta[BoundReference](currentRow, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuBoundReference(
currentRow.ordinal, currentRow.dataType, currentRow.nullable)(
NamedExpression.newExprId, "")
}),
expr[AttributeReference](
"References an input column",
ExprChecks.projectAndAst(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ package org.apache.spark.sql.rapids.utils
import java.io.File
import java.util.TimeZone

import com.nvidia.spark.rapids.{GpuProjectExec, TestStats}
import com.nvidia.spark.rapids.{ ExprChecksImpl, GpuOverrides, GpuProjectExec, ProjectExprContext, TestStats, TypeEnum, TypeSig}
import org.apache.commons.io.{FileUtils => fu}
import org.apache.commons.math3.util.Precision
import org.scalactic.TripleEqualsSupport.Spread
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData, MapData,
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.utils.RapidsQueryTestUtil.isNaNOrInf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String


trait RapidsTestsTrait extends RapidsTestsCommonTrait {
Expand Down Expand Up @@ -221,17 +223,98 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
RapidsTestConstants.SUPPORTED_DATA_TYPE.acceptsType(expr.dataType)
}

/**
* Many of the expressions in RAPIDS do not support vectorized parameters(e.g. regexp_replace)
* So need to check whether the expression being evaluated is qualified for vectorized parameters
*
* If Yes, we'll use pass the parameters of the expression as vectors (Vectorized Parameter).
*
* If No, we'll replace all the parameters with literals (Scalar Parameter) and evaluate
* the expression. We're actually evaluating a constant expression tree in this case,
* but it's fine for testing purposes. Notice that we'll need to make sure Constant Folding is
* disabled.
*
* We always prefer Vectorized Parameters to evaluate expressions. Because Scalar Parameter
* may hide some bugs. For example, an expression `some_expr(NULL)` may correctly return NULL
* only because NullPropagation is working. But if we evaluate the expression with a vector
* containing NUll, it might fail.
*
* @param e the expression being evaluated
* @return true if the expression is qualified for vectorized parameters
*/
def isQualifiedForVectorizedParams(e: Expression): Boolean = {
val map = GpuOverrides.expressions
e.foreachUp(expr => {
logDebug(s"Checking expr $expr :\n")
if (!map.contains(expr.getClass)) {
logDebug(s"Check failed because ${expr.getClass} not found in GpuOverrides.expressions\n")
return false
}
map(expr.getClass).getChecks.foreach(check => {
if (check.isInstanceOf[ExprChecksImpl]) {
val exprChecksImpl = check.asInstanceOf[ExprChecksImpl]
if (!exprChecksImpl.contexts.contains(ProjectExprContext)) {
logDebug(s"Check failed because $exprChecksImpl does not contain ProjectExprContext\n")
return false
}
val context = exprChecksImpl.contexts(ProjectExprContext)
(context.paramCheck.map(_.cudf) ++ context.repeatingParamCheck.map(_.cudf))
.foreach(sig => {
// use reflection to get the private field litOnlyTypes
import scala.reflect.runtime.universe._
val mirror = runtimeMirror(sig.getClass.getClassLoader)
val privateFieldSymbol = typeOf[TypeSig].decl(TermName("litOnlyTypes")).asTerm
val privateFieldMirror =
mirror.reflect(sig).reflectField(privateFieldSymbol)
val litOnlyTypes = privateFieldMirror.get.asInstanceOf[TypeEnum.ValueSet]
if (litOnlyTypes.nonEmpty) {
logDebug(s"Check failed because non empty litOnlyTypes: $litOnlyTypes \n")
return false
}
})
} else {
logDebug(s"Check continues by skipping ${check.getClass}")
}
})
})
logDebug(s"Check succeed")
true
}

def rapidsCheckExpression(origExpr: Expression, expected: Any, inputRow: InternalRow): Unit = {
// many of the expressions in RAPIDS do not support vectorized parameters(e.g. regexp_replace).
// So we downgrade all expression evaluation to use scalar parameters.
// In a follow-up issue (https://github.com/NVIDIA/spark-rapids/issues/10859),
// we'll take care of the expressions those already support vectorized parameters.
val expression = origExpr.transformUp {
case BoundReference(ordinal, dataType, _) =>
Literal(inputRow.asInstanceOf[GenericInternalRow].get(ordinal, dataType), dataType)
var result : Array[Row] = null
var resultDF : DataFrame = null
var expression = origExpr

if(!isQualifiedForVectorizedParams(origExpr)) {
logInfo(s"$origExpr is being evaluated with Scalar Parameter")
println(s"$origExpr is being evaluated with Scalar Parameter")
expression = origExpr.transformUp {
case BoundReference(ordinal, dataType, _) =>
Literal(inputRow.asInstanceOf[GenericInternalRow].get(ordinal, dataType), dataType)
}
resultDF = _spark.range(0, 1).select(Column(expression))
result = resultDF.collect()
} else {
logInfo(s"$expression is being evaluated with Vectorized Parameter")
println(s"$expression is being evaluated with Vectorized Parameter")
val typeHintForOrdinal : Map[Int, DataType] = expression.collect {
// In spark UT testing expressions, they typically use `val s = 's.string.at(0)`
// to define a bound reference with type string.
case b: BoundReference => b.ordinal -> b.dataType
}.toMap
val df = if (inputRow != EmptyRow && inputRow != InternalRow.empty) {
convertInternalRowToDataFrame(inputRow, typeHintForOrdinal)
} else {
// create a fake useless DF
val schema = StructType(StructField("a", IntegerType, nullable = true) :: Nil)
val empData = Seq(Row(1))
_spark.createDataFrame(_spark.sparkContext.parallelize(empData), schema)
}
resultDF = df.select(Column(expression))
result = resultDF.collect()
}
val resultDF = _spark.range(0, 1).select(Column(expression))
val result = resultDF.collect()

TestStats.testUnitNumber = TestStats.testUnitNumber + 1
if (
checkDataTypeSupported(expression) &&
Expand Down Expand Up @@ -293,4 +376,54 @@ trait RapidsTestsTrait extends RapidsTestsCommonTrait {
}
true
}

def convertInternalRowToDataFrame(
inputRow: InternalRow, typeHintForOrdinal: Map[Int, DataType]) : DataFrame = {
val structFileSeq = new ArrayBuffer[StructField]()
val values = inputRow match {
case genericInternalRow: GenericInternalRow =>
genericInternalRow.values
case _ => throw new UnsupportedOperationException("Unsupported InternalRow.")
}
values.zipWithIndex.foreach { pair => {
if (typeHintForOrdinal.contains(pair._2)) {
structFileSeq.append(
StructField(s"col${pair._2}", typeHintForOrdinal(pair._2), pair._1 == null))
} else {
pair._1 match {
case boolean: java.lang.Boolean =>
structFileSeq.append(StructField(s"col${pair._2}", BooleanType, boolean == null))
case short: java.lang.Short =>
structFileSeq.append(StructField(s"col${pair._2}", ShortType, short == null))
case byte: java.lang.Byte =>
structFileSeq.append(StructField(s"col${pair._2}", ByteType, byte == null))
case integer: java.lang.Integer =>
structFileSeq.append(StructField(s"col${pair._2}", IntegerType, integer == null))
case long: java.lang.Long =>
structFileSeq.append(StructField(s"col${pair._2}", LongType, long == null))
case float: java.lang.Float =>
structFileSeq.append(StructField(s"col${pair._2}", FloatType, float == null))
case double: java.lang.Double =>
structFileSeq.append(StructField(s"col${pair._2}", DoubleType, double == null))
case utf8String: UTF8String =>
structFileSeq.append(StructField(s"col${pair._2}", StringType, utf8String == null))
case byteArr: Array[Byte] =>
structFileSeq.append(StructField(s"col${pair._2}", BinaryType, byteArr == null))
case decimal: Decimal =>
structFileSeq.append(
StructField(s"col${pair._2}", DecimalType(decimal.precision, decimal.scale),
decimal == null))
case null =>
structFileSeq.append(StructField(s"col${pair._2}", NullType, nullable = true))
case unsupported@_ =>
throw new UnsupportedOperationException(s"Unsupported type: ${unsupported.getClass}")
}
}
}
}
val fields = structFileSeq.toSeq
_spark.internalCreateDataFrame(
_spark.sparkContext.parallelize(Seq(inputRow)),
StructType(fields))
}
}

0 comments on commit eac691c

Please sign in to comment.