Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement percentile aggregation #9296

Merged
merged 90 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
e01233e
Implementing GpuPercentile
ttnghia Sep 25, 2023
3a17c02
WIP
ttnghia Sep 25, 2023
ff5acf5
Cleanup
ttnghia Sep 25, 2023
12280d1
Fix class serialization
ttnghia Sep 25, 2023
f366a60
Checking if in reduction context
ttnghia Sep 26, 2023
00d775f
Fix conversion of output
ttnghia Sep 26, 2023
36c301d
Working
ttnghia Sep 26, 2023
37b2f74
Implement GpuPercentileEvaluation
ttnghia Sep 26, 2023
ddbbace
Add `isReduction` parameter
ttnghia Sep 26, 2023
e4b926c
Single percentage value is working
ttnghia Sep 26, 2023
91fe05c
Working for both literal and array `percentages`
ttnghia Sep 26, 2023
e19777f
Fix conversion
ttnghia Sep 27, 2023
17066f7
Debugging
ttnghia Sep 27, 2023
b03790d
Fix running issues
ttnghia Sep 28, 2023
df412ab
Add initial test for reduction
ttnghia Sep 28, 2023
49034e3
Fix memory leak
ttnghia Sep 28, 2023
b1ddc78
Move `CudfHistogram` and `CudfMergeHistogram` classes
ttnghia Sep 28, 2023
22f85c0
Cleanup
ttnghia Sep 28, 2023
709c2a6
Fix initialization
ttnghia Sep 28, 2023
a3ee56d
Cleanup
ttnghia Sep 29, 2023
d6de152
Testing
ttnghia Sep 29, 2023
cffad08
Integration test passed
ttnghia Sep 29, 2023
afdc340
Fix data type
ttnghia Sep 29, 2023
2b6b6f7
Fix type
ttnghia Sep 29, 2023
bae641c
Add groupby test
ttnghia Oct 1, 2023
b04fe3e
Add tests
ttnghia Oct 2, 2023
1f60285
Cleanup
ttnghia Oct 2, 2023
25247d1
Add more tests
ttnghia Oct 2, 2023
b0e33a9
Add more tests with frequencies
ttnghia Oct 2, 2023
e93a7e8
WIP
ttnghia Oct 2, 2023
c42f6f1
Implement input projection for values with frequencies
ttnghia Oct 3, 2023
ecdeeaa
Add tests
ttnghia Oct 3, 2023
f2716b1
All tests passed!!!
ttnghia Oct 3, 2023
a02205c
Cleanup tests
ttnghia Oct 3, 2023
41b8991
Add more tests (not pass yet)
ttnghia Oct 3, 2023
77d9242
Fix tests
ttnghia Oct 3, 2023
3781e61
Adding fallback test
ttnghia Oct 3, 2023
9a21cd0
Add error check for percentage input
ttnghia Oct 4, 2023
0fc2f1f
Declare incompat
ttnghia Oct 4, 2023
33137bb
Implement buffer conversion
ttnghia Oct 4, 2023
dc9ac69
Fix compile issue
ttnghia Oct 4, 2023
c7c2f6b
Change test
ttnghia Oct 4, 2023
80aec1f
Update code
ttnghia Oct 5, 2023
1b41ddb
WIP for buffer conversion
ttnghia Oct 5, 2023
89bb7c2
Debugging
ttnghia Oct 6, 2023
082b74a
Change class name, and testing...
ttnghia Oct 6, 2023
eb004a8
Fix CPU-to-GPU deserialization
ttnghia Oct 10, 2023
f255020
Fix empty input projection issue
ttnghia Oct 10, 2023
89edc6f
Rename function and change test
ttnghia Oct 11, 2023
8815911
Fix null handling
ttnghia Oct 11, 2023
7c47a35
Cleanup
ttnghia Oct 11, 2023
e2b238b
Change type check
ttnghia Oct 11, 2023
96710ab
Add comment
ttnghia Oct 11, 2023
70aa9a0
Change comment
ttnghia Oct 11, 2023
1f0c813
WIP
ttnghia Oct 12, 2023
524ab1c
Merge branch 'branch-23.10' into percentile
ttnghia Oct 12, 2023
c2c2950
Complete tests
ttnghia Oct 13, 2023
3e71019
Fix all bugs
ttnghia Oct 13, 2023
1d7dbd2
Merge branch 'branch-23.12' into percentile
ttnghia Oct 13, 2023
6e81087
Move file and cleanup
ttnghia Oct 13, 2023
85cd162
Further cleanup
ttnghia Oct 13, 2023
8233d6e
Finish cleaning up
ttnghia Oct 13, 2023
a5eb50f
Add comments
ttnghia Oct 13, 2023
48f5292
Revert test
ttnghia Oct 13, 2023
741cdf0
Simplify code
ttnghia Oct 13, 2023
73db187
Add generated docs
ttnghia Oct 13, 2023
1df7a94
Some more cleaning up
ttnghia Oct 14, 2023
f1117ba
Cleanup test
ttnghia Oct 16, 2023
e7043ac
Add comment
ttnghia Oct 16, 2023
22c7959
Use `extractLit`
ttnghia Oct 17, 2023
53c433d
Revert "Use `extractLit`"
ttnghia Oct 17, 2023
9e30443
Merge branch 'branch-23.12' into percentile
ttnghia Oct 17, 2023
55da2fb
Use `extractLit`
ttnghia Oct 17, 2023
7db16a9
Merge branch 'branch-23.12' into percentile
ttnghia Oct 17, 2023
32040ce
Fix tests
ttnghia Oct 17, 2023
3729da0
Merge branch 'branch-23.12' into percentile
ttnghia Oct 18, 2023
c188955
Remove incompat
ttnghia Oct 18, 2023
f46f94b
Merge branch 'branch-23.12' into percentile
ttnghia Oct 18, 2023
24c227f
Updated docs
ttnghia Oct 18, 2023
0963303
Remove `@incompat` and `approximate_float`
ttnghia Oct 18, 2023
1ccdcdc
Temporarily run only new tests
ttnghia Oct 18, 2023
d78e7f2
Testing fallback
ttnghia Oct 18, 2023
7836bdb
Revert "Testing fallback"
ttnghia Oct 18, 2023
2b019bd
Revert "Temporarily run only new tests"
ttnghia Oct 18, 2023
2afab91
Revert "Revert "Testing fallback""
ttnghia Oct 18, 2023
6b93243
Cleanup test
ttnghia Oct 18, 2023
3533ee9
Merge branch 'branch-23.12' into percentile
ttnghia Oct 18, 2023
e1136bc
Revert "Testing fallback"
ttnghia Oct 18, 2023
3757b60
xfail tests on databricks
ttnghia Oct 19, 2023
141c19d
Merge branch 'branch-23.12' into percentile
ttnghia Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3328,13 +3328,13 @@ object GpuOverrides extends Logging {
"Collect a set of unique elements, not supported in reduction",
ExprChecks.fullAgg(
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY),
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY),
TypeSig.ARRAY.nested(TypeSig.all),
Seq(ParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
TypeSig.NULL +
TypeSig.STRUCT +
TypeSig.ARRAY).nested(),
TypeSig.NULL +
TypeSig.STRUCT +
TypeSig.ARRAY).nested(),
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {

Expand Down Expand Up @@ -3402,6 +3402,53 @@ object GpuOverrides extends Logging {
GpuVarianceSamp(childExprs.head, !legacyStatisticalAggregate)
}
}),
expr[Percentile](
"Aggregation computing exact percentile",
ExprChecks.reductionAndGroupByAgg(
// The output can be a single number or array depending on whether percentiles param
// is a single number or an array.
TypeSig.gpuNumeric +
TypeSig.ARRAY.nested(TypeSig.gpuNumeric),
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
TypeSig.cpuNumeric + TypeSig.DATE + TypeSig.TIMESTAMP + TypeSig.ARRAY.nested(
TypeSig.cpuNumeric + TypeSig.DATE + TypeSig.TIMESTAMP),
Seq(
ParamCheck("input",
TypeSig.gpuNumeric,
TypeSig.cpuNumeric + TypeSig.DATE + TypeSig.TIMESTAMP),
ParamCheck("percentage",
TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE),
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
TypeSig.DOUBLE + TypeSig.ARRAY.nested(TypeSig.DOUBLE)),
ParamCheck("frequency",
TypeSig.LONG + TypeSig.ARRAY.nested(TypeSig.LONG),
TypeSig.LONG + TypeSig.ARRAY.nested(TypeSig.LONG)))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[Percentile](c, conf, p, r) {
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression = {
val exprMeta = p.get.asInstanceOf[BaseExprMeta[_]]
val context = exprMeta.context
context match {
case ReductionAggExprContext => GpuPercentile(childExprs, isReduction = true)
case GroupByAggExprContext => GpuPercentile(childExprs, isReduction = false)
case _ => throw new IllegalStateException(s"Invalid aggregation context: $context")
}
// val Seq(value, percentage, frequency) = childExprs
// frequency match {
// case GpuLiteral(freq, LongType) if freq == 1 =>
// GpuPercentileDefault(value, percentage)
// case v : Any =>
// GpuPercentileWithFrequency(value, percentage, frequency)
// }
}
override def aggBufferAttribute: AttributeReference = {
val aggBuffer = c.aggBufferAttributes.head
aggBuffer.copy(dataType = c.dataType)(aggBuffer.exprId, aggBuffer.qualifier)
}
override def createCpuToGpuBufferConverter(): CpuToGpuAggregateBufferConverter =
new CpuToGpuPercentileBufferConverter(c.child.dataType)
override def createGpuToCpuBufferConverter(): GpuToCpuAggregateBufferConverter =
new GpuToCpuPercentileBufferConverter()
override val supportBufferConversion: Boolean = true
override val needsAnsiCheck: Boolean = false
}),
expr[ApproximatePercentile](
"Approximate percentile",
ExprChecks.reductionAndGroupByAgg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package org.apache.spark.sql.rapids

import ai.rapids.cudf
import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, GroupByAggregation, GroupByScanAggregation, NaNEquality, NullEquality, NullPolicy, NvtxColor, NvtxRange, ReductionAggregation, ReplacePolicy, RollingAggregation, RollingAggregationOnColumn, Scalar, ScanAggregation}
import ai.rapids.cudf.TableDebug
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
import com.nvidia.spark.rapids.shims.{GpuDeterministicFirstLastCollectShim, ShimExpression, ShimUnaryExpression, TypeUtilsShims}

Expand Down Expand Up @@ -388,9 +389,9 @@ class CudfMergeLists(override val dataType: DataType) extends CudfAggregate {
}

/**
* Spark handles NaN's equality by different way for non-nested float/double and float/double
* in nested types. When we use non-nested versions of floats and doubles, NaN values are
* considered unequal, but when we collect sets of nested versions, NaNs are considered equal
* Spark handles NaN's equality by different way for non-nested float/double and float/double
* in nested types. When we use non-nested versions of floats and doubles, NaN values are
* considered unequal, but when we collect sets of nested versions, NaNs are considered equal
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
* on the CPU. So we set NaNEquality dynamically in CudfCollectSet and CudfMergeSets.
* Note that dataType is ArrayType(child.dataType) here.
*/
Expand All @@ -401,7 +402,7 @@ class CudfCollectSet(override val dataType: DataType) extends CudfAggregate {
case ArrayType(FloatType | DoubleType, _) =>
ReductionAggregation.collectSet(
NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.UNEQUAL)
case _: DataType =>
case _: DataType =>
ReductionAggregation.collectSet(
NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.ALL_EQUAL)
}
Expand Down Expand Up @@ -1981,17 +1982,17 @@ case class GpuCollectSet(
override def aggBufferAttributes: Seq[AttributeReference] = outputBuf :: Nil

override def prettyName: String = "collect_set"
// Spark handles NaN's equality by different way for non-nested float/double and float/double
// in nested types. When we use non-nested versions of floats and doubles, NaN values are
// considered unequal, but when we collect sets of nested versions, NaNs are considered equal

// Spark handles NaN's equality by different way for non-nested float/double and float/double
// in nested types. When we use non-nested versions of floats and doubles, NaN values are
// considered unequal, but when we collect sets of nested versions, NaNs are considered equal
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
// on the CPU. So we set NaNEquality dynamically here.
override def windowAggregation(
inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn = child.dataType match {
case FloatType | DoubleType =>
case FloatType | DoubleType =>
RollingAggregation.collectSet(NullPolicy.EXCLUDE, NullEquality.EQUAL,
NaNEquality.UNEQUAL).onColumn(inputs.head._2)
case _ =>
case _ =>
RollingAggregation.collectSet(NullPolicy.EXCLUDE, NullEquality.EQUAL,
NaNEquality.ALL_EQUAL).onColumn(inputs.head._2)
}
Expand All @@ -2011,16 +2012,16 @@ trait GpuToCpuBufferTransition extends ShimUnaryExpression with CodegenFallback
override def dataType: DataType = BinaryType
}

class CpuToGpuCollectBufferConverter(
elementType: DataType) extends CpuToGpuAggregateBufferConverter {
class CpuToGpuCollectBufferConverter(elementType: DataType)
extends CpuToGpuAggregateBufferConverter {
def createExpression(child: Expression): CpuToGpuBufferTransition = {
CpuToGpuCollectBufferTransition(child, elementType)
}
}

case class CpuToGpuCollectBufferTransition(
override val child: Expression,
private val elementType: DataType) extends CpuToGpuBufferTransition {
case class CpuToGpuCollectBufferTransition(override val child: Expression,
private val elementType: DataType)
extends CpuToGpuBufferTransition {

private lazy val row = new UnsafeRow(1)

Expand All @@ -2044,8 +2045,199 @@ class GpuToCpuCollectBufferConverter extends GpuToCpuAggregateBufferConverter {
}
}

case class GpuToCpuCollectBufferTransition(
override val child: Expression) extends GpuToCpuBufferTransition {
case class GpuToCpuCollectBufferTransition(override val child: Expression)
extends GpuToCpuBufferTransition {

private lazy val projection = UnsafeProjection.create(Array(child.dataType))

override protected def nullSafeEval(input: Any): Array[Byte] = {
// Converts UnSafeArrayData into binary buffer, according to the serialize method of Collect.
// The binary buffer is the binary view of a UnsafeRow, which only contains single field
// with ArrayType of elementType. As Collect.serialize, we create an UnsafeProjection to
// transform ArrayData to binary view of the single field UnsafeRow. Unlike Collect.serialize,
// we don't have to build ArrayData from on-heap array, since the input is already formatted
// in ArrayData(UnsafeArrayData).
val arrayData = input.asInstanceOf[ArrayData]
projection.apply(InternalRow.apply(arrayData)).getBytes
}
}

case class GpuIdentity(child: Expression) extends GpuUnaryExpression {
override def prettyName: String = "identity"

override def dataType: DataType = child.dataType

override def doColumnar(input: GpuColumnVector): ColumnVector = {
TableDebug.get().debug("Final histogram", input.getBase);
input.getBase.incRefCount()
}

override def nullable: Boolean = child.nullable

// override def children: Seq[Expression] = Seq(child)

//override def canEqual(that: Any): Boolean = true

}

/**
* Compute percentile of the input number(s).
*
* The two 'offset' parameters are not used by GPU version, but are here for the compatibility
* with the CPU version and automated checks.
*/
abstract class GpuPercentile(childExprs: Seq[Expression], isReduction: Boolean)
extends GpuAggregateFunction with Serializable {
protected class CudfHistogram(override val dataType: DataType) extends CudfAggregate {
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(col: cudf.ColumnVector) => col.reduce(ReductionAggregation.histogram(), DType.LIST)
override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.histogram()
override val name: String = "CudfHistogram"
}

protected class CudfMergeHistogram(override val dataType: DataType) extends CudfAggregate {
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(col: cudf.ColumnVector) => col.reduce(ReductionAggregation.mergeHistogram(), DType.LIST)
override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHistogram()
override val name: String = "CudfMergeHistogram"
}

override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(new CudfMergeHistogram(dataType))
override lazy val evaluateExpression: Expression = {
// AggregationUtils.percentileFromHistogram()
// TODO
GpuIdentity(histogramBuff)
GpuLiteral(1.0, DoubleType)

}
private final lazy val histogramBuff: AttributeReference =
AttributeReference("histogramBuff", dataType)()

override def dataType: DataType = childExprs(1).dataType match {
case _: ArrayType => ArrayType(DoubleType, containsNull = false)
case _ => DoubleType
}
override def aggBufferAttributes: Seq[AttributeReference] = histogramBuff :: Nil
override def prettyName: String = "percentile"
override def nullable: Boolean = false

override val initialValues: Seq[Expression] = Seq(GpuLiteral.create(null, dataType))
override def children: Seq[Expression] = childExprs
}

case class GpuGetListChild(child: Expression) extends GpuExpression {


override def columnarEvalAny(batch: ColumnarBatch): Any = {
val dt = dataType
withResourceIfAllowed(child.columnarEvalAny(batch)) {
case cv: GpuColumnVector =>
withResource(cv.getBase.getChildColumnView(0)) { view =>
GpuColumnVector.from(view.copyToColumnVector(), dt)
}

case other =>
throw new IllegalArgumentException(s"Got an unexpected type out of columnarEvalAny $other")
}
}

override def columnarEval(batch: ColumnarBatch): GpuColumnVector =
GpuExpressionsUtils.resolveColumnVector(columnarEvalAny(batch), batch.numRows())

override def nullable: Boolean = child.nullable

override def dataType: DataType = child.dataType

override def children: Seq[Expression] = Seq(child)
}

/**
* Compute percentile of the input number(s).
*
* The two 'offset' parameters are not used by GPU version, but are here for the compatibility
* with the CPU version and automated checks.
*/
case class GpuPercentileDefault(childExprs: Seq[Expression], isReduction: Boolean)
extends GpuPercentile(childExprs, isReduction) {

override val inputProjection: Seq[Expression] = Seq(childExprs.head)

private lazy val histogramUpdate = new CudfHistogram(dataType)
override lazy val updateAggregates: Seq[CudfAggregate] = Seq(histogramUpdate)

override lazy val postUpdate: Seq[Expression] = {
if (isReduction) {
val reductionResult = histogramUpdate.attr
Seq(GpuGetListChild(reductionResult))
} else {
Seq(histogramUpdate.attr)
}
}
}
/**
* Compute percentile of the input number(s).
*
* The two 'offset' parameters are not used by GPU version, but are here for the compatibility
* with the CPU version and automated checks.
*/
case class GpuPercentileWithFrequency(childExprs: Seq[Expression], isReduction: Boolean)
extends GpuPercentile(childExprs, isReduction) {

override val inputProjection: Seq[Expression] = {
val childrenWithNames = GpuLiteral("value", StringType) :: childExprs.head ::
GpuLiteral("frequency", StringType) :: childExprs(2) :: Nil
GpuCreateNamedStruct(childrenWithNames) :: Nil
}
override lazy val updateAggregates: Seq[CudfAggregate] = Seq(new CudfMergeHistogram(dataType))
}

object GpuPercentile{
def apply(childExprs: Seq[Expression], isReduction: Boolean): GpuPercentile = {
val Seq(_, _, frequency) = childExprs
frequency match {
case GpuLiteral(freq, LongType) if freq == 1 =>
GpuPercentileDefault(childExprs, isReduction)
case _: Any =>
GpuPercentileWithFrequency(childExprs, isReduction)
}
}
}

class CpuToGpuPercentileBufferConverter(elementType: DataType)
extends CpuToGpuAggregateBufferConverter {
def createExpression(child: Expression): CpuToGpuBufferTransition = {
CpuToGpuPercentileBufferTransition(child, elementType)
}
}

case class CpuToGpuPercentileBufferTransition(override val child: Expression,
private val elementType: DataType)
extends CpuToGpuBufferTransition {

private lazy val row = new UnsafeRow(1)

override def dataType: DataType = ArrayType(elementType, containsNull = false)

override protected def nullSafeEval(input: Any): ArrayData = {
// Converts binary buffer into UnSafeArrayData, according to the deserialize method of Collect.
// The input binary buffer is the binary view of a UnsafeRow, which only contains single field
// with ArrayType of elementType. Since array of elements exactly matches the GPU format, we
// don't need to do any conversion in memory level. Instead, we simply bind the binary data to
// a reused UnsafeRow. Then, fetch the only field as ArrayData.
val bytes = input.asInstanceOf[Array[Byte]]
row.pointTo(bytes, bytes.length)
row.getArray(0).copy()
}
}

class GpuToCpuPercentileBufferConverter extends GpuToCpuAggregateBufferConverter {
def createExpression(child: Expression): GpuToCpuBufferTransition = {
GpuToCpuPercentileBufferTransition(child)
}
}

case class GpuToCpuPercentileBufferTransition(override val child: Expression)
extends GpuToCpuBufferTransition {

private lazy val projection = UnsafeProjection.create(Array(child.dataType))

Expand Down