diff --git a/.gitignore b/.gitignore index fcc5024b5f..d62e922012 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ .vscode/ .vim .zed +.cache/ +.clangd # Rust target/ diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index f083611c03..567b547a40 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::fs::{create_dir_all, write}; +use std::fs::create_dir_all; use std::path::Path; use std::process::ExitCode; use std::sync::Arc; @@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches; use clap::Parser; use common::{LocationArgs, ParseWithExamples}; use itertools::Itertools; -use serde_json::{json, to_vec}; use url::Url; -use uuid::Uuid; use delta_kernel::arrow::array::TimestampMicrosecondArray; use delta_kernel::committer::FileSystemCommitter; @@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt}; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder}; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; +use delta_kernel::transaction::create_table::create_table as create_delta_table; use delta_kernel::transaction::{CommitResult, RetryableTransaction}; use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef}; @@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot( // Create new table println!("Creating new Delta table..."); let schema = parse_schema(schema_str)?; - create_table(url, &schema).await?; + create_table(url, &schema, engine).await?; Snapshot::builder_for(url.clone()).build(engine) } } @@ -192,66 +191,13 @@ fn parse_schema(schema_str: &str) -> DeltaResult { Ok(Arc::new(StructType::try_new(fields)?)) } -/// Create a new Delta table with the given schema. -/// -/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the -/// initial transaction log. -async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> { - let table_id = Uuid::new_v4().to_string(); - let schema_str = serde_json::to_string(&schema)?; - - let (reader_features, writer_features) = { - let reader_features: Vec<&'static str> = vec![]; - let writer_features: Vec<&'static str> = vec![]; - - // TODO: Support adding specific table features - (reader_features, writer_features) - }; - - let protocol = json!({ - "protocol": { - "minReaderVersion": 3, - "minWriterVersion": 7, - "readerFeatures": reader_features, - "writerFeatures": writer_features, - } - }); - let partition_columns: Vec = vec![]; - let metadata = json!({ - "metaData": { - "id": table_id, - "format": { - "provider": "parquet", - "options": {} - }, - "schemaString": schema_str, - "partitionColumns": partition_columns, - "configuration": {}, - "createdTime": 1677811175819u64 - } - }); - - let data = [ - to_vec(&protocol).unwrap(), - b"\n".to_vec(), - to_vec(&metadata).unwrap(), - ] - .concat(); - - // Write the initial transaction with protocol and metadata to 0.json - let delta_log_path = table_url - .join("_delta_log/")? - .to_file_path() - .map_err(|_e| Error::generic("URL cannot be converted to local file path"))?; - let file_path = delta_log_path.join("00000000000000000000.json"); - - // Create the _delta_log directory if it doesn't exist - create_dir_all(&delta_log_path) - .map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?; - - // Write the file using standard filesystem operations - write(&file_path, data) - .map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?; +/// Create a new Delta table with the given schema using the official CreateTable API. +async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> { + // Use the create_table API to create the table + let table_path = table_url.as_str(); + let _result = create_delta_table(table_path, schema.clone(), "write-table-example/1.0") + .build(engine, Box::new(FileSystemCommitter::new()))? + .commit(engine)?; println!("✓ Created Delta table with schema: {schema:#?}"); Ok(()) diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index d27a1c915a..86ce5e9ce5 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -175,6 +175,11 @@ pub mod engine; /// Delta table version is 8 byte unsigned int pub type Version = u64; + +/// Sentinel version indicating a pre-commit state (table does not exist yet). +/// Used for create-table transactions before the first commit. +pub const PRE_COMMIT_VERSION: Version = u64::MAX; + pub type FileSize = u64; pub type FileIndex = u64; diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 0a4aca7a34..e1394b43cf 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -19,7 +19,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _} use crate::utils::require; use crate::{ DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor, - StorageHandler, Version, + StorageHandler, Version, PRE_COMMIT_VERSION, }; use delta_kernel_derive::internal_api; @@ -77,6 +77,27 @@ pub(crate) struct LogSegment { } impl LogSegment { + /// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table). + /// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk. + /// This is used to construct a pre-commit snapshot that provides table configuration + /// (protocol, metadata, schema) for operations like CTAS. + #[allow(dead_code)] // Used by create_table module + pub(crate) fn for_pre_commit(log_root: Url) -> Self { + use crate::PRE_COMMIT_VERSION; + Self { + end_version: PRE_COMMIT_VERSION, + checkpoint_version: None, + log_root, + ascending_commit_files: vec![], + ascending_compaction_files: vec![], + checkpoint_parts: vec![], + latest_crc_file: None, + latest_commit_file: None, + checkpoint_schema: None, + max_published_version: None, + } + } + #[internal_api] pub(crate) fn try_new( listed_files: ListedLogFiles, @@ -734,8 +755,12 @@ impl LogSegment { self.read_actions(engine, schema, META_PREDICATE.clone()) } - /// How many commits since a checkpoint, according to this log segment + /// How many commits since a checkpoint, according to this log segment. + /// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION). pub(crate) fn commits_since_checkpoint(&self) -> u64 { + if self.end_version == PRE_COMMIT_VERSION { + return 0; + } // we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0` // is the correct number of commits since a checkpoint if there are no checkpoints let checkpoint_version = self.checkpoint_version.unwrap_or(0); @@ -743,8 +768,12 @@ impl LogSegment { self.end_version - checkpoint_version } - /// How many commits since a log-compaction or checkpoint, according to this log segment + /// How many commits since a log-compaction or checkpoint, according to this log segment. + /// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION). pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 { + if self.end_version == PRE_COMMIT_VERSION { + return 0; + } // Annoyingly we have to search all the compaction files to determine this, because we only // sort by start version, so technically the max end version could be anywhere in the vec. // We can return 0 in the case there is no compaction since end_version - 0 is the correct diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 195b64f31d..c48dc619d7 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -510,7 +510,7 @@ impl Snapshot { /// Create a [`Transaction`] for this `SnapshotRef`. With the specified [`Committer`]. pub fn transaction(self: Arc, committer: Box) -> DeltaResult { - Transaction::try_new(self, committer) + Transaction::try_new_existing_table(self, committer) } /// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated diff --git a/kernel/src/table_features/mod.rs b/kernel/src/table_features/mod.rs index 4bb563488e..0c947325d4 100644 --- a/kernel/src/table_features/mod.rs +++ b/kernel/src/table_features/mod.rs @@ -16,6 +16,14 @@ pub(crate) use timestamp_ntz::validate_timestamp_ntz_feature_support; mod column_mapping; mod timestamp_ntz; +/// Minimum reader version for tables that use table features. +/// When set to 3, the protocol requires an explicit `readerFeatures` array. +pub const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3; + +/// Minimum writer version for tables that use table features. +/// When set to 7, the protocol requires an explicit `writerFeatures` array. +pub const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7; + /// Table features represent protocol capabilities required to correctly read or write a given table. /// - Readers must implement all features required for correct table reads. /// - Writers must implement all features required for correct table writes. diff --git a/kernel/src/transaction/create_table.rs b/kernel/src/transaction/create_table.rs new file mode 100644 index 0000000000..617cd2c2ab --- /dev/null +++ b/kernel/src/transaction/create_table.rs @@ -0,0 +1,266 @@ +//! Create table transaction implementation (internal API). +//! +//! This module provides a type-safe API for creating Delta tables. +//! Use the [`create_table`] function to get a [`CreateTableTransactionBuilder`] that can be +//! configured before building the [`Transaction`]. + +// Allow `pub` items in this module even though the module itself may be `pub(crate)`. +// The module visibility controls external access; items are `pub` for use within the crate +// and for tests. Also allow dead_code since these are used by integration tests. +#![allow(unreachable_pub, dead_code)] + +use std::collections::HashMap; +use std::sync::Arc; + +use url::Url; + +use crate::actions::{Metadata, Protocol}; +use crate::committer::Committer; +use crate::log_segment::LogSegment; +use crate::schema::SchemaRef; +use crate::snapshot::Snapshot; +use crate::table_configuration::TableConfiguration; +use crate::table_features::{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION}; +use crate::transaction::Transaction; +use crate::utils::{current_time_ms, try_parse_uri}; +use crate::{DeltaResult, Engine, Error, StorageHandler, PRE_COMMIT_VERSION}; + +/// Ensures that no Delta table exists at the given path. +/// +/// This function checks the `_delta_log` directory to determine if a table already exists. +/// It handles various storage backend behaviors gracefully: +/// - If the directory doesn't exist (FileNotFound), returns Ok (new table can be created) +/// - If the directory exists but is empty, returns Ok (new table can be created) +/// - If the directory contains files, returns an error (table already exists) +/// - For other errors (permissions, network), propagates the error +/// +/// # Arguments +/// * `storage` - The storage handler to use for listing +/// * `delta_log_url` - URL to the `_delta_log` directory +/// * `table_path` - Original table path (for error messages) +fn ensure_table_does_not_exist( + storage: &dyn StorageHandler, + delta_log_url: &Url, + table_path: &str, +) -> DeltaResult<()> { + match storage.list_from(delta_log_url) { + Ok(mut files) => { + // files.next() returns Option> + // - Some(Ok(_)) means a file exists -> table exists + // - Some(Err(FileNotFound)) means path doesn't exist -> OK for new table + // - Some(Err(other)) means real error -> propagate + // - None means empty iterator -> OK for new table + match files.next() { + Some(Ok(_)) => Err(Error::generic(format!( + "Table already exists at path: {}", + table_path + ))), + Some(Err(Error::FileNotFound(_))) | None => { + // Path doesn't exist or empty - OK for new table + Ok(()) + } + Some(Err(e)) => { + // Real error (permissions, network, etc.) - propagate + Err(e) + } + } + } + Err(Error::FileNotFound(_)) => { + // Directory doesn't exist - this is expected for a new table. + // The storage layer will create the full path (including _delta_log/) + // when the commit writes the first log file via write_json_file(). + Ok(()) + } + Err(e) => { + // Real error - propagate + Err(e) + } + } +} + +/// Creates a builder for creating a new Delta table. +/// +/// This function returns a [`CreateTableTransactionBuilder`] that can be configured with table +/// properties and other options before building the transaction. +/// +/// # Arguments +/// +/// * `path` - The file system path where the Delta table will be created +/// * `schema` - The schema for the new table +/// * `engine_info` - Information about the engine creating the table (e.g., "MyApp/1.0") +/// +/// # Example +/// +/// ```no_run +/// use std::sync::Arc; +/// use delta_kernel::transaction::create_table::create_table; +/// use delta_kernel::schema::{DataType, StructField, StructType}; +/// use delta_kernel::committer::FileSystemCommitter; +/// use delta_kernel::engine::default::DefaultEngineBuilder; +/// use delta_kernel::engine::default::storage::store_from_url; +/// +/// # fn main() -> delta_kernel::DeltaResult<()> { +/// let schema = Arc::new(StructType::new_unchecked(vec![ +/// StructField::new("id", DataType::INTEGER, false), +/// StructField::new("name", DataType::STRING, true), +/// ])); +/// +/// let url = url::Url::parse("file:///tmp/my_table")?; +/// let engine = DefaultEngineBuilder::new(store_from_url(&url)?).build(); +/// +/// let transaction = create_table("/tmp/my_table", schema, "MyApp/1.0") +/// .build(&engine, Box::new(FileSystemCommitter::new()))?; +/// +/// // Commit the transaction to create the table +/// transaction.commit(&engine)?; +/// # Ok(()) +/// # } +/// ``` +pub fn create_table( + path: impl AsRef, + schema: SchemaRef, + engine_info: impl Into, +) -> CreateTableTransactionBuilder { + CreateTableTransactionBuilder::new(path, schema, engine_info) +} + +/// Builder for configuring a new Delta table. +/// +/// Use this to configure table properties before building a [`Transaction`]. +/// If the table build fails, no transaction will be created. +/// +/// Created via [`create_table`]. +pub struct CreateTableTransactionBuilder { + path: String, + schema: SchemaRef, + engine_info: String, + table_properties: HashMap, +} + +impl CreateTableTransactionBuilder { + /// Creates a new CreateTableTransactionBuilder. + /// + /// This is typically called via [`create_table`] rather than directly. + pub fn new(path: impl AsRef, schema: SchemaRef, engine_info: impl Into) -> Self { + Self { + path: path.as_ref().to_string(), + schema, + engine_info: engine_info.into(), + table_properties: HashMap::new(), + } + } + + /// Builds a [`Transaction`] that can be committed to create the table. + /// + /// This method performs validation: + /// - Checks that the table path is valid + /// - Verifies the table doesn't already exist + /// - Validates the schema is non-empty + /// + /// # Arguments + /// + /// * `engine` - The engine instance to use for validation + /// * `committer` - The committer to use for the transaction + /// + /// # Errors + /// + /// Returns an error if: + /// - The table path is invalid + /// - A table already exists at the given path + /// - The schema is empty + pub fn build( + self, + engine: &dyn Engine, + committer: Box, + ) -> DeltaResult { + // Validate path + let table_url = try_parse_uri(&self.path)?; + + // Validate schema is non-empty + if self.schema.fields().len() == 0 { + return Err(Error::generic("Schema cannot be empty")); + } + + // Check if table already exists by looking for _delta_log directory + let delta_log_url = table_url.join("_delta_log/")?; + let storage = engine.storage_handler(); + ensure_table_does_not_exist(storage.as_ref(), &delta_log_url, &self.path)?; + + // Create Protocol action with table features support + let protocol = Protocol::try_new( + TABLE_FEATURES_MIN_READER_VERSION, + TABLE_FEATURES_MIN_WRITER_VERSION, + Some(Vec::::new()), // readerFeatures (empty for now) + Some(Vec::::new()), // writerFeatures (empty for now) + )?; + + // Create Metadata action + let metadata = Metadata::try_new( + None, // name + None, // description + (*self.schema).clone(), + Vec::new(), // partition_columns - added with data layout support + current_time_ms()?, + self.table_properties, + )?; + + // Create pre-commit snapshot from protocol/metadata + let log_root = table_url.join("_delta_log/")?; + let log_segment = LogSegment::for_pre_commit(log_root); + + // We validate that the table properties are empty. Supported + // table properties for create table will be allowlisted in the future. + assert!( + metadata.configuration().is_empty(), + "Base create table API does not support table properties" + ); + let table_configuration = + TableConfiguration::try_new(metadata, protocol, table_url, PRE_COMMIT_VERSION)?; + + // Create Transaction with pre-commit snapshot + Transaction::try_new_create_table( + Arc::new(Snapshot::new(log_segment, table_configuration)), + self.engine_info, + committer, + vec![], // system_domain_metadata - not supported in base API + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::schema::{DataType, StructField, StructType}; + use std::sync::Arc; + + fn test_schema() -> SchemaRef { + Arc::new(StructType::new_unchecked(vec![StructField::new( + "id", + DataType::INTEGER, + false, + )])) + } + + #[test] + fn test_basic_builder_creation() { + let schema = test_schema(); + let builder = + CreateTableTransactionBuilder::new("/path/to/table", schema.clone(), "TestApp/1.0"); + + assert_eq!(builder.path, "/path/to/table"); + assert_eq!(builder.engine_info, "TestApp/1.0"); + assert!(builder.table_properties.is_empty()); + } + + #[test] + fn test_nested_path_builder_creation() { + let schema = test_schema(); + let builder = CreateTableTransactionBuilder::new( + "/path/to/table/nested", + schema.clone(), + "TestApp/1.0", + ); + + assert_eq!(builder.path, "/path/to/table/nested"); + } +} diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index a28775909e..687ea429b5 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -8,9 +8,10 @@ use url::Url; use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::actions::deletion_vector::DeletionVectorPath; use crate::actions::{ - as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_add_schema, - get_log_commit_info_schema, get_log_domain_metadata_schema, get_log_remove_schema, - get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, INTERNAL_DOMAIN_PREFIX, + as_log_add_schema, domain_metadata::scan_domain_metadatas, get_commit_schema, + get_log_add_schema, get_log_commit_info_schema, get_log_domain_metadata_schema, + get_log_remove_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction, + INTERNAL_DOMAIN_PREFIX, METADATA_NAME, PROTOCOL_NAME, }; use crate::committer::{CommitMetadata, CommitResponse, Committer}; use crate::engine_data::FilteredEngineData; @@ -35,10 +36,15 @@ use crate::utils::{current_time_ms, require}; use crate::FileMeta; use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, SchemaTransform, Version, + RowVisitor, SchemaTransform, Version, PRE_COMMIT_VERSION, }; use delta_kernel_derive::internal_api; +#[cfg(feature = "internal-api")] +pub mod create_table; +#[cfg(not(feature = "internal-api"))] +pub(crate) mod create_table; + /// Type alias for an iterator of [`EngineData`] results. pub(crate) type EngineDataResultIterator<'a> = Box>> + Send + 'a>; @@ -238,6 +244,8 @@ fn new_dv_column_schema() -> &'static SchemaRef { /// txn.commit(&engine)?; /// ``` pub struct Transaction { + // The snapshot this transaction is based on. For create-table transactions, + // this is a pre-commit snapshot with PRE_COMMIT_VERSION. read_snapshot: SnapshotRef, committer: Box, operation: Option, @@ -267,22 +275,27 @@ pub struct Transaction { impl std::fmt::Debug for Transaction { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let version_info = if self.is_create_table() { + "create_table".to_string() + } else { + format!("{}", self.read_snapshot.version()) + }; f.write_str(&format!( "Transaction {{ read_snapshot version: {}, engine_info: {} }}", - self.read_snapshot.version(), + version_info, self.engine_info.is_some() )) } } impl Transaction { - /// Create a new transaction from a snapshot. The snapshot will be used to read the current - /// state of the table (e.g. to read the current version). + /// Create a new transaction from a snapshot for an existing table. The snapshot will be used + /// to read the current state of the table (e.g. to read the current version). /// /// Instead of using this API, the more typical (user-facing) API is /// [Snapshot::transaction](crate::snapshot::Snapshot::transaction) to create a transaction from /// a snapshot. - pub(crate) fn try_new( + pub(crate) fn try_new_existing_table( snapshot: impl Into, committer: Box, ) -> DeltaResult { @@ -296,7 +309,7 @@ impl Transaction { let commit_timestamp = current_time_ms()?; Ok(Transaction { - read_snapshot: read_snapshot.clone(), + read_snapshot, committer, operation: None, engine_info: None, @@ -311,6 +324,41 @@ impl Transaction { }) } + /// Create a new transaction for creating a new table. This is used when the table doesn't + /// exist yet and we need to create it with Protocol and Metadata actions. + /// + /// The `pre_commit_snapshot` is a synthetic snapshot created from the protocol and metadata + /// that will be committed. It uses `PRE_COMMIT_VERSION` as a sentinel to indicate no + /// version exists yet on disk. + /// + /// This is typically called via `CreateTableTransactionBuilder::build()` rather than directly. + #[allow(dead_code)] // Used by create_table module + pub(crate) fn try_new_create_table( + pre_commit_snapshot: SnapshotRef, + engine_info: String, + committer: Box, + system_domain_metadata: Vec, + ) -> DeltaResult { + // TODO(sanuj) Today transactions expect a read snapshot to be passed in and we pass + // in the pre_commit_snapshot for CREATE. To support other operations such as ALTERs + // there might be cleaner alternatives which can clearly disambiguate b/w a snapshot + // the was read vs the effective snapshot we will use for the commit. + Ok(Transaction { + read_snapshot: pre_commit_snapshot, + committer, + operation: Some("CREATE TABLE".to_string()), + engine_info: Some(engine_info), + add_files_metadata: vec![], + remove_files_metadata: vec![], + set_transactions: vec![], + commit_timestamp: current_time_ms()?, + domain_metadata_additions: system_domain_metadata, + domain_removals: vec![], + data_change: true, + dv_matched_files: vec![], + }) + } + /// Set the committer that will be used to commit this transaction. If not set, the default /// filesystem-based committer will be used. Note that the default committer is only allowed /// for non-catalog-managed tables. That is, you _must_ provide a committer via this API in @@ -348,11 +396,13 @@ impl Transaction { ))); } + // CDF check only applies to existing tables (not create table) // If there are add and remove files with data change in the same transaction, we block it. // This is because kernel does not yet have a way to discern DML operations. For DML // operations that perform updates on rows, ChangeDataFeed requires that a `cdc` file be // written to the delta log. - if !self.add_files_metadata.is_empty() + if !self.is_create_table() + && !self.add_files_metadata.is_empty() && !self.remove_files_metadata.is_empty() && self.data_change { @@ -380,41 +430,54 @@ impl Transaction { .map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine)); // Step 2: Construct commit info with ICT if enabled - let in_commit_timestamp = - self.read_snapshot - .get_in_commit_timestamp(engine)? - .map(|prev_ict| { - // The Delta protocol requires the timestamp to be "the larger of two values": - // - The time at which the writer attempted the commit (current_time) - // - One millisecond later than the previous commit's inCommitTimestamp (last_commit_timestamp + 1) - self.commit_timestamp.max(prev_ict + 1) - }); let commit_info = CommitInfo::new( self.commit_timestamp, - in_commit_timestamp, + self.get_in_commit_timestamp(engine)?, self.operation.clone(), self.engine_info.clone(), ); let commit_info_action = commit_info.into_engine_data(get_log_commit_info_schema().clone(), engine); - // Step 3: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark) - let commit_version = self.read_snapshot.version() + 1; + // Step 3: Generate Protocol and Metadata actions for create-table + let (protocol_action, metadata_action) = if self.is_create_table() { + let table_config = self.read_snapshot.table_configuration(); + let protocol = table_config.protocol().clone(); + let metadata = table_config.metadata().clone(); + + let protocol_schema = get_commit_schema().project(&[PROTOCOL_NAME])?; + let metadata_schema = get_commit_schema().project(&[METADATA_NAME])?; + + let protocol_data = protocol.into_engine_data(protocol_schema, engine)?; + let metadata_data = metadata.into_engine_data(metadata_schema, engine)?; + + (Some(protocol_data), Some(metadata_data)) + } else { + (None, None) + }; + + // Step 4: Generate add actions and get data for domain metadata actions (e.g. row tracking high watermark) + let commit_version = self.get_commit_version(); let (add_actions, row_tracking_domain_metadata) = self.generate_adds(engine, commit_version)?; - // Step 3b: Generate DV update actions (remove/add pairs) if any DV updates are present - let dv_update_actions = self.generate_dv_update_actions(engine)?; - - // Step 4: Generate all domain metadata actions (user and system domains) + // Step 4b: Generate all domain metadata actions (user and system domains) let domain_metadata_actions = self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?; - // Step 5: Generate remove actions (collect to avoid borrowing self) + // Step 5: Generate DV update actions (remove/add pairs) if any DV updates are present + let dv_update_actions = self.generate_dv_update_actions(engine)?; + + // Step 6: Generate remove actions (collect to avoid borrowing self) let remove_actions = self.generate_remove_actions(engine, self.remove_files_metadata.iter(), &[])?; + // Build the action chain + // For create-table: CommitInfo -> Protocol -> Metadata -> adds -> txns -> domain_metadata -> removes + // For existing table: CommitInfo -> adds -> txns -> domain_metadata -> removes let actions = iter::once(commit_info_action) + .chain(protocol_action.map(Ok)) + .chain(metadata_action.map(Ok)) .chain(add_actions) .chain(set_transaction_actions) .chain(domain_metadata_actions); @@ -424,7 +487,8 @@ impl Transaction { .chain(remove_actions) .chain(dv_update_actions); - // Step 6: Commit via the committer + // Step 7: Commit via the committer + // Block FileSystemCommitter for catalog-managed tables (including create-table with catalog features) #[cfg(feature = "catalog-managed")] if !self.committer.is_catalog_committer() && self @@ -536,6 +600,53 @@ impl Transaction { self } + /// Returns true if this is a create-table transaction. + /// A create-table transaction has operation "CREATE TABLE" and a pre-commit snapshot + /// with PRE_COMMIT_VERSION. + fn is_create_table(&self) -> bool { + let is_create = self.operation.as_deref() == Some("CREATE TABLE"); + debug_assert!( + !is_create || self.read_snapshot.version() == PRE_COMMIT_VERSION, + "CREATE TABLE transaction must have PRE_COMMIT_VERSION snapshot" + ); + is_create + } + + /// Computes the in-commit timestamp for this transaction if ICT is enabled. + /// Returns `None` if ICT is not enabled on the table. + fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult> { + let has_ict = self + .read_snapshot + .table_configuration() + .is_feature_supported(&TableFeature::InCommitTimestamp); + + if has_ict && !self.is_create_table() { + Ok(self + .read_snapshot + .get_in_commit_timestamp(engine)? + .map(|prev_ict| { + // The Delta protocol requires the timestamp to be "the larger of two values": + // - The time at which the writer attempted the commit (current_time) + // - One millisecond later than the previous commit's inCommitTimestamp (last_commit_timestamp + 1) + self.commit_timestamp.max(prev_ict + 1) + })) + } else if has_ict && self.is_create_table() { + // ICT is enabled but this is a create-table transaction - not yet supported + Err(Error::unsupported( + "InCommitTimestamp is not yet supported for create table", + )) + } else { + Ok(None) + } + } + + /// Returns the commit version for this transaction. + /// For existing table transactions, this is snapshot.version() + 1. + /// For create-table transactions (PRE_COMMIT_VERSION + 1 wraps to 0), this is 0. + fn get_commit_version(&self) -> Version { + // PRE_COMMIT_VERSION (u64::MAX) + 1 wraps to 0, which is the correct first version + self.read_snapshot.version().wrapping_add(1) + } /// Validate that user domains don't conflict with system domains or each other. fn validate_user_domain_operations(&self) -> DeltaResult<()> { let mut seen_domains = HashSet::new(); @@ -645,6 +756,11 @@ impl Transaction { new_dv_descriptors: HashMap, existing_data_files: impl Iterator>, ) -> DeltaResult<()> { + if self.is_create_table() { + return Err(Error::generic( + "Deletion vector operations require an existing table", + )); + } if !self .read_snapshot .table_configuration() @@ -718,7 +834,18 @@ impl Transaction { &'a self, engine: &'a dyn Engine, row_tracking_high_watermark: Option, - ) -> DeltaResult>> + 'a> { + ) -> DeltaResult> { + // For create-table transactions, domain metadata will be added in + // a subsequent code commit + if self.is_create_table() { + if !self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty() { + return Err(Error::unsupported( + "Domain metadata operations are not supported in create-table transactions", + )); + } + return Ok(Box::new(iter::empty())); + } + // Validate feature support for user domain operations if (!self.domain_metadata_additions.is_empty() || !self.domain_removals.is_empty()) && !self @@ -762,13 +889,14 @@ impl Transaction { .into_iter(); // Chain all domain actions and convert to EngineData - Ok(self - .domain_metadata_additions - .clone() - .into_iter() - .chain(removal_actions) - .chain(system_domain_actions) - .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) + Ok(Box::new( + self.domain_metadata_additions + .clone() + .into_iter() + .chain(removal_actions) + .chain(system_domain_actions) + .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine)), + )) } /// The schema that the [`Engine`]'s [`ParquetHandler`] is expected to use when reporting information about @@ -794,28 +922,29 @@ impl Transaction { // Generate the logical-to-physical transform expression which must be evaluated on every data // chunk before writing. At the moment, this is a transaction-wide expression. fn generate_logical_to_physical(&self) -> Expression { - let partition_columns = self + let partition_cols = self .read_snapshot .table_configuration() .metadata() - .partition_columns(); - let schema = self.read_snapshot.schema(); - + .partition_columns() + .to_vec(); // Check if materializePartitionColumns feature is enabled let materialize_partition_columns = self .read_snapshot .table_configuration() .is_feature_enabled(&TableFeature::MaterializePartitionColumns); + let schema = self.read_snapshot.schema(); // If the materialize partition columns feature is enabled, pass through all columns in the // schema. Otherwise, exclude partition columns. let fields = schema .fields() - .filter(|f| materialize_partition_columns || !partition_columns.contains(f.name())) + .filter(|f| { + materialize_partition_columns || !partition_cols.contains(&f.name().to_string()) + }) .map(|f| Expression::column([f.name()])); Expression::struct_from(fields) } - /// Get the write context for this transaction. At the moment, this is constant for the whole /// transaction. // Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure @@ -827,14 +956,15 @@ impl Transaction { let logical_to_physical = self.generate_logical_to_physical(); // Compute physical schema: exclude partition columns since they're stored in the path - let partition_columns = self + let partition_columns: Vec = self .read_snapshot .table_configuration() .metadata() - .partition_columns(); + .partition_columns() + .to_vec(); let physical_fields = snapshot_schema .fields() - .filter(|f| !partition_columns.contains(f.name())) + .filter(|f| !partition_columns.contains(&f.name().to_string())) .cloned(); let physical_schema = Arc::new(StructType::new_unchecked(physical_fields)); @@ -912,6 +1042,13 @@ impl Transaction { .table_configuration() .should_write_row_tracking(); + // Row tracking is not yet supported for create-table with data + if needs_row_tracking && self.is_create_table() { + return Err(Error::unsupported( + "Row tracking is not yet supported for create table with data", + )); + } + if needs_row_tracking { // Read the current rowIdHighWaterMark from the snapshot's row tracking domain metadata let row_id_high_water_mark = @@ -1083,6 +1220,14 @@ impl Transaction { remove_files_metadata: impl Iterator + Send + 'a, columns_to_drop: &'a [&str], ) -> DeltaResult> + Send + 'a> { + // Create-table transactions should not have any remove actions. + // Only error if there are actually files queued for removal. + if self.is_create_table() && !self.remove_files_metadata.is_empty() { + return Err(Error::internal_error( + "CREATE TABLE transaction cannot have remove actions", + )); + } + let input_schema = scan_row_schema(); let target_schema = NullableStatsTransform .transform_struct(get_log_remove_schema()) @@ -1156,6 +1301,13 @@ impl Transaction { &'a self, engine: &'a dyn Engine, ) -> DeltaResult> + Send + 'a> { + // Create-table transactions should not have any DV update actions + if self.is_create_table() && !self.dv_matched_files.is_empty() { + return Err(Error::internal_error( + "CREATE TABLE transaction cannot have DV update actions", + )); + } + static COLUMNS_TO_DROP: &[&str] = &[NEW_DELETION_VECTOR_NAME]; let remove_actions = self.generate_remove_actions(engine, self.dv_matched_files.iter(), COLUMNS_TO_DROP)?; @@ -1420,13 +1572,11 @@ impl CommitResult { } /// This is the result of a successfully committed [Transaction]. One can retrieve the -/// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot -/// can be obtained as well. +/// [PostCommitStats] and [commit version] from this struct. /// /// [commit version]: Self::commit_version #[derive(Debug)] pub struct CommittedTransaction { - // TODO: remove after post-commit snapshot #[allow(dead_code)] transaction: Transaction, /// the version of the table that was just committed @@ -1445,8 +1595,6 @@ impl CommittedTransaction { pub fn post_commit_stats(&self) -> &PostCommitStats { &self.post_commit_stats } - - // TODO(#916): post-commit snapshot } /// This is the result of a conflicted [Transaction]. One can retrieve the [conflict version] from diff --git a/kernel/tests/create_table.rs b/kernel/tests/create_table.rs new file mode 100644 index 0000000000..ffd2d2807c --- /dev/null +++ b/kernel/tests/create_table.rs @@ -0,0 +1,265 @@ +//! Integration tests for the CreateTable API + +use std::sync::Arc; + +use delta_kernel::committer::FileSystemCommitter; +use delta_kernel::schema::{DataType, StructField, StructType}; +use delta_kernel::snapshot::Snapshot; +use delta_kernel::table_features::{ + TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION, +}; +use delta_kernel::transaction::create_table::create_table; +use delta_kernel::DeltaResult; +use serde_json::Value; +use tempfile::tempdir; +use test_utils::{assert_result_error_with_message, create_default_engine}; + +#[tokio::test] +async fn test_create_simple_table() -> DeltaResult<()> { + // Setup + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?; + + // Create schema for an events table + let schema = Arc::new(StructType::try_new(vec![ + StructField::new("event_id", DataType::LONG, false), + StructField::new("user_id", DataType::LONG, false), + StructField::new("event_type", DataType::STRING, false), + StructField::new("timestamp", DataType::TIMESTAMP, false), + StructField::new("properties", DataType::STRING, true), + ])?); + + // Create table using new API + let _result = create_table(&table_path, schema.clone(), "DeltaKernel-RS/0.17.0") + .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))? + .commit(engine.as_ref())?; + + // Verify table was created + let table_url = delta_kernel::try_parse_uri(&table_path)?; + let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?; + + assert_eq!(snapshot.version(), 0); + assert_eq!(snapshot.schema().fields().len(), 5); + + // Verify protocol versions are (3, 7) by reading the log file + let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path); + let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file"); + let actions: Vec = log_contents + .lines() + .map(|line| serde_json::from_str(line).expect("Failed to parse JSON")) + .collect(); + + let protocol_action = actions + .iter() + .find(|a| a.get("protocol").is_some()) + .expect("Protocol action not found"); + let protocol = protocol_action.get("protocol").unwrap(); + assert_eq!( + protocol["minReaderVersion"], + TABLE_FEATURES_MIN_READER_VERSION + ); + assert_eq!( + protocol["minWriterVersion"], + TABLE_FEATURES_MIN_WRITER_VERSION + ); + // Verify no reader/writer features are set (empty arrays for table features mode) + assert_eq!(protocol["readerFeatures"], Value::Array(vec![])); + assert_eq!(protocol["writerFeatures"], Value::Array(vec![])); + + // Verify no table properties are set via public API + use delta_kernel::table_properties::TableProperties; + assert_eq!(snapshot.table_properties(), &TableProperties::default()); + + // Verify schema field names + let field_names: Vec<_> = snapshot + .schema() + .fields() + .map(|f| f.name().to_string()) + .collect(); + assert!(field_names.contains(&"event_id".to_string())); + assert!(field_names.contains(&"user_id".to_string())); + assert!(field_names.contains(&"event_type".to_string())); + assert!(field_names.contains(&"timestamp".to_string())); + assert!(field_names.contains(&"properties".to_string())); + + Ok(()) +} + +#[tokio::test] +async fn test_create_table_already_exists() -> DeltaResult<()> { + // Setup + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?; + + // Create schema for a user profiles table + let schema = Arc::new(StructType::try_new(vec![ + StructField::new("user_id", DataType::LONG, false), + StructField::new("username", DataType::STRING, false), + StructField::new("email", DataType::STRING, false), + StructField::new("created_at", DataType::TIMESTAMP, false), + StructField::new("is_active", DataType::BOOLEAN, false), + ])?); + + // Create table first time + let _result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0") + .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))? + .commit(engine.as_ref())?; + + // Try to create again - should fail at build time (table already exists) + let result = create_table(&table_path, schema.clone(), "UserManagementService/1.2.0") + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert_result_error_with_message(result, "already exists"); + + Ok(()) +} + +#[tokio::test] +async fn test_create_table_empty_schema_not_supported() -> DeltaResult<()> { + // Setup + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?; + + // Create empty schema + let schema = Arc::new(StructType::try_new(vec![])?); + + // Try to create table with empty schema - should fail at build time + let result = create_table(&table_path, schema, "InvalidApp/0.1.0") + .build(engine.as_ref(), Box::new(FileSystemCommitter::new())); + + assert_result_error_with_message(result, "cannot be empty"); + + Ok(()) +} + +#[tokio::test] +async fn test_create_table_log_actions() -> DeltaResult<()> { + // Setup + let temp_dir = tempdir().expect("Failed to create temp dir"); + let table_path = temp_dir.path().to_str().expect("Invalid path").to_string(); + + let engine = + create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?; + + // Create schema + let schema = Arc::new(StructType::try_new(vec![ + StructField::new("user_id", DataType::LONG, false), + StructField::new("action", DataType::STRING, false), + ])?); + + let engine_info = "AuditService/2.1.0"; + + // Create table + let _ = create_table(&table_path, schema, engine_info) + .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))? + .commit(engine.as_ref())?; + + // Read the actual Delta log file + let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path); + let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file"); + + // Parse each line (each line is a separate JSON action) + let actions: Vec = log_contents + .lines() + .map(|line| serde_json::from_str(line).expect("Failed to parse JSON")) + .collect(); + + // Verify we have exactly 3 actions: CommitInfo, Protocol, Metadata + // CommitInfo is first to comply with ICT (In-Commit Timestamps) protocol requirements + assert_eq!( + actions.len(), + 3, + "Expected 3 actions (commitInfo, protocol, metaData), found {}", + actions.len() + ); + + // Verify CommitInfo action (first for ICT compliance) + let commit_info_action = &actions[0]; + assert!( + commit_info_action.get("commitInfo").is_some(), + "First action should be commitInfo" + ); + let commit_info = commit_info_action.get("commitInfo").unwrap(); + assert!( + commit_info.get("timestamp").is_some(), + "CommitInfo should have timestamp" + ); + assert!( + commit_info.get("engineInfo").is_some(), + "CommitInfo should have engineInfo" + ); + assert!( + commit_info.get("operation").is_some(), + "CommitInfo should have operation" + ); + assert_eq!( + commit_info["operation"], "CREATE TABLE", + "Operation should be CREATE TABLE" + ); + + // Verify Protocol action + let protocol_action = &actions[1]; + assert!( + protocol_action.get("protocol").is_some(), + "Second action should be protocol" + ); + let protocol = protocol_action.get("protocol").unwrap(); + assert_eq!( + protocol["minReaderVersion"], + TABLE_FEATURES_MIN_READER_VERSION + ); + assert_eq!( + protocol["minWriterVersion"], + TABLE_FEATURES_MIN_WRITER_VERSION + ); + + // Verify Metadata action + let metadata_action = &actions[2]; + assert!( + metadata_action.get("metaData").is_some(), + "Third action should be metaData" + ); + let metadata = metadata_action.get("metaData").unwrap(); + assert!(metadata.get("id").is_some(), "Metadata should have id"); + assert!( + metadata.get("schemaString").is_some(), + "Metadata should have schemaString" + ); + assert!( + metadata.get("createdTime").is_some(), + "Metadata should have createdTime" + ); + + // Additional CommitInfo verification (commit_info was already extracted from actions[0] above) + assert_eq!( + commit_info["engineInfo"], engine_info, + "CommitInfo should contain the engine info we provided" + ); + + assert!( + commit_info.get("txnId").is_some(), + "CommitInfo should have txnId" + ); + + // Verify kernelVersion is present + let kernel_version = commit_info.get("kernelVersion"); + assert!( + kernel_version.is_some(), + "CommitInfo should have kernelVersion" + ); + assert!( + kernel_version.unwrap().as_str().unwrap().starts_with("v"), + "Kernel version should start with 'v'" + ); + + Ok(()) +}