Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change AQE default minPartitionNum to defaultParallelism #674

Merged
merged 6 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,12 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution. " +
"If not set, the default value is the default parallelism of the Spark cluster.")
.intConf
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)
.createOptional

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
Expand Down Expand Up @@ -1830,9 +1831,6 @@ class SQLConf extends Serializable with Logging {

def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, subqueryCache),
ReduceNumShufflePartitions(conf),
ReduceNumShufflePartitions(queryExecution.sparkSession),
CollapseCodegenStages(conf)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration

import org.apache.spark.MapOutputStatistics
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
Expand Down Expand Up @@ -51,7 +53,8 @@ import org.apache.spark.util.ThreadUtils
* - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
* - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
*/
case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
case class ReduceNumShufflePartitions(session: SparkSession) extends Rule[SparkPlan] {
private def conf = session.sessionState.conf

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.reducePostShufflePartitionsEnabled) {
Expand Down Expand Up @@ -88,7 +91,14 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
val distinctNumPreShufflePartitions =
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
// is not set, so to avoid perf regressions compared to no coalescing.
val minPartitionNum = conf.getConf(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
.getOrElse(session.sparkContext.defaultParallelism)
val partitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices(
validMetrics.toArray,
conf.targetPostShuffleInputSize,
minPartitionNum)
// This transformation adds new nodes, so we must use `transformUp` here.
plan.transformUp {
// even for shuffle exchange whose input RDD has 0 partition, we should still update its
Expand All @@ -102,19 +112,20 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
}

object ReduceNumShufflePartitions extends Logging {
/**
* Estimates partition start indices for post-shuffle partitions based on
* mapOutputStatistics provided by all pre-shuffle stages.
*/
// visible for testing.
private[sql] def estimatePartitionStartIndices(
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
// value less than advisoryTargetPostShuffleInputSize as the target input size of
// a post shuffle task.
mapOutputStatistics: Array[MapOutputStatistics],
advisoryTargetPostShuffleInputSize: Long,
minNumPostShufflePartitions: Int): Array[Int] = {
// If `minNumPostShufflePartitions` is very large, it is possible that we need to use a value
// less than `advisoryTargetPostShuffleInputSize` as the target size of a coalesced task.
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
// The max at here is to make sure that when we have an empty table, we
// only have a single post-shuffle partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,87 +52,79 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
}

private def checkEstimation(
rule: ReduceNumShufflePartitions,
bytesByPartitionIdArray: Array[Array[Long]],
expectedPartitionStartIndices: Array[Int]): Unit = {
expectedPartitionStartIndices: Seq[Int],
targetSize: Long,
minNumPartitions: Int = 1): Unit = {
val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
case (bytesByPartitionId, index) =>
new MapOutputStatistics(index, bytesByPartitionId)
}
val estimatedPartitionStartIndices =
rule.estimatePartitionStartIndices(mapOutputStatistics)
val estimatedPartitionStartIndices = ReduceNumShufflePartitions.estimatePartitionStartIndices(
mapOutputStatistics,
targetSize,
minNumPartitions)
assert(estimatedPartitionStartIndices === expectedPartitionStartIndices)
}

private def createReduceNumShufflePartitionsRule(
advisoryTargetPostShuffleInputSize: Long,
minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = {
val conf = new SQLConf().copy(
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize,
SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions)
ReduceNumShufflePartitions(conf)
}

test("test estimatePartitionStartIndices - 1 Exchange") {
val rule = createReduceNumShufflePartitionsRule(100L)
val targetSize = 100

{
// All bytes per partition are 0.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// Some bytes per partition are 0 and total size is less than the target size.
// 1 post-shuffle partition is needed.
val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// 2 post-shuffle partitions are needed.
val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
val expectedPartitionStartIndices = Array[Int](0, 3)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// There are a few large pre-shuffle partitions.
val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// All pre-shuffle partitions are larger than the targeted size.
val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}

{
// The last pre-shuffle partition is in a single post-shuffle partition.
val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
val expectedPartitionStartIndices = Array[Int](0, 4)
checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
checkEstimation(Array(bytesByPartitionId), expectedPartitionStartIndices, targetSize)
}
}

test("test estimatePartitionStartIndices - 2 Exchanges") {
val rule = createReduceNumShufflePartitionsRule(100L)
val targetSize = 100

{
// If there are multiple values of the number of pre-shuffle partitions,
// we should see an assertion error.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0, 0)
val mapOutputStatistics =
Array(
new MapOutputStatistics(0, bytesByPartitionId1),
new MapOutputStatistics(1, bytesByPartitionId2))
intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics))
intercept[AssertionError]{
checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize)
}
}

{
Expand All @@ -141,9 +133,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -153,9 +145,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -164,9 +156,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 2, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -175,9 +167,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -186,9 +178,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -197,9 +189,9 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}

{
Expand All @@ -208,14 +200,15 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize)
}
}

test("test estimatePartitionStartIndices and enforce minimal number of reducers") {
val rule = createReduceNumShufflePartitionsRule(100L, 2)
val targetSize = 100
val minNumPartitions = 2

{
// The minimal number of post-shuffle partitions is not enforced because
Expand All @@ -224,9 +217,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize,
minNumPartitions)
}

{
Expand All @@ -235,9 +229,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
val expectedPartitionStartIndices = Array[Int](0, 3)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize,
minNumPartitions)
}

{
Expand All @@ -246,9 +241,10 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
checkEstimation(
rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
expectedPartitionStartIndices,
targetSize,
minNumPartitions)
}
}

Expand All @@ -268,7 +264,8 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
def withSparkSession(
f: SparkSession => Unit,
targetPostShuffleInputSize: Int,
minNumPostShufflePartitions: Option[Int]): Unit = {
minNumPostShufflePartitions: Option[Int],
unsetMinMax: Boolean = false): Unit = {
val sparkConf =
new SparkConf(false)
.setMaster("local[*]")
Expand All @@ -288,6 +285,11 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
}

if (unsetMinMax) {
sparkConf.remove(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key)
sparkConf.remove(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key)
}

val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
Expand Down Expand Up @@ -523,6 +525,38 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
}
}

// TODO(rshkv): Remove after taking SPARK-31124 (#676)
test("number of reducers is lower-bound by default parallelism without configured minimum") {
val test = { spark: SparkSession =>
val df =
spark
.range(0, 1000, 1, numInputPartitions)
.selectExpr("id % 20 as key", "id as value")
val agg = df.groupBy("key").count()

agg.collect()

val finalPlan = agg.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val shuffleReaders = finalPlan.collect {
case reader: CoalescedShuffleReaderExec => reader
}

assert(shuffleReaders.length === 1)
shuffleReaders.foreach { reader =>
// Assert that there is no configured minimum
assert(!spark.conf.contains(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key))
// The number of output partitions will be slightly larger than defaultParallelism because
// input partitions don't exactly fit. Key here is that have more than one partition.
assert(reader.outputPartitioning.numPartitions >= spark.sparkContext.defaultParallelism)
}
}
// Pick an advisory partition size such that we'd have one partition
// if min = defaultParallelism didn't work
val targetSizeForOnePartition = 1000000000
withSparkSession(test, targetSizeForOnePartition, None, unsetMinMax = true)
}

test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
val test: SparkSession => Unit = { spark: SparkSession =>
spark.sql("SET spark.sql.exchange.reuse=true")
Expand Down
Loading