Skip to content

Commit 67af9e6

Browse files
committed
Add enumset to refactor ensure_supported_features
1 parent 72e369f commit 67af9e6

File tree

5 files changed

+92
-110
lines changed

5 files changed

+92
-110
lines changed

kernel/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ tracing = { version = "0.1", features = ["log"] }
5151
url = "2"
5252
uuid = "1.10.0"
5353
z85 = "3.0.5"
54+
enumset = "1.1.5"
5455

5556
# bring in our derive macros
5657
delta_kernel_derive = { path = "../derive-macros", version = "0.8.0" }

kernel/src/actions/mod.rs

Lines changed: 4 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
//! Provides parsing and manipulation of the various actions defined in the [Delta
22
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)
33
4-
use std::any::type_name;
5-
use std::collections::{HashMap, HashSet};
6-
use std::fmt::{Debug, Display};
7-
use std::hash::Hash;
8-
use std::str::FromStr;
4+
use std::collections::HashMap;
5+
use std::fmt::Debug;
96
use std::sync::LazyLock;
107

118
use self::deletion_vector::DeletionVectorDescriptor;
129
use crate::actions::schemas::GetStructField;
1310
use crate::schema::{SchemaRef, StructType};
1411
use crate::table_features::{
15-
ReaderFeature, WriterFeature, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
12+
ensure_supported_features, ReaderFeature, WriterFeature, SUPPORTED_READER_FEATURES,
13+
SUPPORTED_WRITER_FEATURES,
1614
};
1715
use crate::table_properties::TableProperties;
1816
use crate::utils::require;
@@ -321,44 +319,6 @@ impl Protocol {
321319
}
322320
}
323321

324-
// given unparsed `table_features`, parse and check if they are subset of `supported_features`
325-
pub(crate) fn ensure_supported_features<T>(
326-
table_features: &[String],
327-
supported_features: &HashSet<T>,
328-
) -> DeltaResult<()>
329-
where
330-
<T as FromStr>::Err: Display,
331-
T: Debug + FromStr + Hash + Eq,
332-
{
333-
let error = |unsupported, unsupported_or_unknown| {
334-
let supported = supported_features.iter().collect::<Vec<_>>();
335-
let features_type = type_name::<T>()
336-
.rsplit("::")
337-
.next()
338-
.unwrap_or("table features");
339-
Error::Unsupported(format!(
340-
"{} {} {:?}. Supported {} are {:?}",
341-
unsupported_or_unknown, features_type, unsupported, features_type, supported
342-
))
343-
};
344-
let parsed_features: HashSet<T> = table_features
345-
.iter()
346-
.map(|s| T::from_str(s).map_err(|_| error(vec![s.to_string()], "Unknown")))
347-
.collect::<Result<_, Error>>()?;
348-
349-
// check that parsed features are a subset of supported features
350-
parsed_features
351-
.is_subset(supported_features)
352-
.then_some(())
353-
.ok_or_else(|| {
354-
let unsupported = parsed_features
355-
.difference(supported_features)
356-
.map(|f| format!("{:?}", f))
357-
.collect::<Vec<_>>();
358-
error(unsupported, "Unsupported")
359-
})
360-
}
361-
362322
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
363323
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
364324
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
@@ -969,26 +929,4 @@ mod tests {
969929
.unwrap();
970930
assert!(protocol.ensure_write_supported().is_err());
971931
}
972-
973-
#[test]
974-
fn test_ensure_supported_features() {
975-
let supported_features = [ReaderFeature::ColumnMapping, ReaderFeature::DeletionVectors]
976-
.into_iter()
977-
.collect();
978-
let table_features = vec![ReaderFeature::ColumnMapping.to_string()];
979-
ensure_supported_features(&table_features, &supported_features).unwrap();
980-
981-
// test unknown features
982-
let table_features = vec![ReaderFeature::ColumnMapping.to_string(), "idk".to_string()];
983-
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
984-
match error {
985-
Error::Unsupported(e) if e ==
986-
"Unknown ReaderFeature [\"idk\"]. Supported ReaderFeature are [ColumnMapping, DeletionVectors]"
987-
=> {},
988-
Error::Unsupported(e) if e ==
989-
"Unknown ReaderFeature [\"idk\"]. Supported ReaderFeature are [DeletionVectors, ColumnMapping]"
990-
=> {},
991-
_ => panic!("Expected unsupported error"),
992-
}
993-
}
994932
}

kernel/src/table_changes/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,19 @@
3131
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
3232
//! # Ok::<(), Error>(())
3333
//! ```
34-
use std::collections::HashSet;
3534
use std::sync::{Arc, LazyLock};
3635

3736
use scan::TableChangesScanBuilder;
3837
use url::Url;
3938

40-
use crate::actions::{ensure_supported_features, Protocol};
39+
use crate::actions::Protocol;
4140
use crate::log_segment::LogSegment;
4241
use crate::path::AsUrl;
4342
use crate::schema::{DataType, Schema, StructField, StructType};
4443
use crate::snapshot::Snapshot;
45-
use crate::table_features::{ColumnMappingMode, ReaderFeature};
44+
use crate::table_features::{
45+
ensure_supported_features, ColumnMappingMode, CDF_SUPPORTED_READER_FEATURES,
46+
};
4647
use crate::table_properties::TableProperties;
4748
use crate::utils::require;
4849
use crate::{DeltaResult, Engine, Error, Version};
@@ -252,8 +253,6 @@ fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult
252253
/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
253254
/// See the documentation of [`TableChanges`] for more details.
254255
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
255-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
256-
LazyLock::new(|| HashSet::from([ReaderFeature::DeletionVectors]));
257256
match &protocol.reader_features() {
258257
// if min_reader_version = 3 and all reader features are subset of supported => OK
259258
Some(reader_features) if protocol.min_reader_version() == 3 => {

kernel/src/table_configuration.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,14 @@
88
//! [`TableProperties`].
99
//!
1010
//! [`Schema`]: crate::schema::Schema
11-
use std::collections::HashSet;
12-
use std::sync::{Arc, LazyLock};
13-
11+
use std::sync::Arc;
1412
use url::Url;
1513

16-
use crate::actions::{ensure_supported_features, Metadata, Protocol};
14+
use crate::actions::{Metadata, Protocol};
1715
use crate::schema::{InvariantChecker, SchemaRef};
1816
use crate::table_features::{
19-
column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode, ReaderFeature,
20-
WriterFeature,
17+
column_mapping_mode, ensure_supported_features, validate_schema_column_mapping,
18+
ColumnMappingMode, ReaderFeature, WriterFeature, CDF_SUPPORTED_READER_FEATURES,
2119
};
2220
use crate::table_properties::TableProperties;
2321
use crate::{DeltaResult, Error, Version};
@@ -156,8 +154,6 @@ impl TableConfiguration {
156154
/// [`TableChanges`]: crate::table_changes::TableChanges
157155
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
158156
pub(crate) fn is_cdf_read_supported(&self) -> bool {
159-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
160-
LazyLock::new(|| HashSet::from([ReaderFeature::DeletionVectors]));
161157
let protocol_supported = match self.protocol.reader_features() {
162158
// if min_reader_version = 3 and all reader features are subset of supported => OK
163159
Some(reader_features) if self.protocol.min_reader_version() == 3 => {

kernel/src/table_features/mod.rs

Lines changed: 79 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use std::collections::HashSet;
2-
use std::sync::LazyLock;
3-
4-
use serde::{Deserialize, Serialize};
1+
use crate::{DeltaResult, Error};
2+
use enumset::{enum_set, EnumSet, EnumSetType};
3+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
4+
use std::any::type_name;
5+
use std::fmt::Display;
56
use strum::{AsRefStr, Display as StrumDisplay, EnumString, VariantNames};
67

78
pub(crate) use column_mapping::column_mapping_mode;
@@ -18,14 +19,12 @@ mod column_mapping;
1819
Serialize,
1920
Deserialize,
2021
Debug,
21-
Clone,
22-
Eq,
23-
PartialEq,
2422
EnumString,
2523
StrumDisplay,
2624
AsRefStr,
2725
VariantNames,
2826
Hash,
27+
EnumSetType,
2928
)]
3029
#[strum(serialize_all = "camelCase")]
3130
#[serde(rename_all = "camelCase")]
@@ -59,14 +58,12 @@ pub enum ReaderFeature {
5958
Serialize,
6059
Deserialize,
6160
Debug,
62-
Clone,
63-
Eq,
64-
PartialEq,
6561
EnumString,
6662
StrumDisplay,
6763
AsRefStr,
6864
VariantNames,
6965
Hash,
66+
EnumSetType,
7067
)]
7168
#[strum(serialize_all = "camelCase")]
7269
#[serde(rename_all = "camelCase")]
@@ -123,30 +120,55 @@ impl From<WriterFeature> for String {
123120
}
124121
}
125122

126-
pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
127-
LazyLock::new(|| {
128-
HashSet::from([
129-
ReaderFeature::ColumnMapping,
130-
ReaderFeature::DeletionVectors,
131-
ReaderFeature::TimestampWithoutTimezone,
132-
ReaderFeature::TypeWidening,
133-
ReaderFeature::TypeWideningPreview,
134-
ReaderFeature::VacuumProtocolCheck,
135-
ReaderFeature::V2Checkpoint,
136-
])
137-
});
138-
139-
pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock<HashSet<WriterFeature>> =
123+
pub(crate) static SUPPORTED_READER_FEATURES: EnumSet<ReaderFeature> = enum_set!(
124+
ReaderFeature::ColumnMapping
125+
| ReaderFeature::DeletionVectors
126+
| ReaderFeature::TimestampWithoutTimezone
127+
| ReaderFeature::TypeWidening
128+
| ReaderFeature::TypeWideningPreview
129+
| ReaderFeature::VacuumProtocolCheck
130+
| ReaderFeature::V2Checkpoint
131+
);
132+
133+
pub(crate) static SUPPORTED_WRITER_FEATURES: EnumSet<WriterFeature> =
140134
// note: we 'support' Invariants, but only insofar as we check that they are not present.
141135
// we support writing to tables that have Invariants enabled but not used. similarly, we only
142136
// support DeletionVectors in that we never write them (no DML).
143-
LazyLock::new(|| {
144-
HashSet::from([
145-
WriterFeature::AppendOnly,
146-
WriterFeature::DeletionVectors,
147-
WriterFeature::Invariants,
148-
])
149-
});
137+
enum_set!(
138+
WriterFeature::AppendOnly | WriterFeature::DeletionVectors | WriterFeature::Invariants
139+
);
140+
141+
pub(crate) static CDF_SUPPORTED_READER_FEATURES: EnumSet<ReaderFeature> =
142+
enum_set!(ReaderFeature::DeletionVectors);
143+
144+
pub(crate) fn ensure_supported_features<F>(
145+
features: &[String],
146+
supported: &EnumSet<F>,
147+
) -> DeltaResult<()>
148+
where
149+
F: DeserializeOwned + EnumSetType + Display,
150+
{
151+
let features_type = type_name::<F>().rsplit("::").next().unwrap();
152+
for feature_str in features {
153+
match serde_json::from_str::<F>(&format!("\"{}\"", feature_str)) {
154+
Ok(feature_enum) => {
155+
if !supported.contains(feature_enum) {
156+
return Err(Error::Unsupported(format!(
157+
"Unsupported {} variant `{}`. Supported features: {}",
158+
features_type, feature_enum, supported
159+
)));
160+
}
161+
}
162+
Err(_) => {
163+
return Err(Error::Unsupported(format!(
164+
"Unknown {} variant `{}`. Supported features: {}",
165+
features_type, feature_str, supported
166+
)));
167+
}
168+
}
169+
}
170+
Ok(())
171+
}
150172

151173
#[cfg(test)]
152174
mod tests {
@@ -217,4 +239,30 @@ mod tests {
217239
assert_eq!(from_str, feature);
218240
}
219241
}
242+
243+
#[test]
244+
fn test_ensure_supported_features() {
245+
let supported_features =
246+
enum_set!(ReaderFeature::ColumnMapping | ReaderFeature::DeletionVectors);
247+
let table_features = vec![ReaderFeature::ColumnMapping.to_string()];
248+
ensure_supported_features(&table_features, &supported_features).unwrap();
249+
}
250+
251+
#[test]
252+
fn test_ensure_supported_features_unsupported() {
253+
let supported_features =
254+
enum_set!(ReaderFeature::ColumnMapping | ReaderFeature::DeletionVectors);
255+
let table_features = vec![ReaderFeature::TimestampWithoutTimezone.to_string()];
256+
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
257+
assert_eq!(error.to_string(), "Unsupported: Unsupported ReaderFeature variant `timestampNtz`. Supported features: columnMapping | deletionVectors".to_string());
258+
}
259+
260+
#[test]
261+
fn test_ensure_supported_features_unknown() {
262+
let supported_features =
263+
enum_set!(ReaderFeature::ColumnMapping | ReaderFeature::DeletionVectors);
264+
let table_features = vec![ReaderFeature::ColumnMapping.to_string(), "idk".to_string()];
265+
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
266+
assert_eq!(error.to_string(), "Unsupported: Unknown ReaderFeature variant `idk`. Supported features: columnMapping | deletionVectors".to_string());
267+
}
220268
}

0 commit comments

Comments
 (0)