Skip to content

Commit bb66898

Browse files
authored
[VARIANT] Fix shredding-related test failures and respect variant shredding table property (#5838)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR introduces three key fixes to enable Delta-Spark to properly and legally work with the variant shredding feature in Spark 4.1. 1. This PR fixes delta column mapping when variant pushdown is enabled (i.e. `spark.sql.variant.pushVariantIntoScan = true` which is the default in Spark 4.1). This is in SchemaMergingUtils.scala. 2. This PR fixes data skipping based on null count on variant data when variant pushdown is enabled. This fix is in DataSkippingReader.scala. 3. This PR adds a fix where Delta-Spark respects the `delta.enableVariantShredding` table property when writing shredded files into delta tables. Before this fix, Spark 4.1 would always write shredded files regardless of the table property since the `spark.sql.variant.inferShreddingSchema` config is set to true. This fix is essential since it is illegal to write shredded files into delta files when the table property is disabled. ## How was this patch tested? 1. Re-enable skipped tests related to column mapping and data skipping that failed because `spark.sql.variant.pushVariantIntoScan = true` by default in Spark 4.1. 2. Test that shredded writes are performed based on the `delta.enableVariantShredding` table property in `DeltaVariantShreddingSuite.scala` ## Does this PR introduce _any_ user-facing changes? Yes, now Delta-Spark can properly work with shredded reads and writes.
1 parent c2e10a5 commit bb66898

File tree

14 files changed

+390
-72
lines changed

14 files changed

+390
-72
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.shims
18+
19+
/**
20+
* Shim for variant shredding configs to handle API changes between Spark versions.
21+
* In Spark 4.0, VARIANT_INFER_SHREDDING_SCHEMA config does not exist.
22+
*
23+
* This shim provides a way to conditionally add the config to the options map
24+
* when writing files.
25+
*/
26+
object VariantShreddingShims {
27+
/**
28+
* Returns a Map containing variant shredding related configs for file writing.
29+
* In Spark 4.0, this returns an empty map since the config doesn't exist.
30+
*/
31+
def getVariantInferShreddingSchemaOptions(enableVariantShredding: Boolean)
32+
: Map[String, String] = {
33+
// In Spark 4.0, VARIANT_INFER_SHREDDING_SCHEMA does not exist, so return empty map
34+
Map.empty[String, String]
35+
}
36+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.shims
18+
19+
import org.apache.spark.sql.internal.SQLConf
20+
21+
/**
22+
* Shim for variant shredding configs to handle API changes between Spark versions.
23+
* In Spark 4.1, VARIANT_INFER_SHREDDING_SCHEMA config exists.
24+
*
25+
* This shim provides a way to conditionally add the config to the options map
26+
* when writing files.
27+
*/
28+
object VariantShreddingShims {
29+
/**
30+
* Returns a Map containing variant shredding related configs for file writing.
31+
* In Spark 4.1, this returns the VARIANT_INFER_SHREDDING_SCHEMA config.
32+
*/
33+
def getVariantInferShreddingSchemaOptions(enableVariantShredding: Boolean)
34+
: Map[String, String] = {
35+
Map(SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> enableVariantShredding.toString)
36+
}
37+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.shims
18+
19+
import org.apache.spark.sql.internal.SQLConf
20+
21+
/**
22+
* Shim for variant shredding configs to handle API changes between Spark versions.
23+
* In Spark 4.2, VARIANT_INFER_SHREDDING_SCHEMA config exists.
24+
*
25+
* This shim provides a way to conditionally add the config to the options map
26+
* when writing files.
27+
*/
28+
object VariantShreddingShims {
29+
/**
30+
* Returns a Map containing variant shredding related configs for file writing.
31+
* In Spark 4.2, this returns the VARIANT_INFER_SHREDDING_SCHEMA config.
32+
*/
33+
def getVariantInferShreddingSchemaOptions(enableVariantShredding: Boolean): Map[String, String] = {
34+
Map(SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key -> enableVariantShredding.toString)
35+
}
36+
}

spark/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompact
2626
import org.apache.spark.sql.delta.metering.DeltaLogging
2727
import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
2828
import org.apache.spark.sql.delta.schema._
29+
import org.apache.spark.sql.delta.shims.VariantShreddingShims
2930
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3031
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_COLLECT_STATS_USING_TABLE_SCHEMA
3132
import org.apache.spark.sql.delta.stats.{
@@ -478,7 +479,9 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
478479
key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
479480
key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
480481
}.toMap
481-
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString)
482+
}) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> writePartitionColumns.toString) ++
483+
VariantShreddingShims.getVariantInferShreddingSchemaOptions(
484+
DeltaConfigs.ENABLE_VARIANT_SHREDDING.fromMetaData(metadata))
482485

483486
try {
484487
DeltaFileFormatWriter.write(

spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ object SchemaMergingUtils {
339339
tf: (Seq[String], StructField, Resolver) => StructField): T = {
340340
def transform[E <: DataType](path: Seq[String], dt: E): E = {
341341
val newDt = dt match {
342+
case s: StructType
343+
if org.apache.spark.sql.execution.datasources.VariantMetadata.isVariantStruct(s) =>
344+
// A variant struct is logically still a variant, so we should not recurse into its
345+
// fields like a normal struct.
346+
s
342347
case StructType(fields) =>
343348
StructType(fields.map { field =>
344349
val newField = tf(path, field, DELTA_COL_RESOLVER)

spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.expressions._
4343
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
4444
import org.apache.spark.sql.catalyst.util.TypeUtils
4545
import org.apache.spark.sql.execution.InSubqueryExec
46+
import org.apache.spark.sql.execution.datasources.VariantMetadata
4647
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
4748
import org.apache.spark.sql.functions._
4849
import org.apache.spark.sql.types.{AtomicType, BooleanType, CalendarIntervalType, DataType, DateType, LongType, NumericType, StringType, StructField, StructType, TimestampNTZType, TimestampType}
@@ -129,7 +130,12 @@ private [sql] object DataSkippingPredicate {
129130
object SkippingEligibleColumn {
130131
def unapply(arg: Expression): Option[(Seq[String], DataType)] = {
131132
// Only atomic types are eligible for skipping, and args should always be resolved by now.
132-
val eligible = arg.resolved && arg.dataType.isInstanceOf[AtomicType]
133+
// When `pushVariantIntoScan` is true, Variants in the read schema are transformed into Structs
134+
// to facilitate shredded reads. Therefore, filters like `v is not null` where `v` is a variant
135+
// column look like the filters on struct data. `VariantMetadata.isVariantStruct` helps in
136+
// distinguishing between "true structs" and "variant structs".
137+
val eligible = arg.resolved && (arg.dataType.isInstanceOf[AtomicType] ||
138+
VariantMetadata.isVariantStruct(arg.dataType))
133139
if (eligible) searchChain(arg).map(_ -> arg.dataType) else None
134140
}
135141

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.test.shims
18+
19+
/**
20+
* Test shim for variant shredding to handle differences between Spark versions.
21+
* In Spark 4.0, VARIANT_INFER_SHREDDING_SCHEMA does not exist.
22+
*/
23+
object VariantShreddingTestShims {
24+
/**
25+
* Returns true if VARIANT_INFER_SHREDDING_SCHEMA config is supported in this Spark version.
26+
* In Spark 4.0, this returns false.
27+
*/
28+
val variantInferShreddingSchemaSupported: Boolean = false
29+
30+
/**
31+
* Returns a dummy config key for VARIANT_INFER_SHREDDING_SCHEMA.
32+
* In Spark 4.0, since this config doesn't exist, we return a dummy key that won't affect tests.
33+
* This allows tests to compile but the config will have no effect.
34+
*/
35+
val variantInferShreddingSchemaKey: String = "spark.sql.dummy.variantInferShreddingSchema"
36+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.test.shims
18+
19+
import org.apache.spark.sql.internal.SQLConf
20+
21+
/**
22+
* Test shim for variant shredding to handle differences between Spark versions.
23+
* In Spark 4.1, VARIANT_INFER_SHREDDING_SCHEMA exists.
24+
*/
25+
object VariantShreddingTestShims {
26+
/**
27+
* Returns true if VARIANT_INFER_SHREDDING_SCHEMA config is supported in this Spark version.
28+
* In Spark 4.1, this returns true.
29+
*/
30+
val variantInferShreddingSchemaSupported: Boolean = true
31+
32+
/**
33+
* Returns the config key for VARIANT_INFER_SHREDDING_SCHEMA.
34+
* In Spark 4.1, this returns the actual SQLConf key.
35+
*/
36+
val variantInferShreddingSchemaKey: String = SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key
37+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.test.shims
18+
19+
import org.apache.spark.sql.internal.SQLConf
20+
21+
/**
22+
* Test shim for variant shredding to handle differences between Spark versions.
23+
* In Spark 4.2, VARIANT_INFER_SHREDDING_SCHEMA exists.
24+
*/
25+
object VariantShreddingTestShims {
26+
/**
27+
* Returns true if VARIANT_INFER_SHREDDING_SCHEMA config is supported in this Spark version.
28+
* In Spark 4.2, this returns true.
29+
*/
30+
val variantInferShreddingSchemaSupported: Boolean = true
31+
32+
/**
33+
* Returns the config key for VARIANT_INFER_SHREDDING_SCHEMA.
34+
* In Spark 4.2, this returns the actual SQLConf key.
35+
*/
36+
val variantInferShreddingSchemaKey: String = SQLConf.VARIANT_INFER_SHREDDING_SCHEMA.key
37+
}

spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,7 @@ class AutoCompactExecutionSuite extends
239239
checkAutoCompactionWorks(dir, spark.range(10).toDF("id"))
240240
}
241241

242-
// TODO: Re-enable this test after fixing Variant data skipping in Spark 4.1.0+
243-
ignore("variant auto compact kicks in when enabled - table config") {
242+
test("variant auto compact kicks in when enabled - table config") {
244243
withTempDir { dir =>
245244
withSQLConf(
246245
"spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" -> "true",
@@ -252,8 +251,7 @@ class AutoCompactExecutionSuite extends
252251
}
253252
}
254253

255-
// TODO: Re-enable this test after fixing Variant data skipping in Spark 4.1.0+
256-
ignore("variant auto compact kicks in when enabled - session config") {
254+
test("variant auto compact kicks in when enabled - session config") {
257255
withTempDir { dir =>
258256
withSQLConf(
259257
DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> "true",
@@ -377,11 +375,6 @@ class AutoCompactConfigurationIdColumnMappingSuite extends AutoCompactConfigurat
377375
class AutoCompactExecutionIdColumnMappingSuite extends AutoCompactExecutionSuite
378376
with DeltaColumnMappingEnableIdMode {
379377
override def runAllTests: Boolean = true
380-
// TODO: these tests need to be fixed for Spark master
381-
override def skipTests: Seq[String] = Seq(
382-
"variant auto compact kicks in when enabled - table config",
383-
"variant auto compact kicks in when enabled - session config"
384-
)
385378
}
386379

387380
class AutoCompactConfigurationNameColumnMappingSuite extends AutoCompactConfigurationSuite
@@ -392,10 +385,5 @@ class AutoCompactConfigurationNameColumnMappingSuite extends AutoCompactConfigur
392385
class AutoCompactExecutionNameColumnMappingSuite extends AutoCompactExecutionSuite
393386
with DeltaColumnMappingEnableNameMode {
394387
override def runAllTests: Boolean = true
395-
// TODO: these tests need to be fixed for Spark master
396-
override def skipTests: Seq[String] = Seq(
397-
"variant auto compact kicks in when enabled - table config",
398-
"variant auto compact kicks in when enabled - session config"
399-
)
400388
}
401389

0 commit comments

Comments
 (0)