Skip to content

Commit 2080c21

Browse files
Z1Wupan3793
authored andcommitted
[KYUUBI #6990] Add rebalance before InsertIntoHiveDirCommand and InsertIntoDataSourceDirCommand to align with behaviors of hive
### Why are the changes needed? When users switch from Hive to Spark, for sql like INSERT OVERWRITE DIRECTORY AS SELECT, it would be great if small files could be automatically merged through simple configuration, just like in Hive. ### How was this patch tested? UnitTest ### Was this patch authored or co-authored using generative AI tooling? No Closes #6991 from Z1Wu/feat/add_insert_dir_rebalance_support. Closes #6990 2820bb2 [wuziyi] [fix] nit a69c041 [wuziyi] [fix] nit 951a773 [wuziyi] [fix] nit f75dfcb [wuziyi] [Feat] add rebalance before InsertIntoHiveDirCommand and InsertIntoDataSourceDirCommand to align with behaviors of hive Authored-by: wuziyi <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent 338206e commit 2080c21

File tree

6 files changed

+150
-9
lines changed

6 files changed

+150
-9
lines changed

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.kyuubi.sql
2020
import org.apache.spark.sql.catalyst.expressions.Attribute
2121
import org.apache.spark.sql.catalyst.plans.logical._
2222
import org.apache.spark.sql.catalyst.rules.Rule
23-
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
23+
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, InsertIntoDataSourceDirCommand}
2424
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
25-
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
25+
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
2626
import org.apache.spark.sql.internal.StaticSQLConf
2727

2828
trait RepartitionBuilder extends Rule[LogicalPlan] with RepartitionBeforeWriteHelper {
@@ -59,6 +59,10 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder
5959
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
6060
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
6161

62+
case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
63+
if query.resolved && canInsertRepartitionByExpression(query) =>
64+
i.copy(query = buildRepartition(Seq.empty, query))
65+
6266
case u @ Union(children, _, _) =>
6367
u.copy(children = children.map(addRepartition))
6468

@@ -101,6 +105,10 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
101105
query.output.filter(attr => table.partitionColumnNames.contains(attr.name))
102106
c.copy(query = buildRepartition(dynamicPartitionColumns, query))
103107

108+
case c @ InsertIntoHiveDirCommand(_, _, query, _, _)
109+
if query.resolved && canInsertRepartitionByExpression(query) =>
110+
c.copy(query = buildRepartition(Seq.empty, query))
111+
104112
case u @ Union(children, _, _) =>
105113
u.copy(children = children.map(addRepartition))
106114

extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.sql
1919

2020
import org.apache.spark.sql.catalyst.expressions.Attribute
2121
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, Sort}
22-
import org.apache.spark.sql.execution.command.DataWritingCommand
22+
import org.apache.spark.sql.execution.command.{DataWritingCommand, InsertIntoDataSourceDirCommand}
2323
import org.apache.spark.sql.hive.HiveUtils
24-
import org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
24+
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, OptimizedCreateHiveTableAsSelectCommand}
2525

2626
import org.apache.kyuubi.sql.KyuubiSQLConf
2727

@@ -272,4 +272,42 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
272272
}
273273
}
274274
}
275+
276+
test("Test rebalance in InsertIntoHiveDirCommand") {
277+
withSQLConf(
278+
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
279+
HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
280+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
281+
withTempDir(tmpDir => {
282+
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
283+
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
284+
s"STORED AS PARQUET SELECT * FROM tmp_table")
285+
val insertHiveDirCommand = df.queryExecution.analyzed.collect {
286+
case _: InsertIntoHiveDirCommand => true
287+
}
288+
assert(insertHiveDirCommand.size == 1)
289+
val repartition = df.queryExecution.analyzed.collect {
290+
case _: RebalancePartitions => true
291+
}
292+
assert(repartition.size == 1)
293+
})
294+
}
295+
}
296+
297+
test("Test rebalance in InsertIntoDataSourceDirCommand") {
298+
withSQLConf(
299+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
300+
withTempDir(tmpDir => {
301+
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
302+
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
303+
s"USING PARQUET SELECT * FROM tmp_table")
304+
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
305+
val repartition =
306+
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect {
307+
case _: RebalancePartitions => true
308+
}
309+
assert(repartition.size == 1)
310+
})
311+
}
312+
}
275313
}

extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ package org.apache.kyuubi.sql
2020
import org.apache.spark.sql.catalyst.expressions.Attribute
2121
import org.apache.spark.sql.catalyst.plans.logical._
2222
import org.apache.spark.sql.catalyst.rules.Rule
23+
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
2324
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
24-
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
25+
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
2526
import org.apache.spark.sql.internal.StaticSQLConf
2627

2728
trait RepartitionBuilder extends Rule[LogicalPlan] with RepartitionBeforeWriteHelper {
@@ -52,6 +53,10 @@ abstract class RepartitionBeforeWritingDatasourceBase extends RepartitionBuilder
5253
val dynamicPartitionColumns = pc.filterNot(attr => sp.contains(attr.name))
5354
i.copy(query = buildRepartition(dynamicPartitionColumns, query))
5455

56+
case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
57+
if query.resolved && canInsertRepartitionByExpression(query) =>
58+
i.copy(query = buildRepartition(Seq.empty, query))
59+
5560
case u @ Union(children, _, _) =>
5661
u.copy(children = children.map(addRepartition))
5762

@@ -82,6 +87,10 @@ abstract class RepartitionBeforeWritingHiveBase extends RepartitionBuilder {
8287
.flatMap(name => query.output.find(_.name == name)).toSeq
8388
i.copy(query = buildRepartition(dynamicPartitionColumns, query))
8489

90+
case i @ InsertIntoHiveDirCommand(_, _, query, _, _)
91+
if query.resolved && canInsertRepartitionByExpression(query) =>
92+
i.copy(query = buildRepartition(Seq.empty, query))
93+
8594
case u @ Union(children, _, _) =>
8695
u.copy(children = children.map(addRepartition))
8796

extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package org.apache.spark.sql
1919

2020
import org.apache.spark.sql.catalyst.expressions.Attribute
2121
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, Sort}
22-
import org.apache.spark.sql.execution.command.DataWritingCommand
22+
import org.apache.spark.sql.execution.command.{DataWritingCommand, InsertIntoDataSourceDirCommand}
2323
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
2424
import org.apache.spark.sql.hive.HiveUtils
25-
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
25+
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
2626

2727
import org.apache.kyuubi.sql.KyuubiSQLConf
2828

@@ -267,4 +267,42 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
267267
}
268268
}
269269
}
270+
271+
test("Test rebalance in InsertIntoHiveDirCommand") {
272+
withSQLConf(
273+
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
274+
HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
275+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
276+
withTempDir(tmpDir => {
277+
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
278+
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
279+
s"STORED AS PARQUET SELECT * FROM tmp_table")
280+
val insertHiveDirCommand = df.queryExecution.analyzed.collect {
281+
case _: InsertIntoHiveDirCommand => true
282+
}
283+
assert(insertHiveDirCommand.size == 1)
284+
val repartition = df.queryExecution.analyzed.collect {
285+
case _: RebalancePartitions => true
286+
}
287+
assert(repartition.size == 1)
288+
})
289+
}
290+
}
291+
292+
test("Test rebalance in InsertIntoDataSourceDirCommand") {
293+
withSQLConf(
294+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
295+
withTempDir(tmpDir => {
296+
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
297+
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
298+
s"USING PARQUET SELECT * FROM tmp_table")
299+
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
300+
val repartition =
301+
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect {
302+
case _: RebalancePartitions => true
303+
}
304+
assert(repartition.size == 1)
305+
})
306+
}
307+
}
270308
}

extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
2424
import org.apache.spark.sql.catalyst.plans.logical._
2525
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
2627
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
27-
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
28+
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
2829
import org.apache.spark.sql.internal.StaticSQLConf
2930

3031
trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
@@ -112,6 +113,10 @@ case class RebalanceBeforeWritingDatasource(session: SparkSession)
112113
val dynamicPartitionColumns = pc.filterNot(attr => sp.contains(attr.name))
113114
i.copy(query = buildRebalance(dynamicPartitionColumns, query))
114115

116+
case i @ InsertIntoDataSourceDirCommand(_, _, query, _)
117+
if query.resolved && canInsertRebalance(query) =>
118+
i.copy(query = buildRebalance(Seq.empty, query))
119+
115120
case u @ Union(children, _, _) =>
116121
u.copy(children = children.map(addRebalance))
117122

@@ -144,6 +149,10 @@ case class RebalanceBeforeWritingHive(session: SparkSession)
144149
.flatMap(name => query.output.find(_.name == name)).toSeq
145150
i.copy(query = buildRebalance(dynamicPartitionColumns, query))
146151

152+
case i @ InsertIntoHiveDirCommand(_, _, query, _, _)
153+
if query.resolved && canInsertRebalance(query) =>
154+
i.copy(query = buildRebalance(Seq.empty, query))
155+
147156
case u @ Union(children, _, _) =>
148157
u.copy(children = children.map(addRebalance))
149158

extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ package org.apache.spark.sql
2020
import org.apache.spark.sql.catalyst.expressions.Attribute
2121
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, RebalancePartitions, Sort}
2222
import org.apache.spark.sql.execution.command.DataWritingCommand
23+
import org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand
2324
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
2425
import org.apache.spark.sql.hive.HiveUtils
25-
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
26+
import org.apache.spark.sql.hive.execution.{InsertIntoHiveDirCommand, InsertIntoHiveTable}
2627

2728
import org.apache.kyuubi.sql.KyuubiSQLConf
2829

@@ -292,4 +293,42 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
292293
}
293294
}
294295
}
296+
297+
test("Test rebalance in InsertIntoHiveDirCommand") {
298+
withSQLConf(
299+
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
300+
HiveUtils.CONVERT_METASTORE_CTAS.key -> "false",
301+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
302+
withTempDir(tmpDir => {
303+
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
304+
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
305+
s"STORED AS PARQUET SELECT * FROM tmp_table")
306+
val insertHiveDirCommand = df.queryExecution.analyzed.collect {
307+
case _: InsertIntoHiveDirCommand => true
308+
}
309+
assert(insertHiveDirCommand.size == 1)
310+
val repartition = df.queryExecution.analyzed.collect {
311+
case _: RebalancePartitions => true
312+
}
313+
assert(repartition.size == 1)
314+
})
315+
}
316+
}
317+
318+
test("Test rebalance in InsertIntoDataSourceDirCommand") {
319+
withSQLConf(
320+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> "true") {
321+
withTempDir(tmpDir => {
322+
spark.range(0, 1000, 1, 10).createOrReplaceTempView("tmp_table")
323+
val df = sql(s"INSERT OVERWRITE DIRECTORY '${tmpDir.getPath}' " +
324+
s"USING PARQUET SELECT * FROM tmp_table")
325+
assert(df.queryExecution.analyzed.isInstanceOf[InsertIntoDataSourceDirCommand])
326+
val repartition =
327+
df.queryExecution.analyzed.asInstanceOf[InsertIntoDataSourceDirCommand].query.collect {
328+
case _: RebalancePartitions => true
329+
}
330+
assert(repartition.size == 1)
331+
})
332+
}
333+
}
295334
}

0 commit comments

Comments
 (0)