diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 710a6e285d..c8960db685 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -3,9 +3,12 @@ use std::collections::HashMap; use std::fmt::Debug; +use std::hash::Hash; use std::str::FromStr; use std::sync::{Arc, LazyLock}; +use indexmap::IndexSet; + use self::deletion_vector::DeletionVectorDescriptor; use crate::expressions::{MapData, Scalar, StructData}; use crate::schema::{DataType, MapType, SchemaRef, StructField, StructType, ToSchema as _}; @@ -400,19 +403,26 @@ pub(crate) struct Protocol { writer_features: Option>, } +/// Parse features from strings and deduplicate them while preserving insertion order. +/// +/// This function converts feature strings to the target type and removes duplicates +/// to ensure the protocol doesn't contain redundant feature entries. Uses IndexSet +/// to maintain the order of first occurrence for each unique feature. fn parse_features(features: Option>) -> Option> where - T: FromStr, + T: FromStr + Hash + Eq, T::Err: Debug, { - features - .map(|fs| { - fs.into_iter() - .map(|f| T::from_str(&f.to_string())) - .collect() - }) - .transpose() - .ok()? + features.map(|fs| { + // Parse all features and collect into an IndexSet to deduplicate + // while preserving insertion order + let unique_features: IndexSet = fs + .into_iter() + .filter_map(|f| T::from_str(&f.to_string()).ok()) + .collect(); + // Convert to Vec for storage + unique_features.into_iter().collect() + }) } impl Protocol { diff --git a/kernel/src/clustering.rs b/kernel/src/clustering.rs new file mode 100644 index 0000000000..365b7bb34b --- /dev/null +++ b/kernel/src/clustering.rs @@ -0,0 +1,148 @@ +//! Clustering support for Delta tables. +//! +//! This module provides support for clustering in Delta tables. Clustering +//! allows data to be physically co-located based on column values, enabling efficient +//! data skipping without the directory overhead of partitioning. +//! +//! Clustering metadata is stored as domain metadata with the domain name `delta.clustering`. + +use serde::{Deserialize, Serialize}; + +use crate::actions::DomainMetadata; +use crate::schema::ColumnName; +use crate::DeltaResult; + +/// The domain name for clustering metadata in Delta tables. +pub(crate) const CLUSTERING_DOMAIN_NAME: &str = "delta.clustering"; + +/// Represents the clustering metadata stored as domain metadata in Delta tables. +/// +/// This struct is serialized to JSON and stored in the `delta.clustering` domain +/// metadata action. The clustering columns are stored as a list of column paths, +/// where each path is a list of field names (to support nested columns). +/// +/// If column mapping is enabled on the table, physical column names are stored; +/// otherwise, logical column names are used. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct ClusteringMetadataDomain { + /// The columns used for clustering, stored as paths (list of field names). + /// For example: `[["col1"], ["nested", "field"]]` + clustering_columns: Vec>, +} + +impl ClusteringMetadataDomain { + /// Creates a new clustering metadata domain from column names. + /// + /// # Arguments + /// + /// * `columns` - The columns to cluster by. Each column is represented as a + /// [`ColumnName`] which supports nested paths. + pub(crate) fn new(columns: &[ColumnName]) -> Self { + let clustering_columns = columns + .iter() + .map(|col| col.path().iter().map(|s| s.to_string()).collect()) + .collect(); + Self { clustering_columns } + } + + /// Returns the clustering columns as a slice of column paths. + #[cfg(test)] + pub(crate) fn clustering_columns(&self) -> &[Vec] { + &self.clustering_columns + } + + /// Converts this clustering metadata to a [`DomainMetadata`] action. + /// + /// # Errors + /// + /// Returns an error if the metadata cannot be serialized to JSON. + pub(crate) fn to_domain_metadata(&self) -> DeltaResult { + let configuration = serde_json::to_string(self)?; + Ok(DomainMetadata::new( + CLUSTERING_DOMAIN_NAME.to_string(), + configuration, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + impl ClusteringMetadataDomain { + /// Creates a clustering metadata domain from a JSON configuration string. + fn from_json(json: &str) -> DeltaResult { + Ok(serde_json::from_str(json)?) + } + } + + #[test] + fn test_clustering_metadata_new() { + let columns = vec![ + ColumnName::new(["col1"]), + ColumnName::new(["nested", "field"]), + ]; + let metadata = ClusteringMetadataDomain::new(&columns); + + assert_eq!( + metadata.clustering_columns(), + &[ + vec!["col1".to_string()], + vec!["nested".to_string(), "field".to_string()] + ] + ); + } + + #[test] + fn test_clustering_metadata_empty() { + let metadata = ClusteringMetadataDomain::new(&[]); + assert!(metadata.clustering_columns().is_empty()); + } + + #[test] + fn test_clustering_metadata_serialization() { + let columns = vec![ + ColumnName::new(["col1"]), + ColumnName::new(["nested", "field"]), + ]; + let metadata = ClusteringMetadataDomain::new(&columns); + + let json = serde_json::to_string(&metadata).unwrap(); + assert!(json.contains("clusteringColumns")); + assert!(json.contains("col1")); + assert!(json.contains("nested")); + assert!(json.contains("field")); + + // Round-trip test + let deserialized: ClusteringMetadataDomain = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized, metadata); + } + + #[test] + fn test_clustering_metadata_to_domain_metadata() { + let columns = vec![ColumnName::new(["col1"])]; + let metadata = ClusteringMetadataDomain::new(&columns); + + let domain_metadata = metadata.to_domain_metadata().unwrap(); + assert_eq!(domain_metadata.domain(), CLUSTERING_DOMAIN_NAME); + + // Verify the configuration can be parsed back + let parsed = ClusteringMetadataDomain::from_json(domain_metadata.configuration()).unwrap(); + assert_eq!(parsed, metadata); + } + + #[test] + fn test_clustering_metadata_from_json() { + let json = r#"{"clusteringColumns":[["col1"],["nested","field"]]}"#; + let metadata = ClusteringMetadataDomain::from_json(json).unwrap(); + + assert_eq!( + metadata.clustering_columns(), + &[ + vec!["col1".to_string()], + vec!["nested".to_string(), "field".to_string()] + ] + ); + } +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 86ce5e9ce5..4fff952b22 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -103,11 +103,13 @@ pub mod table_changes; pub mod table_configuration; pub mod table_features; pub mod table_properties; +mod table_property_protocol_config; pub mod transaction; pub(crate) mod transforms; pub use log_path::LogPath; +pub(crate) mod clustering; mod row_tracking; mod arrow_compat; @@ -161,6 +163,10 @@ pub use log_compaction::{should_compact, LogCompactionWriter}; pub use metrics::MetricsReporter; pub use snapshot::Snapshot; pub use snapshot::SnapshotRef; +#[cfg(feature = "internal-api")] +pub use transaction::create_table::{create_table, CreateTableTransactionBuilder}; +#[cfg(feature = "internal-api")] +pub use transaction::DataLayout; use expressions::literal_expression_transform::LiteralExpressionTransform; use expressions::Scalar; diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index e6c2db1049..dff850f3f6 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -16,9 +16,10 @@ pub(crate) struct RowTrackingDomainMetadata { row_id_high_water_mark: i64, } -impl RowTrackingDomainMetadata { - const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking"; +/// The domain name for row tracking metadata. +pub(crate) const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking"; +impl RowTrackingDomainMetadata { pub(crate) fn new(row_id_high_water_mark: i64) -> Self { RowTrackingDomainMetadata { row_id_high_water_mark, @@ -45,14 +46,16 @@ impl RowTrackingDomainMetadata { snapshot: &Snapshot, engine: &dyn Engine, ) -> DeltaResult> { - Ok(domain_metadata_configuration( - snapshot.log_segment(), - Self::ROW_TRACKING_DOMAIN_NAME, - engine, - )? - .map(|domain_metadata| serde_json::from_str::(&domain_metadata)) - .transpose()? - .map(|metadata| metadata.row_id_high_water_mark)) + Ok( + domain_metadata_configuration( + snapshot.log_segment(), + ROW_TRACKING_DOMAIN_NAME, + engine, + )? + .map(|domain_metadata| serde_json::from_str::(&domain_metadata)) + .transpose()? + .map(|metadata| metadata.row_id_high_water_mark), + ) } } @@ -61,7 +64,7 @@ impl TryFrom for DomainMetadata { fn try_from(metadata: RowTrackingDomainMetadata) -> DeltaResult { Ok(DomainMetadata::new( - RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(), + ROW_TRACKING_DOMAIN_NAME.to_string(), serde_json::to_string(&metadata)?, )) } diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 9080447084..f203269ed3 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -1,3 +1,5 @@ +//! Configuration for reading existing Delta tables. +//! //! This module defines [`TableConfiguration`], a high level api to check feature support and //! feature enablement for a table at a given version. This encapsulates [`Protocol`], [`Metadata`], //! [`Schema`], [`TableProperties`], and [`ColumnMappingMode`]. These structs in isolation should @@ -7,6 +9,12 @@ //! reader/writer features, and ensure that the deletion vector table property is enabled in the //! [`TableProperties`]. //! +//! # Related Modules +//! +//! - [`crate::table_property_protocol_config`]: For **creating/modifying** tables. Parses +//! user-provided properties to extract signal flags and create protocols. Use this when building +//! new tables or modifying existing table properties. +//! //! [`Schema`]: crate::schema::Schema use std::sync::Arc; diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 147ba5979a..a34a852deb 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -1,4 +1,6 @@ use itertools::Itertools; +use std::str::FromStr; + use serde::{Deserialize, Serialize}; use strum::{AsRefStr, Display as StrumDisplay, EnumCount, EnumString}; @@ -417,14 +419,13 @@ static ICEBERG_COMPAT_V2_INFO: FeatureInfo = FeatureInfo { }), }; -#[allow(dead_code)] static CLUSTERED_TABLE_INFO: FeatureInfo = FeatureInfo { name: "clustering", min_reader_version: 1, min_writer_version: 7, feature_type: FeatureType::Writer, feature_requirements: &[FeatureRequirement::Supported(TableFeature::DomainMetadata)], - kernel_support: KernelSupport::NotSupported, + kernel_support: KernelSupport::Supported, enablement_check: EnablementCheck::AlwaysIfSupported, }; @@ -679,6 +680,20 @@ impl TableFeature { TableFeature::Unknown(_) => None, } } + + /// Parse a feature name string into a TableFeature. + /// + /// Known feature names are parsed into their corresponding variants. + /// Unknown feature names are wrapped in `TableFeature::Unknown`. + pub(crate) fn from_name(name: &str) -> Self { + TableFeature::from_str(name).unwrap_or_else(|_| TableFeature::Unknown(name.to_string())) + } + + /// Returns true if this is a ReaderWriter feature (appears in both reader and writer feature lists). + /// Returns false for Writer-only features and Unknown features. + pub(crate) fn is_reader_writer(&self) -> bool { + matches!(self.feature_type(), FeatureType::ReaderWriter) + } } impl ToDataType for TableFeature { @@ -829,4 +844,49 @@ mod tests { assert_eq!(from_str, feature); } } + + #[test] + fn test_from_name() { + // Known features + assert_eq!( + TableFeature::from_name("deletionVectors"), + TableFeature::DeletionVectors + ); + assert_eq!( + TableFeature::from_name("changeDataFeed"), + TableFeature::ChangeDataFeed + ); + assert_eq!( + TableFeature::from_name("columnMapping"), + TableFeature::ColumnMapping + ); + assert_eq!( + TableFeature::from_name("timestampNtz"), + TableFeature::TimestampWithoutTimezone + ); + + // Unknown features + assert_eq!( + TableFeature::from_name("unknownFeature"), + TableFeature::Unknown("unknownFeature".to_string()) + ); + } + + #[test] + fn test_is_reader_writer() { + // ReaderWriter features + assert!(TableFeature::DeletionVectors.is_reader_writer()); + assert!(TableFeature::ColumnMapping.is_reader_writer()); + assert!(TableFeature::TimestampWithoutTimezone.is_reader_writer()); + assert!(TableFeature::V2Checkpoint.is_reader_writer()); + + // Writer-only features + assert!(!TableFeature::ChangeDataFeed.is_reader_writer()); + assert!(!TableFeature::AppendOnly.is_reader_writer()); + assert!(!TableFeature::DomainMetadata.is_reader_writer()); + assert!(!TableFeature::RowTracking.is_reader_writer()); + + // Unknown features + assert!(!TableFeature::unknown("something").is_reader_writer()); + } } diff --git a/kernel/src/table_properties.rs b/kernel/src/table_properties.rs index 35f19b5bf1..77e2a4599f 100644 --- a/kernel/src/table_properties.rs +++ b/kernel/src/table_properties.rs @@ -26,6 +26,14 @@ pub use deserialize::ParseIntervalError; /// Prefix for delta table properties (e.g., `delta.enableChangeDataFeed`, `delta.appendOnly`). pub const DELTA_PROPERTY_PREFIX: &str = "delta."; +/// Table property key for specifying the minimum reader protocol version. +/// This is a signal flag property - it affects protocol creation but is not stored in metadata. +pub const MIN_READER_VERSION_PROP: &str = "delta.minReaderVersion"; + +/// Table property key for specifying the minimum writer protocol version. +/// This is a signal flag property - it affects protocol creation but is not stored in metadata. +pub const MIN_WRITER_VERSION_PROP: &str = "delta.minWriterVersion"; + /// Delta table properties. These are parsed from the 'configuration' map in the most recent /// 'Metadata' action of a table. /// diff --git a/kernel/src/table_property_protocol_config.rs b/kernel/src/table_property_protocol_config.rs new file mode 100644 index 0000000000..b4a9ce642e --- /dev/null +++ b/kernel/src/table_property_protocol_config.rs @@ -0,0 +1,433 @@ +//! Configuration for table property-based protocol modifications. +//! +//! This module provides [`TablePropertyProtocolConfig`], which parses user-provided table +//! properties to extract signal flags and create protocol configurations for table operations. +//! +//! # Difference from [`TableConfiguration`](crate::table_configuration::TableConfiguration) +//! +//! - **[`TableConfiguration`](crate::table_configuration::TableConfiguration)**: Reads the +//! configuration of an *existing* table from a snapshot. It combines the Protocol and Metadata +//! actions to provide a unified view of a table's current state. +//! +//! - **[`TablePropertyProtocolConfig`]**: Parses *user-provided* properties during table creation +//! or modification operations (CREATE TABLE, ALTER TABLE SET TBLPROPERTIES). It extracts signal +//! flags (like `delta.feature.` and protocol versions) that affect the Protocol but are +//! not stored in metadata. +//! +//! In short: `TableConfiguration` is for **reading** existing tables, while +//! `TablePropertyProtocolConfig` is for **creating/modifying** tables. + +use std::collections::HashMap; + +use crate::actions::Protocol; +use crate::table_features::{ + TableFeature, SET_TABLE_FEATURE_SUPPORTED_PREFIX, SET_TABLE_FEATURE_SUPPORTED_VALUE, + TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION, +}; +use crate::table_properties::{MIN_READER_VERSION_PROP, MIN_WRITER_VERSION_PROP}; +use crate::{DeltaResult, Error}; + +/// Configuration parsed from user-provided table properties during table creation or modification. +/// +/// This struct extracts signal flags (protocol versions, feature overrides) from +/// the input properties and separates them from the table properties that will +/// be stored in metadata. +/// +/// Signal flags include: +/// - `delta.feature. = supported` - Explicit feature enablement +/// - `delta.minReaderVersion` - Protocol reader version hint +/// - `delta.minWriterVersion` - Protocol writer version hint +/// +/// These signal flags affect protocol creation but are not stored in the table's +/// metadata configuration. +/// +/// # Example +/// ```ignore +/// let props = HashMap::from([ +/// ("delta.feature.deletionVectors".to_string(), "supported".to_string()), +/// ("delta.minReaderVersion".to_string(), "3".to_string()), +/// ("myapp.version".to_string(), "1.0".to_string()), +/// ]); +/// let config = TablePropertyProtocolConfig::try_from(props)?; +/// config.validate_for_create()?; +/// // config.protocol contains Protocol with reader/writer features +/// // config.table_properties = {"myapp.version": "1.0"} +/// ``` +#[derive(Debug)] +pub(crate) struct TablePropertyProtocolConfig { + /// The resolved protocol (always uses table features protocol: v3/v7). + /// Created during parsing with user-specified features. + pub(crate) protocol: Protocol, + /// Remaining properties to store in Metadata.configuration. + /// Signal flags (delta.feature.*) and version props are stripped out. + pub(crate) table_properties: HashMap, +} + +impl TryFrom> for TablePropertyProtocolConfig { + type Error = Error; + + fn try_from(properties: HashMap) -> DeltaResult { + let mut all_features = Vec::new(); + let mut user_reader_version: Option = None; + let mut user_writer_version: Option = None; + let mut table_properties = HashMap::new(); + + for (key, value) in properties { + if let Some(feature_name) = key.strip_prefix(SET_TABLE_FEATURE_SUPPORTED_PREFIX) { + // Parse delta.feature.* signal flags + if value != SET_TABLE_FEATURE_SUPPORTED_VALUE { + return Err(Error::generic(format!( + "Invalid value '{}' for '{}'. Only '{}' is allowed.", + value, key, SET_TABLE_FEATURE_SUPPORTED_VALUE + ))); + } + let feature = TableFeature::from_name(feature_name); + if matches!(feature, TableFeature::Unknown(_)) { + return Err(Error::generic(format!( + "Unknown table feature '{}'. Cannot create table with unsupported features.", + feature_name + ))); + } + all_features.push(feature); + } else if key == MIN_READER_VERSION_PROP { + // Parse delta.minReaderVersion + let version: i32 = value.parse().map_err(|_| { + Error::generic(format!( + "Invalid value '{}' for '{}'. Must be an integer.", + value, key + )) + })?; + // Validate immediately: if specified, must be the required version + if version != TABLE_FEATURES_MIN_READER_VERSION { + return Err(Error::generic(format!( + "Invalid value '{}' for '{}'. Only '{}' is supported.", + version, MIN_READER_VERSION_PROP, TABLE_FEATURES_MIN_READER_VERSION + ))); + } + user_reader_version = Some(version); + } else if key == MIN_WRITER_VERSION_PROP { + // Parse delta.minWriterVersion + let version: i32 = value.parse().map_err(|_| { + Error::generic(format!( + "Invalid value '{}' for '{}'. Must be an integer.", + value, key + )) + })?; + // Validate immediately: if specified, must be the required version + if version != TABLE_FEATURES_MIN_WRITER_VERSION { + return Err(Error::generic(format!( + "Invalid value '{}' for '{}'. Only '{}' is supported.", + version, MIN_WRITER_VERSION_PROP, TABLE_FEATURES_MIN_WRITER_VERSION + ))); + } + user_writer_version = Some(version); + } else { + // Pass through to table properties + table_properties.insert(key, value); + } + } + + // Partition features: ReaderWriter -> reader list, all -> writer list + let reader_features: Vec<_> = all_features + .iter() + .filter(|f| f.is_reader_writer()) + .cloned() + .collect(); + + // Create Protocol with resolved versions. + // User-specified versions (if any) were validated above; default to required versions. + let protocol = Protocol::try_new( + user_reader_version.unwrap_or(TABLE_FEATURES_MIN_READER_VERSION), + user_writer_version.unwrap_or(TABLE_FEATURES_MIN_WRITER_VERSION), + Some(reader_features.iter()), + Some(all_features.iter()), + )?; + + Ok(Self { + protocol, + table_properties, + }) + } +} + +impl TablePropertyProtocolConfig { + /// Delta properties allowed during CREATE TABLE. + /// Expand this list as more features are supported (e.g., column mapping, clustering). + const ALLOWED_DELTA_PROPERTIES: &[&str] = &[ + // Currently empty - no delta.* properties allowed yet + // Future: "delta.enableDeletionVectors", "delta.columnMapping.mode", etc. + ]; + + /// Table features allowed during CREATE TABLE. + /// Expand this list as more features are supported. + const ALLOWED_DELTA_FEATURES: &[TableFeature] = &[ + TableFeature::DomainMetadata, + TableFeature::ClusteredTable, + ]; + + /// Add a writer feature to the protocol. If the feature is ReaderWriter, + /// it will also be added to reader features. Creates a new Protocol with + /// the added feature. + pub(crate) fn add_writer_feature(&mut self, feature: TableFeature) -> DeltaResult<()> { + use crate::table_features::{ + TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION, + }; + + // Get current features + let mut reader_features: Vec<_> = self + .protocol + .reader_features() + .map(|f| f.to_vec()) + .unwrap_or_default(); + let mut writer_features: Vec<_> = self + .protocol + .writer_features() + .map(|f| f.to_vec()) + .unwrap_or_default(); + + // Add feature to writer features if not already present + if !writer_features.contains(&feature) { + writer_features.push(feature.clone()); + } + + // If it's a ReaderWriter feature, also add to reader features + if feature.is_reader_writer() && !reader_features.contains(&feature) { + reader_features.push(feature); + } + + // Create new protocol with updated features + self.protocol = Protocol::try_new( + TABLE_FEATURES_MIN_READER_VERSION, + TABLE_FEATURES_MIN_WRITER_VERSION, + Some(reader_features.iter()), + Some(writer_features.iter()), + )?; + + Ok(()) + } + + /// Validates the configuration for CREATE TABLE. + /// + /// Checks: + /// - Only allowed delta properties can be set + /// - Only allowed features can be explicitly enabled + /// + /// Note: Protocol version validation happens during parsing in `try_from`. + pub(crate) fn validate_for_create(&self) -> DeltaResult<()> { + use crate::table_properties::DELTA_PROPERTY_PREFIX; + + // Validate delta properties against allow-list + for key in self.table_properties.keys() { + if key.starts_with(DELTA_PROPERTY_PREFIX) + && !Self::ALLOWED_DELTA_PROPERTIES.contains(&key.as_str()) + { + return Err(Error::generic(format!( + "Setting delta property '{}' is not supported during CREATE TABLE", + key + ))); + } + } + + // Validate features against allow-list + if let Some(writer_features) = self.protocol.writer_features() { + for feature in writer_features { + if !Self::ALLOWED_DELTA_FEATURES.contains(feature) { + return Err(Error::generic(format!( + "Enabling feature '{}' is not supported during CREATE TABLE", + feature + ))); + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::table_properties::{MIN_READER_VERSION_PROP, MIN_WRITER_VERSION_PROP}; + use crate::utils::test_utils::assert_result_error_with_message; + + /// Helper to construct a HashMap from string slice pairs. + fn props(pairs: [(&str, &str); N]) -> HashMap { + pairs + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect() + } + + #[test] + fn test_table_property_protocol_config_basic() { + let config = TablePropertyProtocolConfig::try_from(props([ + ("delta.feature.deletionVectors", "supported"), + ("delta.enableDeletionVectors", "true"), + ])) + .unwrap(); + + // DeletionVectors is a ReaderWriter feature - check via protocol + let reader_features = config.protocol.reader_features().unwrap(); + let writer_features = config.protocol.writer_features().unwrap(); + assert_eq!(reader_features.len(), 1); + assert_eq!(reader_features[0], TableFeature::DeletionVectors); + assert_eq!(writer_features.len(), 1); + assert_eq!(writer_features[0], TableFeature::DeletionVectors); + + // Feature override should be removed from table_properties + assert!(!config + .table_properties + .contains_key("delta.feature.deletionVectors")); + // Regular property should be retained + assert_eq!( + config.table_properties.get("delta.enableDeletionVectors"), + Some(&"true".to_string()) + ); + } + + #[test] + fn test_table_property_protocol_config_multiple_features() { + let config = TablePropertyProtocolConfig::try_from(props([ + ("delta.feature.deletionVectors", "supported"), + ("delta.feature.changeDataFeed", "supported"), + ("delta.feature.appendOnly", "supported"), + ("delta.enableDeletionVectors", "true"), + ])) + .unwrap(); + + // All 3 features go into writer_features - check via protocol + let writer_features = config.protocol.writer_features().unwrap(); + assert_eq!(writer_features.len(), 3); + assert!(writer_features.contains(&TableFeature::DeletionVectors)); + assert!(writer_features.contains(&TableFeature::ChangeDataFeed)); + assert!(writer_features.contains(&TableFeature::AppendOnly)); + + // Only ReaderWriter features go into reader_features (DeletionVectors) + // ChangeDataFeed and AppendOnly are Writer-only features + let reader_features = config.protocol.reader_features().unwrap(); + assert_eq!(reader_features.len(), 1); + assert!(reader_features.contains(&TableFeature::DeletionVectors)); + + // Only the regular property should remain + assert_eq!(config.table_properties.len(), 1); + assert_eq!( + config.table_properties.get("delta.enableDeletionVectors"), + Some(&"true".to_string()) + ); + } + + #[test] + fn test_table_property_protocol_config_no_features() { + let config = TablePropertyProtocolConfig::try_from(props([ + ("delta.enableDeletionVectors", "true"), + ("delta.appendOnly", "true"), + ("custom.property", "value"), + ])) + .unwrap(); + + // No features specified - protocol should have empty feature lists + let reader_features = config.protocol.reader_features().unwrap(); + let writer_features = config.protocol.writer_features().unwrap(); + assert!(reader_features.is_empty()); + assert!(writer_features.is_empty()); + assert_eq!(config.table_properties.len(), 3); + } + + #[test] + fn test_table_property_protocol_config_parsing_failures() { + // Invalid feature value ("enabled" instead of "supported") + assert_result_error_with_message( + TablePropertyProtocolConfig::try_from(props([( + "delta.feature.deletionVectors", + "enabled", + )])), + "Invalid value 'enabled'", + ); + + // Unknown feature name + assert_result_error_with_message( + TablePropertyProtocolConfig::try_from(props([( + "delta.feature.futureFeature", + "supported", + )])), + "Unknown table feature 'futureFeature'", + ); + } + + #[test] + fn test_table_property_protocol_config_valid_versions() { + let config = TablePropertyProtocolConfig::try_from(props([ + (MIN_READER_VERSION_PROP, "3"), + (MIN_WRITER_VERSION_PROP, "7"), + ("custom.property", "value"), + ])) + .unwrap(); + + // Protocol created with correct versions + assert_eq!(config.protocol.min_reader_version(), 3); + assert_eq!(config.protocol.min_writer_version(), 7); + // Protocol version properties stripped from table_properties + assert!(!config + .table_properties + .contains_key(MIN_READER_VERSION_PROP)); + assert!(!config + .table_properties + .contains_key(MIN_WRITER_VERSION_PROP)); + // Other properties remain + assert_eq!( + config.table_properties.get("custom.property"), + Some(&"value".to_string()) + ); + // Validation passes + assert!(config.validate_for_create().is_ok()); + } + + #[test] + fn test_table_property_protocol_config_validation_errors() { + // Each case: (description, input properties, expected error substring, fails_at_parse) + let error_cases: &[(&str, &[(&str, &str)], &str, bool)] = &[ + ( + "Invalid reader version (only 3 supported)", + &[(MIN_READER_VERSION_PROP, "2")], + "Only '3' is supported", + true, // fails at parsing + ), + ( + "Invalid writer version (only 7 supported)", + &[(MIN_WRITER_VERSION_PROP, "5")], + "Only '7' is supported", + true, // fails at parsing + ), + ( + "Feature not on allow list", + &[("delta.feature.deletionVectors", "supported")], + "Enabling feature 'deletionVectors' is not supported", + false, // fails at validation + ), + ( + "Delta property not on allow list", + &[("delta.appendOnly", "true")], + "Setting delta property 'delta.appendOnly' is not supported", + false, // fails at validation + ), + ]; + + for (description, input, expected_error, fails_at_parse) in error_cases { + let input_map: HashMap = input + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + + if *fails_at_parse { + assert_result_error_with_message( + TablePropertyProtocolConfig::try_from(input_map), + expected_error, + ); + } else { + let config = TablePropertyProtocolConfig::try_from(input_map) + .unwrap_or_else(|e| panic!("{}: unexpected parse error: {}", description, e)); + assert_result_error_with_message(config.validate_for_create(), expected_error); + } + } + } +} diff --git a/kernel/src/transaction/create_table.rs b/kernel/src/transaction/create_table.rs index 406a897b9c..38e4123253 100644 --- a/kernel/src/transaction/create_table.rs +++ b/kernel/src/transaction/create_table.rs @@ -1,18 +1,20 @@ //! Create table transaction implementation (internal API). //! -//! This module provides a type-safe API for creating Delta tables. -//! Use the [`create_table`] function to get a [`CreateTableTransactionBuilder`] that can be -//! configured with table properties and other options before building the [`Transaction`]. +//! This module provides the [`CreateTableTransactionBuilder`] for creating new Delta tables +//! with optional features like partitioning, clustering, table properties, and table features. +//! Use the [`create_table`] function to get a builder that can be configured before building +//! the [`Transaction`]. //! //! # Example //! //! ```rust,no_run +//! # use delta_kernel::DeltaResult; +//! # use delta_kernel::Engine; +//! # fn example(engine: &dyn Engine) -> DeltaResult<()> { //! use delta_kernel::transaction::create_table::create_table; //! use delta_kernel::schema::{StructType, StructField, DataType}; //! use delta_kernel::committer::FileSystemCommitter; //! use std::sync::Arc; -//! # use delta_kernel::Engine; -//! # fn example(engine: &dyn Engine) -> delta_kernel::DeltaResult<()> { //! //! let schema = Arc::new(StructType::try_new(vec![ //! StructField::new("id", DataType::INTEGER, false), @@ -24,7 +26,6 @@ //! .commit(engine)?; //! # Ok(()) //! # } -//! ``` // Allow `pub` items in this module even though the module itself may be `pub(crate)`. // The module visibility controls external access; items are `pub` for use within the crate @@ -36,31 +37,20 @@ use std::sync::Arc; use url::Url; -use crate::actions::{Metadata, Protocol}; +use crate::actions::{DomainMetadata, Metadata}; +use crate::clustering::ClusteringMetadataDomain; use crate::committer::Committer; use crate::log_segment::LogSegment; -use crate::schema::SchemaRef; +use crate::schema::{ColumnName, SchemaRef}; use crate::snapshot::Snapshot; use crate::table_configuration::TableConfiguration; -use crate::table_features::{ - SET_TABLE_FEATURE_SUPPORTED_PREFIX, TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION, -}; -use crate::table_properties::DELTA_PROPERTY_PREFIX; +use crate::table_features::TableFeature; +use crate::table_property_protocol_config::TablePropertyProtocolConfig; +use crate::transaction::data_layout::{DataLayout, MAX_CLUSTERING_COLUMNS}; use crate::transaction::Transaction; use crate::utils::{current_time_ms, try_parse_uri}; use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION}; -/// Properties that are allowed to be set during create table. -/// This list will expand as more features are supported (e.g., column mapping, clustering). -/// The allow list will be deprecated once auto feature enablement is implemented -/// like the Java Kernel. -const ALLOWED_DELTA_PROPERTIES: &[&str] = &[ - // Empty for now - will add properties as features are implemented: - // - "delta.columnMapping.mode" (for column mapping) - // - etc. -]; - /// Ensures that no Delta table exists at the given path. /// /// This function checks the `_delta_log` directory to determine if a table already exists. @@ -114,33 +104,14 @@ fn ensure_table_does_not_exist( } } -/// Validates that table properties are allowed during CREATE TABLE. -/// -/// This function enforces an allow list for delta properties: -/// - Feature override properties (`delta.feature.*`) are never allowed -/// - Delta properties (`delta.*`) must be on the allow list -/// - Non-delta properties (user/application properties) are always allowed -fn validate_table_properties(properties: &HashMap) -> DeltaResult<()> { - for key in properties.keys() { - // Block all delta.feature.* properties (feature override properties) - if key.starts_with(SET_TABLE_FEATURE_SUPPORTED_PREFIX) { - return Err(Error::generic(format!( - "Setting feature override property '{}' is not supported during CREATE TABLE", - key - ))); - } - // For delta.* properties, check against allow list - if key.starts_with(DELTA_PROPERTY_PREFIX) - && !ALLOWED_DELTA_PROPERTIES.contains(&key.as_str()) - { - return Err(Error::generic(format!( - "Setting delta property '{}' is not supported during CREATE TABLE", - key - ))); - } - // Non-delta properties (user/application properties) are always allowed - } - Ok(()) +/// Result of processing the data layout specification. +struct ProcessedDataLayout { + /// Partition columns for the table (empty if not partitioned). + partition_columns: Vec, + /// Domain metadata for clustering (None if not clustered). + clustering_domain_metadata: Option, + /// Additional writer features required by the layout. + additional_writer_features: Vec, } /// Creates a builder for creating a new Delta table. @@ -200,6 +171,7 @@ pub struct CreateTableTransactionBuilder { schema: SchemaRef, engine_info: String, table_properties: HashMap, + data_layout: DataLayout, } impl CreateTableTransactionBuilder { @@ -212,9 +184,59 @@ impl CreateTableTransactionBuilder { schema, engine_info: engine_info.into(), table_properties: HashMap::new(), + data_layout: DataLayout::None, } } + /// Sets the data layout for the new Delta table. + /// + /// The data layout determines how data files are organized within the table: + /// + /// - [`DataLayout::None`]: No special organization (default) + /// - [`DataLayout::Partitioned`]: Data files are organized by partition column values + /// - [`DataLayout::Clustered`]: Data files are optimized for queries on clustering columns + /// + /// Note: Partitioning and clustering are mutually exclusive. A table can have one or the + /// other, but not both. + /// + /// # Arguments + /// + /// * `layout` - The data layout specification + /// + /// # Example + /// + /// ## Partitioned Table + /// ```rust,ignore + /// # use delta_kernel::transaction::create_table::create_table; + /// # use delta_kernel::transaction::DataLayout; + /// # use delta_kernel::schema::{StructType, DataType, StructField}; + /// # use std::sync::Arc; + /// # let schema = Arc::new(StructType::try_new(vec![ + /// # StructField::new("id", DataType::INTEGER, false), + /// # StructField::new("date", DataType::STRING, false), + /// # ]).unwrap()); + /// let builder = create_table("/path/to/table", schema, "MyApp/1.0") + /// .with_data_layout(DataLayout::partitioned(["date"]).unwrap()); + /// ``` + /// + /// ## Clustered Table + /// ```rust,ignore + /// # use delta_kernel::transaction::create_table::create_table; + /// # use delta_kernel::transaction::DataLayout; + /// # use delta_kernel::schema::{StructType, DataType, StructField}; + /// # use std::sync::Arc; + /// # let schema = Arc::new(StructType::try_new(vec![ + /// # StructField::new("id", DataType::INTEGER, false), + /// # StructField::new("category", DataType::STRING, false), + /// # ]).unwrap()); + /// let builder = create_table("/path/to/table", schema, "MyApp/1.0") + /// .with_data_layout(DataLayout::clustered(["category"])?); + /// ``` + pub fn with_data_layout(mut self, layout: DataLayout) -> Self { + self.data_layout = layout; + self + } + /// Sets table properties for the new Delta table. /// /// Custom application properties (those not starting with `delta.`) are always allowed. @@ -257,6 +279,116 @@ impl CreateTableTransactionBuilder { self } + /// Validates clustering columns against the schema and constraints. + /// + /// # Errors + /// + /// Returns an error if: + /// - More than 4 clustering columns are specified + /// - A clustering column doesn't exist in the schema + fn validate_clustering_columns(columns: &[ColumnName], schema: &SchemaRef) -> DeltaResult<()> { + use crate::schema::DataType; + + // Check maximum clustering columns + if columns.len() > MAX_CLUSTERING_COLUMNS { + return Err(Error::generic(format!( + "Cannot specify more than {} clustering columns. Found {}.", + MAX_CLUSTERING_COLUMNS, + columns.len() + ))); + } + + // Validate each column exists in the schema by traversing the path + for col in columns { + let path = col.path(); + if path.is_empty() { + return Err(Error::generic("Clustering column path cannot be empty")); + } + + // Traverse the schema tree to validate the path + let mut current_schema = schema.as_ref(); + for (i, field_name) in path.iter().enumerate() { + match current_schema.field(field_name) { + Some(field) => { + // If not the last element, we need to descend into a struct + if i < path.len() - 1 { + match field.data_type() { + DataType::Struct(inner) => { + current_schema = inner; + } + _ => { + return Err(Error::generic(format!( + "Clustering column '{}': field '{}' is not a struct and cannot contain nested fields", + col, field_name + ))); + } + } + } + // If it's the last element, we found the column - validation passes + } + None => { + return Err(Error::generic(format!( + "Clustering column '{}' not found in schema: field '{}' does not exist", + col, field_name + ))); + } + } + } + } + + Ok(()) + } + + /// Processes the data layout specification and returns partition columns, + /// clustering metadata, and any additional features required. + /// + /// # Arguments + /// + /// * `data_layout` - The data layout specification + /// * `schema` - The table schema (for validation) + /// + /// # Returns + /// + /// A [`ProcessedDataLayout`] containing: + /// - Partition columns (empty if not partitioned) + /// - Clustering domain metadata (None if not clustered) + /// - Additional writer features required by the layout + fn process_data_layout( + data_layout: &DataLayout, + schema: &SchemaRef, + ) -> DeltaResult { + match data_layout { + DataLayout::None => Ok(ProcessedDataLayout { + partition_columns: vec![], + clustering_domain_metadata: None, + additional_writer_features: vec![], + }), + DataLayout::Partitioned(cols) => Ok(ProcessedDataLayout { + // Convert ColumnName to String for Metadata compatibility + partition_columns: cols.iter().map(|c| c.to_string()).collect(), + clustering_domain_metadata: None, + additional_writer_features: vec![], + }), + DataLayout::Clustered(cols) => { + // Validate clustering columns + Self::validate_clustering_columns(cols, schema)?; + + // Create clustering domain metadata + let clustering_metadata = ClusteringMetadataDomain::new(cols); + let domain_metadata = clustering_metadata.to_domain_metadata()?; + + // Clustering requires this feature (DomainMetadata is added in build()) + let additional_features = vec![TableFeature::ClusteredTable]; + + Ok(ProcessedDataLayout { + partition_columns: vec![], + clustering_domain_metadata: Some(domain_metadata), + additional_writer_features: additional_features, + }) + } + } + } + /// Builds a [`Transaction`] that can be committed to create the table. /// /// This method performs validation: @@ -264,6 +396,7 @@ impl CreateTableTransactionBuilder { /// - Verifies the table doesn't already exist /// - Validates the schema is non-empty /// - Validates table properties against the allow list + /// - For clustered tables: validates clustering columns exist and adds required features /// /// # Arguments /// @@ -277,6 +410,7 @@ impl CreateTableTransactionBuilder { /// - A table already exists at the given path /// - The schema is empty /// - Unsupported delta properties or feature flags are specified + /// - Clustering columns don't exist in schema or exceed the limit (4) pub fn build( self, engine: &dyn Engine, @@ -284,36 +418,46 @@ impl CreateTableTransactionBuilder { ) -> DeltaResult { // Validate path let table_url = try_parse_uri(&self.path)?; + // Check if table already exists by looking for _delta_log directory + let delta_log_url = table_url.join("_delta_log/")?; + let storage = engine.storage_handler(); + ensure_table_does_not_exist(storage.as_ref(), &delta_log_url, &self.path)?; // Validate schema is non-empty if self.schema.fields().len() == 0 { return Err(Error::generic("Schema cannot be empty")); } - // Validate table properties against allow list - validate_table_properties(&self.table_properties)?; + // Parse and validate table properties + let mut config = TablePropertyProtocolConfig::try_from(self.table_properties)?; + config.validate_for_create()?; - // Check if table already exists by looking for _delta_log directory - let delta_log_url = table_url.join("_delta_log/")?; - let storage = engine.storage_handler(); - ensure_table_does_not_exist(storage.as_ref(), &delta_log_url, &self.path)?; + // Process data layout (partitioning or clustering) + let layout = Self::process_data_layout(&self.data_layout, &self.schema)?; - // Create Protocol action with table features support - let protocol = Protocol::try_new( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION, - Some(Vec::::new()), // readerFeatures (empty for now) - Some(Vec::::new()), // writerFeatures (empty for now) - )?; + // Add any features required by the data layout to the protocol + for feature in &layout.additional_writer_features { + config.add_writer_feature(feature.clone())?; + } + + // Add domainMetadata feature if clustering domain metadata is present + // This centralizes the domain metadata feature logic in build() rather than + // scattering it across different layout processors. + if layout.clustering_domain_metadata.is_some() { + config.add_writer_feature(TableFeature::DomainMetadata)?; + } + + // Use protocol from config (with any additional layout features added) + let protocol = config.protocol; // Create Metadata action let metadata = Metadata::try_new( None, // name None, // description (*self.schema).clone(), - Vec::new(), // partition_columns - added with data layout support + layout.partition_columns, current_time_ms()?, - self.table_properties, + config.table_properties, )?; // Create pre-commit snapshot from protocol/metadata @@ -323,11 +467,17 @@ impl CreateTableTransactionBuilder { TableConfiguration::try_new(metadata, protocol, table_url, PRE_COMMIT_VERSION)?; // Create Transaction with pre-commit snapshot + // Convert clustering domain metadata to Vec for the transaction + let system_domain_metadata: Vec = layout + .clustering_domain_metadata + .into_iter() + .collect(); + Transaction::try_new_create_table( Arc::new(Snapshot::new(log_segment, table_configuration)), self.engine_info, committer, - vec![], // system_domain_metadata - not supported in base API + system_domain_metadata, ) } } @@ -335,8 +485,11 @@ impl CreateTableTransactionBuilder { #[cfg(test)] mod tests { use super::*; + use crate::committer::FileSystemCommitter; + use crate::engine::sync::SyncEngine; use crate::schema::{DataType, StructField, StructType}; - use crate::utils::test_utils::assert_result_error_with_message; + use crate::table_properties::{MIN_READER_VERSION_PROP, MIN_WRITER_VERSION_PROP}; + use crate::utils::test_utils::assert_result_error_with_messages; use std::sync::Arc; fn test_schema() -> SchemaRef { @@ -347,6 +500,14 @@ mod tests { )])) } + fn test_schema_with_columns() -> SchemaRef { + Arc::new(StructType::new_unchecked(vec![ + StructField::new("id", DataType::INTEGER, false), + StructField::new("name", DataType::STRING, true), + StructField::new("value", DataType::LONG, true), + ])) + } + #[test] fn test_basic_builder_creation() { let schema = test_schema(); @@ -356,18 +517,7 @@ mod tests { assert_eq!(builder.path, "/path/to/table"); assert_eq!(builder.engine_info, "TestApp/1.0"); assert!(builder.table_properties.is_empty()); - } - - #[test] - fn test_nested_path_builder_creation() { - let schema = test_schema(); - let builder = CreateTableTransactionBuilder::new( - "/path/to/table/nested", - schema.clone(), - "TestApp/1.0", - ); - - assert_eq!(builder.path, "/path/to/table/nested"); + assert!(builder.data_layout.is_none()); } #[test] @@ -402,46 +552,218 @@ mod tests { } #[test] - fn test_validate_supported_properties() { - // Empty properties are allowed + fn test_table_creation_config_parsing() { + // Empty properties are allowed - protocol is created with default features let properties = HashMap::new(); - assert!(validate_table_properties(&properties).is_ok()); + let config = TablePropertyProtocolConfig::try_from(properties).unwrap(); + assert!(config.table_properties.is_empty()); + // Protocol always has features (even if empty) at version 3/7 + assert_eq!(config.protocol.min_reader_version(), 3); + assert_eq!(config.protocol.min_writer_version(), 7); + assert!(config.protocol.reader_features().unwrap().is_empty()); + assert!(config.protocol.writer_features().unwrap().is_empty()); + + // User/application properties are passed through + let properties = HashMap::from([ + ("myapp.version".to_string(), "1.0".to_string()), + ("custom.setting".to_string(), "value".to_string()), + ]); + let config = TablePropertyProtocolConfig::try_from(properties).unwrap(); + assert_eq!( + config.table_properties.get("myapp.version"), + Some(&"1.0".to_string()) + ); + assert_eq!( + config.table_properties.get("custom.setting"), + Some(&"value".to_string()) + ); - // User/application properties are allowed - let mut properties = HashMap::new(); - properties.insert("myapp.version".to_string(), "1.0".to_string()); - properties.insert("custom.setting".to_string(), "value".to_string()); - assert!(validate_table_properties(&properties).is_ok()); + // Protocol version properties are parsed and stripped from table_properties + let properties = HashMap::from([ + (MIN_READER_VERSION_PROP.to_string(), "3".to_string()), + (MIN_WRITER_VERSION_PROP.to_string(), "7".to_string()), + ("myapp.version".to_string(), "1.0".to_string()), + ]); + let config = TablePropertyProtocolConfig::try_from(properties).unwrap(); + // Protocol created with specified versions (always 3/7) + assert_eq!(config.protocol.min_reader_version(), 3); + assert_eq!(config.protocol.min_writer_version(), 7); + // Protocol properties should be stripped from table_properties + assert!(!config + .table_properties + .contains_key(MIN_READER_VERSION_PROP)); + assert!(!config + .table_properties + .contains_key(MIN_WRITER_VERSION_PROP)); + // User properties should remain + assert_eq!( + config.table_properties.get("myapp.version"), + Some(&"1.0".to_string()) + ); } #[test] - fn test_validate_unsupported_properties() { - // Delta properties not on allow list are rejected - let mut properties = HashMap::new(); - properties.insert("delta.enableChangeDataFeed".to_string(), "true".to_string()); - assert_result_error_with_message( - validate_table_properties(&properties), - "Setting delta property 'delta.enableChangeDataFeed' is not supported", + fn test_protocol_version_validation() { + let engine = SyncEngine::new(); + + // Helper to build a table with given properties and return the result + let try_create = |props: HashMap| { + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path().to_str().unwrap(); + CreateTableTransactionBuilder::new(table_path, test_schema(), "TestApp/1.0") + .with_table_properties(props) + .build(&engine, Box::new(FileSystemCommitter::new())) + }; + + // Valid: both versions (3, 7) + let props = HashMap::from([ + (MIN_READER_VERSION_PROP.to_string(), "3".to_string()), + (MIN_WRITER_VERSION_PROP.to_string(), "7".to_string()), + ]); + assert!( + try_create(props).is_ok(), + "Valid protocol versions (3, 7) should succeed" ); - // Feature override properties are rejected - let mut properties = HashMap::new(); - properties.insert( - "delta.feature.domainMetadata".to_string(), - "supported".to_string(), + // Valid: only reader version (3) + let props = HashMap::from([(MIN_READER_VERSION_PROP.to_string(), "3".to_string())]); + assert!( + try_create(props).is_ok(), + "Only reader version (3) should succeed" ); - assert_result_error_with_message( - validate_table_properties(&properties), - "Setting feature override property 'delta.feature.domainMetadata' is not supported", + + // Valid: only writer version (7) + let props = HashMap::from([(MIN_WRITER_VERSION_PROP.to_string(), "7".to_string())]); + assert!( + try_create(props).is_ok(), + "Only writer version (7) should succeed" ); - // Mixed properties with unsupported delta property are rejected - let mut properties = HashMap::new(); - properties.insert("myapp.version".to_string(), "1.0".to_string()); - properties.insert("delta.appendOnly".to_string(), "true".to_string()); - assert_result_error_with_message( - validate_table_properties(&properties), - "Setting delta property 'delta.appendOnly' is not supported", + // Invalid: reader version 2 (only 3 is supported) + let props = HashMap::from([(MIN_READER_VERSION_PROP.to_string(), "2".to_string())]); + assert_result_error_with_messages( + try_create(props), + &["delta.minReaderVersion", "Only '3' is supported"], ); + + // Invalid: writer version 5 (only 7 is supported) + let props = HashMap::from([(MIN_WRITER_VERSION_PROP.to_string(), "5".to_string())]); + assert_result_error_with_messages( + try_create(props), + &["delta.minWriterVersion", "Only '7' is supported"], + ); + + // Invalid: non-integer reader version + let props = HashMap::from([(MIN_READER_VERSION_PROP.to_string(), "abc".to_string())]); + assert_result_error_with_messages(try_create(props), &["Must be an integer"]); + + // Invalid: non-integer writer version + let props = HashMap::from([(MIN_WRITER_VERSION_PROP.to_string(), "xyz".to_string())]); + assert_result_error_with_messages(try_create(props), &["Must be an integer"]); + } + + #[test] + fn test_with_data_layout_partitioned() { + let schema = test_schema_with_columns(); + + let builder = CreateTableTransactionBuilder::new("/path/to/table", schema, "TestApp/1.0") + .with_data_layout(DataLayout::partitioned(["name"]).unwrap()); + + assert!(matches!(builder.data_layout, DataLayout::Partitioned(_))); + } + + #[test] + fn test_with_data_layout_clustered() { + let schema = test_schema_with_columns(); + + let builder = CreateTableTransactionBuilder::new("/path/to/table", schema, "TestApp/1.0") + .with_data_layout(DataLayout::clustered(["name"]).unwrap()); + + assert!(matches!(builder.data_layout, DataLayout::Clustered(_))); + } + + #[test] + fn test_clustering_too_many_columns() { + let engine = SyncEngine::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path().to_str().unwrap(); + + // Create a schema with 5 columns + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::new("a", DataType::STRING, false), + StructField::new("b", DataType::STRING, false), + StructField::new("c", DataType::STRING, false), + StructField::new("d", DataType::STRING, false), + StructField::new("e", DataType::STRING, false), + ])); + + // Try to cluster on 5 columns (max is 4) + let result = CreateTableTransactionBuilder::new(table_path, schema, "TestApp/1.0") + .with_data_layout(DataLayout::clustered(["a", "b", "c", "d", "e"]).unwrap()) + .build(&engine, Box::new(FileSystemCommitter::new())); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("Cannot specify more than 4 clustering columns")); + } + + #[test] + fn test_clustering_column_not_in_schema() { + let engine = SyncEngine::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path().to_str().unwrap(); + + let schema = test_schema_with_columns(); + + // Try to cluster on a non-existent column + let result = CreateTableTransactionBuilder::new(table_path, schema, "TestApp/1.0") + .with_data_layout(DataLayout::clustered(["nonexistent"]).unwrap()) + .build(&engine, Box::new(FileSystemCommitter::new())); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("not found in schema")); + } + + #[test] + fn test_clustering_nested_column() { + let engine = SyncEngine::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path().to_str().unwrap(); + + // Create a schema with nested struct + let inner_struct = StructType::new_unchecked(vec![ + StructField::new("city", DataType::STRING, false), + StructField::new("zip", DataType::STRING, false), + ]); + let schema = Arc::new(StructType::new_unchecked(vec![ + StructField::new("id", DataType::INTEGER, false), + StructField::new("address", DataType::Struct(Box::new(inner_struct)), false), + ])); + + // Cluster on nested column using dot notation + let result = CreateTableTransactionBuilder::new(table_path, schema, "TestApp/1.0") + .with_data_layout(DataLayout::clustered(["address.city"]).unwrap()) + .build(&engine, Box::new(FileSystemCommitter::new())); + + assert!(result.is_ok()); + } + + #[test] + fn test_clustering_invalid_nested_path() { + let engine = SyncEngine::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path().to_str().unwrap(); + + let schema = test_schema_with_columns(); + + // Try to access nested field on a non-struct column + let result = CreateTableTransactionBuilder::new(table_path, schema, "TestApp/1.0") + .with_data_layout(DataLayout::clustered(["name.nested"]).unwrap()) + .build(&engine, Box::new(FileSystemCommitter::new())); + + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert!(err.contains("is not a struct")); } } diff --git a/kernel/src/transaction/data_layout.rs b/kernel/src/transaction/data_layout.rs new file mode 100644 index 0000000000..df873deae3 --- /dev/null +++ b/kernel/src/transaction/data_layout.rs @@ -0,0 +1,214 @@ +//! Data layout specification for Delta tables. +//! +//! This module provides the [`DataLayout`] enum for specifying how data is organized +//! in a Delta table - either partitioned by columns or clustered by columns. +//! +//! # Example +//! +//! ```rust,ignore +//! use delta_kernel::transaction::DataLayout; +//! +//! // Create a partitioned table layout +//! let layout = DataLayout::partitioned(["date", "region"])?; +//! +//! // Create a clustered table layout (supports nested columns via dot notation) +//! let layout = DataLayout::clustered(["col1", "nested.field"])?; +//! ``` + +use crate::schema::ColumnName; +use crate::DeltaResult; + +/// Maximum number of clustering columns allowed per Delta specification. +pub const MAX_CLUSTERING_COLUMNS: usize = 4; + +/// Specifies how data is organized in a Delta table. +/// +/// Delta tables can organize data in different ways to optimize query performance: +/// - **Partitioned**: Data files are organized into directories based on partition column values. +/// - **Clustered**: Data is physically co-located based on clustering column values within files. +/// +/// Partitioning and clustering are mutually exclusive - a table cannot have both. +/// This is enforced at the type level by this enum. +#[derive(Debug, Clone, Default)] +pub enum DataLayout { + /// No special data layout (default for new tables). + #[default] + None, + + /// Data is partitioned by the specified columns. + /// + /// Partition columns must be top-level columns in the schema (no nested fields). + /// The partition column values are extracted from data and used to organize files + /// into directories like `partition_col=value/`. + Partitioned(Vec), + + /// Data is clustered by the specified columns. + /// + /// Clustering columns can be nested (e.g., `["nested", "field"]`). + /// The clustering columns are stored as domain metadata and used by OPTIMIZE + /// to physically co-locate similar data for better data skipping. + /// + /// Maximum of 4 clustering columns are allowed per the Delta specification. + Clustered(Vec), +} + +impl DataLayout { + /// Returns `true` if this layout specifies partitioning. + pub fn is_partitioned(&self) -> bool { + matches!(self, DataLayout::Partitioned(_)) + } + + /// Returns `true` if this layout specifies clustering. + pub fn is_clustered(&self) -> bool { + matches!(self, DataLayout::Clustered(_)) + } + + /// Returns `true` if this is a default layout with no special organization. + pub fn is_none(&self) -> bool { + matches!(self, DataLayout::None) + } + + /// Returns the partition columns if this is a partitioned layout. + pub fn partition_columns(&self) -> Option<&[ColumnName]> { + match self { + DataLayout::Partitioned(cols) => Some(cols), + _ => None, + } + } + + /// Returns the clustering columns if this is a clustered layout. + pub fn clustering_columns(&self) -> Option<&[ColumnName]> { + match self { + DataLayout::Clustered(cols) => Some(cols), + _ => None, + } + } + + /// Creates a partitioned data layout from column names. + /// + /// Partition columns must be top-level columns in the schema (no nested fields). + /// Column names are parsed using standard Delta column name syntax. + /// + /// # Arguments + /// + /// * `columns` - An iterable of column names (accepts `&str`, `String`, etc.) + /// + /// # Example + /// + /// ```rust,ignore + /// use delta_kernel::transaction::DataLayout; + /// + /// let layout = DataLayout::partitioned(["date", "region"])?; + /// ``` + pub fn partitioned(columns: I) -> DeltaResult + where + I: IntoIterator, + S: AsRef, + { + let cols: Vec = columns + .into_iter() + .map(|s| s.as_ref().parse()) + .collect::>()?; + Ok(DataLayout::Partitioned(cols)) + } + + /// Creates a clustered data layout from column names. + /// + /// Clustering columns can be nested using dot notation (e.g., `"address.city"`). + /// Column names are parsed using standard Delta column name syntax, which supports + /// backtick escaping for special characters. + /// + /// # Arguments + /// + /// * `columns` - An iterable of column names (accepts `&str`, `String`, etc.) + /// + /// # Example + /// + /// ```rust,ignore + /// use delta_kernel::transaction::DataLayout; + /// + /// // Simple columns + /// let layout = DataLayout::clustered(["col1", "col2"])?; + /// + /// // Nested columns using dot notation + /// let layout = DataLayout::clustered(["address.city", "user.profile.name"])?; + /// ``` + pub fn clustered(columns: I) -> DeltaResult + where + I: IntoIterator, + S: AsRef, + { + let cols: Vec = columns + .into_iter() + .map(|s| s.as_ref().parse()) + .collect::>()?; + Ok(DataLayout::Clustered(cols)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_data_layout_none() { + let layout = DataLayout::None; + assert!(layout.is_none()); + assert!(!layout.is_partitioned()); + assert!(!layout.is_clustered()); + assert!(layout.partition_columns().is_none()); + assert!(layout.clustering_columns().is_none()); + } + + #[test] + fn test_data_layout_partitioned() { + let layout = DataLayout::partitioned(["date", "region"]).unwrap(); + assert!(!layout.is_none()); + assert!(layout.is_partitioned()); + assert!(!layout.is_clustered()); + + let cols = layout.partition_columns().unwrap(); + assert_eq!(cols.len(), 2); + assert_eq!(cols[0], ColumnName::new(["date"])); + assert_eq!(cols[1], ColumnName::new(["region"])); + assert!(layout.clustering_columns().is_none()); + } + + #[test] + fn test_data_layout_partitioned_with_strings() { + // Test with Vec + let col_names = vec!["date".to_string(), "region".to_string()]; + let layout = DataLayout::partitioned(col_names).unwrap(); + assert!(layout.is_partitioned()); + assert_eq!(layout.partition_columns().unwrap().len(), 2); + } + + #[test] + fn test_data_layout_clustered() { + let layout = DataLayout::clustered(["col1", "nested.field"]).unwrap(); + assert!(!layout.is_none()); + assert!(!layout.is_partitioned()); + assert!(layout.is_clustered()); + assert!(layout.partition_columns().is_none()); + + let cols = layout.clustering_columns().unwrap(); + assert_eq!(cols.len(), 2); + assert_eq!(cols[0], ColumnName::new(["col1"])); + assert_eq!(cols[1], ColumnName::new(["nested", "field"])); + } + + #[test] + fn test_data_layout_clustered_nested_columns() { + // Test deeply nested columns + let layout = DataLayout::clustered(["a.b.c", "x.y"]).unwrap(); + let cols = layout.clustering_columns().unwrap(); + assert_eq!(cols[0], ColumnName::new(["a", "b", "c"])); + assert_eq!(cols[1], ColumnName::new(["x", "y"])); + } + + #[test] + fn test_data_layout_default() { + let layout = DataLayout::default(); + assert!(layout.is_none()); + } +} diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 687ea429b5..6819c2836e 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -20,7 +20,9 @@ use crate::error::Error; use crate::expressions::{column_name, ColumnName}; use crate::expressions::{ArrayData, Scalar, StructData, Transform, UnaryExpressionOp::ToJson}; use crate::path::{LogRoot, ParsedLogPath}; -use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor}; +use crate::row_tracking::{ + RowTrackingDomainMetadata, RowTrackingVisitor, ROW_TRACKING_DOMAIN_NAME, +}; use crate::scan::data_skipping::stats_schema::NullableStatsTransform; use crate::scan::log_replay::{ get_scan_metadata_transform_expr, BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME, @@ -44,6 +46,10 @@ use delta_kernel_derive::internal_api; pub mod create_table; #[cfg(not(feature = "internal-api"))] pub(crate) mod create_table; +pub mod data_layout; + +pub use create_table::create_table; +pub use data_layout::DataLayout; /// Type alias for an iterator of [`EngineData`] results. pub(crate) type EngineDataResultIterator<'a> = @@ -647,19 +653,49 @@ impl Transaction { // PRE_COMMIT_VERSION (u64::MAX) + 1 wraps to 0, which is the correct first version self.read_snapshot.version().wrapping_add(1) } - /// Validate that user domains don't conflict with system domains or each other. - fn validate_user_domain_operations(&self) -> DeltaResult<()> { + /// Validate domain metadata operations for both create-table and existing-table transactions. + /// + /// Enforces the following rules: + /// - DomainMetadata feature must be supported if any domain operations are present + /// - System domains (delta.*) can only be added in create-table transactions + /// - System domains must correspond to a known feature (e.g., rowTracking) and that feature must be enabled + /// - User domains can be added in both create-table and existing-table transactions + /// - Domain removals are not allowed in create-table transactions + /// - No duplicate domains within a single transaction + fn validate_domain_metadata_operations(&self) -> DeltaResult<()> { + // Feature validation (applies to all transactions with domain operations) + if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) + && !self + .read_snapshot + .table_configuration() + .is_feature_supported(&TableFeature::DomainMetadata) + { + return Err(Error::unsupported( + "Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature", + )); + } + + let is_create = self.is_create_table(); let mut seen_domains = HashSet::new(); // Validate domain additions for dm in &self.domain_metadata_additions { let domain = dm.domain(); - if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { + let is_system_domain = domain.starts_with(INTERNAL_DOMAIN_PREFIX); + + // System domains (delta.*) only allowed in create-table + if is_system_domain && !is_create { return Err(Error::generic( "Cannot modify domains that start with 'delta.' as those are system controlled", )); } + // For create-table, validate system domains against their required features + if is_system_domain && is_create { + self.validate_system_domain_feature(domain)?; + } + + // Check for duplicates if !seen_domains.insert(domain) { return Err(Error::generic(format!( "Metadata for domain {} already specified in this transaction", @@ -668,7 +704,14 @@ impl Transaction { } } - // Validate domain removals + // No removals allowed for create-table + if is_create && !self.domain_removals.is_empty() { + return Err(Error::unsupported( + "Domain metadata removals are not supported in create-table transactions", + )); + } + + // Validate domain removals (for non-create-table) for domain in &self.domain_removals { if domain.starts_with(INTERNAL_DOMAIN_PREFIX) { return Err(Error::generic( @@ -687,6 +730,39 @@ impl Transaction { Ok(()) } + /// Validate that a system domain corresponds to a known feature and that the feature is supported. + /// + /// This prevents arbitrary `delta.*` domains from being added during table creation. + /// Each known system domain must have its corresponding feature enabled in the protocol. + fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> { + let table_config = self.read_snapshot.table_configuration(); + + // Map domain to its required feature + let required_feature = match domain { + ROW_TRACKING_DOMAIN_NAME => Some(TableFeature::RowTracking), + // Will be changed to a constant in a follow up clustering create table feature PR + "delta.clustering" => Some(TableFeature::ClusteredTable), + _ => { + return Err(Error::generic(format!( + "Unknown system domain '{}'. Only known system domains are allowed.", + domain + ))); + } + }; + + // If the domain requires a feature, validate it's supported + if let Some(feature) = required_feature { + if !table_config.is_feature_supported(&feature) { + return Err(Error::generic(format!( + "System domain '{}' requires the '{}' feature to be enabled", + domain, feature + ))); + } + } + + Ok(()) + } + /// Helper function to convert scan metadata iterator to filtered engine data iterator. /// /// This adapter extracts the `scan_files` field from each [`crate::scan::ScanMetadata`] item, @@ -825,6 +901,36 @@ impl Transaction { Ok(()) } + /// Generate removal actions for user domain metadata by scanning the log. + /// + /// This performs an expensive log replay operation to fetch the previous configuration + /// value for each domain being removed, as required by the Delta spec for tombstones. + /// Returns an empty vector if there are no domain removals. + fn generate_user_domain_removal_actions( + &self, + engine: &dyn Engine, + ) -> DeltaResult> { + if self.domain_removals.is_empty() { + return Ok(vec![]); + } + + // Scan log to fetch existing configurations for tombstones + let existing_domains = + scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?; + + // Create removal tombstones with pre-image configurations + Ok(self + .domain_removals + .iter() + .filter_map(|domain| { + // If domain doesn't exist in the log, this is a no-op (filter it out) + existing_domains.get(domain).map(|existing| { + DomainMetadata::remove(domain.clone(), existing.configuration().to_owned()) + }) + }) + .collect()) + } + /// Generate domain metadata actions with validation. Handle both user and system domains. /// /// This function may perform an expensive log replay operation if there are any domain removals. @@ -835,55 +941,28 @@ impl Transaction { engine: &'a dyn Engine, row_tracking_high_watermark: Option, ) -> DeltaResult> { - // For create-table transactions, domain metadata will be added in - // a subsequent code commit - if self.is_create_table() { - if !self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty() { - return Err(Error::unsupported( - "Domain metadata operations are not supported in create-table transactions", + let is_create = self.is_create_table(); + + // Validate domain operations (includes feature validation) + self.validate_domain_metadata_operations()?; + + // TODO(sanuj) Create-table must not have row tracking or removals + // Defensive. Needs to be updated when row tracking support is added. + if is_create { + if row_tracking_high_watermark.is_some() { + return Err(Error::internal_error( + "CREATE TABLE cannot have row tracking domain metadata", )); } - return Ok(Box::new(iter::empty())); + // domain_removals already validated above, but be explicit + debug_assert!(self.domain_removals.is_empty()); } - // Validate feature support for user domain operations - if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) - && !self - .read_snapshot - .table_configuration() - .is_feature_supported(&TableFeature::DomainMetadata) - { - return Err(Error::unsupported("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature")); - } - - // Validate user domain operations - self.validate_user_domain_operations()?; - - // Generate user domain removals via log replay (expensive if non-empty) - let removal_actions = if !self.domain_removals.is_empty() { - // Scan log to fetch existing configurations for tombstones - let existing_domains = - scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?; - - // Create removal tombstones with pre-image configurations - let removals: Vec<_> = self - .domain_removals - .iter() - .filter_map(|domain| { - // If domain doesn't exist in the log, this is a no-op (filter it out) - existing_domains.get(domain).map(|existing| { - DomainMetadata::remove(domain.clone(), existing.configuration().to_owned()) - }) - }) - .collect(); - - removals - } else { - vec![] - }; + // Generate removal actions (empty for create-table due to validation above) + let removal_actions = self.generate_user_domain_removal_actions(engine)?; - // Generate system domain actions (row tracking) - let system_domain_actions = row_tracking_high_watermark + // Generate row tracking domain action (None for create-table) + let row_tracking_domain_action = row_tracking_high_watermark .map(DomainMetadata::try_from) .transpose()? .into_iter(); @@ -894,7 +973,7 @@ impl Transaction { .clone() .into_iter() .chain(removal_actions) - .chain(system_domain_actions) + .chain(row_tracking_domain_action) .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)), )) } diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 097a2b1e02..796a2060f9 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -292,6 +292,25 @@ pub(crate) mod test_utils { } } + /// Assert that a Result is an error and that the error message contains all expected substrings. + pub(crate) fn assert_result_error_with_messages( + res: Result, + messages: &[&str], + ) { + match res { + Ok(_) => panic!("Expected error with messages {:?}, but got Ok result", messages), + Err(error) => { + let error_str = error.to_string(); + for message in messages { + assert!( + error_str.contains(message), + "Error message does not contain expected substring.\nExpected substring:\t{message}\nActual message:\t\t{error_str}" + ); + } + } + } + } + /// Helper to get a field from a StructType by name, panicking if not found. pub(crate) fn get_schema_field(struct_type: &StructType, name: &str) -> StructField { struct_type diff --git a/kernel/tests/create_table.rs b/kernel/tests/create_table.rs index ffd2d2807c..33ba20b0f2 100644 --- a/kernel/tests/create_table.rs +++ b/kernel/tests/create_table.rs @@ -263,3 +263,76 @@ async fn test_create_table_log_actions() -> DeltaResult<()> { Ok(()) } + +#[test] +fn test_feature_overrides_and_delta_properties_rejected_until_on_allow_list() { + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL")) + .expect("Failed to create engine"); + + let schema = Arc::new( + StructType::try_new(vec![StructField::new("id", DataType::LONG, false)]) + .expect("Invalid schema"), + ); + + // Feature overrides are parsed but rejected during validation (not on allow-list) + let result = create_table(&table_path, schema.clone(), "FeatureTest/1.0") + .with_table_properties([("delta.feature.deletionVectors", "supported")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert_result_error_with_message( + result, + "Enabling feature 'deletionVectors' is not supported during CREATE TABLE", + ); + + // Delta properties are rejected during validation (not on allow-list) + let result = create_table(&table_path, schema.clone(), "FeatureTest/1.0") + .with_table_properties([("delta.enableDeletionVectors", "true")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert_result_error_with_message( + result, + "Setting delta property 'delta.enableDeletionVectors' is not supported during CREATE TABLE", + ); + + // User/application properties (non-delta.*) are allowed + let result = create_table(&table_path, schema, "FeatureTest/1.0") + .with_table_properties([("myapp.version", "1.0")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert!(result.is_ok(), "User properties should be allowed"); +} + +#[test] +fn test_feature_override_rejects_invalid_value_only_supported_allowed() { + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL")) + .expect("Failed to create engine"); + + let schema = Arc::new( + StructType::try_new(vec![StructField::new("id", DataType::LONG, false)]) + .expect("Invalid schema"), + ); + + // "enabled" is not valid - only "supported" is allowed + let result = create_table(&table_path, schema, "FeatureTest/1.0") + .with_table_properties([("delta.feature.deletionVectors", "enabled")]) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Invalid value"), + "Error should mention invalid value" + ); + assert!( + err_msg.contains("supported"), + "Error should mention 'supported' as valid value" + ); +}