Skip to content

Commit bd3dc17

Browse files
sanujbasuSanuj Basu
authored andcommitted
feat: Add CreateTable API with simplified single-stage flow
Implement create-table functionality where CreateTableTransactionBuilder::build() returns a Transaction with stored actions to be used for a commit. API Usage: let result = create_table(path, schema, engine_info) .build(engine, committer)? .commit(engine)?; This specific change doesn't allow table properties and features to be set and has validations in the transaction module which error if unsupported features or properties such as row tracking and ICT are set in the table configuration being pushed down. Key Changes: - CreateTableTransactionBuilder::build() takes committer and returns Transaction with commit info, protocol and metadata actions. - Transaction struct holds optional protocol/metadata actions for create-table - Adds try_new_create_table() constructor alongside try_new_existing_table() - commit() handles both existing-table and create-table flows - get_write_context() now returns DeltaResult<WriteContext> for proper error handling This aligns the Rust Kernel's create-table flow with the Java Kernel's approach where Transaction is the single unit for all commit operations. Testing: Unit tests
1 parent bc9a5d8 commit bd3dc17

File tree

14 files changed

+524
-153
lines changed

14 files changed

+524
-153
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
.vscode/
88
.vim
99
.zed
10+
.cache/
11+
.clangd
1012

1113
# Rust
1214
target/

ffi/src/transaction/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ mod tests {
288288
))
289289
};
290290

291-
let write_context = unsafe { get_write_context(txn_with_engine_info.shallow_copy()) };
291+
let write_context = ok_or_panic(unsafe {
292+
get_write_context(txn_with_engine_info.shallow_copy(), engine.shallow_copy())
293+
});
292294

293295
// Ensure we get the correct schema
294296
let write_schema = unsafe { get_write_schema(write_context.shallow_copy()) };

ffi/src/transaction/write_context.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
use crate::error::{ExternResult, IntoExternResult};
12
use crate::handle::Handle;
2-
use crate::{kernel_string_slice, AllocateStringFn, NullableCvoid, SharedSchema};
3+
use crate::{
4+
kernel_string_slice, AllocateStringFn, NullableCvoid, SharedExternEngine, SharedSchema,
5+
};
36
use delta_kernel::transaction::WriteContext;
47
use delta_kernel_ffi_macros::handle_descriptor;
58

@@ -19,13 +22,18 @@ pub struct SharedWriteContext;
1922
///
2023
/// # Safety
2124
///
22-
/// Caller is responsible for passing a [valid][Handle#Validity] transaction handle.
25+
/// Caller is responsible for passing valid transaction and engine handles.
2326
#[no_mangle]
2427
pub unsafe extern "C" fn get_write_context(
2528
txn: Handle<ExclusiveTransaction>,
26-
) -> Handle<SharedWriteContext> {
29+
engine: Handle<SharedExternEngine>,
30+
) -> ExternResult<Handle<SharedWriteContext>> {
2731
let txn = unsafe { txn.as_ref() };
28-
Arc::new(txn.get_write_context()).into()
32+
let engine = unsafe { engine.as_ref() };
33+
let write_context = txn.get_write_context();
34+
write_context
35+
.map(|wc| Arc::new(wc).into())
36+
.into_extern_result(&engine)
2937
}
3038

3139
#[no_mangle]

kernel/examples/write-table/src/main.rs

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashMap;
2-
use std::fs::{create_dir_all, write};
2+
use std::fs::create_dir_all;
33
use std::path::Path;
44
use std::process::ExitCode;
55
use std::sync::Arc;
@@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
99
use clap::Parser;
1010
use common::{LocationArgs, ParseWithExamples};
1111
use itertools::Itertools;
12-
use serde_json::{json, to_vec};
1312
use url::Url;
14-
use uuid::Uuid;
1513

1614
use delta_kernel::arrow::array::TimestampMicrosecondArray;
1715
use delta_kernel::committer::FileSystemCommitter;
@@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt};
2018
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
2119
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
2220
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
21+
use delta_kernel::transaction::create_table::create_table as create_delta_table;
2322
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
2423
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
2524

@@ -94,7 +93,7 @@ async fn try_main() -> DeltaResult<()> {
9493
.with_data_change(true);
9594

9695
// Write the data using the engine
97-
let write_context = Arc::new(txn.get_write_context());
96+
let write_context = Arc::new(txn.get_write_context()?);
9897
let file_metadata = engine
9998
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
10099
.await?;
@@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot(
152151
// Create new table
153152
println!("Creating new Delta table...");
154153
let schema = parse_schema(schema_str)?;
155-
create_table(url, &schema).await?;
154+
create_table(url, &schema, engine).await?;
156155
Snapshot::builder_for(url.clone()).build(engine)
157156
}
158157
}
@@ -192,66 +191,13 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
192191
Ok(Arc::new(StructType::try_new(fields)?))
193192
}
194193

195-
/// Create a new Delta table with the given schema.
196-
///
197-
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
198-
/// initial transaction log.
199-
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
200-
let table_id = Uuid::new_v4().to_string();
201-
let schema_str = serde_json::to_string(&schema)?;
202-
203-
let (reader_features, writer_features) = {
204-
let reader_features: Vec<&'static str> = vec![];
205-
let writer_features: Vec<&'static str> = vec![];
206-
207-
// TODO: Support adding specific table features
208-
(reader_features, writer_features)
209-
};
210-
211-
let protocol = json!({
212-
"protocol": {
213-
"minReaderVersion": 3,
214-
"minWriterVersion": 7,
215-
"readerFeatures": reader_features,
216-
"writerFeatures": writer_features,
217-
}
218-
});
219-
let partition_columns: Vec<String> = vec![];
220-
let metadata = json!({
221-
"metaData": {
222-
"id": table_id,
223-
"format": {
224-
"provider": "parquet",
225-
"options": {}
226-
},
227-
"schemaString": schema_str,
228-
"partitionColumns": partition_columns,
229-
"configuration": {},
230-
"createdTime": 1677811175819u64
231-
}
232-
});
233-
234-
let data = [
235-
to_vec(&protocol).unwrap(),
236-
b"\n".to_vec(),
237-
to_vec(&metadata).unwrap(),
238-
]
239-
.concat();
240-
241-
// Write the initial transaction with protocol and metadata to 0.json
242-
let delta_log_path = table_url
243-
.join("_delta_log/")?
244-
.to_file_path()
245-
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
246-
let file_path = delta_log_path.join("00000000000000000000.json");
247-
248-
// Create the _delta_log directory if it doesn't exist
249-
create_dir_all(&delta_log_path)
250-
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;
251-
252-
// Write the file using standard filesystem operations
253-
write(&file_path, data)
254-
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
194+
/// Create a new Delta table with the given schema using the official CreateTable API.
195+
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
196+
// Use the create_table API to create the table
197+
let table_path = table_url.as_str();
198+
let _result = create_delta_table(table_path, schema.clone(), "write-table-example/1.0")
199+
.build(engine, Box::new(FileSystemCommitter::new()))?
200+
.commit(engine)?;
255201

256202
println!("✓ Created Delta table with schema: {schema:#?}");
257203
Ok(())

kernel/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ pub mod engine;
175175

176176
/// Delta table version is 8 byte unsigned int
177177
pub type Version = u64;
178+
179+
/// Sentinel version indicating a pre-commit state (table does not exist yet).
180+
/// Used for create-table transactions before the first commit.
181+
pub const PRE_COMMIT_VERSION: Version = u64::MAX;
182+
178183
pub type FileSize = u64;
179184
pub type FileIndex = u64;
180185

kernel/src/log_segment.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,27 @@ pub(crate) struct LogSegment {
7777
}
7878

7979
impl LogSegment {
80+
/// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table).
81+
/// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk.
82+
/// This is used to construct a pre-commit snapshot that provides table configuration
83+
/// (protocol, metadata, schema) for operations like CTAS.
84+
#[allow(dead_code)] // Used by create_table module
85+
pub(crate) fn for_pre_commit(log_root: Url) -> Self {
86+
use crate::PRE_COMMIT_VERSION;
87+
Self {
88+
end_version: PRE_COMMIT_VERSION,
89+
checkpoint_version: None,
90+
log_root,
91+
ascending_commit_files: vec![],
92+
ascending_compaction_files: vec![],
93+
checkpoint_parts: vec![],
94+
latest_crc_file: None,
95+
latest_commit_file: None,
96+
checkpoint_schema: None,
97+
max_published_version: None,
98+
}
99+
}
100+
80101
#[internal_api]
81102
pub(crate) fn try_new(
82103
listed_files: ListedLogFiles,

kernel/src/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ impl Snapshot {
443443

444444
/// Create a [`Transaction`] for this `SnapshotRef`. With the specified [`Committer`].
445445
pub fn transaction(self: Arc<Self>, committer: Box<dyn Committer>) -> DeltaResult<Transaction> {
446-
Transaction::try_new(self, committer)
446+
Transaction::try_new_existing_table(self, committer)
447447
}
448448

449449
/// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated

kernel/src/table_features/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ pub(crate) use timestamp_ntz::validate_timestamp_ntz_feature_support;
1515
mod column_mapping;
1616
mod timestamp_ntz;
1717

18+
/// Minimum reader version for tables that use table features.
19+
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
20+
#[internal_api]
21+
#[allow(dead_code)] // Used by create_table module
22+
pub(crate) const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;
23+
24+
/// Minimum writer version for tables that use table features.
25+
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
26+
#[internal_api]
27+
#[allow(dead_code)] // Used by create_table module
28+
pub(crate) const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;
29+
1830
/// Table features represent protocol capabilities required to correctly read or write a given table.
1931
/// - Readers must implement all features required for correct table reads.
2032
/// - Writers must implement all features required for correct table writes.

0 commit comments

Comments
 (0)