Skip to content

Commit

Permalink
Scope aggregate columns in SparkConnectPlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Jan 18, 2024
1 parent 682c7a3 commit e396fb0
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,9 @@ class SparkConnectPlanner(
val keyColumn = TypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq
.map(expr => transformExpressionWithTypedReduceExpression(expr, input))
// SPARK-42199: resolve these aggregate expressions only against dataAttributes
// this is to hide key column from expression resolution
.map(ScopedExpression(_, ds.dataAttributes))
.map(toNamedExpression)
logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
val encoders = columns.map(_.encoder)
val namedColumns =
columns
// SPARK-42199: resolve these sort expressions only against dataAttributes
// SPARK-42199: resolve these aggregate expressions only against dataAttributes
// this is to hide key column from expression resolution
.map(scopeTypedColumn(dataAttributes))
.map(_.withInputType(vExprEnc, dataAttributes).named)
val keyColumn = TypedAggUtils.aggKeyColumn(kExprEnc, groupingAttributes)
Expand Down

0 comments on commit e396fb0

Please sign in to comment.