Skip to content

Commit c46c4e6

Browse files
committed
Add enumset to refactor ensure_supported_features
1 parent 72e369f commit c46c4e6

File tree

5 files changed

+61
-88
lines changed

5 files changed

+61
-88
lines changed

kernel/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ tokio = { version = "1.40", optional = true, features = ["rt-multi-thread"] }
8787
# Used in integration tests
8888
hdfs-native = { workspace = true, optional = true }
8989
walkdir = { workspace = true, optional = true }
90+
enumset = "1.1.5"
9091

9192
[features]
9293
# The default version to be expected

kernel/src/actions/mod.rs

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
//! Provides parsing and manipulation of the various actions defined in the [Delta
22
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)
33
4-
use std::any::type_name;
5-
use std::collections::{HashMap, HashSet};
6-
use std::fmt::{Debug, Display};
7-
use std::hash::Hash;
8-
use std::str::FromStr;
4+
use std::collections::HashMap;
5+
use std::fmt::Debug;
96
use std::sync::LazyLock;
107

118
use self::deletion_vector::DeletionVectorDescriptor;
129
use crate::actions::schemas::GetStructField;
1310
use crate::schema::{SchemaRef, StructType};
1411
use crate::table_features::{
15-
ReaderFeature, WriterFeature, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
12+
ensure_supported_features, ReaderFeature, WriterFeature, SUPPORTED_READER_FEATURES,
13+
SUPPORTED_WRITER_FEATURES,
1614
};
1715
use crate::table_properties::TableProperties;
1816
use crate::utils::require;
@@ -321,44 +319,6 @@ impl Protocol {
321319
}
322320
}
323321

324-
// given unparsed `table_features`, parse and check if they are subset of `supported_features`
325-
pub(crate) fn ensure_supported_features<T>(
326-
table_features: &[String],
327-
supported_features: &HashSet<T>,
328-
) -> DeltaResult<()>
329-
where
330-
<T as FromStr>::Err: Display,
331-
T: Debug + FromStr + Hash + Eq,
332-
{
333-
let error = |unsupported, unsupported_or_unknown| {
334-
let supported = supported_features.iter().collect::<Vec<_>>();
335-
let features_type = type_name::<T>()
336-
.rsplit("::")
337-
.next()
338-
.unwrap_or("table features");
339-
Error::Unsupported(format!(
340-
"{} {} {:?}. Supported {} are {:?}",
341-
unsupported_or_unknown, features_type, unsupported, features_type, supported
342-
))
343-
};
344-
let parsed_features: HashSet<T> = table_features
345-
.iter()
346-
.map(|s| T::from_str(s).map_err(|_| error(vec![s.to_string()], "Unknown")))
347-
.collect::<Result<_, Error>>()?;
348-
349-
// check that parsed features are a subset of supported features
350-
parsed_features
351-
.is_subset(supported_features)
352-
.then_some(())
353-
.ok_or_else(|| {
354-
let unsupported = parsed_features
355-
.difference(supported_features)
356-
.map(|f| format!("{:?}", f))
357-
.collect::<Vec<_>>();
358-
error(unsupported, "Unsupported")
359-
})
360-
}
361-
362322
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
363323
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
364324
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]

kernel/src/table_changes/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,19 @@
3131
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
3232
//! # Ok::<(), Error>(())
3333
//! ```
34-
use std::collections::HashSet;
3534
use std::sync::{Arc, LazyLock};
3635

3736
use scan::TableChangesScanBuilder;
3837
use url::Url;
3938

40-
use crate::actions::{ensure_supported_features, Protocol};
39+
use crate::actions::Protocol;
4140
use crate::log_segment::LogSegment;
4241
use crate::path::AsUrl;
4342
use crate::schema::{DataType, Schema, StructField, StructType};
4443
use crate::snapshot::Snapshot;
45-
use crate::table_features::{ColumnMappingMode, ReaderFeature};
44+
use crate::table_features::{
45+
ensure_supported_features, ColumnMappingMode, CDF_SUPPORTED_READER_FEATURES,
46+
};
4647
use crate::table_properties::TableProperties;
4748
use crate::utils::require;
4849
use crate::{DeltaResult, Engine, Error, Version};
@@ -252,8 +253,6 @@ fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult
252253
/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
253254
/// See the documentation of [`TableChanges`] for more details.
254255
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
255-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
256-
LazyLock::new(|| HashSet::from([ReaderFeature::DeletionVectors]));
257256
match &protocol.reader_features() {
258257
// if min_reader_version = 3 and all reader features are subset of supported => OK
259258
Some(reader_features) if protocol.min_reader_version() == 3 => {

kernel/src/table_configuration.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,14 @@
88
//! [`TableProperties`].
99
//!
1010
//! [`Schema`]: crate::schema::Schema
11-
use std::collections::HashSet;
12-
use std::sync::{Arc, LazyLock};
13-
11+
use std::sync::Arc;
1412
use url::Url;
1513

16-
use crate::actions::{ensure_supported_features, Metadata, Protocol};
14+
use crate::actions::{Metadata, Protocol};
1715
use crate::schema::{InvariantChecker, SchemaRef};
1816
use crate::table_features::{
19-
column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode, ReaderFeature,
20-
WriterFeature,
17+
column_mapping_mode, ensure_supported_features, validate_schema_column_mapping,
18+
ColumnMappingMode, ReaderFeature, WriterFeature, CDF_SUPPORTED_READER_FEATURES,
2119
};
2220
use crate::table_properties::TableProperties;
2321
use crate::{DeltaResult, Error, Version};
@@ -156,8 +154,6 @@ impl TableConfiguration {
156154
/// [`TableChanges`]: crate::table_changes::TableChanges
157155
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
158156
pub(crate) fn is_cdf_read_supported(&self) -> bool {
159-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
160-
LazyLock::new(|| HashSet::from([ReaderFeature::DeletionVectors]));
161157
let protocol_supported = match self.protocol.reader_features() {
162158
// if min_reader_version = 3 and all reader features are subset of supported => OK
163159
Some(reader_features) if self.protocol.min_reader_version() == 3 => {

kernel/src/table_features/mod.rs

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use std::collections::HashSet;
2-
use std::sync::LazyLock;
3-
4-
use serde::{Deserialize, Serialize};
1+
use crate::{DeltaResult, Error};
2+
use enumset::{enum_set, EnumSet, EnumSetType};
3+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
4+
use std::fmt::Display;
55
use strum::{AsRefStr, Display as StrumDisplay, EnumString, VariantNames};
66

77
pub(crate) use column_mapping::column_mapping_mode;
@@ -18,14 +18,12 @@ mod column_mapping;
1818
Serialize,
1919
Deserialize,
2020
Debug,
21-
Clone,
22-
Eq,
23-
PartialEq,
2421
EnumString,
2522
StrumDisplay,
2623
AsRefStr,
2724
VariantNames,
2825
Hash,
26+
EnumSetType,
2927
)]
3028
#[strum(serialize_all = "camelCase")]
3129
#[serde(rename_all = "camelCase")]
@@ -59,14 +57,12 @@ pub enum ReaderFeature {
5957
Serialize,
6058
Deserialize,
6159
Debug,
62-
Clone,
63-
Eq,
64-
PartialEq,
6560
EnumString,
6661
StrumDisplay,
6762
AsRefStr,
6863
VariantNames,
6964
Hash,
65+
EnumSetType,
7066
)]
7167
#[strum(serialize_all = "camelCase")]
7268
#[serde(rename_all = "camelCase")]
@@ -123,30 +119,51 @@ impl From<WriterFeature> for String {
123119
}
124120
}
125121

126-
pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeature>> =
127-
LazyLock::new(|| {
128-
HashSet::from([
129-
ReaderFeature::ColumnMapping,
130-
ReaderFeature::DeletionVectors,
131-
ReaderFeature::TimestampWithoutTimezone,
132-
ReaderFeature::TypeWidening,
133-
ReaderFeature::TypeWideningPreview,
134-
ReaderFeature::VacuumProtocolCheck,
135-
ReaderFeature::V2Checkpoint,
136-
])
137-
});
138-
139-
pub(crate) static SUPPORTED_WRITER_FEATURES: LazyLock<HashSet<WriterFeature>> =
122+
pub(crate) static SUPPORTED_READER_FEATURES: EnumSet<ReaderFeature> = enum_set!(
123+
ReaderFeature::ColumnMapping
124+
| ReaderFeature::DeletionVectors
125+
| ReaderFeature::TimestampWithoutTimezone
126+
| ReaderFeature::TypeWidening
127+
| ReaderFeature::TypeWideningPreview
128+
| ReaderFeature::VacuumProtocolCheck
129+
| ReaderFeature::V2Checkpoint
130+
);
131+
132+
pub(crate) static SUPPORTED_WRITER_FEATURES: EnumSet<WriterFeature> =
140133
// note: we 'support' Invariants, but only insofar as we check that they are not present.
141134
// we support writing to tables that have Invariants enabled but not used. similarly, we only
142135
// support DeletionVectors in that we never write them (no DML).
143-
LazyLock::new(|| {
144-
HashSet::from([
145-
WriterFeature::AppendOnly,
146-
WriterFeature::DeletionVectors,
147-
WriterFeature::Invariants,
148-
])
149-
});
136+
enum_set!(
137+
WriterFeature::AppendOnly | WriterFeature::DeletionVectors | WriterFeature::Invariants
138+
);
139+
140+
pub(crate) static CDF_SUPPORTED_READER_FEATURES: EnumSet<ReaderFeature> =
141+
enum_set!(ReaderFeature::DeletionVectors);
142+
143+
pub(crate) fn ensure_supported_features<F>(
144+
features: &[String],
145+
supported: &EnumSet<F>,
146+
) -> DeltaResult<()>
147+
where
148+
F: DeserializeOwned + EnumSetType + Display,
149+
{
150+
for feature_str in features {
151+
match serde_json::from_str::<F>(feature_str) {
152+
Ok(feature_enum) => {
153+
if !supported.contains(feature_enum) {
154+
return Err(Error::Unsupported(format!(
155+
"Unsupported feature: {}",
156+
feature_enum
157+
)));
158+
}
159+
}
160+
Err(err) => {
161+
return Err(Error::Unsupported(format!("Unknown feature: {}", err)));
162+
}
163+
}
164+
}
165+
Ok(())
166+
}
150167

151168
#[cfg(test)]
152169
mod tests {

0 commit comments

Comments
 (0)