Skip to content

Commit 9e937c5

Browse files
committed
[SPARK-32620][SQL] Reset the numPartitions metric when DPP is enabled
### What changes were proposed in this pull request? This pr reset the `numPartitions` metric when DPP is enabled. Otherwise, it is always a [static value](https://github.com/apache/spark/blob/18cac6a9f0bf4a6d449393f1ee84004623b3c893/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L215). ### Why are the changes needed? Fix metric issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and manual test For [this test case](https://github.com/apache/spark/blob/18cac6a9f0bf4a6d449393f1ee84004623b3c893/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala#L252-L280). Before this pr: ![image](https://user-images.githubusercontent.com/5399861/90301798-9310b480-ded4-11ea-9294-49bcaba46f83.png) After this pr: ![image](https://user-images.githubusercontent.com/5399861/90301709-0fef5e80-ded4-11ea-942d-4d45d1dd15bc.png) Closes #29436 from wangyum/SPARK-32620. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 1354cf0) Signed-off-by: Yuming Wang <[email protected]>
1 parent 68ff809 commit 9e937c5

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ case class FileSourceScanExec(
239239
setFilesNumAndSizeMetric(ret, false)
240240
val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
241241
driverMetrics("pruningTime") = timeTakenMs
242+
driverMetrics("numPartitions") = ret.length
242243
ret
243244
} else {
244245
selectedPartitions

sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,8 @@ abstract class DynamicPartitionPruningSuiteBase
319319
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
320320
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
321321
withTable("fact", "dim") {
322+
val numPartitions = 10
323+
322324
spark.range(10)
323325
.map { x => Tuple3(x, x + 1, 0) }
324326
.toDF("did", "d1", "d2")
@@ -328,7 +330,7 @@ abstract class DynamicPartitionPruningSuiteBase
328330
.saveAsTable("dim")
329331

330332
spark.range(100)
331-
.map { x => Tuple2(x, x % 10) }
333+
.map { x => Tuple2(x, x % numPartitions) }
332334
.toDF("f1", "fid")
333335
.write.partitionBy("fid")
334336
.format(tableFormat)
@@ -355,6 +357,7 @@ abstract class DynamicPartitionPruningSuiteBase
355357
assert(!scan1.metrics.contains("staticFilesSize"))
356358
val allFilesNum = scan1.metrics("numFiles").value
357359
val allFilesSize = scan1.metrics("filesSize").value
360+
assert(scan1.metrics("numPartitions").value === numPartitions)
358361

359362
// No dynamic partition pruning, so no static metrics
360363
// Only files from fid = 5 partition are scanned
@@ -367,6 +370,7 @@ abstract class DynamicPartitionPruningSuiteBase
367370
val partFilesSize = scan2.metrics("filesSize").value
368371
assert(0 < partFilesNum && partFilesNum < allFilesNum)
369372
assert(0 < partFilesSize && partFilesSize < allFilesSize)
373+
assert(scan2.metrics("numPartitions").value === 1)
370374

371375
// Dynamic partition pruning is used
372376
// Static metrics are as-if reading the whole fact table
@@ -378,6 +382,7 @@ abstract class DynamicPartitionPruningSuiteBase
378382
assert(scan3.metrics("staticFilesSize").value == allFilesSize)
379383
assert(scan3.metrics("numFiles").value == partFilesNum)
380384
assert(scan3.metrics("filesSize").value == partFilesSize)
385+
assert(scan3.metrics("numPartitions").value === 1)
381386
}
382387
}
383388
}

0 commit comments

Comments
 (0)