-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-51987][SQL] DSv2 expressions in column defaults on write #51002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -263,7 +263,12 @@ object ResolveDefaultColumns extends QueryErrorsBase | |
| field: StructField, | ||
| statementType: String, | ||
| metadataKey: String = CURRENT_DEFAULT_COLUMN_METADATA_KEY): Expression = { | ||
| analyze(field.name, field.dataType, field.metadata.getString(metadataKey), statementType) | ||
| field.metadata.getExpression[Expression](metadataKey) match { | ||
| case (sql, Some(expr)) => | ||
| analyze(field.name, field.dataType, expr, sql, statementType) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this branch, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
| case (sql, _) => | ||
| analyze(field.name, field.dataType, sql, statementType) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,12 +22,12 @@ import java.util.{Collections, Locale} | |
|
|
||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
| import org.apache.spark.SparkIllegalArgumentException | ||
| import org.apache.spark.{SparkException, SparkIllegalArgumentException} | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.CurrentUserContext | ||
| import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, TimeTravelSpec} | ||
| import org.apache.spark.sql.catalyst.catalog.ClusterBySpec | ||
| import org.apache.spark.sql.catalyst.expressions.Literal | ||
| import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, V2ExpressionUtils} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} | ||
| import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn} | ||
| import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ | ||
|
|
@@ -597,11 +597,31 @@ private[sql] object CatalogV2Util { | |
| // Note: the back-fill here is a logical concept. The data source can keep the existing | ||
| // data unchanged and let the data reader to return "exist default" for missing | ||
| // columns. | ||
| val existingDefault = Literal(default.getValue.value(), default.getValue.dataType()).sql | ||
| f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql) | ||
| val existsDefault = extractExistsDefault(default) | ||
| val (sql, expr) = extractCurrentDefault(default) | ||
| val newMetadata = new MetadataBuilder() | ||
| .withMetadata(f.metadata) | ||
| .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, existsDefault) | ||
| .putExpression(CURRENT_DEFAULT_COLUMN_METADATA_KEY, sql, expr) | ||
| .build() | ||
| f.copy(metadata = newMetadata) | ||
| }.getOrElse(f) | ||
| } | ||
|
|
||
| private def extractExistsDefault(default: ColumnDefaultValue): String = { | ||
| Literal(default.getValue.value, default.getValue.dataType).sql | ||
| } | ||
|
|
||
| private def extractCurrentDefault(default: ColumnDefaultValue): (String, Option[Expression]) = { | ||
| val expr = Option(default.getExpression).flatMap(V2ExpressionUtils.toCatalyst) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [doubt] presently There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally yes, but I am not sure we would want to allow them in default values. |
||
| val sql = Option(default.getSql).orElse(expr.map(_.sql)).getOrElse { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [doubt] my understanding was There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we mean these properties to be accessible by users. We do that for built-in tables but there are proper DSv2 APIs for this like |
||
| throw SparkException.internalError( | ||
| s"Can't generate SQL for $default. The connector expression couldn't be " + | ||
| "converted to Catalyst and there is no provided SQL representation.") | ||
| } | ||
| (sql, expr) | ||
| } | ||
|
|
||
| /** | ||
| * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column | ||
| * comment and default value or generation expression. This is mainly used to generate DS v2 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding a runtime-only map of configs that is not serialized or exposed to the user. It will allow me to store alternative in-memory representations for certain configs. In particular, it would allow me to store SQL as well as the expression itself for default values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below is an example of how it is being used:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows me to use the expression directly without generating/parsing SQL for it.