Skip to content

Commit

Permalink
Return KeyGroupedPartitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Oct 9, 2023
1 parent 10b19c8 commit 84fb949
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper}
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode, SQLExecution}
Expand Down Expand Up @@ -99,8 +99,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
val keyGroupedParts = keyGroupedPartsInfo.groupedParts
KeyGroupedPartitioning(exprs, keyGroupedParts.size, keyGroupedParts.map(_.value),
keyGroupedPartsInfo.originalParts.map(_.partitionKey()))
}.getOrElse {
KeyGroupedPartitioning(exprs, inputPartitions.size)
}
.getOrElse(HashPartitioning(exprs, inputPartitions.size))
case _ =>
super.outputPartitioning
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val distribution = physical.ClusteredDistribution(
Seq(TransformExpression(BucketFunction, Seq(attr("ts")), Some(32))))

checkQueryPlan(df, distribution, physical.HashPartitioning(distribution.clustering, 32))
checkQueryPlan(df, distribution, physical.KeyGroupedPartitioning(distribution.clustering, 32))
}
}

Expand Down Expand Up @@ -175,7 +175,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val distribution = physical.ClusteredDistribution(
Seq(TransformExpression(BucketFunction, Seq(attr("ts")), Some(32))))

checkQueryPlan(df, distribution, physical.HashPartitioning(distribution.clustering, 32))
checkQueryPlan(df, distribution, physical.KeyGroupedPartitioning(distribution.clustering, 32))
}
}

Expand Down

0 comments on commit 84fb949

Please sign in to comment.