Skip to content

Commit

Permalink
Scope aggregate columns in SparkConnectPlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Jul 30, 2024
1 parent 2443f8a commit c4fce40
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,9 @@ class SparkConnectPlanner(
val keyColumn = TypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq
.map(expr => transformExpressionWithTypedReduceExpression(expr, input))
// SPARK-42199: resolve these aggregate expressions only against dataAttributes
// this is to hide key column from expression resolution
.map(ScopedExpression(_, ds.dataAttributes))
.map(toNamedExpression)
logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
val encoders = columns.map(_.encoder)
val namedColumns =
columns
// SPARK-42199: resolve these sort expressions only against dataAttributes
// SPARK-42199: resolve these aggregate expressions only against dataAttributes
// this is to hide key column from expression resolution
.map(scopeTypedColumn(dataAttributes))
.map(_.withInputType(vExprEnc, dataAttributes).named)
val keyColumn = TypedAggUtils.aggKeyColumn(kExprEnc, groupingAttributes)
Expand Down

0 comments on commit c4fce40

Please sign in to comment.