Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
.vscode/
.vim
.zed
.cache/
.clangd

# Rust
target/
Expand Down
74 changes: 10 additions & 64 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::fs::{create_dir_all, write};
use std::fs::create_dir_all;
use std::path::Path;
use std::process::ExitCode;
use std::sync::Arc;
Expand All @@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
use clap::Parser;
use common::{LocationArgs, ParseWithExamples};
use itertools::Itertools;
use serde_json::{json, to_vec};
use url::Url;
use uuid::Uuid;

use delta_kernel::arrow::array::TimestampMicrosecondArray;
use delta_kernel::committer::FileSystemCommitter;
Expand All @@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::transaction::create_table::create_table as create_delta_table;
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};

Expand Down Expand Up @@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot(
// Create new table
println!("Creating new Delta table...");
let schema = parse_schema(schema_str)?;
create_table(url, &schema).await?;
create_table(url, &schema, engine).await?;
Snapshot::builder_for(url.clone()).build(engine)
}
}
Expand Down Expand Up @@ -192,66 +191,13 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
Ok(Arc::new(StructType::try_new(fields)?))
}

/// Create a new Delta table with the given schema.
///
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
/// initial transaction log.
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
let table_id = Uuid::new_v4().to_string();
let schema_str = serde_json::to_string(&schema)?;

let (reader_features, writer_features) = {
let reader_features: Vec<&'static str> = vec![];
let writer_features: Vec<&'static str> = vec![];

// TODO: Support adding specific table features
(reader_features, writer_features)
};

let protocol = json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": reader_features,
"writerFeatures": writer_features,
}
});
let partition_columns: Vec<String> = vec![];
let metadata = json!({
"metaData": {
"id": table_id,
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": schema_str,
"partitionColumns": partition_columns,
"configuration": {},
"createdTime": 1677811175819u64
}
});

let data = [
to_vec(&protocol).unwrap(),
b"\n".to_vec(),
to_vec(&metadata).unwrap(),
]
.concat();

// Write the initial transaction with protocol and metadata to 0.json
let delta_log_path = table_url
.join("_delta_log/")?
.to_file_path()
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
let file_path = delta_log_path.join("00000000000000000000.json");

// Create the _delta_log directory if it doesn't exist
create_dir_all(&delta_log_path)
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;

// Write the file using standard filesystem operations
write(&file_path, data)
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
/// Create a new Delta table with the given schema using the official CreateTable API.
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
// Use the create_table API to create the table
let table_path = table_url.as_str();
let _result = create_delta_table(table_path, schema.clone(), "write-table-example/1.0")
.build(engine, Box::new(FileSystemCommitter::new()))?
.commit(engine)?;

println!("✓ Created Delta table with schema: {schema:#?}");
Ok(())
Expand Down
5 changes: 5 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ pub mod engine;

/// Delta table version is 8 byte unsigned int
pub type Version = u64;

/// Sentinel version indicating a pre-commit state (table does not exist yet).
/// Used for create-table transactions before the first commit.
pub const PRE_COMMIT_VERSION: Version = u64::MAX;

pub type FileSize = u64;
pub type FileIndex = u64;

Expand Down
35 changes: 32 additions & 3 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _}
use crate::utils::require;
use crate::{
DeltaResult, Engine, Error, Expression, FileMeta, Predicate, PredicateRef, RowVisitor,
StorageHandler, Version,
StorageHandler, Version, PRE_COMMIT_VERSION,
};
use delta_kernel_derive::internal_api;

Expand Down Expand Up @@ -77,6 +77,27 @@ pub(crate) struct LogSegment {
}

impl LogSegment {
/// Creates a synthetic LogSegment for pre-commit transactions (e.g., create-table).
/// The sentinel version PRE_COMMIT_VERSION indicates no version exists yet on disk.
/// This is used to construct a pre-commit snapshot that provides table configuration
/// (protocol, metadata, schema) for operations like CTAS.
#[allow(dead_code)] // Used by create_table module
pub(crate) fn for_pre_commit(log_root: Url) -> Self {
use crate::PRE_COMMIT_VERSION;
Self {
end_version: PRE_COMMIT_VERSION,
checkpoint_version: None,
log_root,
ascending_commit_files: vec![],
ascending_compaction_files: vec![],
checkpoint_parts: vec![],
latest_crc_file: None,
latest_commit_file: None,
checkpoint_schema: None,
max_published_version: None,
}
}

#[internal_api]
pub(crate) fn try_new(
listed_files: ListedLogFiles,
Expand Down Expand Up @@ -734,17 +755,25 @@ impl LogSegment {
self.read_actions(engine, schema, META_PREDICATE.clone())
}

/// How many commits since a checkpoint, according to this log segment
/// How many commits since a checkpoint, according to this log segment.
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
pub(crate) fn commits_since_checkpoint(&self) -> u64 {
if self.end_version == PRE_COMMIT_VERSION {
return 0;
}
// we can use 0 as the checkpoint version if there is no checkpoint since `end_version - 0`
// is the correct number of commits since a checkpoint if there are no checkpoints
let checkpoint_version = self.checkpoint_version.unwrap_or(0);
debug_assert!(checkpoint_version <= self.end_version);
self.end_version - checkpoint_version
}

/// How many commits since a log-compaction or checkpoint, according to this log segment
/// How many commits since a log-compaction or checkpoint, according to this log segment.
/// Returns 0 for pre-commit snapshots (where end_version is PRE_COMMIT_VERSION).
pub(crate) fn commits_since_log_compaction_or_checkpoint(&self) -> u64 {
if self.end_version == PRE_COMMIT_VERSION {
return 0;
}
// Annoyingly we have to search all the compaction files to determine this, because we only
// sort by start version, so technically the max end version could be anywhere in the vec.
// We can return 0 in the case there is no compaction since end_version - 0 is the correct
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ impl Snapshot {

/// Create a [`Transaction`] for this `SnapshotRef`. With the specified [`Committer`].
pub fn transaction(self: Arc<Self>, committer: Box<dyn Committer>) -> DeltaResult<Transaction> {
Transaction::try_new(self, committer)
Transaction::try_new_existing_table(self, committer)
}

/// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated
Expand Down
8 changes: 8 additions & 0 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ pub(crate) use timestamp_ntz::validate_timestamp_ntz_feature_support;
mod column_mapping;
mod timestamp_ntz;

/// Minimum reader version for tables that use table features.
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
pub const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;

/// Minimum writer version for tables that use table features.
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
pub const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;

/// Table features represent protocol capabilities required to correctly read or write a given table.
/// - Readers must implement all features required for correct table reads.
/// - Writers must implement all features required for correct table writes.
Expand Down
Loading
Loading