Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ case class CreateDeltaTableCommand(
CoordinatedCommitsUtils.validateConfigurationsForCreateDeltaTableCommand(
sparkSession, deltaLog.tableExists, query, tableWithLocation.properties)
CatalogOwnedTableUtils.validatePropertiesForCreateDeltaTableCommand(
sparkSession, deltaLog.tableExists, query, tableWithLocation.properties)
spark = sparkSession,
tableExists = deltaLog.tableExists,
query = query,
catalogTableProperties = tableWithLocation.properties,
existingTableSnapshotOpt =
if (deltaLog.tableExists) Some(deltaLog.unsafeVolatileSnapshot) else None)

recordDeltaOperation(deltaLog, "delta.ddl.createTable") {
val result = handleCommit(sparkSession, deltaLog, tableWithLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,25 @@ object CatalogOwnedTableUtils extends DeltaLogging {
}

/**
* Validates the Catalog-Owned properties in explicit command overrides and default
* Validates the CatalogManaged properties in explicit command overrides and default
* SparkSession properties for `CreateDeltaTableCommand`.
*
* @param spark The SparkSession.
* @param tableExists Whether the table already exists.
* @param query The query to be executed (e.g., CloneTableCommand).
* @param catalogTableProperties The table properties from the catalog table.
* @param existingTableSnapshotOpt The snapshot of the existing table, if it exists.
*/
def validatePropertiesForCreateDeltaTableCommand(
spark: SparkSession,
tableExists: Boolean,
query: Option[LogicalPlan],
catalogTableProperties: Map[String, String]): Unit = {
catalogTableProperties: Map[String, String],
existingTableSnapshotOpt: Option[Snapshot] = None): Unit = {
val (command, propertyOverrides) = query match {
// For CLONE, we cannot use the properties from the catalog table, because they are already
// the result of merging the source table properties with the overrides, but we do not
// consider the source table properties for Catalog-Owned tables.
// consider the source table properties for CatalogManaged tables.
case Some(cmd: CloneTableCommand) =>
(if (tableExists) "REPLACE with CLONE" else "CREATE with CLONE",
cmd.tablePropertyOverrides)
Expand All @@ -256,13 +263,21 @@ object CatalogOwnedTableUtils extends DeltaLogging {
assert(command == "REPLACE with CLONE" || command == "REPLACE",
s"Unexpected command: $command")
validateUCTableIdNotPresent(property = propertyOverrides)
// Blocks explicit enablements of Catalog-Owned through REPLACE commands.
// We *ignore* default enablement of Catalog-Owned for REPLACE commands.
if (TableFeatureProtocolUtils.getSupportedFeaturesFromTableConfigs(propertyOverrides)
.contains(CatalogOwnedTableFeature)) {

val isSpecifyingCatalogManaged = TableFeatureProtocolUtils
.getSupportedFeaturesFromTableConfigs(propertyOverrides)
.contains(CatalogOwnedTableFeature)
val existingTableIsCatalogManaged = existingTableSnapshotOpt.exists(_.isCatalogOwned)

// Allow specifying CatalogManaged in REPLACE TABLE if the existing table is already
// CatalogManaged. In this case, the commit coordinator properties are treated as a no-op.
// Block if trying to enable CatalogManaged on a non-CatalogManaged table via REPLACE.
Copy link
Collaborator

@openinx openinx Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @xzhseh , seems like you are adding an enforced checking to prevent REPLACE an existing non-catalogManaged table to be a managed one.

I will suggest to add an integration tests to cover the changes you made. https://github.com/delta-io/delta/blob/master/spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableCreationTest.java

// Users should either upgrade the existing table or create a fresh CatalogManaged table.
if (isSpecifyingCatalogManaged && !existingTableIsCatalogManaged) {
throw new IllegalStateException(
"Specifying Catalog-Owned in REPLACE TABLE command is not supported. " +
"Please use CREATE TABLE command to create a Catalog-Owned table.")
"Specifying CatalogManaged in REPLACE TABLE command is not supported " +
"for tables that are not already CatalogManaged. " +
"Please either upgrade the existing table or create a fresh CatalogManaged table.")
}
}
}
Expand Down
Loading