-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-51987][SQL] DSv2 expressions in column defaults on write #50792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,12 +22,12 @@ import java.util.{Collections, Locale} | |
|
|
||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
| import org.apache.spark.SparkIllegalArgumentException | ||
| import org.apache.spark.{SparkException, SparkIllegalArgumentException} | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.CurrentUserContext | ||
| import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, TimeTravelSpec} | ||
| import org.apache.spark.sql.catalyst.catalog.ClusterBySpec | ||
| import org.apache.spark.sql.catalyst.expressions.Literal | ||
| import org.apache.spark.sql.catalyst.expressions.{Literal, V2ExpressionUtils} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} | ||
| import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn} | ||
| import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ | ||
|
|
@@ -598,10 +598,27 @@ private[sql] object CatalogV2Util { | |
| // data unchanged and let the data reader to return "exist default" for missing | ||
| // columns. | ||
| val existingDefault = Literal(default.getValue.value(), default.getValue.dataType()).sql | ||
| f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql) | ||
| f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(toSql(defaultValue)) | ||
| }.getOrElse(f) | ||
| } | ||
|
|
||
| private def toSql(defaultValue: DefaultValue): String = { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (defaultValue.getExpression != null) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. iiuc, this is prioritizing expression over sql , if we get both. Is that desired? We could put a comment somewhere if so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I have the same question. It seems more reasonable to use the original SQL string first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we need to evaluate, it makes sense to use expression first and fallback to SQL string if the expression cannot be converted. But here we just need to get the SQL string. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that whatever we encode here will be picked up and used on write. Spark doesn't currently use expressions to represent default values. We persist the expression SQL in struct field metadata. What this logic does is saying if I a connector provides an expression, I am going to convert it to Catalyst and trust Spark to generate an accurate SQL representation. If we flip this, it means we prioritize the connector-provided SQL string over our own SQL generation for Catalyst expressions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But Spark is not capable of generating an executable SQL string from a query plan/expression. The On the other hand, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I misinterpreted the promise behind There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a way to produce reliable SQL strings from v2 expressions: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan, that could be an option. However, it doesn't produce Spark SQL dialect... for things like DAY_OF_WEEK and similar expressions. It seems to generate a user-friendly representation. The fundamental problem is that we have a Catalyst expression and we need to persist it in the struct field metadata. I wonder if we can avoid the need to put the expression in the metadata. It is going to be challenging for sure, as it requires changing Any thoughts? Any ways to avoid converting Catalyst expression to string just parse it back soon after? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's just for the legacy v1 code path (e.g. built-in file sources), I'd say let's keep the old behavior and save the original SQL string only. We can refactor them to use |
||
| V2ExpressionUtils.toCatalyst(defaultValue.getExpression) match { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, when it is possible that a default expression cannot be converted back to catalyst one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dynamic expressions other than literals? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also add a test case that falls back to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. V2 expressions are quite flexible, users can represent a function with an arbitrary name. In practice, I expect it to be rare but you never know. I'll add a test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added more tests. |
||
| case Some(catalystExpr) => | ||
| catalystExpr.sql | ||
| case None if defaultValue.getSql != null => | ||
| defaultValue.getSql | ||
| case _ => | ||
| throw SparkException.internalError( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we add a test for this branch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
| s"Can't generate SQL for $defaultValue. The connector expression couldn't be " + | ||
| "converted to Catalyst and there is no provided SQL representation.") | ||
| } | ||
| } else { | ||
| defaultValue.getSql | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column | ||
| * comment and default value or generation expression. This is mainly used to generate DS v2 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,7 +49,7 @@ import org.apache.spark.util.ArrayImplicits._ | |
| */ | ||
| abstract class InMemoryBaseTable( | ||
| val name: String, | ||
| val schema: StructType, | ||
| override val columns: Array[Column], | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to change this to expose passed columns directly, rather than going through a round of conversion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| override val partitioning: Array[Transform], | ||
| override val properties: util.Map[String, String], | ||
| val distribution: Distribution = Distributions.unspecified(), | ||
|
|
@@ -88,6 +88,8 @@ abstract class InMemoryBaseTable( | |
| } | ||
| } | ||
|
|
||
| override val schema: StructType = CatalogV2Util.v2ColumnsToStructType(columns) | ||
|
|
||
| // purposely exposes a metadata column that conflicts with a data column in some tests | ||
| override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn) | ||
| private val metadataColumnNames = metadataColumns.map(_.name).toSet | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,7 @@ import org.apache.spark.util.ArrayImplicits._ | |
| */ | ||
| class InMemoryTable( | ||
| name: String, | ||
| schema: StructType, | ||
| columns: Array[Column], | ||
| override val partitioning: Array[Transform], | ||
| override val properties: util.Map[String, String], | ||
| override val constraints: Array[Constraint] = Array.empty, | ||
|
|
@@ -43,10 +43,22 @@ class InMemoryTable( | |
| advisoryPartitionSize: Option[Long] = None, | ||
| isDistributionStrictlyRequired: Boolean = true, | ||
| override val numRowsPerSplit: Int = Int.MaxValue) | ||
| extends InMemoryBaseTable(name, schema, partitioning, properties, distribution, | ||
| extends InMemoryBaseTable(name, columns, partitioning, properties, distribution, | ||
| ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired, | ||
| numRowsPerSplit) with SupportsDelete { | ||
|
|
||
| def this( | ||
| name: String, | ||
| schema: StructType, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for adding this constructor. |
||
| partitioning: Array[Transform], | ||
| properties: util.Map[String, String]) = { | ||
| this( | ||
| name, | ||
| CatalogV2Util.structTypeToV2Columns(schema), | ||
| partitioning, | ||
| properties) | ||
| } | ||
|
|
||
| override def canDeleteWhere(filters: Array[Filter]): Boolean = { | ||
| InMemoryTable.supportsFilters(filters) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this method comment outdated (its about built-in file sources)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for all data sources that can add column default values with existing data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to still apply as it describes the concept of "exists" default that we don't change here.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean the method javadoc:
do we need to update it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That still seems accurate even after this PR. Spark always converts
DefaultValuetoStructFieldmetadata. It is a good question whether that's strictly required or desirable but reconsidering that will be a big effort.