Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import java.util.{Collections, Locale}

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.{Literal, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
Expand Down Expand Up @@ -598,10 +598,28 @@ private[sql] object CatalogV2Util {
// data unchanged and let the data reader to return "exist default" for missing
// columns.
val existingDefault = Literal(default.getValue.value(), default.getValue.dataType()).sql
f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(default.getSql)
f.withExistenceDefaultValue(existingDefault).withCurrentDefaultValue(toSql(defaultValue))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method comment outdated (its about built-in file sources)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for all data sources that can add column default values with existing data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to still apply as it describes the concept of "exists" default that we don't change here.

Copy link
Member

@szehon-ho szehon-ho May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i mean the method javadoc:

// For built-in file sources, we encode the default value in StructField metadata. An analyzer
// rule will check the special metadata and change the DML input plan to fill the default value.

do we need to update it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That still seems accurate even after this PR. Spark always converts DefaultValue to StructField metadata. It is a good question whether that's strictly required or desirable but reconsidering that will be a big effort.

}.getOrElse(f)
}

private def toSql(defaultValue: DefaultValue): String = {
if (defaultValue.getExpression != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc, this is prioritizing expression over sql , if we get both. Is that desired? We could put a comment somewhere if so

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I have the same question. It seems more reasonable to use the original SQL string first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on DefaultValue document, it uses expression first and fallbacks to SQL string if the expression cannot be converted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we need to evaluate, it makes sense to use expression first and fallback to SQL string if the expression cannot be converted. But here we just need to get the SQL string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that whatever we encode here will be picked up and used on write. Spark doesn't currently use expressions to represent default values. We persist the expression SQL in struct field metadata. What this logic does is saying if I a connector provides an expression, I am going to convert it to Catalyst and trust Spark to generate an accurate SQL representation. If we flip this, it means we prioritize the connector-provided SQL string over our own SQL generation for Catalyst expressions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But Spark is not capable of generating an executable SQL string from a query plan/expression. The .sql only generates a pretty string for display. We don't even have a test to parse back the .sql result.

On the other hand, the DefaultValue.sql is usually the original user-specified SQL, or the connector is responsible to give an executable SQL if it generates it by itself.

Copy link
Contributor Author

@aokolnychyi aokolnychyi May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I misinterpreted the promise behind .sql. It was my understanding it is actually reliable. Let me think more about this tomorrow. If .sql is not reliable, then it does change things a bit, I guess. The problem that if we prefer the string over the expression it kind of breaks the promise behind DefaultValue, which states the expression takes precedence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a way to produce reliable SQL strings from v2 expressions: V2ExpressionSQLBuilder

Copy link
Contributor Author

@aokolnychyi aokolnychyi May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, that could be an option. However, it doesn't produce Spark SQL dialect... for things like DAY_OF_WEEK and similar expressions. It seems to generate a user-friendly representation.

The fundamental problem is that we have a Catalyst expression and we need to persist it in the struct field metadata. I wonder if we can avoid the need to put the expression in the metadata. It is going to be challenging for sure, as it requires changing StructField, which is a very stable API.

Any thoughts? Any ways to avoid converting Catalyst expression to string just parse it back soon after?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's just for the legacy v1 code path (e.g. built-in file sources), I'd say let's keep the old behavior and save the original SQL string only. We can refactor them to use ColumnDefinition instead of StructType, but that is non trival and we can leave it for the future.

V2ExpressionUtils.toCatalyst(defaultValue.getExpression) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, when it is possible that a default expression cannot be converted back to catalyst one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic expressions other than literals?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test case that falls back to defaultValue.getSql when toCatalyst returns None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2 expressions are quite flexible, users can represent a function with an arbitrary name. In practice, I expect it to be rare but you never know.

I'll add a test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more tests.

case Some(catalystExpr) =>
catalystExpr.sql
case None if defaultValue.getSql != null =>
defaultValue.getSql
case _ =>
throw SparkException.internalError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a test for this branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

s"""Can't generate SQL for $defaultValue. The connector expression couldn't
|be converted to Catalyst and there is no provided SQL representation.
|""".stripMargin)
}
} else {
defaultValue.getSql
}
}

/**
* Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column
* comment and default value or generation expression. This is mainly used to generate DS v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.util.ArrayImplicits._
*/
abstract class InMemoryBaseTable(
val name: String,
val schema: StructType,
override val columns: Array[Column],
Copy link
Contributor Author

@aokolnychyi aokolnychyi May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change this to expose passed columns directly, rather than going through a round of conversion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

override val partitioning: Array[Transform],
override val properties: util.Map[String, String],
val distribution: Distribution = Distributions.unspecified(),
Expand Down Expand Up @@ -88,6 +88,8 @@ abstract class InMemoryBaseTable(
}
}

override val schema: StructType = CatalogV2Util.v2ColumnsToStructType(columns)

// purposely exposes a metadata column that conflicts with a data column in some tests
override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
private val metadataColumnNames = metadataColumns.map(_.name).toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ class InMemoryRowLevelOperationTable(
partitioning: Array[Transform],
properties: util.Map[String, String],
constraints: Array[Constraint] = Array.empty)
extends InMemoryTable(name, schema, partitioning, properties, constraints)
with SupportsRowLevelOperations {
extends InMemoryTable(
name,
CatalogV2Util.structTypeToV2Columns(schema),
partitioning,
properties,
constraints)
with SupportsRowLevelOperations {

private final val PARTITION_COLUMN_REF = FieldReference(PartitionKeyColumn.name)
private final val INDEX_COLUMN_REF = FieldReference(IndexColumn.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.ArrayImplicits._
*/
class InMemoryTable(
name: String,
schema: StructType,
columns: Array[Column],
override val partitioning: Array[Transform],
override val properties: util.Map[String, String],
override val constraints: Array[Constraint] = Array.empty,
Expand All @@ -43,10 +43,22 @@ class InMemoryTable(
advisoryPartitionSize: Option[Long] = None,
isDistributionStrictlyRequired: Boolean = true,
override val numRowsPerSplit: Int = Int.MaxValue)
extends InMemoryBaseTable(name, schema, partitioning, properties, distribution,
extends InMemoryBaseTable(name, columns, partitioning, properties, distribution,
ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired,
numRowsPerSplit) with SupportsDelete {

def this(
name: String,
schema: StructType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding this constructor.

partitioning: Array[Transform],
properties: util.Map[String, String]) = {
this(
name,
CatalogV2Util.structTypeToV2Columns(schema),
partitioning,
properties)
}

override def canDeleteWhere(filters: Array[Filter]): Boolean = {
InMemoryTable.supportsFilters(filters)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,14 @@ class BasicInMemoryTableCatalog extends TableCatalog {
distributionStrictlyRequired: Boolean = true,
numRowsPerSplit: Int = Int.MaxValue): Table = {
// scalastyle:on argcount
val schema = CatalogV2Util.v2ColumnsToStructType(columns)
if (tables.containsKey(ident)) {
throw new TableAlreadyExistsException(ident.asMultipartIdentifier)
}

InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)

val tableName = s"$name.${ident.quoted}"
val table = new InMemoryTable(tableName, schema, partitions, properties, constraints,
val table = new InMemoryTable(tableName, columns, partitions, properties, constraints,
distribution, ordering, requiredNumPartitions, advisoryPartitionSize,
distributionStrictlyRequired, numRowsPerSplit)
tables.put(ident, table)
Expand All @@ -152,7 +151,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {

val newTable = new InMemoryTable(
name = table.name,
schema = schema,
columns = CatalogV2Util.structTypeToV2Columns(schema),
partitioning = finalPartitioning,
properties = properties,
constraints = constraints)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import org.apache.spark.util.ArrayImplicits._

class InMemoryTableWithV2Filter(
name: String,
schema: StructType,
columns: Array[Column],
partitioning: Array[Transform],
properties: util.Map[String, String])
extends InMemoryBaseTable(name, schema, partitioning, properties) with SupportsDeleteV2 {
extends InMemoryBaseTable(name, columns, partitioning, properties) with SupportsDeleteV2 {

override def canDeleteWhere(predicates: Array[Predicate]): Boolean = {
InMemoryTableWithV2Filter.supportsPredicates(predicates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ class InMemoryTableWithV2FilterCatalog extends InMemoryTableCatalog {
InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)

val tableName = s"$name.${ident.quoted}"
val schema = CatalogV2Util.v2ColumnsToStructType(columns)
val table = new InMemoryTableWithV2Filter(tableName, schema, partitions, properties)
val table = new InMemoryTableWithV2Filter(tableName, columns, partitions, properties)
tables.put(ident, table)
namespaces.putIfAbsent(ident.namespace.toList, Map())
table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, Identifier, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, Identifier, InMemoryTableCatalog, TableInfo}
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, GeneralScalarExpression, LiteralValue, Transform}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan
Expand Down Expand Up @@ -51,6 +51,11 @@ class DataSourceV2DataFrameSuite
override protected val catalogAndNamespace: String = "testcat.ns1.ns2.tbls"
override protected val v2Format: String = classOf[FakeV2Provider].getName

protected def catalog(name: String): InMemoryTableCatalog = {
val catalog = spark.sessionState.catalogManager.catalog(name)
catalog.asInstanceOf[InMemoryTableCatalog]
}

override def verifyTable(tableName: String, expected: DataFrame): Unit = {
checkAnswer(spark.table(tableName), expected)
}
Expand Down Expand Up @@ -483,6 +488,32 @@ class DataSourceV2DataFrameSuite
}
}

test("write with expression-based default values") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would previously fail.

val tableName = "testcat.ns1.ns2.tbl"
withTable(tableName) {
val columns = Array(
Column.create("c1", IntegerType),
Column.create(
"c2",
IntegerType,
false, /* not nullable */
null, /* no comment */
new ColumnDefaultValue(
new GeneralScalarExpression(
"+",
Array(LiteralValue(100, IntegerType), LiteralValue(23, IntegerType))),
LiteralValue(123, IntegerType)),
"{}"))
val tableInfo = new TableInfo.Builder().withColumns(columns).build()
catalog("testcat").createTable(Identifier.of(Array("ns1", "ns2"), "tbl"), tableInfo)
val df = Seq(1, 2, 3).toDF("c1")
df.writeTo(tableName).append()
checkAnswer(
spark.table(tableName),
Seq(Row(1, 123), Row(2, 123), Row(3, 123)))
}
}

private def executeAndKeepPhysicalPlan[T <: SparkPlan](func: => Unit): T = {
val qe = withQueryExecutionsCaptured(spark) {
func
Expand Down