-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-51987][SQL] DSv2 expressions in column defaults on write #50792
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -49,7 +49,7 @@ import org.apache.spark.util.ArrayImplicits._ | |||
*/ | |||
abstract class InMemoryBaseTable( | |||
val name: String, | |||
val schema: StructType, | |||
override val columns: Array[Column], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to change this to expose passed columns directly, rather than going through a round of conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired, | ||
numRowsPerSplit) with SupportsDelete { | ||
|
||
def this( | ||
name: String, | ||
schema: StructType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @aokolnychyi .
cc @huaxingao , too. |
@@ -483,6 +488,32 @@ class DataSourceV2DataFrameSuite | |||
} | |||
} | |||
|
|||
test("write with expression-based default values") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test would previously fail.
case None if defaultValue.getSql != null => | ||
defaultValue.getSql | ||
case _ => | ||
throw SparkException.internalError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a test for this branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
}.getOrElse(f) | ||
} | ||
|
||
private def toSql(defaultValue: DefaultValue): String = { | ||
if (defaultValue.getExpression != null) { | ||
V2ExpressionUtils.toCatalyst(defaultValue.getExpression) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, when it is possible that a default expression cannot be converted back to catalyst one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dynamic expressions other than literals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add a test case that falls back to defaultValue.getSql
when toCatalyst
returns None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V2 expressions are quite flexible, users can represent a function with an arbitrary name. In practice, I expect it to be rare but you never know.
I'll add a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more tests.
@@ -598,10 +598,28 @@ private[sql] object CatalogV2Util { | |||
// data unchanged and let the data reader to return "exist default" for missing | |||
// columns. | |||
val existingDefault = Literal(default.getValue.value(), default.getValue.dataType()).sql | |||
f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql) | |||
f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(toSql(defaultValue)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this method comment outdated (its about built-in file sources)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for all data sources that can add column default values with existing data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to still apply as it describes the concept of "exists" default that we don't change here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean the method javadoc:
// For built-in file sources, we encode the default value in StructField metadata. An analyzer
// rule will check the special metadata and change the DML input plan to fill the default value.
do we need to update it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That still seems accurate even after this PR. Spark always converts DefaultValue
to StructField
metadata. It is a good question whether that's strictly required or desirable but reconsidering that will be a big effort.
}.getOrElse(f) | ||
} | ||
|
||
private def toSql(defaultValue: DefaultValue): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style question: did we consider putting this logic in DefaultValue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DefaultValue
is a user-facing DS v2 API. We should put these internal util functions in internal classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, the idea is to keep DefaultValue
as simple as possible.
}.getOrElse(f) | ||
} | ||
|
||
private def toSql(defaultValue: DefaultValue): String = { | ||
if (defaultValue.getExpression != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iiuc, this is prioritizing expression over sql , if we get both. Is that desired? We could put a comment somewhere if so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I have the same question. It seems more reasonable to use the original SQL string first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on DefaultValue
document, it uses expression first and fallbacks to SQL string if the expression cannot be converted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we need to evaluate, it makes sense to use expression first and fallback to SQL string if the expression cannot be converted. But here we just need to get the SQL string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that whatever we encode here will be picked up and used on write. Spark doesn't currently use expressions to represent default values. We persist the expression SQL in struct field metadata. What this logic does is saying if I a connector provides an expression, I am going to convert it to Catalyst and trust Spark to generate an accurate SQL representation. If we flip this, it means we prioritize the connector-provided SQL string over our own SQL generation for Catalyst expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this make sense, @szehon-ho @viirya @cloud-fan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But Spark is not capable of generating an executable SQL string from a query plan/expression. The .sql
only generates a pretty string for display. We don't even have a test to parse back the .sql
result.
On the other hand, the DefaultValue.sql
is usually the original user-specified SQL, or the connector is responsible to give an executable SQL if it generates it by itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I misinterpreted the promise behind .sql
. It was my understanding it is actually reliable. Let me think more about this tomorrow. If .sql
is not reliable, then it does change things a bit, I guess. The problem that if we prefer the string over the expression it kind of breaks the promise behind DefaultValue
, which states the expression takes precedence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a way to produce reliable SQL strings from v2 expressions: V2ExpressionSQLBuilder
What changes were proposed in this pull request?
This PR allows connectors to expose expression-based defaults on write.
Why are the changes needed?
These changes are needed to avoid the requirement of producing Spark SQL dialect in connectors.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.