Skip to content

Commit

Permalink
Merge branch 'hash_agg_mr' into 'master'
Browse files Browse the repository at this point in the history
Basic hash aggregate support: min, max, avg, count, first, last

See merge request nvspark/rapids-plugin-4-spark!14
  • Loading branch information
abellina committed Aug 8, 2019
2 parents 9d03171 + b90db75 commit bdcdd3a
Show file tree
Hide file tree
Showing 12 changed files with 1,413 additions and 84 deletions.
12 changes: 8 additions & 4 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuExpressions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package ai.rapids.spark

import ai.rapids.cudf.{BinaryOp, BinaryOperable, DType, Scalar, TimeUnit, UnaryOp}

import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, UnaryExpression}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, UnaryExpression, Unevaluable}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}

trait GpuExpression extends Expression {
/**
Expand All @@ -39,12 +38,17 @@ trait GpuExpression extends Expression {
if (!super.equals(other)) {
return false
}
return other.isInstanceOf[GpuBinaryExpression]
return other.isInstanceOf[GpuExpression]
}

override def hashCode(): Int = super.hashCode()
}

trait GpuUnevaluable extends Unevaluable with GpuExpression {
final override def columnarEval(batch: ColumnarBatch): Any =
throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this")
}

trait GpuUnaryExpression extends UnaryExpression with GpuExpression {
def doColumnar(input: GpuColumnVector): GpuColumnVector
def outputTypeOverride: DType = null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.rapids.spark

import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.vectorized.ColumnarBatch

// This will ensure that:
// - input NaNs become Float.NaN, or Double.NaN
// - that -0.0f and -0.0d becomes 0.0f, and 0.0d respectively
// TODO: need coalesce as a feature request in cudf
class GpuNormalizeNaNAndZero(child: GpuExpression) extends NormalizeNaNAndZero(child)
with GpuExpression {
override def columnarEval(input: ColumnarBatch): Any = child.columnarEval(input)
}
195 changes: 192 additions & 3 deletions sql-plugin/src/main/scala/ai/rapids/spark/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import ai.rapids.spark
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.catalyst.optimizer._

import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
Expand All @@ -35,7 +39,6 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.sources.v2.reader.Scan
import org.apache.spark.sql.types._
import org.apache.spark.{ExecutorPlugin, SparkEnv}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec

trait GpuExec extends SparkPlan {
override def supportsColumnar = true
Expand Down Expand Up @@ -251,6 +254,28 @@ class ExprRuleBuilder[INPUT <: Expression](implicit val tag: ClassTag[INPUT])
})
}

/**
*
* Set a simple conversion function for a [[AggregateExpression]].
* @param func takes the already converted aggregate function, the AggregateMode (Partial, Final, PartialMerge),
* whether it is distinct or not, its expression id (resultId)
* @return this for chaining
*/
final def aggregate(func: (GpuAggregateFunction, AggregateMode, Boolean, ExprId) => GpuExpression): ExprRuleBuilder[INPUT] = {
if (!classOf[AggregateExpression].isAssignableFrom(tag.runtimeClass)) {
throw new IllegalStateException(s"aggregate called on a class that is not" +
s" a AggregateExpression ${tag}")
}
convert((exp, overrides) => {
val aggExp = exp.asInstanceOf[AggregateExpression]
if (aggExp.isDistinct) {
throw new CannotReplaceException("distinct aggregates are not supported")
}
val aggFn = overrides.replaceWithGpuAggregate(aggExp.aggregateFunction)
func(aggFn, aggExp.mode, aggExp.isDistinct, aggExp.resultId)
})
}

/**
* Build the final rule.
* @return the rule along with the class it is replacing.
Expand Down Expand Up @@ -382,14 +407,55 @@ class ExecRuleBuilder[INPUT <: SparkPlan](implicit val tag: ClassTag[INPUT])
}
}

/**
* Holds everything that is needed to replace a [[AggregateFunction]] with a GPU enabled version.
*/
class AggRule[INPUT <: AggregateFunction](
doConvert: (INPUT, GpuOverrides) => GpuAggregateFunction,
doAssertIsAllowed: (INPUT, RapidsConf) => Unit,
isIncompat: Boolean,
incompatDoc: String,
desc: String,
tag: ClassTag[INPUT])
extends ReplacementRule[INPUT, AggregateFunction, GpuAggregateFunction](doConvert,
doAssertIsAllowed, isIncompat, incompatDoc, desc, tag){

override val confKeyPart: String = "agg"
override val operationName: String = "aggregation"
}
/**
* Builds an [[AggRule]] from the given inputs.
*
* @param tag implicitly set value of the INPUT type
* @tparam INPUT the type of INPUT [[AggregateFunction]] this rule will replace.
*/
class AggRuleBuilder[INPUT <: AggregateFunction](implicit val tag: ClassTag[INPUT])
extends ReplacementRuleBuilder[INPUT, GpuAggregateFunction] {

/**
* Build the final rule.
* @return the rule along with the class it is replacing.
*/
final def build(): (Class[_ <: AggregateFunction], AggRule[_ <: AggregateFunction]) = {
if (doConvert == null) {
throw new IllegalStateException(s"Conversion function for ${tag} was not set")
}
(tag.runtimeClass.asSubclass(classOf[AggregateFunction]),
new AggRule[INPUT](doConvert, doAssertIsAllowed, isIncompat, incompatDoc, desc, tag))
}
}

object GpuOverrides {
val FLOAT_DIFFERS_INCOMPAT =
"floating point results in some cases may differ with the JVM version by a small amount"
val FLOAT_DIFFERS_GROUP_INCOMPAT =
"when enabling these, there may be extra groups produced for floating point grouping keys (e.g. -0.0, and 0.0)"
val DIVIDE_BY_ZERO_INCOMPAT = "divide by 0 does not result in null"

def isStringLit(exp: Expression): Boolean = exp match {
case Literal(_, StringType) => true
case a: Alias => isStringLit(a.child)
case a: AttributeReference => a.dataType == StringType
case _ => false
}

Expand All @@ -415,6 +481,10 @@ object GpuOverrides {
new ExecRuleBuilder[INPUT]()
}

def agg[INPUT <: AggregateFunction](implicit tag: ClassTag[INPUT]): AggRuleBuilder[INPUT] = {
new AggRuleBuilder[INPUT]()
}

val expressions : Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Map(
expr[Literal]
.convert((lit, overrides) => new GpuLiteral(lit.value, lit.dataType))
Expand Down Expand Up @@ -527,6 +597,16 @@ object GpuOverrides {
.desc("tangent")
.incompat(FLOAT_DIFFERS_INCOMPAT)
.build(),
expr[NormalizeNaNAndZero]
.unary(new GpuNormalizeNaNAndZero(_))
.desc("normalize nan and zero")
.incompat(FLOAT_DIFFERS_GROUP_INCOMPAT)
.build(),
expr[KnownFloatingPointNormalized]
.unary(new GpuKnownFloatingPointNormalized(_))
.desc("tag to prevent redundant normalization")
.incompat(FLOAT_DIFFERS_GROUP_INCOMPAT)
.build(),
expr[Add]
.binary(new GpuAdd(_, _))
.desc("addition")
Expand Down Expand Up @@ -586,6 +666,10 @@ object GpuOverrides {
.binary(new GpuRemainder(_, _))
.desc("remainder or modulo")
.incompat(DIVIDE_BY_ZERO_INCOMPAT)
.build(),
expr[AggregateExpression]
.aggregate(new GpuAggregateExpression(_, _, _, _))
.desc("aggregate expression")
.build()
)

Expand Down Expand Up @@ -653,7 +737,7 @@ object GpuOverrides {
.desc("The backend for most select, withColumn and dropColumn statements")
.assertIsAllowed((proj, conf) =>
if (isAnyStringLit(proj.expressions)) {
throw new CannotReplaceException("String literal values are not supported in a projection")
throw new CannotReplaceException("string literal values are not supported in a projection")
})
.build(),
exec[BatchScanExec]
Expand All @@ -680,6 +764,89 @@ object GpuOverrides {
new GpuUnionExec(union.children.map(overrides.replaceWithGpuPlan)))
.desc("The backend for the union operator")
.build(),
exec[HashAggregateExec]
.convert((hashAgg, overrides) => {
new GpuHashAggregateExec(
hashAgg.requiredChildDistributionExpressions.map(_.map(overrides.replaceWithGpuExpression)),
hashAgg.groupingExpressions.map(overrides.replaceWithGpuExpression),
hashAgg.aggregateExpressions.map(overrides.replaceWithGpuExpression).asInstanceOf[Seq[GpuAggregateExpression]],
hashAgg.aggregateAttributes.map(overrides.replaceWithGpuExpression).asInstanceOf[Seq[GpuAttributeReference]],
hashAgg.initialInputBufferOffset,
hashAgg.resultExpressions.map(overrides.replaceWithGpuExpression),
overrides.replaceWithGpuPlan(hashAgg.child))
})
.assertIsAllowed((hashAgg, conf) => {
// The StringType check was added due to core dump in the latest branch while running the mortgage tests
// in ~NVStrings.
//
// Note that we replace only if resultExpressions is non empty. This is due to a case
// where HashAggregateExec nodes can be added with empty result expression (the MortgageTest exposes this).
// The only choice should be to group by the grouping expressions, and use the grouping
// key + the aggregate expressions as the result, but for now lets not handle it until
// we can shed some light (in our tests the HashAggregateExec node without result expression appears
// to go away after the plugin anyway)
if (isAnyStringLit(hashAgg.groupingExpressions)) {
throw new CannotReplaceException("string literal values are not supported in a hash aggregate")
}
if (hashAgg.resultExpressions.isEmpty) {
throw new CannotReplaceException("result expressions is empty")
}
})
.desc("The backend for hash based aggregations")
.build()
)

val aggs: Map[Class[_ <: AggregateFunction], AggRule[_ <: AggregateFunction]] = Map(
// declarative aggregates
agg[Count]
.assertIsAllowed((count, conf) =>
if (!count.children.forall(_.isInstanceOf[Literal])) {
throw new CannotReplaceException("only count('*') or count(1) supported")
})
.convert((count, overrides) =>
new GpuCount(count.children.map(overrides.replaceWithGpuExpression)))
.desc("count aggregate operator")
.build(),
agg[Max]
.convert((max, overrides) =>
new GpuMax(overrides.replaceWithGpuExpression(max.child)))
.desc("max aggregate operator")
.build(),
agg[Min]
.convert((min, overrides) =>
new GpuMin(overrides.replaceWithGpuExpression(min.child)))
.desc("min aggregate operator")
.build(),
agg[First]
.assertIsAllowed((first, conf) =>
if (first.ignoreNullsExpr.semanticEquals(Literal(false))) {
throw new CannotReplaceException("including nulls is not supported, use first(col, true)")
})
.convert((first, overrides) =>
new GpuFirst(overrides.replaceWithGpuExpression(first.child),
isIgnoreNulls = overrides.replaceWithGpuExpression(first.ignoreNullsExpr)))
.desc("first aggregate operator")
.build(),
agg[Last]
.assertIsAllowed((last, conf) =>
if (last.ignoreNullsExpr.semanticEquals(Literal(false))) {
throw new CannotReplaceException("including nulls is not supported, use last(col, true)")
})
.convert((last, overrides) =>
new GpuLast(overrides.replaceWithGpuExpression(last.child),
isIgnoreNulls = overrides.replaceWithGpuExpression(last.ignoreNullsExpr)))
.desc("last aggregate operator")
.build(),
agg[Sum]
.convert((sum, overrides) =>
new GpuSum(overrides.replaceWithGpuExpression(sum.child)))
.desc("sum aggregate operator")
.build(),
agg[Average]
.convert((avg, overrides) =>
new GpuAverage(overrides.replaceWithGpuExpression(avg.child)))
.desc("average aggregate operator")
.build(),
)
}

Expand Down Expand Up @@ -815,6 +982,29 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
plan.withNewChildren(plan.children.map(replaceWithGpuPlan))
}

def replaceWithGpuAggregate(agg: AggregateFunction): GpuAggregateFunction = {
val rule = GpuOverrides.aggs.getOrElse(agg.getClass,
throw new CannotReplaceException(s"no GPU enabled version of aggregate type" +
s" ${agg.getClass.getName} could be found"))

if (!conf.isOperatorEnabled(rule.confKey, rule.isIncompat)) {
if (rule.isIncompat && !conf.isIncompatEnabled) {
throw new CannotReplaceException(s"the GPU version of aggregation type ${agg.getClass.getName}" +
s" is not 100% compatible with the Spark version. ${rule.incompatDoc}. To enable this" +
s" operator despite the incompatibilities please set the config" +
s" ${rule.confKey} to true. You could also set ${RapidsConf.INCOMPATIBLE_OPS} to true" +
s" to enable all incompatible ops")
} else {
throw new CannotReplaceException(s"The input type ${agg.getClass} has been" +
s" disabled. To enable it set ${rule.confKey} to true")
}
}

rule.assertIsAllowed(agg, conf)

rule.convert(agg, this)
}

override def apply(plan: SparkPlan) :SparkPlan = {
conf = new RapidsConf(plan.conf)
if (conf.isSqlEnabled) {
Expand Down Expand Up @@ -872,7 +1062,6 @@ case class GpuTransitionOverrides() extends Rule[SparkPlan] {
}
case _: GpuColumnarToRowExec => () // Ignored
case _: ShuffleExchangeExec => () // Ignored for now
case _: HashAggregateExec => () // Ignored for now
case _ =>
if (!plan.supportsColumnar) {
throw new IllegalArgumentException(s"Part of the plan is not columnar ${plan.getClass}\n${plan}")
Expand Down
1 change: 1 addition & 0 deletions sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ object RapidsConf {
GpuOverrides.execs.values.foreach(_.confHelp())
GpuOverrides.scans.values.foreach(_.confHelp())
GpuOverrides.parts.values.foreach(_.confHelp())
GpuOverrides.aggs.values.foreach(_.confHelp())
}
}

Expand Down
Loading

0 comments on commit bdcdd3a

Please sign in to comment.