From b90db75d09eca928d3fa7553f76ff7f21437713c Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Thu, 8 Aug 2019 12:28:55 -0700 Subject: [PATCH] Basic hash aggregate support: min, max, avg, count, first, last --- .../ai/rapids/spark/GpuExpressions.scala | 12 +- .../spark/NormalizeFloatingNumbers.scala | 29 + .../main/scala/ai/rapids/spark/Plugin.scala | 195 ++++- .../scala/ai/rapids/spark/RapidsConf.scala | 1 + .../scala/ai/rapids/spark/aggregate.scala | 756 ++++++++++++++++++ .../rapids/spark/constraintExpressions.scala | 27 + sql-plugin/src/test/resources/dates.csv | 6 + sql-plugin/src/test/resources/floats.csv | 10 +- sql-plugin/src/test/resources/ints.csv | 7 + .../src/test/resources/nullable_floats.csv | 4 +- sql-plugin/src/test/resources/shorts.csv | 8 + .../spark/SparkQueryCompareTestSuite.scala | 442 ++++++++-- 12 files changed, 1413 insertions(+), 84 deletions(-) create mode 100644 sql-plugin/src/main/scala/ai/rapids/spark/NormalizeFloatingNumbers.scala create mode 100644 sql-plugin/src/main/scala/ai/rapids/spark/aggregate.scala create mode 100644 sql-plugin/src/main/scala/ai/rapids/spark/constraintExpressions.scala create mode 100644 sql-plugin/src/test/resources/dates.csv create mode 100644 sql-plugin/src/test/resources/ints.csv create mode 100644 sql-plugin/src/test/resources/shorts.csv diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuExpressions.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuExpressions.scala index c7d763690bb..ce07600f1a0 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuExpressions.scala @@ -17,9 +17,8 @@ package ai.rapids.spark import ai.rapids.cudf.{BinaryOp, BinaryOperable, DType, Scalar, TimeUnit, UnaryOp} - -import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, UnaryExpression} -import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression, UnaryExpression, Unevaluable} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} trait GpuExpression extends Expression { /** @@ -39,12 +38,17 @@ trait GpuExpression extends Expression { if (!super.equals(other)) { return false } - return other.isInstanceOf[GpuBinaryExpression] + return other.isInstanceOf[GpuExpression] } override def hashCode(): Int = super.hashCode() } +trait GpuUnevaluable extends Unevaluable with GpuExpression { + final override def columnarEval(batch: ColumnarBatch): Any = + throw new UnsupportedOperationException(s"Cannot columnar evaluate expression: $this") +} + trait GpuUnaryExpression extends UnaryExpression with GpuExpression { def doColumnar(input: GpuColumnVector): GpuColumnVector def outputTypeOverride: DType = null diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/NormalizeFloatingNumbers.scala b/sql-plugin/src/main/scala/ai/rapids/spark/NormalizeFloatingNumbers.scala new file mode 100644 index 00000000000..fb8a712a29a --- /dev/null +++ b/sql-plugin/src/main/scala/ai/rapids/spark/NormalizeFloatingNumbers.scala @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.spark + +import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero +import org.apache.spark.sql.vectorized.ColumnarBatch + +// This will ensure that: +// - input NaNs become Float.NaN, or Double.NaN +// - that -0.0f and -0.0d becomes 0.0f, and 0.0d respectively +// TODO: need coalesce as a feature request in cudf +class GpuNormalizeNaNAndZero(child: GpuExpression) extends NormalizeNaNAndZero(child) + with GpuExpression { + override def columnarEval(input: ColumnarBatch): Any = child.columnarEval(input) +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/Plugin.scala b/sql-plugin/src/main/scala/ai/rapids/spark/Plugin.scala index 0c3ad244351..790027005d2 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/Plugin.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/Plugin.scala @@ -24,9 +24,13 @@ import ai.rapids.spark import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.catalyst.optimizer._ + import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan @@ -35,7 +39,6 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.sources.v2.reader.Scan import org.apache.spark.sql.types._ import org.apache.spark.{ExecutorPlugin, SparkEnv} -import org.apache.spark.sql.execution.aggregate.HashAggregateExec trait GpuExec extends SparkPlan { override def supportsColumnar = true @@ -251,6 +254,28 @@ class ExprRuleBuilder[INPUT <: Expression](implicit val tag: ClassTag[INPUT]) }) } + /** + * + * Set a simple conversion function for a [[AggregateExpression]]. + * @param func takes the already converted aggregate function, the AggregateMode (Partial, Final, PartialMerge), + * whether it is distinct or not, its expression id (resultId) + * @return this for chaining + */ + final def aggregate(func: (GpuAggregateFunction, AggregateMode, Boolean, ExprId) => GpuExpression): ExprRuleBuilder[INPUT] = { + if (!classOf[AggregateExpression].isAssignableFrom(tag.runtimeClass)) { + throw new IllegalStateException(s"aggregate called on a class that is not" + + s" a AggregateExpression ${tag}") + } + convert((exp, overrides) => { + val aggExp = exp.asInstanceOf[AggregateExpression] + if (aggExp.isDistinct) { + throw new CannotReplaceException("distinct aggregates are not supported") + } + val aggFn = overrides.replaceWithGpuAggregate(aggExp.aggregateFunction) + func(aggFn, aggExp.mode, aggExp.isDistinct, aggExp.resultId) + }) + } + /** * Build the final rule. * @return the rule along with the class it is replacing. @@ -382,14 +407,55 @@ class ExecRuleBuilder[INPUT <: SparkPlan](implicit val tag: ClassTag[INPUT]) } } +/** + * Holds everything that is needed to replace a [[AggregateFunction]] with a GPU enabled version. + */ +class AggRule[INPUT <: AggregateFunction]( + doConvert: (INPUT, GpuOverrides) => GpuAggregateFunction, + doAssertIsAllowed: (INPUT, RapidsConf) => Unit, + isIncompat: Boolean, + incompatDoc: String, + desc: String, + tag: ClassTag[INPUT]) + extends ReplacementRule[INPUT, AggregateFunction, GpuAggregateFunction](doConvert, + doAssertIsAllowed, isIncompat, incompatDoc, desc, tag){ + + override val confKeyPart: String = "agg" + override val operationName: String = "aggregation" +} +/** + * Builds an [[AggRule]] from the given inputs. + * + * @param tag implicitly set value of the INPUT type + * @tparam INPUT the type of INPUT [[AggregateFunction]] this rule will replace. + */ +class AggRuleBuilder[INPUT <: AggregateFunction](implicit val tag: ClassTag[INPUT]) + extends ReplacementRuleBuilder[INPUT, GpuAggregateFunction] { + + /** + * Build the final rule. + * @return the rule along with the class it is replacing. + */ + final def build(): (Class[_ <: AggregateFunction], AggRule[_ <: AggregateFunction]) = { + if (doConvert == null) { + throw new IllegalStateException(s"Conversion function for ${tag} was not set") + } + (tag.runtimeClass.asSubclass(classOf[AggregateFunction]), + new AggRule[INPUT](doConvert, doAssertIsAllowed, isIncompat, incompatDoc, desc, tag)) + } +} + object GpuOverrides { val FLOAT_DIFFERS_INCOMPAT = "floating point results in some cases may differ with the JVM version by a small amount" + val FLOAT_DIFFERS_GROUP_INCOMPAT = + "when enabling these, there may be extra groups produced for floating point grouping keys (e.g. -0.0, and 0.0)" val DIVIDE_BY_ZERO_INCOMPAT = "divide by 0 does not result in null" def isStringLit(exp: Expression): Boolean = exp match { case Literal(_, StringType) => true case a: Alias => isStringLit(a.child) + case a: AttributeReference => a.dataType == StringType case _ => false } @@ -415,6 +481,10 @@ object GpuOverrides { new ExecRuleBuilder[INPUT]() } + def agg[INPUT <: AggregateFunction](implicit tag: ClassTag[INPUT]): AggRuleBuilder[INPUT] = { + new AggRuleBuilder[INPUT]() + } + val expressions : Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Map( expr[Literal] .convert((lit, overrides) => new GpuLiteral(lit.value, lit.dataType)) @@ -527,6 +597,16 @@ object GpuOverrides { .desc("tangent") .incompat(FLOAT_DIFFERS_INCOMPAT) .build(), + expr[NormalizeNaNAndZero] + .unary(new GpuNormalizeNaNAndZero(_)) + .desc("normalize nan and zero") + .incompat(FLOAT_DIFFERS_GROUP_INCOMPAT) + .build(), + expr[KnownFloatingPointNormalized] + .unary(new GpuKnownFloatingPointNormalized(_)) + .desc("tag to prevent redundant normalization") + .incompat(FLOAT_DIFFERS_GROUP_INCOMPAT) + .build(), expr[Add] .binary(new GpuAdd(_, _)) .desc("addition") @@ -586,6 +666,10 @@ object GpuOverrides { .binary(new GpuRemainder(_, _)) .desc("remainder or modulo") .incompat(DIVIDE_BY_ZERO_INCOMPAT) + .build(), + expr[AggregateExpression] + .aggregate(new GpuAggregateExpression(_, _, _, _)) + .desc("aggregate expression") .build() ) @@ -653,7 +737,7 @@ object GpuOverrides { .desc("The backend for most select, withColumn and dropColumn statements") .assertIsAllowed((proj, conf) => if (isAnyStringLit(proj.expressions)) { - throw new CannotReplaceException("String literal values are not supported in a projection") + throw new CannotReplaceException("string literal values are not supported in a projection") }) .build(), exec[BatchScanExec] @@ -680,6 +764,89 @@ object GpuOverrides { new GpuUnionExec(union.children.map(overrides.replaceWithGpuPlan))) .desc("The backend for the union operator") .build(), + exec[HashAggregateExec] + .convert((hashAgg, overrides) => { + new GpuHashAggregateExec( + hashAgg.requiredChildDistributionExpressions.map(_.map(overrides.replaceWithGpuExpression)), + hashAgg.groupingExpressions.map(overrides.replaceWithGpuExpression), + hashAgg.aggregateExpressions.map(overrides.replaceWithGpuExpression).asInstanceOf[Seq[GpuAggregateExpression]], + hashAgg.aggregateAttributes.map(overrides.replaceWithGpuExpression).asInstanceOf[Seq[GpuAttributeReference]], + hashAgg.initialInputBufferOffset, + hashAgg.resultExpressions.map(overrides.replaceWithGpuExpression), + overrides.replaceWithGpuPlan(hashAgg.child)) + }) + .assertIsAllowed((hashAgg, conf) => { + // The StringType check was added due to core dump in the latest branch while running the mortgage tests + // in ~NVStrings. + // + // Note that we replace only if resultExpressions is non empty. This is due to a case + // where HashAggregateExec nodes can be added with empty result expression (the MortgageTest exposes this). + // The only choice should be to group by the grouping expressions, and use the grouping + // key + the aggregate expressions as the result, but for now lets not handle it until + // we can shed some light (in our tests the HashAggregateExec node without result expression appears + // to go away after the plugin anyway) + if (isAnyStringLit(hashAgg.groupingExpressions)) { + throw new CannotReplaceException("string literal values are not supported in a hash aggregate") + } + if (hashAgg.resultExpressions.isEmpty) { + throw new CannotReplaceException("result expressions is empty") + } + }) + .desc("The backend for hash based aggregations") + .build() + ) + + val aggs: Map[Class[_ <: AggregateFunction], AggRule[_ <: AggregateFunction]] = Map( + // declarative aggregates + agg[Count] + .assertIsAllowed((count, conf) => + if (!count.children.forall(_.isInstanceOf[Literal])) { + throw new CannotReplaceException("only count('*') or count(1) supported") + }) + .convert((count, overrides) => + new GpuCount(count.children.map(overrides.replaceWithGpuExpression))) + .desc("count aggregate operator") + .build(), + agg[Max] + .convert((max, overrides) => + new GpuMax(overrides.replaceWithGpuExpression(max.child))) + .desc("max aggregate operator") + .build(), + agg[Min] + .convert((min, overrides) => + new GpuMin(overrides.replaceWithGpuExpression(min.child))) + .desc("min aggregate operator") + .build(), + agg[First] + .assertIsAllowed((first, conf) => + if (first.ignoreNullsExpr.semanticEquals(Literal(false))) { + throw new CannotReplaceException("including nulls is not supported, use first(col, true)") + }) + .convert((first, overrides) => + new GpuFirst(overrides.replaceWithGpuExpression(first.child), + isIgnoreNulls = overrides.replaceWithGpuExpression(first.ignoreNullsExpr))) + .desc("first aggregate operator") + .build(), + agg[Last] + .assertIsAllowed((last, conf) => + if (last.ignoreNullsExpr.semanticEquals(Literal(false))) { + throw new CannotReplaceException("including nulls is not supported, use last(col, true)") + }) + .convert((last, overrides) => + new GpuLast(overrides.replaceWithGpuExpression(last.child), + isIgnoreNulls = overrides.replaceWithGpuExpression(last.ignoreNullsExpr))) + .desc("last aggregate operator") + .build(), + agg[Sum] + .convert((sum, overrides) => + new GpuSum(overrides.replaceWithGpuExpression(sum.child))) + .desc("sum aggregate operator") + .build(), + agg[Average] + .convert((avg, overrides) => + new GpuAverage(overrides.replaceWithGpuExpression(avg.child))) + .desc("average aggregate operator") + .build(), ) } @@ -815,6 +982,29 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { plan.withNewChildren(plan.children.map(replaceWithGpuPlan)) } + def replaceWithGpuAggregate(agg: AggregateFunction): GpuAggregateFunction = { + val rule = GpuOverrides.aggs.getOrElse(agg.getClass, + throw new CannotReplaceException(s"no GPU enabled version of aggregate type" + + s" ${agg.getClass.getName} could be found")) + + if (!conf.isOperatorEnabled(rule.confKey, rule.isIncompat)) { + if (rule.isIncompat && !conf.isIncompatEnabled) { + throw new CannotReplaceException(s"the GPU version of aggregation type ${agg.getClass.getName}" + + s" is not 100% compatible with the Spark version. ${rule.incompatDoc}. To enable this" + + s" operator despite the incompatibilities please set the config" + + s" ${rule.confKey} to true. You could also set ${RapidsConf.INCOMPATIBLE_OPS} to true" + + s" to enable all incompatible ops") + } else { + throw new CannotReplaceException(s"The input type ${agg.getClass} has been" + + s" disabled. To enable it set ${rule.confKey} to true") + } + } + + rule.assertIsAllowed(agg, conf) + + rule.convert(agg, this) + } + override def apply(plan: SparkPlan) :SparkPlan = { conf = new RapidsConf(plan.conf) if (conf.isSqlEnabled) { @@ -872,7 +1062,6 @@ case class GpuTransitionOverrides() extends Rule[SparkPlan] { } case _: GpuColumnarToRowExec => () // Ignored case _: ShuffleExchangeExec => () // Ignored for now - case _: HashAggregateExec => () // Ignored for now case _ => if (!plan.supportsColumnar) { throw new IllegalArgumentException(s"Part of the plan is not columnar ${plan.getClass}\n${plan}") diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala b/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala index 6205fc89200..86b79a5de0d 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/RapidsConf.scala @@ -175,6 +175,7 @@ object RapidsConf { GpuOverrides.execs.values.foreach(_.confHelp()) GpuOverrides.scans.values.foreach(_.confHelp()) GpuOverrides.parts.values.foreach(_.confHelp()) + GpuOverrides.aggs.values.foreach(_.confHelp()) } } diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/aggregate.scala b/sql-plugin/src/main/scala/ai/rapids/spark/aggregate.scala new file mode 100644 index 00000000000..0ac51905c9d --- /dev/null +++ b/sql-plugin/src/main/scala/ai/rapids/spark/aggregate.scala @@ -0,0 +1,756 @@ +/* + * Copyright (c) 2019, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.spark + +import ai.rapids.cudf +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, ExprId, Expression, NamedExpression} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.types.{DataType, DoubleType, LongType} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} + +trait GpuAggregateFunction extends AggregateFunction with GpuUnevaluable { + // using the child reference, define the shape of the vectors sent to + // the update/merge expressions + val inputProjection: Seq[GpuExpression] + + // update: first half of the aggregation (count = count) + val updateExpressions: Seq[GpuExpression] + + // merge: second half of the aggregation (count = sum). Also use to merge multiple batches. + val mergeExpressions: Seq[GpuExpression] + + // mostly likely a pass through (count => sum we merged above). + // average has a more interesting expression to compute the division of sum/count + val finalExpression: GpuExpression + + // returns the attribute references associated with this function + // given a mode of aggregation + def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] +} + +class GpuAggregateExpression(override val aggregateFunction: GpuAggregateFunction, + mode: AggregateMode, + isDistinct: Boolean, + resultId: ExprId) + extends AggregateExpression(aggregateFunction, mode, isDistinct, resultId) + with GpuUnevaluable { + + override def equals(other: Any): Boolean = { + if (!super.equals(other)) { + return false + } + other.isInstanceOf[GpuAggregateExpression] + } +} + +abstract case class CudfAggregate(ref: GpuExpression) extends GpuUnevaluable { + // we use this to get the ordinal of the bound reference, s.t. we can ask cudf to perform + // the aggregate on that column + def getOrdinal(ref: GpuExpression) = ref.asInstanceOf[GpuBoundReference].ordinal + val updateReductionAggregates: cudf.ColumnVector => Seq[cudf.Scalar] + val mergeReductionAggregates: cudf.ColumnVector => Seq[cudf.Scalar] + val updateAggregates: Seq[cudf.Aggregate] + val mergeAggregates: Seq[cudf.Aggregate] + + def dataType: DataType = ref.dataType + def nullable: Boolean = ref.nullable + def children: Seq[Expression] = ref :: Nil +} + +class CudfCount(ref: GpuExpression) extends CudfAggregate(ref) { + val updateReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => cudf.Scalar.fromLong(col.getRowCount) :: Nil + val mergeReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.sum :: Nil + lazy val updateAggregates: List[cudf.Aggregate] = + cudf.Table.count(getOrdinal(ref)) :: Nil + lazy val mergeAggregates: List[cudf.Aggregate] = + cudf.Table.sum(getOrdinal(ref)) :: Nil +} + +class CudfSum(ref: GpuExpression) extends CudfAggregate(ref) { + val updateReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.sum :: Nil + val mergeReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.sum :: Nil + lazy val updateAggregates: List[cudf.Aggregate] = + cudf.Table.sum(getOrdinal(ref)) :: Nil + lazy val mergeAggregates: List[cudf.Aggregate] = + cudf.Table.sum(getOrdinal(ref)) :: Nil +} + +class CudfMax(ref: GpuExpression) extends CudfAggregate(ref) { + val updateReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.max :: Nil + val mergeReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.max :: Nil + lazy val updateAggregates: List[cudf.Aggregate] = + cudf.Table.max(getOrdinal(ref)) :: Nil + lazy val mergeAggregates: List[cudf.Aggregate] = + cudf.Table.max(getOrdinal(ref)) :: Nil +} + +class CudfMin(ref: GpuExpression) extends CudfAggregate(ref) { + val updateReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.min :: Nil + val mergeReductionAggregates: cudf.ColumnVector => List[cudf.Scalar] = + (col: cudf.ColumnVector) => col.min :: Nil + lazy val updateAggregates: List[cudf.Aggregate] = + cudf.Table.min(getOrdinal(ref)) :: Nil + lazy val mergeAggregates: List[cudf.Aggregate] = + cudf.Table.min(getOrdinal(ref)):: Nil +} + +trait GpuDeclarativeAggregate extends GpuAggregateFunction + +class GpuMin(child: GpuExpression) extends Min(child) + with GpuDeclarativeAggregate { + private lazy val cudfMin = new GpuAttributeReference("cudf_min", child.dataType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(child) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMin(cudfMin)) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfMin(cudfMin)) + override lazy val finalExpression: GpuExpression = cudfMin + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = cudfMin :: Nil +} + +class GpuMax(child: GpuExpression) extends Max(child) + with GpuDeclarativeAggregate { + private lazy val cudfMax = new GpuAttributeReference("cudf_max", child.dataType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(child) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax)) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax)) + override lazy val finalExpression: GpuExpression = cudfMax + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = cudfMax :: Nil +} + +class GpuSum(child: GpuExpression) extends Sum(child) + with GpuDeclarativeAggregate { + // copied this from Sum.scala + private lazy val resultType = child.dataType match { + //case DecimalType.Fixed(precision, scale) => + // DecimalType.bounded(precision + 10, scale) + case _: Integral[_] => LongType + case _ => DoubleType + } + + private lazy val cudfSum = new GpuAttributeReference("cudf_sum", resultType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(child) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum)) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum)) + override lazy val finalExpression: GpuExpression = cudfSum + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = cudfSum :: Nil +} + +class GpuCount(children: Seq[GpuExpression]) extends Count(children) + with GpuDeclarativeAggregate { + // counts are Long + private lazy val cudfCount = new GpuAttributeReference("cudf_count", LongType)() + private lazy val cudfSum = new GpuAttributeReference("cudf_sum", LongType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(children.head) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfCount(cudfCount)) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum)) + override lazy val finalExpression: GpuExpression = cudfSum + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = { + if (merge) { + cudfSum :: Nil + } else { + cudfCount :: Nil + } + } +} + +class GpuAverage(child: GpuExpression) extends Average(child) + with GpuDeclarativeAggregate { + // averages are either Decimal or Double. We don't support decimal yet, so making this double. + private lazy val cudfSum = new GpuAttributeReference("cudf_sum", DoubleType)() + private lazy val cudfCount = new GpuAttributeReference("cudf_count", DoubleType)() + // this is is the merge-side sum of counts + private lazy val cudfSumCount = new GpuAttributeReference("cudf_sum_count", DoubleType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(child, + if (child.isInstanceOf[GpuLiteral]) { + child + } else { + // takes any column and turns it into 1 for non null, and 0 for null + // a sum of this == the count + new GpuCast(new GpuIsNotNull(child), LongType) + }) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum), new CudfSum(cudfSumCount)) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfSum(cudfSum), new CudfSum(cudfCount)) + override lazy val finalExpression: GpuExpression = new GpuDivide( + new GpuCast(cudfSum, DoubleType), + new GpuCast(cudfSumCount, DoubleType)) + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = { + if (merge) { + cudfSum :: cudfSumCount :: Nil + } else { + cudfSum :: cudfCount :: Nil + } + } +} + +/* + * First/Last are "good enough" for the hash aggregate, and should only be used when we + * want to collapse to a grouped key. The hash aggregate doesn't make guarantees on the + * ordering of how batches are processed, so this is as good as picking any old function + * (we picked max) + * + * These functions have an extra field they expect to be around in the aggregation buffer. + * So this adds a "max" of that, and currently sends it to the GPU. The CPU version uses it + * to check if the value was set (if we don't ignore nulls, valueSet is true, that's what we do here). + */ +class GpuFirst(child: GpuExpression, isIgnoreNulls: GpuExpression) extends First(child, isIgnoreNulls) + with GpuDeclarativeAggregate { + private lazy val cudfMax = new GpuAttributeReference("cudf_max", child.dataType)() + private lazy val valueSet = new GpuAttributeReference("valueSet", child.dataType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(child, new GpuNot(isIgnoreNulls)) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax), new CudfMax(valueSet)) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax), new CudfMax(valueSet)) + override lazy val finalExpression: GpuExpression = cudfMax + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = cudfMax :: valueSet :: Nil +} + +class GpuLast(child: GpuExpression, isIgnoreNulls: GpuExpression) extends Last(child, isIgnoreNulls) + with GpuDeclarativeAggregate { + private lazy val cudfMax = new GpuAttributeReference("cudf_max", child.dataType)() + private lazy val valueSet = new GpuAttributeReference("valueSet", child.dataType)() + + override lazy val inputProjection: Seq[GpuExpression] = Seq(child, new GpuNot(isIgnoreNulls)) + override lazy val updateExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax), new CudfMax(valueSet)) + override lazy val mergeExpressions: Seq[GpuExpression] = Seq(new CudfMax(cudfMax), new CudfMax(valueSet)) + override lazy val finalExpression: GpuExpression = cudfMax + + override def cudfBufferAttributes(merge: Boolean): Seq[AttributeReference] = cudfMax :: valueSet :: Nil +} + +/** + * GpuHashAggregateExec - is the GPU version of HashAggregateExec, with some major differences: + * - it doesn't support spilling to disk + * - it doesn't support strings in the grouping key + * - it doesn't support count(col1, col2, ..., colN) + * - it doesn't support distinct + * @param requiredChildDistributionExpressions - this is unchanged by the GPU. It is used in EnsureRequirements + * to be able to add shuffle nodes + * @param groupingExpressions - The expressions that, when applied to the input batch, return the grouping key + * @param aggregateExpressions - The GpuAggregateExpression instances for this node + * @param aggregateAttributes - References to each GpuAggregateExpression (attribute references) + * @param initialInputBufferOffset - this is not used in the GPU version, but it's used to offset the slot in the + * aggregation buffer that aggregates should start referencing + * @param resultExpressions - the expected output expression of this hash aggregate (which this node should project) + * @param child - incoming plan (where we get input columns from) + */ +class GpuHashAggregateExec(requiredChildDistributionExpressions: Option[Seq[GpuExpression]], + groupingExpressions: Seq[GpuExpression], + aggregateExpressions: Seq[GpuAggregateExpression], + aggregateAttributes: Seq[GpuAttributeReference], + initialInputBufferOffset: Int, + resultExpressions: Seq[GpuExpression], + child: SparkPlan) extends HashAggregateExec( + requiredChildDistributionExpressions, + groupingExpressions.asInstanceOf[Seq[NamedExpression]], + aggregateExpressions.asInstanceOf[Seq[AggregateExpression]], + aggregateAttributes.asInstanceOf[Seq[AttributeReference]], + initialInputBufferOffset, + resultExpressions.asInstanceOf[Seq[NamedExpression]], + child) with GpuExec { + + // Disable code generation for now... + override def supportCodegen: Boolean = false + + // This handles GPU hash aggregation without spilling. + // + // The CPU version of this is (assume no fallback to sort-based aggregation) + // Re: TungstenAggregationIterator.scala, and AggregationIterator.scala + // + // 1) Obtaining an input row and finding a buffer (by hash of the grouping key) where + // to aggregate on. + // 2) Once it has a buffer, it calls processRow on it with the incoming row. + // 3) This will in turn update the buffer, for each row received, agg function by agg function + // 4) In the happy case, we never spill, and an iterator (from the HashMap) is produced s.t. + // downstream nodes can access the aggregated and projected results. + // 5) Spill case (not handled in gpu case) + // a) we create an external sorter [[UnsafeKVExternalSorter]] from the hash map (spilling) + // this leaves room for more aggregations to happen. The external sorter first sorts, and then + // stores the results to disk, and last it clears the in-memory hash map. + // b) we merge external sorters as we spill the map further. + // c) after the hash agg is done with its incoming rows, it will switch to sort-based aggregation, + // because it detected a spill + // d) sort based aggregation is then the mode forward, and not covered in this. + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numOutputRows = longMetric("numOutputRows") + // These metrics are supported by the cpu hash aggregate + // + // We should eventually have gpu versions of: + // + // This is the peak memory used max of what the hash map has used + // and what the external sorter has used + // val peakMemory = longMetric("peakMemory") + // + // Byte amount spilled. + // val spillSize = longMetric("spillSize") + // + // These don't make a lot of sense for the gpu case: + // + // This the time that has passed while setting up the iterators for tungsten + // val aggTime = longMetric("aggTime") + // + // Avg number of bucket list iterations per lookup in the underlying map + // val avgHashProbe = longMetric("avgHashProbe") + // + val rdd = child.executeColumnar() + rdd.mapPartitions { cbIter => { + var batch: ColumnarBatch = null // incoming bach + // + // aggregated[Input]Cb + // This is the equivalent of the aggregation buffer for the cpu case with the grouping key. + // Its columns are: [key1, key2, ..., keyN, cudfAgg1, cudfAgg2, ..., cudfAggN] + // + // For aggregate expressions that are multiple cudf aggregates (like average), + // aggregated[Input]Cb can have one or more cudf aggregate columns. This isn't different than + // the cpu version, other than in the cpu version everything is a catalyst expression. + var aggregatedInputCb: ColumnarBatch = null // aggregated raw incoming batch + var aggregatedCb: ColumnarBatch = null // aggregated concatenated batches + + var finalCb: ColumnarBatch = null // batch after the final projection for each aggregator + var resultCb: ColumnarBatch = null // after the result projection given in resultExpressions + var success = false + + var childCvs: Seq[GpuColumnVector] = null + var concatCvs: Seq[GpuColumnVector] = null + var resultCvs: Seq[GpuColumnVector] = null + + // + // For aggregate exec there are four stages of operation: + // 1) extract columns from input + // 2) aggregation (update/merge) + // 3) finalize aggregations (avg = sum/count) + // 4) result projection (resolve any expressions in the output) + // + // In the CPU hash aggregate, Spark is using a buffer to aggregate the results. + // This buffer has room for each aggregate it is computing (based on type). + // Note that some AggregateExpressions are more than one slot in this buffer + // (avg is a sum and a count slot). + // + // In the GPU, we don't have an aggregation buffer in Spark code (this happens behind the scenes + // in cudf), but we still need to be able to pick out columns out of the input CB (for aggregation) + // and the aggregated CB (for the result projection). + // + // Partial mode: + // 1. boundInputReferences: picks column from raw input + // 2. boundUpdateAgg: performs the partial half of the aggregates (GpuCount => CudfCount) + // 3. boundMergeAgg: (if needed) perform a merge of partial aggregates (CudfCount => CudfSum) + // 4. boundResultReferences: is a pass-through of the merged aggregate + // + // Final mode: + // 1. boundInputReferences: is a pass-through of the merged aggregate + // 2. boundMergeAgg: perform merges of incoming, and subsequent batches if required. + // 3. boundFinalProjections: on merged batches, finalize aggregates (GpuAverage => CudfSum/CudfCount) + // 4. boundResultReferences: project the result expressions Spark expects in the output. + val (boundInputReferences, + boundUpdateAgg, + boundMergeAgg, + boundFinalProjections, + boundResultReferences) = + setupReferences( + finalMode, child.output, groupingExpressions, aggregateExpressions) + try { + while (cbIter.hasNext) { + // 1) Consume the raw incoming batch, evaluating nested expressions (e.g. avg(col1 + col2)), + // obtaining ColumnVectors that can be aggregated + batch = cbIter.next() + + childCvs = processIncomingBatch(batch, boundInputReferences) + + // done with the batch, clean it as soon as possible + batch.close() + batch = null + + // 2) When a partition gets multiple batches, we need to do two things: + // a) if this is the first batch, run aggregation and store the aggregated result + // b) if this is a subsequent batch, we need to merge the previously aggregated results + // with the incoming batch + aggregatedInputCb = computeAggregate(childCvs, finalMode, groupingExpressions, + if (finalMode) boundMergeAgg else boundUpdateAgg) + + childCvs.foreach(_.close) + childCvs = null + + if (aggregatedCb == null) { + // this is the first batch, regardless of mode. + aggregatedCb = aggregatedInputCb + aggregatedInputCb = null + } else { + // this is a subsequent batch, and we must: + // 1) concatenate aggregatedInputCb with the prior result (aggregatedCb) + // 2) perform a merge aggregate on the concatenated columns + // + // In the future, we could plugin in spilling here, where if the concatenated + // batch sizes would go over a threshold, we'd spill the aggregatedCb, + // and perform aggregation on the new batch (which would need to be merged, with the + // spilled aggregates) + concatCvs = concatenateBatches(aggregatedInputCb, aggregatedCb) + aggregatedCb.close() + aggregatedCb = null + aggregatedInputCb.close() + aggregatedInputCb = null + + // 3) Compute aggregate. In subsequent iterations we'll use this result + // to concatenate against incoming batches (step 2) + aggregatedCb = computeAggregate(concatCvs, true, groupingExpressions, boundMergeAgg) + concatCvs.foreach(_.close) + concatCvs = null + } + } + + // 4) Finally, project the result to the expected layout that Spark expects + // i.e.: select avg(foo) from bar group by baz will produce: + // Partial mode: 3 columns => [bar, sum(foo) as sum_foo, count(foo) as count_foo] + // Final mode: 2 columns => [bar, sum(sum_foo) / sum(count_foo)] + if (aggregatedCb != null) { + finalCb = if (boundFinalProjections.isDefined) { // only defined in Final mode + val finalCvs = + boundFinalProjections.get.map { ref => + // aggregatedCb is made up of ColumnVectors + // and the final projections from the aggregates won't change that, + // so we can assume they will be vectors after we eval + ref.columnarEval(aggregatedCb).asInstanceOf[GpuColumnVector] + } + aggregatedCb.close() + aggregatedCb = null + new ColumnarBatch(finalCvs.toArray, finalCvs.head.asInstanceOf[GpuColumnVector].getBase.getRowCount.toInt) + } else { + aggregatedCb + } + aggregatedCb = null + + // Perform the last project to get the correct shape that Spark expects. Note this will add things like + // literals, that were not part of the aggregate into the batch. + resultCvs = boundResultReferences.map { ref => + val result = ref.columnarEval(finalCb) + // Result references can be virtually anything, we need to coerce + // them to be vectors since this is going into a ColumnarBatch + result match { + case cv: ColumnVector => cv.asInstanceOf[GpuColumnVector] + case other => GpuColumnVector.from(GpuScalar.from(result), finalCb.numRows) + } + } + + finalCb.close() + finalCb = null + + resultCb = if (resultCvs.isEmpty) { + new ColumnarBatch(Seq().toArray, 0) + } else { + numOutputRows += resultCvs.head.getBase.getRowCount + new ColumnarBatch(resultCvs.toArray, resultCvs.head.getBase.getRowCount.toInt) + } + } + success = true + new Iterator[ColumnarBatch] { + var fullyConsumed: Boolean = false + override def hasNext: Boolean = !fullyConsumed && resultCb != null + + override def next(): ColumnarBatch = { + fullyConsumed = true + resultCb + } + } + } finally { + if (!success) { + if (resultCvs != null) { + resultCvs.foreach(_.close) + } + } + if (batch != null) { + batch.close() + } + if (childCvs != null) { + childCvs.foreach(_.close) + } + if (aggregatedInputCb != null) { + aggregatedInputCb.close() + } + if (aggregatedCb != null) { + aggregatedCb.close() + } + if (concatCvs != null) { + concatCvs.foreach(_.close) + } + if (finalCb != null) { + finalCb.close() + } + } + }} + } + + private def processIncomingBatch(batch: ColumnarBatch, + boundInputReferences: Seq[GpuExpression]): Seq[GpuColumnVector] = { + boundInputReferences.map { ref => { + val childCv: GpuColumnVector = null + var success = false + try { + val in = ref.columnarEval(batch) + val childCv = in match { + case cv: ColumnVector => cv.asInstanceOf[GpuColumnVector] + case other => GpuColumnVector.from(GpuScalar.from(in), batch.numRows) + } + val childCvCasted = if (childCv.dataType != ref.dataType) { + val newCv = GpuColumnVector.from( + childCv.asInstanceOf[GpuColumnVector].getBase.castTo( + GpuColumnVector.getRapidsType(childCv.dataType), + GpuColumnVector.getTimeUnits(ref.dataType))) + // out with the old, in with the new + childCv.close() + newCv + } else { + childCv + } + success = true + childCvCasted + } finally { + if (!success && childCv != null) { + childCv.close() + } + } + }} + } + + /** + * concatenateBatches - given two ColumnarBatch instances, return a sequence of GpuColumnVector that is + * the concatenated columns of the two input batches. + * @param aggregatedInputCb - this is an incoming batch + * @param aggregatedCb - this is a batch that was kept for concatenation + * @return Seq[GpuColumnVector] with concatenated vectors + */ + private def concatenateBatches(aggregatedInputCb: ColumnarBatch, aggregatedCb: ColumnarBatch): Seq[GpuColumnVector] = { + // get tuples of columns to concatenate + val zipped = (0 until aggregatedCb.numCols()).map { i => + (aggregatedInputCb.column(i), aggregatedCb.column(i)) + } + + val concatCvs = zipped.map { + case (col1, col2) => { + GpuColumnVector.from( + cudf.ColumnVector.concatenate( + col1.asInstanceOf[GpuColumnVector].getBase, + col2.asInstanceOf[GpuColumnVector].getBase)) + }} + + concatCvs + } + + // this will need to change when we support distinct aggregates + // because in this case, a hash aggregate exec could be both a "final" (merge) + // and partial + private lazy val finalMode = { + val modes = aggregateExpressions.map(_.mode).distinct + modes.contains(Final) || modes.contains(Complete) + } + + /** + * getCudfAggregates returns a sequence of [[cudf.Aggregate]], given the current mode + * [[AggregateMode]], and a sequence of all expressions for this [[GpuHashAggregateExec]] + * node, we get all the expressions as that's important for us to be able to resolve the current + * ordinal for this cudf aggregate. + * + * Examples: + * fn = sum, min, max will always be Seq(fn) + * avg will be Seq(sum, count) for Partial mode, but Seq(sum, sum) for other modes + * count will be Seq(count) for Partial mode, but Seq(sum) for other modes + * + * @return - Seq of [[cudf.Aggregate]], with one or more aggregates that correspond to each expression + * in allExpressions + */ + def setupReferences(finalMode: Boolean, + childAttr: AttributeSeq, + groupingExpressions: Seq[GpuExpression], + aggregateExpressions: Seq[GpuAggregateExpression]): + (Seq[GpuExpression], Seq[CudfAggregate], Seq[CudfAggregate], Option[Seq[GpuExpression]], Seq[GpuExpression]) = { + + val groupingAttributes = groupingExpressions.map(_.asInstanceOf[NamedExpression].toAttribute) + // + // update expressions are those performed on the raw input data + // e.g. for count it's count, and for average it's sum and count. + // + val updateExpressions = + aggregateExpressions.flatMap(_.aggregateFunction.updateExpressions) + + val updateAggBufferAttributes = groupingAttributes ++ + aggregateExpressions.flatMap(_.aggregateFunction.cudfBufferAttributes(false)) + + val boundUpdateAgg = GpuBindReferences.bindReferences(updateExpressions, updateAggBufferAttributes) + + // + // merge expressions are used while merging multiple batches, or while on final mode + // e.g. for count it's sum, and for average it's sum and sum. + // + val mergeExpressions = + aggregateExpressions.flatMap(_.aggregateFunction.mergeExpressions) + + val mergeAggBufferAttributes = groupingAttributes ++ + aggregateExpressions.flatMap(_.aggregateFunction.cudfBufferAttributes(true)) + + val boundMergeAgg = GpuBindReferences.bindReferences(mergeExpressions, mergeAggBufferAttributes) + + // + // expressions to pick input to the aggregate, and finalize the output to the result projection + // + val inputProjections = + (groupingExpressions ++ aggregateExpressions.flatMap(_.aggregateFunction.inputProjection)) + + val finalProjections = + (groupingExpressions ++ aggregateExpressions.map(_.aggregateFunction.finalExpression)) + + // boundInputReferences is used to pick out of the input batch the appropriate columns for aggregation + // - Partial mode: we use the aggregateExpressions to pick out the correct columns. + // - Final mode: we pick the columns in the order as handed to us. + val boundInputReferences = if (finalMode) { + GpuBindReferences.bindReferences( + child.output.asInstanceOf[Seq[GpuAttributeReference]], + child.output) + } else { + GpuBindReferences.bindReferences(inputProjections, childAttr) + } + + val boundFinalProjections = if (finalMode) { + Some(GpuBindReferences.bindReferences(finalProjections, mergeAggBufferAttributes)) + } else { + None + } + + // boundResultReferences is used to project the aggregated input batch(es) for the result. + // - Partial mode: it's a pass through. We take whatever was aggregated and let it come + // out of the node as is. + // - Final mode: we use resultExpressions to pick out the correct columns that finalReferences + // has pre-processed for us + var boundResultReferences: Seq[GpuExpression] = null + + // allAttributes can be different things, depending on aggregation mode: + // - Partial mode: grouping key + cudf aggregates (e.g. no avg, intead sum::count + // - Final mode: grouping key + spark aggregates (e.g. avg) + val allAttributes = groupingAttributes ++ aggregateAttributes + + if (finalMode) { + boundResultReferences = + GpuBindReferences.bindReferences( + resultExpressions, + allAttributes.asInstanceOf[Seq[GpuAttributeReference]]) + } else { + boundResultReferences = + GpuBindReferences.bindReferences( + resultExpressions, + resultExpressions.map(_.asInstanceOf[NamedExpression].toAttribute)) + } + + (boundInputReferences, + boundUpdateAgg.asInstanceOf[Seq[CudfAggregate]], + boundMergeAgg.asInstanceOf[Seq[CudfAggregate]], + boundFinalProjections, + boundResultReferences) + } + + def computeAggregate(toAggregateCvs: Seq[GpuColumnVector], + merge: Boolean, + groupingExpressions: Seq[GpuExpression], + aggregates: Seq[CudfAggregate]): ColumnarBatch = { + if (groupingExpressions.nonEmpty) { + // Perform goup by aggregation + var tbl: cudf.Table = null + var result: cudf.Table = null + try { + // Create a cudf Table, which we use as the base of aggregations. + // At this point we are getting the cudf aggregate's merge or update version + // + // For example: GpuAverage has an update version of: (CudfSum, CudfCount) + // and CudfCount has an update version of AggregateOp.COUNT and a + // merge version of AggregateOp.COUNT. + var cudfAggregates = aggregates.flatMap { agg => { + if (merge) { + agg.mergeAggregates + } else { + agg.updateAggregates + } + }} + + tbl = new cudf.Table(toAggregateCvs.map(_.getBase): _*) + + if (cudfAggregates.isEmpty) { + // we can't have empty aggregates, so pick a dummy max + // could be caused by something like: select 1 from table group by awesome_key + cudfAggregates = Seq(cudf.Table.max(0)) + } + + result = tbl.groupBy(groupingExpressions.indices: _*).aggregate(cudfAggregates: _*) + + // Turn aggregation into a ColumnarBatch for the result evaluation + // Note that the resulting ColumnarBatch has the following shape: + // + // [key1, key2, ..., keyN, cudfAgg1, cudfAgg2, ..., cudfAggN] + // + // where cudfAgg_i can be multiple columns foreach Spark aggregate + // (i.e. partial_gpuavg => cudf sum and cudf count) + GpuColumnVector.from(result) + } finally { + if (tbl != null) { + tbl.close() + } + if (result != null) { + result.close() + } + } + } else { + // Reduction aggregate + // we ask the appropriate merge or update CudfAggregates, what their + // reduction merge or update aggregates functions are + val cvs = aggregates.flatMap { agg => { + val aggFn = if (merge) { + agg.mergeReductionAggregates + } else { + agg.updateReductionAggregates + } + + val results = aggFn(toAggregateCvs(agg.getOrdinal(agg.ref)).getBase) + results.map(res => GpuColumnVector.from(cudf.ColumnVector.fromScalar(res, 1))) + }} + new ColumnarBatch(cvs.toArray, cvs.head.getBase.getRowCount.toInt) + } + } + + // HashAggregateExec isn't hard coding HashAggregate in its .toString functions + // this is an ugly way to get something out that says "Gpu" + override def verboseString(maxFields: Int): String = { + super.verboseString(maxFields).replace("HashAggregate", "GpuHashAggregate") + } + + override def simpleString(maxFields: Int): String = { + super.simpleString(maxFields).replace("HashAggregate", "GpuHashAggregate") + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/constraintExpressions.scala b/sql-plugin/src/main/scala/ai/rapids/spark/constraintExpressions.scala new file mode 100644 index 00000000000..d41661342e0 --- /dev/null +++ b/sql-plugin/src/main/scala/ai/rapids/spark/constraintExpressions.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.spark + +import org.apache.spark.sql.catalyst.expressions.KnownFloatingPointNormalized +import org.apache.spark.sql.vectorized.ColumnarBatch + +// this is a TaggingExpression in spark, which gets matched in NormalizeFloatingNumbers (which is a Rule) +// TODO: need coalesce as a feature request in cudf +class GpuKnownFloatingPointNormalized(child: GpuExpression) extends KnownFloatingPointNormalized(child) + with GpuExpression { + override def columnarEval(batch: ColumnarBatch): Any = child.columnarEval(batch) +} \ No newline at end of file diff --git a/sql-plugin/src/test/resources/dates.csv b/sql-plugin/src/test/resources/dates.csv new file mode 100644 index 00000000000..ed7dad39f4c --- /dev/null +++ b/sql-plugin/src/test/resources/dates.csv @@ -0,0 +1,6 @@ +2019-01-03,1 +2019-01-03,1 +2019-01-03,1 +2019-01-05,2 +2019-01-05,3 +2019-01-06,6 diff --git a/sql-plugin/src/test/resources/floats.csv b/sql-plugin/src/test/resources/floats.csv index 5ae6622d2c6..9a259a76fbf 100644 --- a/sql-plugin/src/test/resources/floats.csv +++ b/sql-plugin/src/test/resources/floats.csv @@ -1,8 +1,14 @@ -floats,more_floats 100.0,1.0 200.0,2.0 300.0,3.0 400.0,4.0 500.0,5.0 -100.0,6.0 --500.0,0.0 \ No newline at end of file +-500.0,0.0 +-600.0,0.0 +-700.0,0.0 +-800.0,0.0 +-1000.0,0.0 +-1500.0,0.0 +-2000.0,0.0 +-3000.0,0.0 diff --git a/sql-plugin/src/test/resources/ints.csv b/sql-plugin/src/test/resources/ints.csv new file mode 100644 index 00000000000..2b67fccd54a --- /dev/null +++ b/sql-plugin/src/test/resources/ints.csv @@ -0,0 +1,7 @@ +-500,0,5,6 +-600,0,5,6 +-700,0,5,6 +-800,0,5,6 +-1000,0,5,6 +2147483647,7,5,6 +-2147483648,7,5,6 diff --git a/sql-plugin/src/test/resources/nullable_floats.csv b/sql-plugin/src/test/resources/nullable_floats.csv index b54dcea6e47..634e0ddf699 100644 --- a/sql-plugin/src/test/resources/nullable_floats.csv +++ b/sql-plugin/src/test/resources/nullable_floats.csv @@ -1,8 +1,8 @@ -floats,more_floats 100.0,1.0, 200.0,, 300.0,3.0, +1.0,4.0, ,4.0, 500.0,, ,6.0, --500.0,50.5 \ No newline at end of file +-500.0,50.5 diff --git a/sql-plugin/src/test/resources/shorts.csv b/sql-plugin/src/test/resources/shorts.csv new file mode 100644 index 00000000000..c1d3a3d705d --- /dev/null +++ b/sql-plugin/src/test/resources/shorts.csv @@ -0,0 +1,8 @@ +-32768,0,5,6 +-1,0,5,6 +32767,1,5,6 +1,1,5,6 +32767,2,5,6 +32767,2,5,6 +16383,3,5,6 +32767,3,5,6 diff --git a/sql-plugin/src/test/scala/ai/rapids/spark/SparkQueryCompareTestSuite.scala b/sql-plugin/src/test/scala/ai/rapids/spark/SparkQueryCompareTestSuite.scala index b76e23e845d..4dd2c48f1b9 100644 --- a/sql-plugin/src/test/scala/ai/rapids/spark/SparkQueryCompareTestSuite.scala +++ b/sql-plugin/src/test/scala/ai/rapids/spark/SparkQueryCompareTestSuite.scala @@ -149,30 +149,25 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { testName: String, df: SparkSession => DataFrame, maxFloatDiff: Double = 0.0, - conf: SparkConf = new SparkConf()) + conf: SparkConf = new SparkConf(), + sort: Boolean = false, + repart: Integer = 1) (fun: DataFrame => DataFrame): Unit = { - val testConf = conf.clone().set(RapidsConf.INCOMPATIBLE_OPS.key, "true") - test("INCOMPAT: " + testName) { - val (fromCpu, fromGpu) = runOnCpuAndGpu(df, fun, conf=testConf) - - if (!compare(fromCpu, fromGpu, maxFloatDiff)) { - fail( - s""" - |Running on the GPU and on the CPU did not match (relaxed float comparison) - |CPU: ${fromCpu.toSeq} - |GPU: ${fromGpu.toSeq} - """.stripMargin) - } - } + testSparkResultsAreEqual(testName, df, + conf=conf, + repart=repart, + sort=sort, + maxFloatDiff=maxFloatDiff, + incompat=true)(fun) } def ALLOW_NON_GPU_testSparkResultsAreEqual( testName: String, df: SparkSession => DataFrame, conf: SparkConf = new SparkConf())(fun: DataFrame => DataFrame): Unit = { - val testConf = conf.clone().set(RapidsConf.TEST_CONF.key, "false") - testSparkResultsAreEqual("NOT ALL ON GPU: " + testName, df, - conf=testConf)(fun) + testSparkResultsAreEqual(testName, df, + conf=conf, + allowNonGpu=true)(fun) } def IGNORE_ORDER_testSparkResultsAreEqual( @@ -180,9 +175,21 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { df: SparkSession => DataFrame, repart: Integer = 1, conf: SparkConf = new SparkConf())(fun: DataFrame => DataFrame): Unit = { - testSparkResultsAreEqual("IGNORE ORDER: " + testName, df, + testSparkResultsAreEqual(testName, df, + conf=conf, + repart=repart, + sort=true)(fun) + } + + def INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual( + testName: String, + df: SparkSession => DataFrame, + repart: Integer = 1, + conf: SparkConf = new SparkConf())(fun: DataFrame => DataFrame): Unit = { + testSparkResultsAreEqual(testName, df, conf=conf, repart=repart, + incompat=true, sort=true)(fun) } @@ -205,12 +212,18 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { return false } - if ((v1, v2) match { + return if ((v1, v2) match { case (i1: Int, i2:Int) => i1 < i2 case (i1: Long, i2:Long) => i1 < i2 + case (i1: Float, i2:Float) => i1 < i2 + case (i1: Date, i2:Date) => i1.before(i2) + case (i1: Double, i2:Double) => i1 < i2 + case (i1: Short, i2:Short) => i1 < i2 case (o1, o2) => throw new UnsupportedOperationException(o1.getClass + " is not supported yet") }) { - return true + true + } else { + false } } } @@ -222,19 +235,43 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { df: SparkSession => DataFrame, conf: SparkConf = new SparkConf(), repart: Integer = 1, - sort: Boolean = false) + sort: Boolean = false, + maxFloatDiff: Double = 0.0, + incompat: Boolean = false, + allowNonGpu: Boolean = false) (fun: DataFrame => DataFrame): Unit = { - test(testName) { + var qualifiers = Set[String]() + var testConf = conf + if (incompat) { + testConf = testConf.clone().set(RapidsConf.INCOMPATIBLE_OPS.key, "true") + qualifiers = qualifiers + "INCOMPAT" + } + if (sort) { + qualifiers = qualifiers + "IGNORE ORDER" + } + if (allowNonGpu) { + testConf = testConf.clone().set(RapidsConf.TEST_CONF.key, "false") + qualifiers = qualifiers + "NOT ALL ON GPU" + } + val qualifiedTestName = qualifiers.mkString("", + ", ", + (if (qualifiers.nonEmpty) ": " else "") + testName) + test(qualifiedTestName) { var (fromCpu, fromGpu) = runOnCpuAndGpu(df, fun, - conf=conf, + conf = testConf, repart = repart) + val relaxedFloatDisclaimer = if (maxFloatDiff > 0) { + "(relaxed float comparison)" + } else { + "" + } if (sort) { val cpu = fromCpu.map(_.toSeq).sortWith(seqLt) val gpu = fromGpu.map(_.toSeq).sortWith(seqLt) - if (!compare(cpu, gpu)) { + if (!compare(cpu, gpu, maxFloatDiff)) { fail( s""" - |Running on the GPU and on the CPU did not match + |Running on the GPU and on the CPU did not match $relaxedFloatDisclaimer |CPU: ${cpu.seq} |GPU: ${gpu.seq} @@ -242,10 +279,10 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { stripMargin) } } else { - if (!compare(fromCpu, fromGpu)) { + if (!compare(fromCpu, fromGpu, maxFloatDiff)) { fail( s""" - |Running on the GPU and on the CPU did not match + |Running on the GPU and on the CPU did not match $relaxedFloatDisclaimer |CPU: ${fromCpu.toSeq} |GPU: ${fromGpu.toSeq} @@ -279,14 +316,6 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { ).toDF("dates", "more_dates") } - def longsFromCSVDf(session: SparkSession): DataFrame = { - var path = this.getClass.getClassLoader.getResource("lots_o_longs.csv") - session.read.schema(StructType(Array( - StructField("longs", LongType, true), - StructField("more_longs", LongType, true) - ))).csv(path.toString) - } - def longsDf(session: SparkSession): DataFrame = { import session.sqlContext.implicits._ Seq( @@ -404,36 +433,6 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { ).toDF("doubles", "more_doubles") } - def intsFromCsv(session: SparkSession): DataFrame = { - val schema = StructType(Array( - StructField("ints_1", IntegerType), - StructField("ints_2", IntegerType), - StructField("ints_3", IntegerType), - StructField("ints_4", IntegerType), - StructField("ints_5", IntegerType) - )) - val path = this.getClass.getClassLoader.getResource("test.csv") - session.read.schema(schema).csv(path.toString) - } - - def intsFromPartitionedCsv(session: SparkSession): DataFrame = { - val schema = StructType(Array( - StructField("partKey", IntegerType), - StructField("ints_1", IntegerType), - StructField("ints_2", IntegerType), - StructField("ints_3", IntegerType), - StructField("ints_4", IntegerType), - StructField("ints_5", IntegerType) - )) - val path = this.getClass.getClassLoader.getResource("partitioned-csv") - session.read.schema(schema).csv(path.toString) - } - - def intsFromCsvInferredSchema(session: SparkSession): DataFrame = { - val path = this.getClass.getClassLoader.getResource("test.csv") - session.read.option("inferSchema", "true").csv(path.toString) - } - def nullableFloatDf(session: SparkSession): DataFrame = { import session.sqlContext.implicits._ Seq[(java.lang.Float, java.lang.Float)]( @@ -462,24 +461,65 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { // Note: some tests here currently use this to force Spark not to // push down expressions into the scan (e.g. GpuFilters need this) - def fromCsvDf(csvPath: String, schema: StructType) + def fromCsvDf(file: String, schema: StructType) (session: SparkSession): DataFrame = { + //val resource = this.getClass.getClassLoader.getResource(file).toString + val resource = "/home/abellina/work/rapids-plugin-4-spark/sql-plugin/src/test/resources/"+file var df = session.read.format("csv") - .option("header", "true") - df.schema(schema).load(csvPath).toDF() + df.schema(schema).load(resource).toDF() + } + + def shortsFromCsv = { + fromCsvDf("shorts.csv", StructType(Array( + StructField("shorts", ShortType), + StructField("more_shorts", ShortType), + StructField("five", ShortType), + StructField("six", ShortType), + )))(_) + } + + def intsFromCsv = { + fromCsvDf("test.csv", StructType(Array( + StructField("ints_1", IntegerType), + StructField("ints_2", IntegerType), + StructField("ints_3", IntegerType), + StructField("ints_4", IntegerType), + StructField("ints_5", IntegerType) + )))(_) + } + + def intsFromPartitionedCsv= { + fromCsvDf("partitioned-csv", StructType(Array( + StructField("partKey", IntegerType), + StructField("ints_1", IntegerType), + StructField("ints_2", IntegerType), + StructField("ints_3", IntegerType), + StructField("ints_4", IntegerType), + StructField("ints_5", IntegerType) + )))(_) + } + + def longsFromCSVDf = { + fromCsvDf("lots_o_longs.csv", StructType(Array( + StructField("longs", LongType, true), + StructField("more_longs", LongType, true) + )))(_) + } + + def intsFromCsvInferredSchema(session: SparkSession): DataFrame = { + val path = this.getClass.getClassLoader.getResource("test.csv") + session.read.option("inferSchema", "true").csv(path.toString) } def nullableFloatCsvDf = { - var path = this.getClass.getClassLoader.getResource("nullable_floats.csv") - fromCsvDf(path.toString, StructType(Array( + fromCsvDf("nullable_floats.csv", StructType(Array( StructField("floats", FloatType, true), StructField("more_floats", FloatType, true) )))(_) } def floatCsvDf = { - var path = this.getClass.getClassLoader.getResource("floats.csv") - fromCsvDf(path.toString, StructType(Array( + fromCsvDf("floats.csv", StructType(Array( StructField("floats", FloatType, false), StructField("more_floats", FloatType, false) )))(_) @@ -490,6 +530,38 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { Seq(frame.count()).toDF } + def intCsvDf= { + fromCsvDf("ints.csv", StructType(Array( + StructField("ints", IntegerType, false), + StructField("more_ints", IntegerType, false), + StructField("five", IntegerType, false), + StructField("six", IntegerType, false) + )))(_) + } + + def longsCsvDf= { + fromCsvDf("ints.csv", StructType(Array( + StructField("longs", LongType, false), + StructField("more_longs", LongType, false), + StructField("five", IntegerType, false), + StructField("six", IntegerType, false) + )))(_) + } + + def doubleCsvDf= { + fromCsvDf("floats.csv", StructType(Array( + StructField("doubles", DoubleType, false), + StructField("more_doubles", DoubleType, false) + )))(_) + } + + def datesCsvDf= { + fromCsvDf("dates.csv", StructType(Array( + StructField("dates", DateType, false), + StructField("ints", IntegerType, false) + )))(_) + } + testSparkResultsAreEqual("Test CSV", intsFromCsv) { frame => frame.select(col("ints_1"), col("ints_3"), col("ints_5")) } @@ -877,7 +949,7 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { testSparkResultsAreEqual("Greater than or equal longs", longsDf) { frame => frame.selectExpr("longs >= more_longs") } - + // // (in)equality with doubles // @@ -1004,4 +1076,228 @@ class SparkQueryCompareTestSuite extends FunSuite with BeforeAndAfterEach { testSparkResultsAreEqual("Test unionByName doubles", doubleDf) { frame => frame.unionByName(frame.select(col("more_doubles"), col("doubles"))) } + + /* + * HASH AGGREGATE TESTS + */ + IGNORE_ORDER_testSparkResultsAreEqual("short reduction aggs", shortsFromCsv) { + frame => frame.agg( + (max("shorts") - min("more_shorts")) * lit(5), + sum("shorts"), + count("*"), + avg("shorts"), + avg(col("more_shorts") * lit("10"))) + } + + IGNORE_ORDER_testSparkResultsAreEqual("reduction aggs", longsCsvDf) { + frame => frame.agg( + (max("longs") - min("more_longs")) * lit(5), + sum("longs"), + count("*"), + avg("longs"), + avg(col("more_longs") * lit("10"))) + } + + IGNORE_ORDER_testSparkResultsAreEqual("test count, sum, max, min with shuffle", longsFromCSVDf, repart = 2) { + frame => frame.groupBy(col("more_longs")).agg( + count("*"), + sum("more_longs"), + sum("longs") * lit(2), + (max("more_longs") - min("more_longs")) * 3.0) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("float basic aggregates group by floats", floatCsvDf) { + frame => frame.groupBy("floats").agg( + lit(456f), + min(col("floats")) + lit(123), + sum(col("more_floats") + lit(123.0)), + max(col("floats") * col("more_floats")), + max("floats") - min("more_floats"), + max("more_floats") - min("floats"), + sum("floats") + sum("more_floats"), + avg("floats"), + count("*")) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("float basic aggregates group by more_floats", floatCsvDf) { + frame => frame.groupBy("more_floats").agg( + lit(456f), + min(col("floats")) + lit(123), + sum(col("more_floats") + lit(123.0)), + max(col("floats") * col("more_floats")), + max("floats") - min("more_floats"), + max("more_floats") - min("floats"), + sum("floats") + sum("more_floats"), + avg("floats"), + count("*")) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("nullable float basic aggregates group by more_floats", nullableFloatCsvDf) { + frame => frame.groupBy("more_floats").agg( + lit(456f), + min(col("floats")) + lit(123), + sum(col("more_floats") + lit(123.0)), + max(col("floats") * col("more_floats")), + max("floats") - min("more_floats"), + max("more_floats") - min("floats"), + sum("floats") + sum("more_floats"), + avg("floats"), + count("*")) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("shorts basic aggregates group by more_shorts", shortsFromCsv) { + frame => frame.groupBy("more_shorts").agg( + lit(456), + min(col("shorts")) + lit(123), + sum(col("more_shorts") + lit(123.0)), + max(col("shorts") * col("more_shorts")), + max("shorts") - min("more_shorts"), + max("more_shorts") - min("shorts"), + sum("shorts"), + avg("shorts"), + count("*")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("long basic aggregates group by longs", longsCsvDf) { + frame => frame.groupBy("longs").agg( + lit(456f), + min(col("longs")) + lit(123), + sum(col("more_longs") + lit(123.0)), + max(col("longs") * col("more_longs")), + max("longs") - min("more_longs"), + max("more_longs") - min("longs"), + sum("longs") + sum("more_longs"), + avg("longs"), + count("*")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("long basic aggregates group by more_longs", longsCsvDf) { + frame => frame.groupBy("more_longs").agg( + lit(456f), + min(col("longs")) + lit(123), + sum(col("more_longs") + lit(123.0)), + max(col("longs") * col("more_longs")), + max("longs") - min("more_longs"), + max("more_longs") - min("longs"), + sum("longs") + sum("more_longs"), + avg("longs"), + count("*")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("ints basic aggregates group by ints", intCsvDf) { + frame => frame.groupBy("ints").agg( + lit(456f), + min(col("ints")) + lit(123), + sum(col("more_ints") + lit(123.0)), + max(col("ints") * col("more_ints")), + max("ints") - min("more_ints"), + max("more_ints") - min("ints"), + sum("ints") + sum("more_ints"), + avg("ints"), + count("*")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("ints basic aggregates group by more_ints", intCsvDf) { + frame => frame.groupBy("more_ints").agg( + lit(456f), + min(col("ints")) + lit(123), + sum(col("more_ints") + lit(123.0)), + max(col("ints") * col("more_ints")), + max("ints") - min("more_ints"), + max("more_ints") - min("ints"), + sum("ints") + sum("more_ints"), + avg("ints"), + count("*")) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("doubles basic aggregates group by doubles", doubleCsvDf) { + frame => frame.groupBy("doubles").agg( + lit(456f), + min(col("doubles")) + lit(123), + sum(col("more_doubles") + lit(123.0)), + max(col("doubles") * col("more_doubles")), + max("doubles") - min("more_doubles"), + max("more_doubles") - min("doubles"), + sum("doubles") + sum("more_doubles"), + avg("doubles"), + count("*")) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("doubles basic aggregates group by more_doubles", doubleCsvDf) { + frame => frame.groupBy("more_doubles").agg( + lit(456f), + min(col("doubles")) + lit(123), + sum(col("more_doubles") + lit(123.0)), + max(col("doubles") * col("more_doubles")), + max("doubles") - min("more_doubles"), + max("more_doubles") - min("doubles"), + sum("doubles") + sum("more_doubles"), + avg("doubles"), + count("*")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("sum(longs) multi group by longs, more_longs", longsCsvDf) { + frame => frame.groupBy("longs", "more_longs").agg( + sum("longs"), count("*")) + } + + // misc aggregation tests + testSparkResultsAreEqual("sum(ints) group by literal", intCsvDf) { + frame => frame.groupBy(lit(1)).agg(sum("ints")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("sum(ints) group by dates", datesCsvDf) { + frame => frame.groupBy("dates").sum("ints") + } + + IGNORE_ORDER_testSparkResultsAreEqual("max(ints) group by month", datesCsvDf) { + frame => frame.withColumn("monthval", month(col("dates"))) + .groupBy(col("monthval")) + .agg(max("ints").as("max_ints_by_month")) + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("sum(floats) group by more_floats 2 partitions", floatCsvDf, repart = 2) { + frame => frame.groupBy("more_floats").sum("floats") + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("avg(floats) group by more_floats 4 partitions", floatCsvDf, repart = 4) { + frame => frame.groupBy("more_floats").avg("floats") + } + + INCOMPAT_IGNORE_ORDER_testSparkResultsAreEqual("avg(floats),count(floats) group by more_floats 4 partitions", floatCsvDf, repart = 4) { + frame => frame + .groupBy("more_floats") + .agg(avg("floats"), count("*")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("complex aggregate expressions", intCsvDf) { + frame => frame.groupBy(col("more_ints") * 2).agg( + lit(1000) + + (lit(100) * (avg("ints") * sum("ints") - min("ints")))) + } + + IGNORE_ORDER_testSparkResultsAreEqual("complex aggregate expressions 2", intCsvDf) { + frame => frame.groupBy("more_ints").agg( + min("ints") + + (lit(100) * (avg("ints") * sum("ints") - min("ints")))) + } + + IGNORE_ORDER_testSparkResultsAreEqual("complex aggregate expression 3", intCsvDf) { + frame => frame.groupBy("more_ints").agg( + min("ints"), avg("ints"), + max(col("ints") + col("more_ints")), lit(1), min("ints")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("grouping expressions", longsCsvDf) { + frame => frame.groupBy(col("more_longs") + lit(10)).agg(min("longs")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("grouping expressions 2", longsCsvDf) { + frame => frame.groupBy(col("more_longs") + col("longs")).agg(min("longs")) + } + + IGNORE_ORDER_testSparkResultsAreEqual("first/last aggregates", intCsvDf) { + frame => frame.groupBy(col("more_ints")).agg( + first("five", true), last("six", true)) + } }