Skip to content

Commit 158657e

Browse files
sanujbasuSanuj Basu
andauthored
feat: Add CreateTable API with simplified single-stage flow (#1629)
Implement create-table functionality where CreateTableTransactionBuilder::build() returns a Transaction with stored actions to be used for a commit. ``` API Usage: let result = create_table(path, schema, engine_info) .build(engine, committer)? .commit(engine)?; ``` This specific change doesn't allow table properties and features to be set and has validations in the transaction module which error if unsupported features or properties such as row tracking and ICT are set in the table configuration being pushed down. Key Changes: 1. CreateTableTransactionBuilder::build() takes committer and returns Transaction with commit info, protocol and metadata actions. 2. Transaction struct holds optional protocol/metadata actions for create-table 3. Adds try_new_create_table() constructor alongside try_new_existing_table() 4. commit() handles both existing-table and create-table flows 5. get_write_context() now returns DeltaResult for proper error handling This aligns the Rust Kernel's create-table flow with the Java Kernel's approach where Transaction is the single unit for all commit operations. Testing: Add comprehensive tests validating the basic create_table() functionality introduced both unit and functional. Co-authored-by: Sanuj Basu <[email protected]>
1 parent eb42bc6 commit 158657e

File tree

9 files changed

+787
-118
lines changed

9 files changed

+787
-118
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
.vscode/
88
.vim
99
.zed
10+
.cache/
11+
.clangd
1012

1113
# Rust
1214
target/

kernel/examples/write-table/src/main.rs

Lines changed: 10 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashMap;
2-
use std::fs::{create_dir_all, write};
2+
use std::fs::create_dir_all;
33
use std::path::Path;
44
use std::process::ExitCode;
55
use std::sync::Arc;
@@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
99
use clap::Parser;
1010
use common::{LocationArgs, ParseWithExamples};
1111
use itertools::Itertools;
12-
use serde_json::{json, to_vec};
1312
use url::Url;
14-
use uuid::Uuid;
1513

1614
use delta_kernel::arrow::array::TimestampMicrosecondArray;
1715
use delta_kernel::committer::FileSystemCommitter;
@@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt};
2018
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
2119
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
2220
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
21+
use delta_kernel::transaction::create_table::create_table as create_delta_table;
2322
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
2423
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
2524

@@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot(
152151
// Create new table
153152
println!("Creating new Delta table...");
154153
let schema = parse_schema(schema_str)?;
155-
create_table(url, &schema).await?;
154+
create_table(url, &schema, engine).await?;
156155
Snapshot::builder_for(url.clone()).build(engine)
157156
}
158157
}
@@ -192,66 +191,13 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
192191
Ok(Arc::new(StructType::try_new(fields)?))
193192
}
194193

195-
/// Create a new Delta table with the given schema.
196-
///
197-
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
198-
/// initial transaction log.
199-
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
200-
let table_id = Uuid::new_v4().to_string();
201-
let schema_str = serde_json::to_string(&schema)?;
202-
203-
let (reader_features, writer_features) = {
204-
let reader_features: Vec<&'static str> = vec![];
205-
let writer_features: Vec<&'static str> = vec![];
206-
207-
// TODO: Support adding specific table features
208-
(reader_features, writer_features)
209-
};
210-
211-
let protocol = json!({
212-
"protocol": {
213-
"minReaderVersion": 3,
214-
"minWriterVersion": 7,
215-
"readerFeatures": reader_features,
216-
"writerFeatures": writer_features,
217-
}
218-
});
219-
let partition_columns: Vec<String> = vec![];
220-
let metadata = json!({
221-
"metaData": {
222-
"id": table_id,
223-
"format": {
224-
"provider": "parquet",
225-
"options": {}
226-
},
227-
"schemaString": schema_str,
228-
"partitionColumns": partition_columns,
229-
"configuration": {},
230-
"createdTime": 1677811175819u64
231-
}
232-
});
233-
234-
let data = [
235-
to_vec(&protocol).unwrap(),
236-
b"\n".to_vec(),
237-
to_vec(&metadata).unwrap(),
238-
]
239-
.concat();
240-
241-
// Write the initial transaction with protocol and metadata to 0.json
242-
let delta_log_path = table_url
243-
.join("_delta_log/")?
244-
.to_file_path()
245-
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
246-
let file_path = delta_log_path.join("00000000000000000000.json");
247-
248-
// Create the _delta_log directory if it doesn't exist
249-
create_dir_all(&delta_log_path)
250-
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;
251-
252-
// Write the file using standard filesystem operations
253-
write(&file_path, data)
254-
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
194+
/// Create a new Delta table with the given schema using the official CreateTable API.
195+
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
196+
// Use the create_table API to create the table
197+
let table_path = table_url.as_str();
198+
let _result = create_delta_table(table_path, schema.clone(), "write-table-example/1.0")
199+
.build(engine, Box::new(FileSystemCommitter::new()))?
200+
.commit(engine)?;
255201

256202
println!("✓ Created Delta table with schema: {schema:#?}");
257203
Ok(())

kernel/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ pub mod engine;
175175

176176
/// Delta table version is 8 byte unsigned int
177177
pub type Version = u64;
178+
179+
/// Sentinel version indicating a pre-commit state (table does not exist yet).
180+
/// Used for create-table transactions before the first commit.
181+
pub const PRE_COMMIT_VERSION: Version = u64::MAX;
182+
178183
pub type FileSize = u64;
179184
pub type FileIndex = u64;
180185

kernel/src/log_segment.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _}
1919
use crate::utils::require;
2020
use crate::{
2121
DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
22-
StorageHandler, Version,
22+
StorageHandler, Version, PRE_COMMIT_VERSION,
2323
};
2424
use delta_kernel_derive::internal_api;
2525

@@ -77,6 +77,27 @@ pub(crate) struct LogSegment {
7777
}
7878

7979
impl LogSegment {
80+
/// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table).
81+
/// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk.
82+
/// This is used to construct a pre-commit snapshot that provides table configuration
83+
/// (protocol, metadata, schema) for operations like CTAS.
84+
#[allow(dead_code)] // Used by create_table module
85+
pub(crate) fn for_pre_commit(log_root: Url) -> Self {
86+
use crate::PRE_COMMIT_VERSION;
87+
Self {
88+
end_version: PRE_COMMIT_VERSION,
89+
checkpoint_version: None,
90+
log_root,
91+
ascending_commit_files: vec![],
92+
ascending_compaction_files: vec![],
93+
checkpoint_parts: vec![],
94+
latest_crc_file: None,
95+
latest_commit_file: None,
96+
checkpoint_schema: None,
97+
max_published_version: None,
98+
}
99+
}
100+
80101
#[internal_api]
81102
pub(crate) fn try_new(
82103
listed_files: ListedLogFiles,
@@ -734,17 +755,25 @@ impl LogSegment {
734755
self.read_actions(engine, schema, META_PREDICATE.clone())
735756
}
736757

737-
/// How many commits since a checkpoint, according to this log segment
758+
/// How many commits since a checkpoint, according to this log segment.
759+
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
738760
pub(crate) fn commits_since_checkpoint(&self) -> u64 {
761+
if self.end_version == PRE_COMMIT_VERSION {
762+
return 0;
763+
}
739764
// we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0`
740765
// is the correct number of commits since a checkpoint if there are no checkpoints
741766
let checkpoint_version = self.checkpoint_version.unwrap_or(0);
742767
debug_assert!(checkpoint_version <= self.end_version);
743768
self.end_version - checkpoint_version
744769
}
745770

746-
/// How many commits since a log-compaction or checkpoint, according to this log segment
771+
/// How many commits since a log-compaction or checkpoint, according to this log segment.
772+
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
747773
pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
774+
if self.end_version == PRE_COMMIT_VERSION {
775+
return 0;
776+
}
748777
// Annoyingly we have to search all the compaction files to determine this, because we only
749778
// sort by start version, so technically the max end version could be anywhere in the vec.
750779
// We can return 0 in the case there is no compaction since end_version - 0 is the correct

kernel/src/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ impl Snapshot {
510510

511511
/// Create a [`Transaction`] for this `SnapshotRef`. With the specified [`Committer`].
512512
pub fn transaction(self: Arc<Self>, committer: Box<dyn Committer>) -> DeltaResult<Transaction> {
513-
Transaction::try_new(self, committer)
513+
Transaction::try_new_existing_table(self, committer)
514514
}
515515

516516
/// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated

kernel/src/table_features/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ pub(crate) use timestamp_ntz::validate_timestamp_ntz_feature_support;
1616
mod column_mapping;
1717
mod timestamp_ntz;
1818

19+
/// Minimum reader version for tables that use table features.
20+
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
21+
pub const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;
22+
23+
/// Minimum writer version for tables that use table features.
24+
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
25+
pub const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;
26+
1927
/// Table features represent protocol capabilities required to correctly read or write a given table.
2028
/// - Readers must implement all features required for correct table reads.
2129
/// - Writers must implement all features required for correct table writes.

0 commit comments

Comments
 (0)