diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 390db8db758..5d65408496c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -217,6 +217,16 @@ public static KernelException checkpointOnUnpublishedCommits( return new KernelException(message); } + public static KernelException checkpointOnProtection(String tablePath) { + String message = + String.format( + "Cannot create checkpoint for table `%s` because the table has " + + "checkpointProtection feature enabled. Kernel does not support checkpointing " + + "tables with checkpoint protection.", + tablePath); + return new KernelException(message); + } + public static KernelException unsupportedDataType(DataType dataType) { return new KernelException("Kernel doesn't support writing data of type: " + dataType); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java index 5b326f5e000..e49d2d61f8d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checkpoints/Checkpointer.java @@ -15,6 +15,7 @@ */ package io.delta.kernel.internal.checkpoints; +import static io.delta.kernel.internal.DeltaErrors.checkpointOnProtection; import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED; import static io.delta.kernel.internal.TableConfig.LOG_RETENTION; @@ -68,6 +69,12 @@ public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot) TableFeatures.validateKernelCanWriteToTable( snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getDataPath().toString()); + // Check if checkpoint protection is enabled - Kernel does not support checkpointing + // tables with checkpoint protection feature + if (snapshot.getProtocol().supportsFeature(TableFeatures.CHECKPOINT_PROTECTION_W_FEATURE)) { + throw checkpointOnProtection(tablePath.toString()); + } + final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version); long numberOfAddFiles = 0; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index a3e6e1fd7d3..5f3fb7eec06 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -449,6 +449,15 @@ private static class VacuumProtocolCheckTableFeature extends TableFeature.Reader } } + public static final TableFeature CHECKPOINT_PROTECTION_W_FEATURE = + new CheckpointProtectionTableFeature(); + + private static class CheckpointProtectionTableFeature extends TableFeature.WriterFeature { + CheckpointProtectionTableFeature() { + super("checkpointProtection", /* minWriterVersion = */ 7); + } + } + /** * Support reading / metadata writes on tables with the feature. Don't support writing new data * rows with default values. Don't allow updating the types of columns with default values. @@ -517,6 +526,7 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me ALLOW_COLUMN_DEFAULTS_W_FEATURE, APPEND_ONLY_W_FEATURE, CATALOG_MANAGED_RW_FEATURE, + CHECKPOINT_PROTECTION_W_FEATURE, CHECKPOINT_V2_RW_FEATURE, CHANGE_DATA_FEED_W_FEATURE, CLUSTERING_W_FEATURE, diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala index 590235dd7a8..8a5262f65a0 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala @@ -55,6 +55,7 @@ class TableFeaturesSuite extends AnyFunSuite { val writerOnlyFeatures = Seq( "allowColumnDefaults", "appendOnly", + "checkpointProtection", "invariants", "checkConstraints", "generatedColumns", @@ -198,6 +199,7 @@ class TableFeaturesSuite extends AnyFunSuite { }) Seq( + "checkpointProtection", "domainMetadata", "vacuumProtocolCheck", "clustering", @@ -259,6 +261,7 @@ class TableFeaturesSuite extends AnyFunSuite { // these features are enabled val expected = Seq( "catalogManaged", + "checkpointProtection", "columnMapping", "allowColumnDefaults", "v2Checkpoint", diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala index 9da6cfab1e6..139a266b54e 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/CreateCheckpointSuite.scala @@ -23,6 +23,7 @@ import io.delta.kernel.defaults.engine.DefaultEngine import io.delta.kernel.defaults.utils.{TestRow, TestUtils, WriteUtils} import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions.{CheckpointAlreadyExistsException, TableNotFoundException} +import io.delta.kernel.expressions.Literal import org.apache.spark.sql.delta.{DeltaLog, VersionNotFoundException} import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate @@ -390,6 +391,30 @@ class CreateCheckpointSuite extends CheckpointBase { verifyResults(tablePath, expResults, checkpointVersion) } } + + test("try create a checkpoint when checkpointProtection enabled") { + withTempDirAndEngine { (tablePath, tc) => + // Create a table with checkpointProtection feature enabled + createEmptyTable( + tc, + tablePath, + testSchema, + tableProperties = Map("delta.feature.checkpointProtection" -> "supported")) + + // Add some data to create commits + val smallBatch = generateData(testSchema, Seq.empty, Map.empty, batchSize = 5, numBatches = 1) + for (_ <- 0 to 3) { + appendData(tc, tablePath, data = Seq(Map.empty[String, Literal] -> smallBatch)) + } + + // Attempting to checkpoint should fail + val ex = intercept[Exception] { + kernelCheckpoint(tc, tablePath, checkpointVersion = 4) + } + assert(ex.getMessage.contains("table has checkpointProtection feature enabled. " + + "Kernel does not support checkpointing tables with checkpoint protection.")) + } + } } /**