Skip to content

Commit

Permalink
[SPARK-51119][SQL][FOLLOW-UP] Add fallback to ResolveDefaultColumnsUt…
Browse files Browse the repository at this point in the history
…il.existenceDefaultValues
  • Loading branch information
szehon-ho committed Feb 15, 2025
1 parent e397207 commit 64b62ca
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.sql.catalyst.util

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -340,12 +341,43 @@ object ResolveDefaultColumns extends QueryErrorsBase
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"", field.name, defaultSQL)
}
if (!expr.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)

val resolvedExpr = expr match {
case _: ExprLiteral | _: Cast => expr
case _ =>
fallbackResolveExistenceDefaultValue(field, defaultSQL)
}

coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL)
}

// In most cases, column existsDefault should already be persisted as resolved
// and constant-folded literal sql, but because they are fetched from external catalog,
// it is possible that this assumption does not hold, so we fallback to full analysis
// if we encounter an unresolved existsDefault
private def fallbackResolveExistenceDefaultValue(
field: StructField,
defaultSQL: String): Expression = {
logWarning(log"Encountered unresolved exists default value: " +
log"'${MDC(COLUMN_DEFAULT_VALUE, defaultSQL)}' " +
log"for column ${MDC(COLUMN_NAME, field.name)} " +
log"with ${MDC(COLUMN_DATA_TYPE_SOURCE, field.dataType)}, " +
log"falling back to full analysis.")

field.getExistenceDefaultValue().map { text: String =>
val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
val literal = expr match {
case _: ExprLiteral | _: Cast => expr
case _ => throw SparkException.internalError(s"parse existence default as literal err," +
s" field name: ${field.name}, value: $text")
}
// sanity check
if (!literal.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)
}
literal
}.orNull
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,4 +831,22 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
validateConvertedDefaults("c4", VariantType, "parse_json(null)", "CAST(NULL AS VARIANT)")

}

test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
val source = StructType(
Array(
StructField("c1", VariantType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.build()),
StructField("c0", StringType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.build())))
val res = ResolveDefaultColumns.existenceDefaultValues(source)
assert(res(0) == null)
assert(res(1) == "spark_catalog")
}
}

0 comments on commit 64b62ca

Please sign in to comment.