Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51119][SQL][FOLLOW-UP] Add fallback to ResolveDefaultColumnsUtil existenceDefaultValues #49962

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package org.apache.spark.sql.catalyst.util

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -340,12 +341,43 @@ object ResolveDefaultColumns extends QueryErrorsBase
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"", field.name, defaultSQL)
}
if (!expr.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)

val resolvedExpr = expr match {
case _: ExprLiteral | _: Cast => expr
case _ =>
fallbackResolveExistenceDefaultValue(field, defaultSQL)
}

coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL)
}

// In most cases, column existsDefault should already be persisted as resolved
// and constant-folded literal sql, but because they are fetched from external catalog,
// it is possible that this assumption does not hold, so we fallback to full analysis
// if we encounter an unresolved existsDefault
private def fallbackResolveExistenceDefaultValue(
field: StructField,
defaultSQL: String): Expression = {
logWarning(log"Encountered unresolved exists default value: " +
log"'${MDC(COLUMN_DEFAULT_VALUE, defaultSQL)}' " +
log"for column ${MDC(COLUMN_NAME, field.name)} " +
log"with ${MDC(COLUMN_DATA_TYPE_SOURCE, field.dataType)}, " +
log"falling back to full analysis.")

field.getExistenceDefaultValue().map { text: String =>
val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
val literal = expr match {
case _: ExprLiteral | _: Cast => expr
case _ => throw SparkException.internalError(s"parse existence default as literal err," +
s" field name: ${field.name}, value: $text")
}
// sanity check
if (!literal.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)
}
literal
}.orNull
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,4 +831,22 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
validateConvertedDefaults("c4", VariantType, "parse_json(null)", "CAST(NULL AS VARIANT)")

}

test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
val source = StructType(
Array(
StructField("c1", VariantType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
.build()),
StructField("c0", StringType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
.build())))
val res = ResolveDefaultColumns.existenceDefaultValues(source)
assert(res(0) == null)
assert(res(1) == "spark_catalog")
}
}