Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};

use indexmap::IndexSet;

use self::deletion_vector::DeletionVectorDescriptor;
use crate::expressions::{MapData, Scalar, StructData};
use crate::schema::{DataType, MapType, SchemaRef, StructField, StructType, ToSchema as _};
Expand Down Expand Up @@ -400,19 +403,26 @@ pub(crate) struct Protocol {
writer_features: Option<Vec<TableFeature>>,
}

/// Parse features from strings and deduplicate them while preserving insertion order.
///
/// This function converts feature strings to the target type and removes duplicates
/// to ensure the protocol doesn't contain redundant feature entries. Uses IndexSet
/// to maintain the order of first occurrence for each unique feature.
fn parse_features<T>(features: Option<impl IntoIterator<Item = impl ToString>>) -> Option<Vec<T>>
where
T: FromStr,
T: FromStr + Hash + Eq,
T::Err: Debug,
{
features
.map(|fs| {
fs.into_iter()
.map(|f| T::from_str(&f.to_string()))
.collect()
})
.transpose()
.ok()?
features.map(|fs| {
// Parse all features and collect into an IndexSet to deduplicate
// while preserving insertion order
let unique_features: IndexSet<T> = fs
.into_iter()
.filter_map(|f| T::from_str(&f.to_string()).ok())
.collect();
// Convert to Vec for storage
unique_features.into_iter().collect()
})
}

impl Protocol {
Expand Down
148 changes: 148 additions & 0 deletions kernel/src/clustering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//! Clustering support for Delta tables.
//!
//! This module provides support for clustering in Delta tables. Clustering
//! allows data to be physically co-located based on column values, enabling efficient
//! data skipping without the directory overhead of partitioning.
//!
//! Clustering metadata is stored as domain metadata with the domain name `delta.clustering`.

use serde::{Deserialize, Serialize};

use crate::actions::DomainMetadata;
use crate::schema::ColumnName;
use crate::DeltaResult;

/// The domain name for clustering metadata in Delta tables.
pub(crate) const CLUSTERING_DOMAIN_NAME: &str = "delta.clustering";

/// Represents the clustering metadata stored as domain metadata in Delta tables.
///
/// This struct is serialized to JSON and stored in the `delta.clustering` domain
/// metadata action. The clustering columns are stored as a list of column paths,
/// where each path is a list of field names (to support nested columns).
///
/// If column mapping is enabled on the table, physical column names are stored;
/// otherwise, logical column names are used.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ClusteringMetadataDomain {
/// The columns used for clustering, stored as paths (list of field names).
/// For example: `[["col1"], ["nested", "field"]]`
clustering_columns: Vec<Vec<String>>,
}

impl ClusteringMetadataDomain {
/// Creates a new clustering metadata domain from column names.
///
/// # Arguments
///
/// * `columns` - The columns to cluster by. Each column is represented as a
/// [`ColumnName`] which supports nested paths.
pub(crate) fn new(columns: &[ColumnName]) -> Self {
let clustering_columns = columns
.iter()
.map(|col| col.path().iter().map(|s| s.to_string()).collect())
.collect();
Self { clustering_columns }
}

/// Returns the clustering columns as a slice of column paths.
#[cfg(test)]
pub(crate) fn clustering_columns(&self) -> &[Vec<String>] {
&self.clustering_columns
}

/// Converts this clustering metadata to a [`DomainMetadata`] action.
///
/// # Errors
///
/// Returns an error if the metadata cannot be serialized to JSON.
pub(crate) fn to_domain_metadata(&self) -> DeltaResult<DomainMetadata> {
let configuration = serde_json::to_string(self)?;
Ok(DomainMetadata::new(
CLUSTERING_DOMAIN_NAME.to_string(),
configuration,
))
}
}

#[cfg(test)]
mod tests {
use super::*;

impl ClusteringMetadataDomain {
/// Creates a clustering metadata domain from a JSON configuration string.
fn from_json(json: &str) -> DeltaResult<Self> {
Ok(serde_json::from_str(json)?)
}
}

#[test]
fn test_clustering_metadata_new() {
let columns = vec![
ColumnName::new(["col1"]),
ColumnName::new(["nested", "field"]),
];
let metadata = ClusteringMetadataDomain::new(&columns);

assert_eq!(
metadata.clustering_columns(),
&[
vec!["col1".to_string()],
vec!["nested".to_string(), "field".to_string()]
]
);
}

#[test]
fn test_clustering_metadata_empty() {
let metadata = ClusteringMetadataDomain::new(&[]);
assert!(metadata.clustering_columns().is_empty());
}

#[test]
fn test_clustering_metadata_serialization() {
let columns = vec![
ColumnName::new(["col1"]),
ColumnName::new(["nested", "field"]),
];
let metadata = ClusteringMetadataDomain::new(&columns);

let json = serde_json::to_string(&metadata).unwrap();
assert!(json.contains("clusteringColumns"));
assert!(json.contains("col1"));
assert!(json.contains("nested"));
assert!(json.contains("field"));

// Round-trip test
let deserialized: ClusteringMetadataDomain = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, metadata);
}

#[test]
fn test_clustering_metadata_to_domain_metadata() {
let columns = vec![ColumnName::new(["col1"])];
let metadata = ClusteringMetadataDomain::new(&columns);

let domain_metadata = metadata.to_domain_metadata().unwrap();
assert_eq!(domain_metadata.domain(), CLUSTERING_DOMAIN_NAME);

// Verify the configuration can be parsed back
let parsed = ClusteringMetadataDomain::from_json(domain_metadata.configuration()).unwrap();
assert_eq!(parsed, metadata);
}

#[test]
fn test_clustering_metadata_from_json() {
let json = r#"{"clusteringColumns":[["col1"],["nested","field"]]}"#;
let metadata = ClusteringMetadataDomain::from_json(json).unwrap();

assert_eq!(
metadata.clustering_columns(),
&[
vec!["col1".to_string()],
vec!["nested".to_string(), "field".to_string()]
]
);
}
}
6 changes: 6 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ pub mod table_changes;
pub mod table_configuration;
pub mod table_features;
pub mod table_properties;
mod table_property_protocol_config;
pub mod transaction;
pub(crate) mod transforms;

pub use log_path::LogPath;

pub(crate) mod clustering;
mod row_tracking;

mod arrow_compat;
Expand Down Expand Up @@ -161,6 +163,10 @@ pub use log_compaction::{should_compact, LogCompactionWriter};
pub use metrics::MetricsReporter;
pub use snapshot::Snapshot;
pub use snapshot::SnapshotRef;
#[cfg(feature = "internal-api")]
pub use transaction::create_table::{create_table, CreateTableTransactionBuilder};
#[cfg(feature = "internal-api")]
pub use transaction::DataLayout;

use expressions::literal_expression_transform::LiteralExpressionTransform;
use expressions::Scalar;
Expand Down
25 changes: 14 additions & 11 deletions kernel/src/row_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ pub(crate) struct RowTrackingDomainMetadata {
row_id_high_water_mark: i64,
}

impl RowTrackingDomainMetadata {
const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking";
/// The domain name for row tracking metadata.
pub(crate) const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking";

impl RowTrackingDomainMetadata {
pub(crate) fn new(row_id_high_water_mark: i64) -> Self {
RowTrackingDomainMetadata {
row_id_high_water_mark,
Expand All @@ -45,14 +46,16 @@ impl RowTrackingDomainMetadata {
snapshot: &Snapshot,
engine: &dyn Engine,
) -> DeltaResult<Option<i64>> {
Ok(domain_metadata_configuration(
snapshot.log_segment(),
Self::ROW_TRACKING_DOMAIN_NAME,
engine,
)?
.map(|domain_metadata| serde_json::from_str::<Self>(&domain_metadata))
.transpose()?
.map(|metadata| metadata.row_id_high_water_mark))
Ok(
domain_metadata_configuration(
snapshot.log_segment(),
ROW_TRACKING_DOMAIN_NAME,
engine,
)?
.map(|domain_metadata| serde_json::from_str::<Self>(&domain_metadata))
.transpose()?
.map(|metadata| metadata.row_id_high_water_mark),
)
}
}

Expand All @@ -61,7 +64,7 @@ impl TryFrom<RowTrackingDomainMetadata> for DomainMetadata {

fn try_from(metadata: RowTrackingDomainMetadata) -> DeltaResult<Self> {
Ok(DomainMetadata::new(
RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(),
ROW_TRACKING_DOMAIN_NAME.to_string(),
serde_json::to_string(&metadata)?,
))
}
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Configuration for reading existing Delta tables.
//!
//! This module defines [`TableConfiguration`], a high level api to check feature support and
//! feature enablement for a table at a given version. This encapsulates [`Protocol`], [`Metadata`],
//! [`Schema`], [`TableProperties`], and [`ColumnMappingMode`]. These structs in isolation should
Expand All @@ -7,6 +9,12 @@
//! reader/writer features, and ensure that the deletion vector table property is enabled in the
//! [`TableProperties`].
//!
//! # Related Modules
//!
//! - [`crate::table_property_protocol_config`]: For **creating/modifying** tables. Parses

Check failure on line 14 in kernel/src/table_configuration.rs

View workflow job for this annotation

GitHub Actions / docs

public documentation for `table_configuration` links to private item `crate::table_property_protocol_config`
//! user-provided properties to extract signal flags and create protocols. Use this when building
//! new tables or modifying existing table properties.
//!
//! [`Schema`]: crate::schema::Schema
use std::sync::Arc;

Expand Down
64 changes: 62 additions & 2 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use itertools::Itertools;
use std::str::FromStr;

use serde::{Deserialize, Serialize};
use strum::{AsRefStr, Display as StrumDisplay, EnumCount, EnumString};

Expand Down Expand Up @@ -417,14 +419,13 @@ static ICEBERG_COMPAT_V2_INFO: FeatureInfo = FeatureInfo {
}),
};

#[allow(dead_code)]
static CLUSTERED_TABLE_INFO: FeatureInfo = FeatureInfo {
name: "clustering",
min_reader_version: 1,
min_writer_version: 7,
feature_type: FeatureType::Writer,
feature_requirements: &[FeatureRequirement::Supported(TableFeature::DomainMetadata)],
kernel_support: KernelSupport::NotSupported,
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};

Expand Down Expand Up @@ -679,6 +680,20 @@ impl TableFeature {
TableFeature::Unknown(_) => None,
}
}

/// Parse a feature name string into a TableFeature.
///
/// Known feature names are parsed into their corresponding variants.
/// Unknown feature names are wrapped in `TableFeature::Unknown`.
pub(crate) fn from_name(name: &str) -> Self {
TableFeature::from_str(name).unwrap_or_else(|_| TableFeature::Unknown(name.to_string()))
}

/// Returns true if this is a ReaderWriter feature (appears in both reader and writer feature lists).
/// Returns false for Writer-only features and Unknown features.
pub(crate) fn is_reader_writer(&self) -> bool {
matches!(self.feature_type(), FeatureType::ReaderWriter)
}
}

impl ToDataType for TableFeature {
Expand Down Expand Up @@ -829,4 +844,49 @@ mod tests {
assert_eq!(from_str, feature);
}
}

#[test]
fn test_from_name() {
// Known features
assert_eq!(
TableFeature::from_name("deletionVectors"),
TableFeature::DeletionVectors
);
assert_eq!(
TableFeature::from_name("changeDataFeed"),
TableFeature::ChangeDataFeed
);
assert_eq!(
TableFeature::from_name("columnMapping"),
TableFeature::ColumnMapping
);
assert_eq!(
TableFeature::from_name("timestampNtz"),
TableFeature::TimestampWithoutTimezone
);

// Unknown features
assert_eq!(
TableFeature::from_name("unknownFeature"),
TableFeature::Unknown("unknownFeature".to_string())
);
}

#[test]
fn test_is_reader_writer() {
// ReaderWriter features
assert!(TableFeature::DeletionVectors.is_reader_writer());
assert!(TableFeature::ColumnMapping.is_reader_writer());
assert!(TableFeature::TimestampWithoutTimezone.is_reader_writer());
assert!(TableFeature::V2Checkpoint.is_reader_writer());

// Writer-only features
assert!(!TableFeature::ChangeDataFeed.is_reader_writer());
assert!(!TableFeature::AppendOnly.is_reader_writer());
assert!(!TableFeature::DomainMetadata.is_reader_writer());
assert!(!TableFeature::RowTracking.is_reader_writer());

// Unknown features
assert!(!TableFeature::unknown("something").is_reader_writer());
}
}
8 changes: 8 additions & 0 deletions kernel/src/table_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ pub use deserialize::ParseIntervalError;
/// Prefix for delta table properties (e.g., `delta.enableChangeDataFeed`, `delta.appendOnly`).
pub const DELTA_PROPERTY_PREFIX: &str = "delta.";

/// Table property key for specifying the minimum reader protocol version.
/// This is a signal flag property - it affects protocol creation but is not stored in metadata.
pub const MIN_READER_VERSION_PROP: &str = "delta.minReaderVersion";

/// Table property key for specifying the minimum writer protocol version.
/// This is a signal flag property - it affects protocol creation but is not stored in metadata.
pub const MIN_WRITER_VERSION_PROP: &str = "delta.minWriterVersion";

/// Delta table properties. These are parsed from the 'configuration' map in the most recent
/// 'Metadata' action of a table.
///
Expand Down
Loading
Loading