Skip to content

Commit cfb1706

Browse files
fuwhucloud-fan
authored andcommitted
[SPARK-15616][SQL] Add optimizer rule PruneHiveTablePartitions
### What changes were proposed in this pull request? Add optimizer rule PruneHiveTablePartitions pruning hive table partitions based on filters on partition columns. Doing so, the total size of pruned partitions may be small enough for broadcast join in JoinSelection strategy. ### Why are the changes needed? In JoinSelection strategy, spark use the "plan.stats.sizeInBytes" to decide whether the plan is suitable for broadcast join. Currently, "plan.stats.sizeInBytes" does not take "pruned partitions" into account, so it may miss some broadcast join and take sort-merge join instead, which will definitely impact join performance. This PR aim at taking "pruned partitions" into account for hive table in "plan.stats.sizeInBytes" and then improve performance by using broadcast join if possible. ### Does this PR introduce any user-facing change? no ### How was this patch tested? Added unit tests. This is based on #25919, credits should go to lianhuiwang and advancedxy. Closes #26805 from fuwhu/SPARK-15616. Authored-by: fuwhu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent ff39c92 commit cfb1706

File tree

7 files changed

+193
-8
lines changed

7 files changed

+193
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,7 +651,9 @@ case class HiveTableRelation(
651651
tableMeta: CatalogTable,
652652
dataCols: Seq[AttributeReference],
653653
partitionCols: Seq[AttributeReference],
654-
tableStats: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {
654+
tableStats: Option[Statistics] = None,
655+
@transient prunedPartitions: Option[Seq[CatalogTablePartition]] = None)
656+
extends LeafNode with MultiInstanceRelation {
655657
assert(tableMeta.identifier.database.isDefined)
656658
assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
657659
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2424
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
2525
import org.apache.spark.sql.catalyst.rules.Rule
26-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan, FileTable}
26+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan}
2727
import org.apache.spark.sql.types.StructType
2828

2929
private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark {
8484
queryRelations.add(alias.identifier)
8585
case LogicalRelation(_, _, Some(catalogTable), _) =>
8686
queryRelations.add(catalogTable.identifier.table)
87-
case HiveTableRelation(tableMeta, _, _, _) =>
87+
case HiveTableRelation(tableMeta, _, _, _, _) =>
8888
queryRelations.add(tableMeta.identifier.table)
8989
case _ =>
9090
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ import org.apache.spark.annotation.Unstable
2121
import org.apache.spark.sql._
2222
import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog}
2323
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
24+
import org.apache.spark.sql.catalyst.optimizer.Optimizer
2425
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2526
import org.apache.spark.sql.catalyst.rules.Rule
26-
import org.apache.spark.sql.execution.SparkPlanner
27+
import org.apache.spark.sql.execution.{SparkOptimizer, SparkPlanner}
2728
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
2829
import org.apache.spark.sql.execution.datasources._
2930
import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck
3031
import org.apache.spark.sql.hive.client.HiveClient
32+
import org.apache.spark.sql.hive.execution.PruneHiveTablePartitions
3133
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
3234

3335
/**
@@ -93,6 +95,20 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
9395
customCheckRules
9496
}
9597

98+
/**
99+
* Logical query plan optimizer that takes into account Hive.
100+
*/
101+
override protected def optimizer: Optimizer = {
102+
new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
103+
override def postHocOptimizationBatches: Seq[Batch] = Seq(
104+
Batch("Prune Hive Table Partitions", Once, new PruneHiveTablePartitions(session))
105+
)
106+
107+
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
108+
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
109+
}
110+
}
111+
96112
/**
97113
* Planner that takes into account Hive-specific strategies.
98114
*/

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,14 +166,14 @@ case class HiveTableScanExec(
166166
@transient lazy val rawPartitions = {
167167
val prunedPartitions =
168168
if (sparkSession.sessionState.conf.metastorePartitionPruning &&
169-
partitionPruningPred.size > 0) {
169+
partitionPruningPred.nonEmpty) {
170170
// Retrieve the original attributes based on expression ID so that capitalization matches.
171171
val normalizedFilters = partitionPruningPred.map(_.transform {
172172
case a: AttributeReference => originalAttributes(a)
173173
})
174-
sparkSession.sessionState.catalog.listPartitionsByFilter(
175-
relation.tableMeta.identifier,
176-
normalizedFilters)
174+
relation.prunedPartitions.getOrElse(
175+
sparkSession.sessionState.catalog
176+
.listPartitionsByFilter(relation.tableMeta.identifier, normalizedFilters))
177177
} else {
178178
sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier)
179179
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import org.apache.hadoop.hive.common.StatsSetupConst
21+
22+
import org.apache.spark.sql.SparkSession
23+
import org.apache.spark.sql.catalyst.analysis.CastSupport
24+
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils, HiveTableRelation}
25+
import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, SubqueryExpression}
26+
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
27+
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
28+
import org.apache.spark.sql.catalyst.rules.Rule
29+
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
30+
import org.apache.spark.sql.internal.SQLConf
31+
32+
/**
33+
* TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source.
34+
*/
35+
private[sql] class PruneHiveTablePartitions(session: SparkSession)
36+
extends Rule[LogicalPlan] with CastSupport {
37+
38+
override val conf: SQLConf = session.sessionState.conf
39+
40+
/**
41+
* Extract the partition filters from the filters on the table.
42+
*/
43+
private def getPartitionKeyFilters(
44+
filters: Seq[Expression],
45+
relation: HiveTableRelation): ExpressionSet = {
46+
val normalizedFilters = DataSourceStrategy.normalizeExprs(
47+
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output)
48+
val partitionColumnSet = AttributeSet(relation.partitionCols)
49+
ExpressionSet(normalizedFilters.filter { f =>
50+
!f.references.isEmpty && f.references.subsetOf(partitionColumnSet)
51+
})
52+
}
53+
54+
/**
55+
* Prune the hive table using filters on the partitions of the table.
56+
*/
57+
private def prunePartitions(
58+
relation: HiveTableRelation,
59+
partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
60+
if (conf.metastorePartitionPruning) {
61+
session.sessionState.catalog.listPartitionsByFilter(
62+
relation.tableMeta.identifier, partitionFilters.toSeq)
63+
} else {
64+
ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
65+
session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
66+
partitionFilters.toSeq, conf.sessionLocalTimeZone)
67+
}
68+
}
69+
70+
/**
71+
* Update the statistics of the table.
72+
*/
73+
private def updateTableMeta(
74+
tableMeta: CatalogTable,
75+
prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
76+
val sizeOfPartitions = prunedPartitions.map { partition =>
77+
val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
78+
val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
79+
if (rawDataSize.isDefined && rawDataSize.get > 0) {
80+
rawDataSize.get
81+
} else if (totalSize.isDefined && totalSize.get > 0L) {
82+
totalSize.get
83+
} else {
84+
0L
85+
}
86+
}
87+
if (sizeOfPartitions.forall(_ > 0)) {
88+
val sizeInBytes = sizeOfPartitions.sum
89+
tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
90+
} else {
91+
tableMeta
92+
}
93+
}
94+
95+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
96+
case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation)
97+
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
98+
val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
99+
if (partitionKeyFilters.nonEmpty) {
100+
val newPartitions = prunePartitions(relation, partitionKeyFilters)
101+
val newTableMeta = updateTableMeta(relation.tableMeta, newPartitions)
102+
val newRelation = relation.copy(
103+
tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
104+
// Keep partition filters so that they are visible in physical planning
105+
Project(projections, Filter(filters.reduceLeft(And), newRelation))
106+
} else {
107+
op
108+
}
109+
}
110+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import org.apache.spark.sql.QueryTest
21+
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
24+
import org.apache.spark.sql.hive.test.TestHiveSingleton
25+
import org.apache.spark.sql.test.SQLTestUtils
26+
27+
class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
28+
29+
object Optimize extends RuleExecutor[LogicalPlan] {
30+
val batches =
31+
Batch("PruneHiveTablePartitions", Once,
32+
EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
33+
}
34+
35+
test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") {
36+
withTable("test", "temp") {
37+
sql(
38+
s"""
39+
|CREATE TABLE test(i int)
40+
|PARTITIONED BY (p int)
41+
|STORED AS textfile""".stripMargin)
42+
spark.range(0, 1000, 1).selectExpr("id as col")
43+
.createOrReplaceTempView("temp")
44+
45+
for (part <- Seq(1, 2, 3, 4)) {
46+
sql(
47+
s"""
48+
|INSERT OVERWRITE TABLE test PARTITION (p='$part')
49+
|select col from temp""".stripMargin)
50+
}
51+
val analyzed1 = sql("select i from test where p > 0").queryExecution.analyzed
52+
val analyzed2 = sql("select i from test where p = 1").queryExecution.analyzed
53+
assert(Optimize.execute(analyzed1).stats.sizeInBytes / 4 ===
54+
Optimize.execute(analyzed2).stats.sizeInBytes)
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)