Skip to content

Commit cf03941

Browse files
sanujbasuSanuj Basu
authored andcommitted
feat: Add domain metadata support for create-table transactions
Adds support for user and system domain metadata (domains with 'delta.' prefix) during table creation. This enables features like clustering to be configured at table creation time. Invariants are validated and feature checks performed. Changes: - Refactors generate_domain_metadata_actions to consume domain metadata actions passed down by the create table builder. - Refactors validate_user_domain_operations() to validate_domain_metadata_operations() which enforces a myriad of domain metadata invariants. - Adds validate_system_domain_feature to make sure relevant features are supported when row tracking and clustering domain metadata are pushed down. Clustered table creation support will be added in a stacked PR. The PR with push down the domain metadata for clustered tables into the log. Integration tests to be added in kernel/tests/create_table.rs once clustered table creation support is implement since validation testing requires feature allow listing.
1 parent d09e389 commit cf03941

File tree

2 files changed

+139
-61
lines changed

2 files changed

+139
-61
lines changed

kernel/src/row_tracking.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ pub(crate) struct RowTrackingDomainMetadata {
1616
row_id_high_water_mark: i64,
1717
}
1818

19-
impl RowTrackingDomainMetadata {
20-
const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking";
19+
/// The domain name for row tracking metadata.
20+
pub(crate) const ROW_TRACKING_DOMAIN_NAME: &str = "delta.rowTracking";
2121

22+
impl RowTrackingDomainMetadata {
2223
pub(crate) fn new(row_id_high_water_mark: i64) -> Self {
2324
RowTrackingDomainMetadata {
2425
row_id_high_water_mark,
@@ -45,14 +46,16 @@ impl RowTrackingDomainMetadata {
4546
snapshot: &Snapshot,
4647
engine: &dyn Engine,
4748
) -> DeltaResult<Option<i64>> {
48-
Ok(domain_metadata_configuration(
49-
snapshot.log_segment(),
50-
Self::ROW_TRACKING_DOMAIN_NAME,
51-
engine,
52-
)?
53-
.map(|domain_metadata| serde_json::from_str::<Self>(&domain_metadata))
54-
.transpose()?
55-
.map(|metadata| metadata.row_id_high_water_mark))
49+
Ok(
50+
domain_metadata_configuration(
51+
snapshot.log_segment(),
52+
ROW_TRACKING_DOMAIN_NAME,
53+
engine,
54+
)?
55+
.map(|domain_metadata| serde_json::from_str::<Self>(&domain_metadata))
56+
.transpose()?
57+
.map(|metadata| metadata.row_id_high_water_mark),
58+
)
5659
}
5760
}
5861

@@ -61,7 +64,7 @@ impl TryFrom<RowTrackingDomainMetadata> for DomainMetadata {
6164

6265
fn try_from(metadata: RowTrackingDomainMetadata) -> DeltaResult<Self> {
6366
Ok(DomainMetadata::new(
64-
RowTrackingDomainMetadata::ROW_TRACKING_DOMAIN_NAME.to_string(),
67+
ROW_TRACKING_DOMAIN_NAME.to_string(),
6568
serde_json::to_string(&metadata)?,
6669
))
6770
}

kernel/src/transaction/mod.rs

Lines changed: 125 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use crate::error::Error;
2020
use crate::expressions::{column_name, ColumnName};
2121
use crate::expressions::{ArrayData, Scalar, StructData, Transform, UnaryExpressionOp::ToJson};
2222
use crate::path::{LogRoot, ParsedLogPath};
23-
use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor};
23+
use crate::row_tracking::{
24+
RowTrackingDomainMetadata, RowTrackingVisitor, ROW_TRACKING_DOMAIN_NAME,
25+
};
2426
use crate::scan::data_skipping::stats_schema::NullableStatsTransform;
2527
use crate::scan::log_replay::{
2628
get_scan_metadata_transform_expr, BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME,
@@ -647,19 +649,49 @@ impl Transaction {
647649
// PRE_COMMIT_VERSION (u64::MAX) + 1 wraps to 0, which is the correct first version
648650
self.read_snapshot.version().wrapping_add(1)
649651
}
650-
/// Validate that user domains don't conflict with system domains or each other.
651-
fn validate_user_domain_operations(&self) -> DeltaResult<()> {
652+
/// Validate domain metadata operations for both create-table and existing-table transactions.
653+
///
654+
/// Enforces the following rules:
655+
/// - DomainMetadata feature must be supported if any domain operations are present
656+
/// - System domains (delta.*) can only be added in create-table transactions
657+
/// - System domains must correspond to a known feature (e.g., rowTracking) and that feature must be enabled
658+
/// - User domains can be added in both create-table and existing-table transactions
659+
/// - Domain removals are not allowed in create-table transactions
660+
/// - No duplicate domains within a single transaction
661+
fn validate_domain_metadata_operations(&self) -> DeltaResult<()> {
662+
// Feature validation (applies to all transactions with domain operations)
663+
if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty())
664+
&& !self
665+
.read_snapshot
666+
.table_configuration()
667+
.is_feature_supported(&TableFeature::DomainMetadata)
668+
{
669+
return Err(Error::unsupported(
670+
"Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature",
671+
));
672+
}
673+
674+
let is_create = self.is_create_table();
652675
let mut seen_domains = HashSet::new();
653676

654677
// Validate domain additions
655678
for dm in &self.domain_metadata_additions {
656679
let domain = dm.domain();
657-
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
680+
let is_system_domain = domain.starts_with(INTERNAL_DOMAIN_PREFIX);
681+
682+
// System domains (delta.*) only allowed in create-table
683+
if is_system_domain && !is_create {
658684
return Err(Error::generic(
659685
"Cannot modify domains that start with 'delta.' as those are system controlled",
660686
));
661687
}
662688

689+
// For create-table, validate system domains against their required features
690+
if is_system_domain && is_create {
691+
self.validate_system_domain_feature(domain)?;
692+
}
693+
694+
// Check for duplicates
663695
if !seen_domains.insert(domain) {
664696
return Err(Error::generic(format!(
665697
"Metadata for domain {} already specified in this transaction",
@@ -668,7 +700,14 @@ impl Transaction {
668700
}
669701
}
670702

671-
// Validate domain removals
703+
// No removals allowed for create-table
704+
if is_create && !self.domain_removals.is_empty() {
705+
return Err(Error::unsupported(
706+
"Domain metadata removals are not supported in create-table transactions",
707+
));
708+
}
709+
710+
// Validate domain removals (for non-create-table)
672711
for domain in &self.domain_removals {
673712
if domain.starts_with(INTERNAL_DOMAIN_PREFIX) {
674713
return Err(Error::generic(
@@ -687,6 +726,39 @@ impl Transaction {
687726
Ok(())
688727
}
689728

729+
/// Validate that a system domain corresponds to a known feature and that the feature is supported.
730+
///
731+
/// This prevents arbitrary `delta.*` domains from being added during table creation.
732+
/// Each known system domain must have its corresponding feature enabled in the protocol.
733+
fn validate_system_domain_feature(&self, domain: &str) -> DeltaResult<()> {
734+
let table_config = self.read_snapshot.table_configuration();
735+
736+
// Map domain to its required feature
737+
let required_feature = match domain {
738+
ROW_TRACKING_DOMAIN_NAME => Some(TableFeature::RowTracking),
739+
// Will be changed to a constant in a follow up clustering create table feature PR
740+
"delta.clustering" => Some(TableFeature::ClusteredTable),
741+
_ => {
742+
return Err(Error::generic(format!(
743+
"Unknown system domain '{}'. Only known system domains are allowed.",
744+
domain
745+
)));
746+
}
747+
};
748+
749+
// If the domain requires a feature, validate it's supported
750+
if let Some(feature) = required_feature {
751+
if !table_config.is_feature_supported(&feature) {
752+
return Err(Error::generic(format!(
753+
"System domain '{}' requires the '{}' feature to be enabled",
754+
domain, feature
755+
)));
756+
}
757+
}
758+
759+
Ok(())
760+
}
761+
690762
/// Helper function to convert scan metadata iterator to filtered engine data iterator.
691763
///
692764
/// This adapter extracts the `scan_files` field from each [`crate::scan::ScanMetadata`] item,
@@ -825,6 +897,36 @@ impl Transaction {
825897
Ok(())
826898
}
827899

900+
/// Generate removal actions for user domain metadata by scanning the log.
901+
///
902+
/// This performs an expensive log replay operation to fetch the previous configuration
903+
/// value for each domain being removed, as required by the Delta spec for tombstones.
904+
/// Returns an empty vector if there are no domain removals.
905+
fn generate_user_domain_removal_actions(
906+
&self,
907+
engine: &dyn Engine,
908+
) -> DeltaResult<Vec<DomainMetadata>> {
909+
if self.domain_removals.is_empty() {
910+
return Ok(vec![]);
911+
}
912+
913+
// Scan log to fetch existing configurations for tombstones
914+
let existing_domains =
915+
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?;
916+
917+
// Create removal tombstones with pre-image configurations
918+
Ok(self
919+
.domain_removals
920+
.iter()
921+
.filter_map(|domain| {
922+
// If domain doesn't exist in the log, this is a no-op (filter it out)
923+
existing_domains.get(domain).map(|existing| {
924+
DomainMetadata::remove(domain.clone(), existing.configuration().to_owned())
925+
})
926+
})
927+
.collect())
928+
}
929+
828930
/// Generate domain metadata actions with validation. Handle both user and system domains.
829931
///
830932
/// This function may perform an expensive log replay operation if there are any domain removals.
@@ -835,55 +937,28 @@ impl Transaction {
835937
engine: &'a dyn Engine,
836938
row_tracking_high_watermark: Option<RowTrackingDomainMetadata>,
837939
) -> DeltaResult<EngineDataResultIterator<'a>> {
838-
// For create-table transactions, domain metadata will be added in
839-
// a subsequent code commit
840-
if self.is_create_table() {
841-
if !self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty() {
842-
return Err(Error::unsupported(
843-
"Domain metadata operations are not supported in create-table transactions",
940+
let is_create = self.is_create_table();
941+
942+
// Validate domain operations (includes feature validation)
943+
self.validate_domain_metadata_operations()?;
944+
945+
// TODO(sanuj) Create-table must not have row tracking or removals
946+
// Defensive. Needs to be updated when row tracking support is added.
947+
if is_create {
948+
if row_tracking_high_watermark.is_some() {
949+
return Err(Error::internal_error(
950+
"CREATE TABLE cannot have row tracking domain metadata",
844951
));
845952
}
846-
return Ok(Box::new(iter::empty()));
953+
// domain_removals already validated above, but be explicit
954+
debug_assert!(self.domain_removals.is_empty());
847955
}
848956

849-
// Validate feature support for user domain operations
850-
if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty())
851-
&& !self
852-
.read_snapshot
853-
.table_configuration()
854-
.is_feature_supported(&TableFeature::DomainMetadata)
855-
{
856-
return Err(Error::unsupported("Domain metadata operations require writer version 7 and the 'domainMetadata' writer feature"));
857-
}
858-
859-
// Validate user domain operations
860-
self.validate_user_domain_operations()?;
861-
862-
// Generate user domain removals via log replay (expensive if non-empty)
863-
let removal_actions = if !self.domain_removals.is_empty() {
864-
// Scan log to fetch existing configurations for tombstones
865-
let existing_domains =
866-
scan_domain_metadatas(self.read_snapshot.log_segment(), None, engine)?;
867-
868-
// Create removal tombstones with pre-image configurations
869-
let removals: Vec<_> = self
870-
.domain_removals
871-
.iter()
872-
.filter_map(|domain| {
873-
// If domain doesn't exist in the log, this is a no-op (filter it out)
874-
existing_domains.get(domain).map(|existing| {
875-
DomainMetadata::remove(domain.clone(), existing.configuration().to_owned())
876-
})
877-
})
878-
.collect();
879-
880-
removals
881-
} else {
882-
vec![]
883-
};
957+
// Generate removal actions (empty for create-table due to validation above)
958+
let removal_actions = self.generate_user_domain_removal_actions(engine)?;
884959

885-
// Generate system domain actions (row tracking)
886-
let system_domain_actions = row_tracking_high_watermark
960+
// Generate row tracking domain action (None for create-table)
961+
let row_tracking_domain_action = row_tracking_high_watermark
887962
.map(DomainMetadata::try_from)
888963
.transpose()?
889964
.into_iter();
@@ -894,7 +969,7 @@ impl Transaction {
894969
.clone()
895970
.into_iter()
896971
.chain(removal_actions)
897-
.chain(system_domain_actions)
972+
.chain(row_tracking_domain_action)
898973
.map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)),
899974
))
900975
}

0 commit comments

Comments
 (0)