From e396fb0dccef060da4e144f3b93a2b73ba8f3bcb Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 18 Jan 2024 10:02:34 +0100 Subject: [PATCH] Scope aggregate columns in SparkConnectPlanner --- .../apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 3 +++ .../scala/org/apache/spark/sql/KeyValueGroupedDataset.scala | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index dc57adc90c425..69f30b7b9da6f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2422,6 +2422,9 @@ class SparkConnectPlanner( val keyColumn = TypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes) val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq .map(expr => transformExpressionWithTypedReduceExpression(expr, input)) + // SPARK-42199: resolve these aggregate expressions only against dataAttributes + // this is to hide key column from expression resolution + .map(ScopedExpression(_, ds.dataAttributes)) .map(toNamedExpression) logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index c8abfb054ad81..31bb28c8eae59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -675,7 +675,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( val encoders = columns.map(_.encoder) val namedColumns = columns - // SPARK-42199: resolve these sort expressions only against dataAttributes + // SPARK-42199: resolve these aggregate expressions only against dataAttributes + // this is to hide key column from expression resolution .map(scopeTypedColumn(dataAttributes)) .map(_.withInputType(vExprEnc, dataAttributes).named) val keyColumn = TypedAggUtils.aggKeyColumn(kExprEnc, groupingAttributes)