Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
child
case (child, BroadcastDistribution(mode)) =>
BroadcastExchangeExec(mode, child)
case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) =>
ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child)
Comment on lines +58 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This considers a special case for OrderedDistribution. Generally, if ShuffleExchangeExec is followed by any unsatisfying distribution , we should always trim the ShuffleExchangeExec and apply the partitioning of distribution. Don't we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound reasonable. Any suitable cases?

Copy link
Member

@viirya viirya Dec 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried few possible cases, but can not have a concrete case like this. Maybe this is the only case possibly. So I think this should be fine.

case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
.getOrElse(defaultNumPreShufflePartitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {
def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
// Range has range partitioning in its output now. To have a range shuffle, we
// need to run a repartition first.
val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc)
val data = spark.range(0, n, 1, 10).sort($"id".desc)
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
Expand All @@ -55,12 +53,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession {

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)
assert(computeChiSquareTest() < 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the physical plan is same as before, what caused this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not same, we had two shuffles before, one was RoundRobinPartitioning, the other was RangePartitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see


withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
// If we only sample one point, the range boundaries will be pretty bad and the
// chi-sq value would be very high.
assert(computeChiSquareTest() > 300)
assert(computeChiSquareTest() > 100)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,52 @@ class PlannerSuite extends SharedSparkSession {
}
}

test("SPARK-30036: Romove unnecessary RoundRobinPartitioning " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Romove -> Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"if SortExec is followed by RoundRobinPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = RoundRobinPartitioning(5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: RoundRobinPartitioning, _, _) => true
case _ => false
}.isEmpty,
"RoundRobinPartitioning should be changed to RangePartitioning")

val query = testData.select('key, 'value).repartition(2).sort('key.asc)
assert(query.rdd.getNumPartitions == 2)
assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 50))
}

test("SPARK-30036: Romove unnecessary HashPartitioning " +
"if SortExec is followed by HashPartitioning") {
val distribution = OrderedDistribution(SortOrder(Literal(1), Ascending) :: Nil)
val partitioning = HashPartitioning(Literal(1) :: Nil, 5)
assert(!partitioning.satisfies(distribution))

val inputPlan = SortExec(SortOrder(Literal(1), Ascending) :: Nil,
global = true,
child = ShuffleExchangeExec(
partitioning,
DummySparkPlan(outputPartitioning = partitioning)))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assert(outputPlan.find {
case ShuffleExchangeExec(_: HashPartitioning, _, _) => true
case _ => false
}.isEmpty,
"HashPartitioning should be changed to RangePartitioning")

val query = testData.select('key, 'value).repartition(5, 'key).sort('key.asc)
assert(query.rdd.getNumPartitions == 5)
assert(query.rdd.collectPartitions()(0).map(_.get(0)).toSeq == (1 to 20))
}

test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val partitioning = HashPartitioning(Literal(2) :: Nil, 5)
Expand Down