Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ public static KernelException checkpointOnUnpublishedCommits(
return new KernelException(message);
}

public static KernelException checkpointOnProtection(String tablePath) {
String message =
String.format(
"Cannot create checkpoint for table `%s` because the table has "
+ "checkpointProtection feature enabled. Kernel does not support checkpointing "
+ "tables with checkpoint protection.",
tablePath);
return new KernelException(message);
}

public static KernelException unsupportedDataType(DataType dataType) {
return new KernelException("Kernel doesn't support writing data of type: " + dataType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.delta.kernel.internal.checkpoints;

import static io.delta.kernel.internal.DeltaErrors.checkpointOnProtection;
import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED;
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;
Expand Down Expand Up @@ -68,6 +69,12 @@ public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot)
TableFeatures.validateKernelCanWriteToTable(
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getDataPath().toString());

// Check if checkpoint protection is enabled - Kernel does not support checkpointing
// tables with checkpoint protection feature
if (snapshot.getProtocol().supportsFeature(TableFeatures.CHECKPOINT_PROTECTION_W_FEATURE)) {
throw checkpointOnProtection(tablePath.toString());
}

final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

long numberOfAddFiles = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,15 @@ private static class VacuumProtocolCheckTableFeature extends TableFeature.Reader
}
}

public static final TableFeature CHECKPOINT_PROTECTION_W_FEATURE =
new CheckpointProtectionTableFeature();

private static class CheckpointProtectionTableFeature extends TableFeature.WriterFeature {
CheckpointProtectionTableFeature() {
super("checkpointProtection", /* minWriterVersion = */ 7);
}
}

/**
* Support reading / metadata writes on tables with the feature. Don't support writing new data
* rows with default values. Don't allow updating the types of columns with default values.
Expand Down Expand Up @@ -517,6 +526,7 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me
ALLOW_COLUMN_DEFAULTS_W_FEATURE,
APPEND_ONLY_W_FEATURE,
CATALOG_MANAGED_RW_FEATURE,
CHECKPOINT_PROTECTION_W_FEATURE,
CHECKPOINT_V2_RW_FEATURE,
CHANGE_DATA_FEED_W_FEATURE,
CLUSTERING_W_FEATURE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TableFeaturesSuite extends AnyFunSuite {
val writerOnlyFeatures = Seq(
"allowColumnDefaults",
"appendOnly",
"checkpointProtection",
"invariants",
"checkConstraints",
"generatedColumns",
Expand Down Expand Up @@ -198,6 +199,7 @@ class TableFeaturesSuite extends AnyFunSuite {
})

Seq(
"checkpointProtection",
"domainMetadata",
"vacuumProtocolCheck",
"clustering",
Expand Down Expand Up @@ -259,6 +261,7 @@ class TableFeaturesSuite extends AnyFunSuite {
// these features are enabled
val expected = Seq(
"catalogManaged",
"checkpointProtection",
"columnMapping",
"allowColumnDefaults",
"v2Checkpoint",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,25 @@ class CreateCheckpointSuite extends CheckpointBase {
verifyResults(tablePath, expResults, checkpointVersion)
}
}

test("try create a checkpoint when checkpointProtection enabled") {
withTempDirAndEngine { (tablePath, tc) =>
// Create a table with checkpointProtection feature enabled
spark.sql(s"CREATE TABLE delta.`$tablePath` (id INT) USING delta " +
"TBLPROPERTIES ('delta.feature.checkpointProtection' = 'supported')")

// Add some data to create commits
for (_ <- 0 to 3) {
spark.sql(s"INSERT INTO delta.`$tablePath` VALUES (1), (2), (3)")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to use delta-spark? Couldn't you use kernel to create the table? and look at other tests for how we append/insert data?


// Attempting to checkpoint should fail
val ex = intercept[Exception] {
kernelCheckpoint(tc, tablePath, checkpointVersion = 4)
}
assert(ex.getMessage.contains("checkpointProtection"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert more of the error message please. this makes reading this single test even cleaner.

}
}
}

/**
Expand Down
Loading