diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 86ce5e9ce5..bb9eddc5cd 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -103,6 +103,7 @@ pub mod table_changes; pub mod table_configuration; pub mod table_features; pub mod table_properties; +mod table_property_protocol_config; pub mod transaction; pub(crate) mod transforms; diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index e6c2db1049..dff850f3f6 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -16,9 +16,10 @@ pub(crate) struct RowTrackingDomainMetadata { row_id_high_water_mark: i64, } -impl RowTrackingDomainMetadata { - const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking"; +/// The domain name for row tracking metadata. +pub(crate) const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking"; +impl RowTrackingDomainMetadata { pub(crate) fn new(row_id_high_water_mark: i64) -> Self { RowTrackingDomainMetadata { row_id_high_water_mark, @@ -45,14 +46,16 @@ impl RowTrackingDomainMetadata { snapshot: &Snapshot, engine: &dyn Engine, ) -> DeltaResult> { - Ok(domain_metadata_configuration( - snapshot.log_segment(), - Self::ROW_TRACKING_DOMAIN_NAME, - engine, - )? - .map(|domain_metadata| serde_json::from_str::(&domain_metadata)) - .transpose()? - .map(|metadata| metadata.row_id_high_water_mark)) + Ok( + domain_metadata_configuration( + snapshot.log_segment(), + ROW_TRACKING_DOMAIN_NAME, + engine, + )? + .map(|domain_metadata| serde_json::from_str::(&domain_metadata)) + .transpose()? + .map(|metadata| metadata.row_id_high_water_mark), + ) } } @@ -61,7 +64,7 @@ impl TryFrom for DomainMetadata { fn try_from(metadata: RowTrackingDomainMetadata) -> DeltaResult { Ok(DomainMetadata::new( - RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(), + ROW_TRACKING_DOMAIN_NAME.to_string(), serde_json::to_string(&metadata)?, )) } diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 9080447084..f203269ed3 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -1,3 +1,5 @@ +//! Configuration for reading existing Delta tables. +//! //! This module defines [`TableConfiguration`], a high level api to check feature support and //! feature enablement for a table at a given version. This encapsulates [`Protocol`], [`Metadata`], //! [`Schema`], [`TableProperties`], and [`ColumnMappingMode`]. These structs in isolation should @@ -7,6 +9,12 @@ //! reader/writer features, and ensure that the deletion vector table property is enabled in the //! [`TableProperties`]. //! +//! # Related Modules +//! +//! - [`crate::table_property_protocol_config`]: For **creating/modifying** tables. Parses +//! user-provided properties to extract signal flags and create protocols. Use this when building +//! new tables or modifying existing table properties. +//! //! [`Schema`]: crate::schema::Schema use std::sync::Arc; diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 147ba5979a..c30abf1e80 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -1,4 +1,6 @@ use itertools::Itertools; +use std::str::FromStr; + use serde::{Deserialize, Serialize}; use strum::{AsRefStr, Display as StrumDisplay, EnumCount, EnumString}; @@ -679,6 +681,20 @@ impl TableFeature { TableFeature::Unknown(_) => None, } } + + /// Parse a feature name string into a TableFeature. + /// + /// Known feature names are parsed into their corresponding variants. + /// Unknown feature names are wrapped in `TableFeature::Unknown`. + pub(crate) fn from_name(name: &str) -> Self { + TableFeature::from_str(name).unwrap_or_else(|_| TableFeature::Unknown(name.to_string())) + } + + /// Returns true if this is a ReaderWriter feature (appears in both reader and writer feature lists). + /// Returns false for Writer-only features and Unknown features. + pub(crate) fn is_reader_writer(&self) -> bool { + matches!(self.feature_type(), FeatureType::ReaderWriter) + } } impl ToDataType for TableFeature { @@ -829,4 +845,49 @@ mod tests { assert_eq!(from_str, feature); } } + + #[test] + fn test_from_name() { + // Known features + assert_eq!( + TableFeature::from_name("deletionVectors"), + TableFeature::DeletionVectors + ); + assert_eq!( + TableFeature::from_name("changeDataFeed"), + TableFeature::ChangeDataFeed + ); + assert_eq!( + TableFeature::from_name("columnMapping"), + TableFeature::ColumnMapping + ); + assert_eq!( + TableFeature::from_name("timestampNtz"), + TableFeature::TimestampWithoutTimezone + ); + + // Unknown features + assert_eq!( + TableFeature::from_name("unknownFeature"), + TableFeature::Unknown("unknownFeature".to_string()) + ); + } + + #[test] + fn test_is_reader_writer() { + // ReaderWriter features + assert!(TableFeature::DeletionVectors.is_reader_writer()); + assert!(TableFeature::ColumnMapping.is_reader_writer()); + assert!(TableFeature::TimestampWithoutTimezone.is_reader_writer()); + assert!(TableFeature::V2Checkpoint.is_reader_writer()); + + // Writer-only features + assert!(!TableFeature::ChangeDataFeed.is_reader_writer()); + assert!(!TableFeature::AppendOnly.is_reader_writer()); + assert!(!TableFeature::DomainMetadata.is_reader_writer()); + assert!(!TableFeature::RowTracking.is_reader_writer()); + + // Unknown features + assert!(!TableFeature::unknown("something").is_reader_writer()); + } } diff --git a/kernel/src/table_properties.rs b/kernel/src/table_properties.rs index 35f19b5bf1..77e2a4599f 100644 --- a/kernel/src/table_properties.rs +++ b/kernel/src/table_properties.rs @@ -26,6 +26,14 @@ pub use deserialize::ParseIntervalError; /// Prefix for delta table properties (e.g., `delta.enableChangeDataFeed`, `delta.appendOnly`). pub const DELTA_PROPERTY_PREFIX: &str = "delta."; +/// Table property key for specifying the minimum reader protocol version. +/// This is a signal flag property - it affects protocol creation but is not stored in metadata. +pub const MIN_READER_VERSION_PROP: &str = "delta.minReaderVersion"; + +/// Table property key for specifying the minimum writer protocol version. +/// This is a signal flag property - it affects protocol creation but is not stored in metadata. +pub const MIN_WRITER_VERSION_PROP: &str = "delta.minWriterVersion"; + /// Delta table properties. These are parsed from the 'configuration' map in the most recent /// 'Metadata' action of a table. /// diff --git a/kernel/src/table_property_protocol_config.rs b/kernel/src/table_property_protocol_config.rs new file mode 100644 index 0000000000..2579cb3afb --- /dev/null +++ b/kernel/src/table_property_protocol_config.rs @@ -0,0 +1,392 @@ +//! Configuration for table property-based protocol modifications. +//! +//! This module provides [`TablePropertyProtocolConfig`], which parses user-provided table +//! properties to extract signal flags and create protocol configurations for table operations. +//! +//! # Difference from [`TableConfiguration`](crate::table_configuration::TableConfiguration) +//! +//! - **[`TableConfiguration`](crate::table_configuration::TableConfiguration)**: Reads the +//! configuration of an *existing* table from a snapshot. It combines the Protocol and Metadata +//! actions to provide a unified view of a table's current state. +//! +//! - **[`TablePropertyProtocolConfig`]**: Parses *user-provided* properties during table creation +//! or modification operations (CREATE TABLE, ALTER TABLE SET TBLPROPERTIES). It extracts signal +//! flags (like `delta.feature.` and protocol versions) that affect the Protocol but are +//! not stored in metadata. +//! +//! In short: `TableConfiguration` is for **reading** existing tables, while +//! `TablePropertyProtocolConfig` is for **creating/modifying** tables. + +use std::collections::HashMap; + +use crate::actions::Protocol; +use crate::table_features::{ + TableFeature, SET_TABLE_FEATURE_SUPPORTED_PREFIX, SET_TABLE_FEATURE_SUPPORTED_VALUE, + TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION, +}; +use crate::table_properties::{MIN_READER_VERSION_PROP, MIN_WRITER_VERSION_PROP}; +use crate::{DeltaResult, Error}; + +/// Configuration parsed from user-provided table properties during table creation or modification. +/// +/// This struct extracts signal flags (protocol versions, feature overrides) from +/// the input properties and separates them from the table properties that will +/// be stored in metadata. +/// +/// Signal flags include: +/// - `delta.feature. = supported` - Explicit feature enablement +/// - `delta.minReaderVersion` - Protocol reader version hint +/// - `delta.minWriterVersion` - Protocol writer version hint +/// +/// These signal flags affect protocol creation but are not stored in the table's +/// metadata configuration. +/// +/// # Example +/// ```ignore +/// let props = HashMap::from([ +/// ("delta.feature.deletionVectors".to_string(), "supported".to_string()), +/// ("delta.minReaderVersion".to_string(), "3".to_string()), +/// ("myapp.version".to_string(), "1.0".to_string()), +/// ]); +/// let config = TablePropertyProtocolConfig::try_from(props)?; +/// config.validate_for_create()?; +/// // config.protocol contains Protocol with reader/writer features +/// // config.table_properties = {"myapp.version": "1.0"} +/// ``` +#[derive(Debug)] +pub(crate) struct TablePropertyProtocolConfig { + /// The resolved protocol (always uses table features protocol: v3/v7). + /// Created during parsing with user-specified features. + pub(crate) protocol: Protocol, + /// Remaining properties to store in Metadata.configuration. + /// Signal flags (delta.feature.*) and version props are stripped out. + pub(crate) table_properties: HashMap, +} + +impl TryFrom> for TablePropertyProtocolConfig { + type Error = Error; + + fn try_from(properties: HashMap) -> DeltaResult { + let mut all_features = Vec::new(); + let mut user_reader_version: Option = None; + let mut user_writer_version: Option = None; + let mut table_properties = HashMap::new(); + + for (key, value) in properties { + if let Some(feature_name) = key.strip_prefix(SET_TABLE_FEATURE_SUPPORTED_PREFIX) { + // Parse delta.feature.* signal flags + if value != SET_TABLE_FEATURE_SUPPORTED_VALUE { + return Err(Error::generic(format!( + "Invalid value '{}' for '{}'. Only '{}' is allowed.", + value, key, SET_TABLE_FEATURE_SUPPORTED_VALUE + ))); + } + let feature = TableFeature::from_name(feature_name); + if matches!(feature, TableFeature::Unknown(_)) { + return Err(Error::generic(format!( + "Unknown table feature '{}'. Cannot create table with unsupported features.", + feature_name + ))); + } + all_features.push(feature); + } else if key == MIN_READER_VERSION_PROP { + // Parse delta.minReaderVersion + let version: i32 = value.parse().map_err(|_| { + Error::generic(format!( + "Invalid value '{}' for '{}'. Must be an integer.", + value, key + )) + })?; + // Validate immediately: if specified, must be the required version + if version != TABLE_FEATURES_MIN_READER_VERSION { + return Err(Error::generic(format!( + "Invalid value '{}' for '{}'. Only '{}' is supported.", + version, MIN_READER_VERSION_PROP, TABLE_FEATURES_MIN_READER_VERSION + ))); + } + user_reader_version = Some(version); + } else if key == MIN_WRITER_VERSION_PROP { + // Parse delta.minWriterVersion + let version: i32 = value.parse().map_err(|_| { + Error::generic(format!( + "Invalid value '{}' for '{}'. Must be an integer.", + value, key + )) + })?; + // Validate immediately: if specified, must be the required version + if version != TABLE_FEATURES_MIN_WRITER_VERSION { + return Err(Error::generic(format!( + "Invalid value '{}' for '{}'. Only '{}' is supported.", + version, MIN_WRITER_VERSION_PROP, TABLE_FEATURES_MIN_WRITER_VERSION + ))); + } + user_writer_version = Some(version); + } else { + // Pass through to table properties + table_properties.insert(key, value); + } + } + + // Partition features: ReaderWriter -> reader list, all -> writer list + let reader_features: Vec<_> = all_features + .iter() + .filter(|f| f.is_reader_writer()) + .cloned() + .collect(); + + // Create Protocol with resolved versions. + // User-specified versions (if any) were validated above; default to required versions. + let protocol = Protocol::try_new( + user_reader_version.unwrap_or(TABLE_FEATURES_MIN_READER_VERSION), + user_writer_version.unwrap_or(TABLE_FEATURES_MIN_WRITER_VERSION), + Some(reader_features.iter()), + Some(all_features.iter()), + )?; + + Ok(Self { + protocol, + table_properties, + }) + } +} + +impl TablePropertyProtocolConfig { + /// Delta properties allowed during CREATE TABLE. + /// Expand this list as more features are supported (e.g., column mapping, clustering). + const ALLOWED_DELTA_PROPERTIES: &[&str] = &[ + // Currently empty - no delta.* properties allowed yet + // Future: "delta.enableDeletionVectors", "delta.columnMapping.mode", etc. + ]; + + /// Table features allowed during CREATE TABLE. + /// Expand this list as more features are supported. + const ALLOWED_DELTA_FEATURES: &[TableFeature] = &[ + // Currently empty - no explicit feature overrides allowed yet + // Future: TableFeature::DeletionVectors, TableFeature::ColumnMapping, etc. + ]; + + /// Validates the configuration for CREATE TABLE. + /// + /// Checks: + /// - Only allowed delta properties can be set + /// - Only allowed features can be explicitly enabled + /// + /// Note: Protocol version validation happens during parsing in `try_from`. + pub(crate) fn validate_for_create(&self) -> DeltaResult<()> { + use crate::table_properties::DELTA_PROPERTY_PREFIX; + + // Validate delta properties against allow-list + for key in self.table_properties.keys() { + if key.starts_with(DELTA_PROPERTY_PREFIX) + && !Self::ALLOWED_DELTA_PROPERTIES.contains(&key.as_str()) + { + return Err(Error::generic(format!( + "Setting delta property '{}' is not supported during CREATE TABLE", + key + ))); + } + } + + // Validate features against allow-list + if let Some(writer_features) = self.protocol.writer_features() { + for feature in writer_features { + if !Self::ALLOWED_DELTA_FEATURES.contains(feature) { + return Err(Error::generic(format!( + "Enabling feature '{}' is not supported during CREATE TABLE", + feature + ))); + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::table_properties::{MIN_READER_VERSION_PROP, MIN_WRITER_VERSION_PROP}; + use crate::utils::test_utils::assert_result_error_with_message; + + /// Helper to construct a HashMap from string slice pairs. + fn props(pairs: [(&str, &str); N]) -> HashMap { + pairs + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect() + } + + #[test] + fn test_table_property_protocol_config_basic() { + let config = TablePropertyProtocolConfig::try_from(props([ + ("delta.feature.deletionVectors", "supported"), + ("delta.enableDeletionVectors", "true"), + ])) + .unwrap(); + + // DeletionVectors is a ReaderWriter feature - check via protocol + let reader_features = config.protocol.reader_features().unwrap(); + let writer_features = config.protocol.writer_features().unwrap(); + assert_eq!(reader_features.len(), 1); + assert_eq!(reader_features[0], TableFeature::DeletionVectors); + assert_eq!(writer_features.len(), 1); + assert_eq!(writer_features[0], TableFeature::DeletionVectors); + + // Feature override should be removed from table_properties + assert!(!config + .table_properties + .contains_key("delta.feature.deletionVectors")); + // Regular property should be retained + assert_eq!( + config.table_properties.get("delta.enableDeletionVectors"), + Some(&"true".to_string()) + ); + } + + #[test] + fn test_table_property_protocol_config_multiple_features() { + let config = TablePropertyProtocolConfig::try_from(props([ + ("delta.feature.deletionVectors", "supported"), + ("delta.feature.changeDataFeed", "supported"), + ("delta.feature.appendOnly", "supported"), + ("delta.enableDeletionVectors", "true"), + ])) + .unwrap(); + + // All 3 features go into writer_features - check via protocol + let writer_features = config.protocol.writer_features().unwrap(); + assert_eq!(writer_features.len(), 3); + assert!(writer_features.contains(&TableFeature::DeletionVectors)); + assert!(writer_features.contains(&TableFeature::ChangeDataFeed)); + assert!(writer_features.contains(&TableFeature::AppendOnly)); + + // Only ReaderWriter features go into reader_features (DeletionVectors) + // ChangeDataFeed and AppendOnly are Writer-only features + let reader_features = config.protocol.reader_features().unwrap(); + assert_eq!(reader_features.len(), 1); + assert!(reader_features.contains(&TableFeature::DeletionVectors)); + + // Only the regular property should remain + assert_eq!(config.table_properties.len(), 1); + assert_eq!( + config.table_properties.get("delta.enableDeletionVectors"), + Some(&"true".to_string()) + ); + } + + #[test] + fn test_table_property_protocol_config_no_features() { + let config = TablePropertyProtocolConfig::try_from(props([ + ("delta.enableDeletionVectors", "true"), + ("delta.appendOnly", "true"), + ("custom.property", "value"), + ])) + .unwrap(); + + // No features specified - protocol should have empty feature lists + let reader_features = config.protocol.reader_features().unwrap(); + let writer_features = config.protocol.writer_features().unwrap(); + assert!(reader_features.is_empty()); + assert!(writer_features.is_empty()); + assert_eq!(config.table_properties.len(), 3); + } + + #[test] + fn test_table_property_protocol_config_parsing_failures() { + // Invalid feature value ("enabled" instead of "supported") + assert_result_error_with_message( + TablePropertyProtocolConfig::try_from(props([( + "delta.feature.deletionVectors", + "enabled", + )])), + "Invalid value 'enabled'", + ); + + // Unknown feature name + assert_result_error_with_message( + TablePropertyProtocolConfig::try_from(props([( + "delta.feature.futureFeature", + "supported", + )])), + "Unknown table feature 'futureFeature'", + ); + } + + #[test] + fn test_table_property_protocol_config_valid_versions() { + let config = TablePropertyProtocolConfig::try_from(props([ + (MIN_READER_VERSION_PROP, "3"), + (MIN_WRITER_VERSION_PROP, "7"), + ("custom.property", "value"), + ])) + .unwrap(); + + // Protocol created with correct versions + assert_eq!(config.protocol.min_reader_version(), 3); + assert_eq!(config.protocol.min_writer_version(), 7); + // Protocol version properties stripped from table_properties + assert!(!config + .table_properties + .contains_key(MIN_READER_VERSION_PROP)); + assert!(!config + .table_properties + .contains_key(MIN_WRITER_VERSION_PROP)); + // Other properties remain + assert_eq!( + config.table_properties.get("custom.property"), + Some(&"value".to_string()) + ); + // Validation passes + assert!(config.validate_for_create().is_ok()); + } + + #[test] + fn test_table_property_protocol_config_validation_errors() { + // Each case: (description, input properties, expected error substring, fails_at_parse) + let error_cases: &[(&str, &[(&str, &str)], &str, bool)] = &[ + ( + "Invalid reader version (only 3 supported)", + &[(MIN_READER_VERSION_PROP, "2")], + "Only '3' is supported", + true, // fails at parsing + ), + ( + "Invalid writer version (only 7 supported)", + &[(MIN_WRITER_VERSION_PROP, "5")], + "Only '7' is supported", + true, // fails at parsing + ), + ( + "Feature not on allow list", + &[("delta.feature.deletionVectors", "supported")], + "Enabling feature 'deletionVectors' is not supported", + false, // fails at validation + ), + ( + "Delta property not on allow list", + &[("delta.appendOnly", "true")], + "Setting delta property 'delta.appendOnly' is not supported", + false, // fails at validation + ), + ]; + + for (description, input, expected_error, fails_at_parse) in error_cases { + let input_map: HashMap = input + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + if *fails_at_parse { + assert_result_error_with_message( + TablePropertyProtocolConfig::try_from(input_map), + expected_error, + ); + } else { + let config = TablePropertyProtocolConfig::try_from(input_map) + .unwrap_or_else(|e| panic!("{}: unexpected parse error: {}", description, e)); + assert_result_error_with_message(config.validate_for_create(), expected_error); + } + } + } +} diff --git a/kernel/src/transaction/create_table.rs b/kernel/src/transaction/create_table.rs index 406a897b9c..54a2afa25a 100644 --- a/kernel/src/transaction/create_table.rs +++ b/kernel/src/transaction/create_table.rs @@ -36,31 +36,17 @@ use std::sync::Arc; use url::Url; -use crate::actions::{Metadata, Protocol}; +use crate::actions::Metadata; use crate::committer::Committer; use crate::log_segment::LogSegment; use crate::schema::SchemaRef; use crate::snapshot::Snapshot; use crate::table_configuration::TableConfiguration; -use crate::table_features::{ - SET_TABLE_FEATURE_SUPPORTED_PREFIX, TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION, -}; -use crate::table_properties::DELTA_PROPERTY_PREFIX; +use crate::table_property_protocol_config::TablePropertyProtocolConfig; use crate::transaction::Transaction; use crate::utils::{current_time_ms, try_parse_uri}; use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION}; -/// Properties that are allowed to be set during create table. -/// This list will expand as more features are supported (e.g., column mapping, clustering). -/// The allow list will be deprecated once auto feature enablement is implemented -/// like the Java Kernel. -const ALLOWED_DELTA_PROPERTIES: &[&str] = &[ - // Empty for now - will add properties as features are implemented: - // - "delta.columnMapping.mode" (for column mapping) - // - etc. -]; - /// Ensures that no Delta table exists at the given path. /// /// This function checks the `_delta_log` directory to determine if a table already exists. @@ -114,35 +100,6 @@ fn ensure_table_does_not_exist( } } -/// Validates that table properties are allowed during CREATE TABLE. -/// -/// This function enforces an allow list for delta properties: -/// - Feature override properties (`delta.feature.*`) are never allowed -/// - Delta properties (`delta.*`) must be on the allow list -/// - Non-delta properties (user/application properties) are always allowed -fn validate_table_properties(properties: &HashMap) -> DeltaResult<()> { - for key in properties.keys() { - // Block all delta.feature.* properties (feature override properties) - if key.starts_with(SET_TABLE_FEATURE_SUPPORTED_PREFIX) { - return Err(Error::generic(format!( - "Setting feature override property '{}' is not supported during CREATE TABLE", - key - ))); - } - // For delta.* properties, check against allow list - if key.starts_with(DELTA_PROPERTY_PREFIX) - && !ALLOWED_DELTA_PROPERTIES.contains(&key.as_str()) - { - return Err(Error::generic(format!( - "Setting delta property '{}' is not supported during CREATE TABLE", - key - ))); - } - // Non-delta properties (user/application properties) are always allowed - } - Ok(()) -} - /// Creates a builder for creating a new Delta table. /// /// This function returns a [`CreateTableTransactionBuilder`] that can be configured with table @@ -284,27 +241,22 @@ impl CreateTableTransactionBuilder { ) -> DeltaResult { // Validate path let table_url = try_parse_uri(&self.path)?; + // Check if table already exists by looking for _delta_log directory + let delta_log_url = table_url.join("_delta_log/")?; + let storage = engine.storage_handler(); + ensure_table_does_not_exist(storage.as_ref(), &delta_log_url, &self.path)?; // Validate schema is non-empty if self.schema.fields().len() == 0 { return Err(Error::generic("Schema cannot be empty")); } - // Validate table properties against allow list - validate_table_properties(&self.table_properties)?; - - // Check if table already exists by looking for _delta_log directory - let delta_log_url = table_url.join("_delta_log/")?; - let storage = engine.storage_handler(); - ensure_table_does_not_exist(storage.as_ref(), &delta_log_url, &self.path)?; + // Parse and validate table properties + let config = TablePropertyProtocolConfig::try_from(self.table_properties)?; + config.validate_for_create()?; - // Create Protocol action with table features support - let protocol = Protocol::try_new( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION, - Some(Vec::::new()), // readerFeatures (empty for now) - Some(Vec::::new()), // writerFeatures (empty for now) - )?; + // Use protocol from config (already created with table features support) + let protocol = config.protocol; // Create Metadata action let metadata = Metadata::try_new( @@ -313,7 +265,7 @@ impl CreateTableTransactionBuilder { (*self.schema).clone(), Vec::new(), // partition_columns - added with data layout support current_time_ms()?, - self.table_properties, + config.table_properties, )?; // Create pre-commit snapshot from protocol/metadata @@ -335,8 +287,11 @@ impl CreateTableTransactionBuilder { #[cfg(test)] mod tests { use super::*; + use crate::committer::FileSystemCommitter; + use crate::engine::sync::SyncEngine; use crate::schema::{DataType, StructField, StructType}; - use crate::utils::test_utils::assert_result_error_with_message; + use crate::table_properties::{MIN_READER_VERSION_PROP, MIN_WRITER_VERSION_PROP}; + use crate::utils::test_utils::assert_result_error_with_messages; use std::sync::Arc; fn test_schema() -> SchemaRef { @@ -402,46 +357,113 @@ mod tests { } #[test] - fn test_validate_supported_properties() { - // Empty properties are allowed + fn test_table_creation_config_parsing() { + // Empty properties are allowed - protocol is created with default features let properties = HashMap::new(); - assert!(validate_table_properties(&properties).is_ok()); + let config = TablePropertyProtocolConfig::try_from(properties).unwrap(); + assert!(config.table_properties.is_empty()); + // Protocol always has features (even if empty) at version 3/7 + assert_eq!(config.protocol.min_reader_version(), 3); + assert_eq!(config.protocol.min_writer_version(), 7); + assert!(config.protocol.reader_features().unwrap().is_empty()); + assert!(config.protocol.writer_features().unwrap().is_empty()); + + // User/application properties are passed through + let properties = HashMap::from([ + ("myapp.version".to_string(), "1.0".to_string()), + ("custom.setting".to_string(), "value".to_string()), + ]); + let config = TablePropertyProtocolConfig::try_from(properties).unwrap(); + assert_eq!( + config.table_properties.get("myapp.version"), + Some(&"1.0".to_string()) + ); + assert_eq!( + config.table_properties.get("custom.setting"), + Some(&"value".to_string()) + ); - // User/application properties are allowed - let mut properties = HashMap::new(); - properties.insert("myapp.version".to_string(), "1.0".to_string()); - properties.insert("custom.setting".to_string(), "value".to_string()); - assert!(validate_table_properties(&properties).is_ok()); + // Protocol version properties are parsed and stripped from table_properties + let properties = HashMap::from([ + (MIN_READER_VERSION_PROP.to_string(), "3".to_string()), + (MIN_WRITER_VERSION_PROP.to_string(), "7".to_string()), + ("myapp.version".to_string(), "1.0".to_string()), + ]); + let config = TablePropertyProtocolConfig::try_from(properties).unwrap(); + // Protocol created with specified versions (always 3/7) + assert_eq!(config.protocol.min_reader_version(), 3); + assert_eq!(config.protocol.min_writer_version(), 7); + // Protocol properties should be stripped from table_properties + assert!(!config + .table_properties + .contains_key(MIN_READER_VERSION_PROP)); + assert!(!config + .table_properties + .contains_key(MIN_WRITER_VERSION_PROP)); + // User properties should remain + assert_eq!( + config.table_properties.get("myapp.version"), + Some(&"1.0".to_string()) + ); } #[test] - fn test_validate_unsupported_properties() { - // Delta properties not on allow list are rejected - let mut properties = HashMap::new(); - properties.insert("delta.enableChangeDataFeed".to_string(), "true".to_string()); - assert_result_error_with_message( - validate_table_properties(&properties), - "Setting delta property 'delta.enableChangeDataFeed' is not supported", + fn test_protocol_version_validation() { + let engine = SyncEngine::new(); + + // Helper to build a table with given properties and return the result + let try_create = |props: HashMap| { + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path().to_str().unwrap(); + CreateTableTransactionBuilder::new(table_path, test_schema(), "TestApp/1.0") + .with_table_properties(props) + .build(&engine, Box::new(FileSystemCommitter::new())) + }; + + // Valid: both versions (3, 7) + let props = HashMap::from([ + (MIN_READER_VERSION_PROP.to_string(), "3".to_string()), + (MIN_WRITER_VERSION_PROP.to_string(), "7".to_string()), + ]); + assert!( + try_create(props).is_ok(), + "Valid protocol versions (3, 7) should succeed" ); - // Feature override properties are rejected - let mut properties = HashMap::new(); - properties.insert( - "delta.feature.domainMetadata".to_string(), - "supported".to_string(), + // Valid: only reader version (3) + let props = HashMap::from([(MIN_READER_VERSION_PROP.to_string(), "3".to_string())]); + assert!( + try_create(props).is_ok(), + "Only reader version (3) should succeed" ); - assert_result_error_with_message( - validate_table_properties(&properties), - "Setting feature override property 'delta.feature.domainMetadata' is not supported", + + // Valid: only writer version (7) + let props = HashMap::from([(MIN_WRITER_VERSION_PROP.to_string(), "7".to_string())]); + assert!( + try_create(props).is_ok(), + "Only writer version (7) should succeed" ); - // Mixed properties with unsupported delta property are rejected - let mut properties = HashMap::new(); - properties.insert("myapp.version".to_string(), "1.0".to_string()); - properties.insert("delta.appendOnly".to_string(), "true".to_string()); - assert_result_error_with_message( - validate_table_properties(&properties), - "Setting delta property 'delta.appendOnly' is not supported", + // Invalid: reader version 2 (only 3 is supported) + let props = HashMap::from([(MIN_READER_VERSION_PROP.to_string(), "2".to_string())]); + assert_result_error_with_messages( + try_create(props), + &["delta.minReaderVersion", "Only '3' is supported"], ); + + // Invalid: writer version 5 (only 7 is supported) + let props = HashMap::from([(MIN_WRITER_VERSION_PROP.to_string(), "5".to_string())]); + assert_result_error_with_messages( + try_create(props), + &["delta.minWriterVersion", "Only '7' is supported"], + ); + + // Invalid: non-integer reader version + let props = HashMap::from([(MIN_READER_VERSION_PROP.to_string(), "abc".to_string())]); + assert_result_error_with_messages(try_create(props), &["Must be an integer"]); + + // Invalid: non-integer writer version + let props = HashMap::from([(MIN_WRITER_VERSION_PROP.to_string(), "xyz".to_string())]); + assert_result_error_with_messages(try_create(props), &["Must be an integer"]); } } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 687ea429b5..4197aa7064 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -20,7 +20,9 @@ use crate::error::Error; use crate::expressions::{column_name, ColumnName}; use crate::expressions::{ArrayData, Scalar, StructData, Transform, UnaryExpressionOp::ToJson}; use crate::path::{LogRoot, ParsedLogPath}; -use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor}; +use crate::row_tracking::{ + RowTrackingDomainMetadata, RowTrackingVisitor, ROW_TRACKING_DOMAIN_NAME, +}; use crate::scan::data_skipping::stats_schema::NullableStatsTransform; use crate::scan::log_replay::{ get_scan_metadata_transform_expr, BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME, @@ -647,19 +649,49 @@ impl Transaction { // PRE_COMMIT_VERSION (u64::MAX) + 1 wraps to 0, which is the correct first version self.read_snapshot.version().wrapping_add(1) } - /// Validate that user domains don't conflict with system domains or each other. - fn validate_user_domain_operations(&self) -> DeltaResult<()> { + /// Validate domain metadata operations for both create-table and existing-table transactions. + /// + /// Enforces the following rules: + /// - DomainMetadata feature must be supported if any domain operations are present + /// - System domains (delta.*) can only be added in create-table transactions + /// - System domains must correspond to a known feature (e.g., rowTracking) and that feature must be enabled + /// - User domains can be added in both create-table and existing-table transactions + /// - Domain removals are not allowed in create-table transactions + /// - No duplicate domains within a single transaction + fn validate_domain_metadata_operations(&self) -> DeltaResult<()> { + // Feature validation (applies to all transactions with domain operations) + if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) + && !self + .read_snapshot + .table_configuration() + .is_feature_supported(&TableFeature::DomainMetadata) + { + return Err(Error::unsupported( + "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature", + )); + } + + let is_create = self.is_create_table(); let mut seen_domains = HashSet::new(); // Validate domain additions for dm in &self.domain_metadata_additions { let domain = dm.domain(); - if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + let is_system_domain = domain.starts_with(INTERNAL_DOMAIN_PREFIX); + + // System domains (delta.*) only allowed in create-table + if is_system_domain && !is_create { return Err(Error::generic( "Cannot modify domains that start with 'delta.' as those are system controlled", )); } + // For create-table, validate system domains against their required features + if is_system_domain && is_create { + self.validate_system_domain_feature(domain)?; + } + + // Check for duplicates if !seen_domains.insert(domain) { return Err(Error::generic(format!( "Metadata for domain {} already specified in this transaction", @@ -668,7 +700,14 @@ impl Transaction { } } - // Validate domain removals + // No removals allowed for create-table + if is_create && !self.domain_removals.is_empty() { + return Err(Error::unsupported( + "Domain metadata removals are not supported in create-table transactions", + )); + } + + // Validate domain removals (for non-create-table) for domain in &self.domain_removals { if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { return Err(Error::generic( @@ -687,6 +726,39 @@ impl Transaction { Ok(()) } + /// Validate that a system domain corresponds to a known feature and that the feature is supported. + /// + /// This prevents arbitrary `delta.*` domains from being added during table creation. + /// Each known system domain must have its corresponding feature enabled in the protocol. + fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> { + let table_config = self.read_snapshot.table_configuration(); + + // Map domain to its required feature + let required_feature = match domain { + ROW_TRACKING_DOMAIN_NAME => Some(TableFeature::RowTracking), + // Will be changed to a constant in a follow up clustering create table feature PR + "delta.clustering" => Some(TableFeature::ClusteredTable), + _ => { + return Err(Error::generic(format!( + "Unknown system domain '{}'. Only known system domains are allowed.", + domain + ))); + } + }; + + // If the domain requires a feature, validate it's supported + if let Some(feature) = required_feature { + if !table_config.is_feature_supported(&feature) { + return Err(Error::generic(format!( + "System domain '{}' requires the '{}' feature to be enabled", + domain, feature + ))); + } + } + + Ok(()) + } + /// Helper function to convert scan metadata iterator to filtered engine data iterator. /// /// This adapter extracts the `scan_files` field from each [`crate::scan::ScanMetadata`] item, @@ -825,6 +897,36 @@ impl Transaction { Ok(()) } + /// Generate removal actions for user domain metadata by scanning the log. + /// + /// This performs an expensive log replay operation to fetch the previous configuration + /// value for each domain being removed, as required by the Delta spec for tombstones. + /// Returns an empty vector if there are no domain removals. + fn generate_user_domain_removal_actions( + &self, + engine: &dyn Engine, + ) -> DeltaResult> { + if self.domain_removals.is_empty() { + return Ok(vec![]); + } + + // Scan log to fetch existing configurations for tombstones + let existing_domains = + scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?; + + // Create removal tombstones with pre-image configurations + Ok(self + .domain_removals + .iter() + .filter_map(|domain| { + // If domain doesn't exist in the log, this is a no-op (filter it out) + existing_domains.get(domain).map(|existing| { + DomainMetadata::remove(domain.clone(), existing.configuration().to_owned()) + }) + }) + .collect()) + } + /// Generate domain metadata actions with validation. Handle both user and system domains. /// /// This function may perform an expensive log replay operation if there are any domain removals. @@ -835,55 +937,28 @@ impl Transaction { engine: &'a dyn Engine, row_tracking_high_watermark: Option, ) -> DeltaResult> { - // For create-table transactions, domain metadata will be added in - // a subsequent code commit - if self.is_create_table() { - if !self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty() { - return Err(Error::unsupported( - "Domain metadata operations are not supported in create-table transactions", + let is_create = self.is_create_table(); + + // Validate domain operations (includes feature validation) + self.validate_domain_metadata_operations()?; + + // TODO(sanuj) Create-table must not have row tracking or removals + // Defensive. Needs to be updated when row tracking support is added. + if is_create { + if row_tracking_high_watermark.is_some() { + return Err(Error::internal_error( + "CREATE TABLE cannot have row tracking domain metadata", )); } - return Ok(Box::new(iter::empty())); + // domain_removals already validated above, but be explicit + debug_assert!(self.domain_removals.is_empty()); } - // Validate feature support for user domain operations - if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) - && !self - .read_snapshot - .table_configuration() - .is_feature_supported(&TableFeature::DomainMetadata) - { - return Err(Error::unsupported("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature")); - } - - // Validate user domain operations - self.validate_user_domain_operations()?; - - // Generate user domain removals via log replay (expensive if non-empty) - let removal_actions = if !self.domain_removals.is_empty() { - // Scan log to fetch existing configurations for tombstones - let existing_domains = - scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?; - - // Create removal tombstones with pre-image configurations - let removals: Vec<_> = self - .domain_removals - .iter() - .filter_map(|domain| { - // If domain doesn't exist in the log, this is a no-op (filter it out) - existing_domains.get(domain).map(|existing| { - DomainMetadata::remove(domain.clone(), existing.configuration().to_owned()) - }) - }) - .collect(); - - removals - } else { - vec![] - }; + // Generate removal actions (empty for create-table due to validation above) + let removal_actions = self.generate_user_domain_removal_actions(engine)?; - // Generate system domain actions (row tracking) - let system_domain_actions = row_tracking_high_watermark + // Generate row tracking domain action (None for create-table) + let row_tracking_domain_action = row_tracking_high_watermark .map(DomainMetadata::try_from) .transpose()? .into_iter(); @@ -894,7 +969,7 @@ impl Transaction { .clone() .into_iter() .chain(removal_actions) - .chain(system_domain_actions) + .chain(row_tracking_domain_action) .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)), )) } diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 097a2b1e02..796a2060f9 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -292,6 +292,25 @@ pub(crate) mod test_utils { } } + /// Assert that a Result is an error and that the error message contains all expected substrings. + pub(crate) fn assert_result_error_with_messages( + res: Result, + messages: &[&str], + ) { + match res { + Ok(_) => panic!("Expected error with messages {:?}, but got Ok result", messages), + Err(error) => { + let error_str = error.to_string(); + for message in messages { + assert!( + error_str.contains(message), + "Error message does not contain expected substring.\nExpected substring:\t{message}\nActual message:\t\t{error_str}" + ); + } + } + } + } + /// Helper to get a field from a StructType by name, panicking if not found. pub(crate) fn get_schema_field(struct_type: &StructType, name: &str) -> StructField { struct_type diff --git a/kernel/tests/create_table.rs b/kernel/tests/create_table.rs index ffd2d2807c..33ba20b0f2 100644 --- a/kernel/tests/create_table.rs +++ b/kernel/tests/create_table.rs @@ -263,3 +263,76 @@ async fn test_create_table_log_actions() -> DeltaResult<()> { Ok(()) } + +#[test] +fn test_feature_overrides_and_delta_properties_rejected_until_on_allow_list() { + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL")) + .expect("Failed to create engine"); + + let schema = Arc::new( + StructType::try_new(vec![StructField::new("id", DataType::LONG, false)]) + .expect("Invalid schema"), + ); + + // Feature overrides are parsed but rejected during validation (not on allow-list) + let result = create_table(&table_path, schema.clone(), "FeatureTest/1.0") + .with_table_properties([("delta.feature.deletionVectors", "supported")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert_result_error_with_message( + result, + "Enabling feature 'deletionVectors' is not supported during CREATE TABLE", + ); + + // Delta properties are rejected during validation (not on allow-list) + let result = create_table(&table_path, schema.clone(), "FeatureTest/1.0") + .with_table_properties([("delta.enableDeletionVectors", "true")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert_result_error_with_message( + result, + "Setting delta property 'delta.enableDeletionVectors' is not supported during CREATE TABLE", + ); + + // User/application properties (non-delta.*) are allowed + let result = create_table(&table_path, schema, "FeatureTest/1.0") + .with_table_properties([("myapp.version", "1.0")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert!(result.is_ok(), "User properties should be allowed"); +} + +#[test] +fn test_feature_override_rejects_invalid_value_only_supported_allowed() { + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL")) + .expect("Failed to create engine"); + + let schema = Arc::new( + StructType::try_new(vec![StructField::new("id", DataType::LONG, false)]) + .expect("Invalid schema"), + ); + + // "enabled" is not valid - only "supported" is allowed + let result = create_table(&table_path, schema, "FeatureTest/1.0") + .with_table_properties([("delta.feature.deletionVectors", "enabled")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Invalid value"), + "Error should mention invalid value" + ); + assert!( + err_msg.contains("supported"), + "Error should mention 'supported' as valid value" + ); +}