Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ tracing = { version = "0.1", features = ["log"] }
url = "2"
uuid = "1.10.0"
z85 = "3.0.5"
enumset = "1.1.5"

# bring in our derive macros
delta_kernel_derive = { path = "../derive-macros", version = "0.8.0" }
Expand Down
70 changes: 4 additions & 66 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
//! Provides parsing and manipulation of the various actions defined in the [Delta
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

use std::any::type_name;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::str::FromStr;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::LazyLock;

use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::schema::{SchemaRef, StructType};
use crate::table_features::{
ReaderFeature, WriterFeature, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
ensure_supported_features, ReaderFeature, WriterFeature, SUPPORTED_READER_FEATURES,
SUPPORTED_WRITER_FEATURES,
};
use crate::table_properties::TableProperties;
use crate::utils::require;
Expand Down Expand Up @@ -321,44 +319,6 @@ impl Protocol {
}
}

// given unparsed `table_features`, parse and check if they are subset of `supported_features`
pub(crate) fn ensure_supported_features<T>(
table_features: &[String],
supported_features: &HashSet<T>,
) -> DeltaResult<()>
where
<T as FromStr>::Err: Display,
T: Debug + FromStr + Hash + Eq,
{
let error = |unsupported, unsupported_or_unknown| {
let supported = supported_features.iter().collect::<Vec<_>>();
let features_type = type_name::<T>()
.rsplit("::")
.next()
.unwrap_or("table features");
Error::Unsupported(format!(
"{} {} {:?}. Supported {} are {:?}",
unsupported_or_unknown, features_type, unsupported, features_type, supported
))
};
let parsed_features: HashSet<T> = table_features
.iter()
.map(|s| T::from_str(s).map_err(|_| error(vec![s.to_string()], "Unknown")))
.collect::<Result<_, Error>>()?;

// check that parsed features are a subset of supported features
parsed_features
.is_subset(supported_features)
.then_some(())
.ok_or_else(|| {
let unsupported = parsed_features
.difference(supported_features)
.map(|f| format!("{:?}", f))
.collect::<Vec<_>>();
error(unsupported, "Unsupported")
})
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
Expand Down Expand Up @@ -969,26 +929,4 @@ mod tests {
.unwrap();
assert!(protocol.ensure_write_supported().is_err());
}

#[test]
fn test_ensure_supported_features() {
let supported_features = [ReaderFeature::ColumnMapping, ReaderFeature::DeletionVectors]
.into_iter()
.collect();
let table_features = vec![ReaderFeature::ColumnMapping.to_string()];
ensure_supported_features(&table_features, &supported_features).unwrap();

// test unknown features
let table_features = vec![ReaderFeature::ColumnMapping.to_string(), "idk".to_string()];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
match error {
Error::Unsupported(e) if e ==
"Unknown ReaderFeature [\"idk\"]. Supported ReaderFeature are [ColumnMapping, DeletionVectors]"
=> {},
Error::Unsupported(e) if e ==
"Unknown ReaderFeature [\"idk\"]. Supported ReaderFeature are [DeletionVectors, ColumnMapping]"
=> {},
_ => panic!("Expected unsupported error"),
}
}
}
9 changes: 4 additions & 5 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
//! # Ok::<(), Error>(())
//! ```
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use scan::TableChangesScanBuilder;
use url::Url;

use crate::actions::{ensure_supported_features, Protocol};
use crate::actions::Protocol;
use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::{ColumnMappingMode, ReaderFeature};
use crate::table_features::{
ensure_supported_features, ColumnMappingMode, CDF_SUPPORTED_READER_FEATURES,
};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, Version};
Expand Down Expand Up @@ -252,8 +253,6 @@ fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult
/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
/// See the documentation of [`TableChanges`] for more details.
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
LazyLock::new(|| HashSet::from([ReaderFeature::DeletionVectors]));
match &protocol.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if protocol.min_reader_version() == 3 => {
Expand Down
12 changes: 4 additions & 8 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
//! [`TableProperties`].
//!
//! [`Schema`]: crate::schema::Schema
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

use std::sync::Arc;
use url::Url;

use crate::actions::{ensure_supported_features, Metadata, Protocol};
use crate::actions::{Metadata, Protocol};
use crate::schema::{InvariantChecker, SchemaRef};
use crate::table_features::{
column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode, ReaderFeature,
WriterFeature,
column_mapping_mode, ensure_supported_features, validate_schema_column_mapping,
ColumnMappingMode, ReaderFeature, WriterFeature, CDF_SUPPORTED_READER_FEATURES,
};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error, Version};
Expand Down Expand Up @@ -156,8 +154,6 @@ impl TableConfiguration {
/// [`TableChanges`]: crate::table_changes::TableChanges
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn is_cdf_read_supported(&self) -> bool {
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
LazyLock::new(|| HashSet::from([ReaderFeature::DeletionVectors]));
let protocol_supported = match self.protocol.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if self.protocol.min_reader_version() == 3 => {
Expand Down
110 changes: 79 additions & 31 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashSet;
use std::sync::LazyLock;

use serde::{Deserialize, Serialize};
use crate::{DeltaResult, Error};
use enumset::{enum_set, EnumSet, EnumSetType};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::any::type_name;
use std::fmt::Display;
use strum::{AsRefStr, Display as StrumDisplay, EnumString, VariantNames};

pub(crate) use column_mapping::column_mapping_mode;
Expand All @@ -18,14 +19,12 @@ mod column_mapping;
Serialize,
Deserialize,
Debug,
Clone,
Eq,
PartialEq,
EnumString,
StrumDisplay,
AsRefStr,
VariantNames,
Hash,
EnumSetType,
)]
#[strum(serialize_all = "camelCase")]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -59,14 +58,12 @@ pub enum ReaderFeature {
Serialize,
Deserialize,
Debug,
Clone,
Eq,
PartialEq,
EnumString,
StrumDisplay,
AsRefStr,
VariantNames,
Hash,
EnumSetType,
)]
#[strum(serialize_all = "camelCase")]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -123,30 +120,55 @@ impl From<WriterFeature> for String {
}
}

pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
LazyLock::new(|| {
HashSet::from([
ReaderFeature::ColumnMapping,
ReaderFeature::DeletionVectors,
ReaderFeature::TimestampWithoutTimezone,
ReaderFeature::TypeWidening,
ReaderFeature::TypeWideningPreview,
ReaderFeature::VacuumProtocolCheck,
ReaderFeature::V2Checkpoint,
])
});

pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock<HashSet<WriterFeature>> =
pub(crate) static SUPPORTED_READER_FEATURES: EnumSet<ReaderFeature> = enum_set!(
ReaderFeature::ColumnMapping
| ReaderFeature::DeletionVectors
| ReaderFeature::TimestampWithoutTimezone
| ReaderFeature::TypeWidening
| ReaderFeature::TypeWideningPreview
| ReaderFeature::VacuumProtocolCheck
| ReaderFeature::V2Checkpoint
);

pub(crate) static SUPPORTED_WRITER_FEATURES: EnumSet<WriterFeature> =
// note: we 'support' Invariants, but only insofar as we check that they are not present.
// we support writing to tables that have Invariants enabled but not used. similarly, we only
// support DeletionVectors in that we never write them (no DML).
LazyLock::new(|| {
HashSet::from([
WriterFeature::AppendOnly,
WriterFeature::DeletionVectors,
WriterFeature::Invariants,
])
});
enum_set!(
WriterFeature::AppendOnly | WriterFeature::DeletionVectors | WriterFeature::Invariants
);

pub(crate) static CDF_SUPPORTED_READER_FEATURES: EnumSet<ReaderFeature> =
enum_set!(ReaderFeature::DeletionVectors);

pub(crate) fn ensure_supported_features<F>(
features: &[String],
supported: &EnumSet<F>,
) -> DeltaResult<()>
where
F: DeserializeOwned + EnumSetType + Display,
{
let features_type = type_name::<F>().rsplit("::").next().unwrap();
for feature_str in features {
match serde_json::from_str::<F>(&format!("\"{}\"", feature_str)) {
Ok(feature_enum) => {
if !supported.contains(feature_enum) {
return Err(Error::Unsupported(format!(
"Unsupported {} variant `{}`. Supported features: {}",
features_type, feature_enum, supported
)));
}
}
Err(_) => {
return Err(Error::Unsupported(format!(
"Unknown {} variant `{}`. Supported features: {}",
features_type, feature_str, supported
)));
}
}
}
Ok(())
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -217,4 +239,30 @@ mod tests {
assert_eq!(from_str, feature);
}
}

#[test]
fn test_ensure_supported_features() {
let supported_features =
enum_set!(ReaderFeature::ColumnMapping | ReaderFeature::DeletionVectors);
let table_features = vec![ReaderFeature::ColumnMapping.to_string()];
ensure_supported_features(&table_features, &supported_features).unwrap();
}

#[test]
fn test_ensure_supported_features_unsupported() {
let supported_features =
enum_set!(ReaderFeature::ColumnMapping | ReaderFeature::DeletionVectors);
let table_features = vec![ReaderFeature::TimestampWithoutTimezone.to_string()];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
assert_eq!(error.to_string(), "Unsupported: Unsupported ReaderFeature variant `timestampNtz`. Supported features: columnMapping | deletionVectors".to_string());
}

#[test]
fn test_ensure_supported_features_unknown() {
let supported_features =
enum_set!(ReaderFeature::ColumnMapping | ReaderFeature::DeletionVectors);
let table_features = vec![ReaderFeature::ColumnMapping.to_string(), "idk".to_string()];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
assert_eq!(error.to_string(), "Unsupported: Unknown ReaderFeature variant `idk`. Supported features: columnMapping | deletionVectors".to_string());
}
}