diff --git a/ffi/src/domain_metadata.rs b/ffi/src/domain_metadata.rs index 58f39e38dc..d6ed7609d4 100644 --- a/ffi/src/domain_metadata.rs +++ b/ffi/src/domain_metadata.rs @@ -92,7 +92,7 @@ mod tests { allocate_err, allocate_str, assert_extern_result_error_with_message, ok_or_panic, recover_string, }; - use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot}; + use crate::{engine_to_handle, free_engine, free_snapshot, kernel_string_slice, snapshot, OptionalValue}; use delta_kernel::engine::default::DefaultEngineBuilder; use delta_kernel::DeltaResult; use object_store::memory::InMemory; @@ -184,8 +184,14 @@ mod tests { add_commit(storage.as_ref(), 1, commit).await.unwrap(); - let snapshot = - unsafe { ok_or_panic(snapshot(kernel_string_slice!(path), engine.shallow_copy())) }; + let snapshot = unsafe { + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::None, + )) + }; let get_domain_metadata_helper = |domain: &str| unsafe { get_domain_metadata( diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 46f2f315a1..744ecec62a 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -599,96 +599,132 @@ pub struct SharedSchema; #[handle_descriptor(target=Snapshot, mutable=false, sized=true)] pub struct SharedSnapshot; -/// Get the latest snapshot from the specified table +/// Get a snapshot from the specified table. /// -/// # Safety -/// -/// Caller is responsible for passing valid handles and path pointer. -#[no_mangle] -pub unsafe extern "C" fn snapshot( - path: KernelStringSlice, - engine: Handle, -) -> ExternResult> { - let url = unsafe { unwrap_and_parse_path_as_url(path) }; - let engine = unsafe { engine.as_ref() }; - snapshot_impl(url, engine, None, Vec::new()).into_extern_result(&engine) -} - -/// Get the latest snapshot from the specified table with optional log tail +/// Parameters: +/// - `path`: Table path (required when `old_snapshot` is not provided, ignored otherwise) +/// - `engine`: The engine handle +/// - `old_snapshot`: Pass an existing snapshot for optimized creation (avoids re-reading the entire log) +/// - `version`: Specify a version to get a snapshot at a specific table version /// /// # Safety /// -/// Caller is responsible for passing valid handles and path pointer. -/// The log_paths array and its contents must remain valid for the duration of this call. -#[cfg(feature = "catalog-managed")] +/// Caller is responsible for passing valid handles. Path must be valid when `old_snapshot` is None. #[no_mangle] -pub unsafe extern "C" fn snapshot_with_log_tail( - path: KernelStringSlice, +pub unsafe extern "C" fn snapshot( + path: OptionalValue, engine: Handle, - log_paths: log_path::LogPathArray, + old_snapshot: OptionalValue>, + version: OptionalValue, ) -> ExternResult> { - let url = unsafe { unwrap_and_parse_path_as_url(path) }; let engine_ref = unsafe { engine.as_ref() }; - // Convert LogPathArray to Vec - let log_tail = match unsafe { log_paths.log_paths() } { - Ok(paths) => paths, - Err(err) => return Err(err).into_extern_result(&engine_ref), + let old_snapshot_handle = match old_snapshot { + OptionalValue::Some(handle) => Some(handle), + OptionalValue::None => None, }; - snapshot_impl(url, engine_ref, None, log_tail).into_extern_result(&engine_ref) + let version_opt = match version { + OptionalValue::Some(v) => Some(v), + OptionalValue::None => None, + }; + + // Only parse URL if we don't have an old snapshot + let url = if old_snapshot_handle.is_none() { + match path { + OptionalValue::Some(p) => Some(unsafe { unwrap_and_parse_path_as_url(p) }), + OptionalValue::None => { + return Err(delta_kernel::Error::generic( + "Path is required when not using an old snapshot", + )) + .into_extern_result(&engine_ref) + } + } + } else { + None + }; + + snapshot_impl(url, engine_ref, old_snapshot_handle, version_opt, Vec::new()) + .into_extern_result(&engine_ref) } -/// Get the snapshot from the specified table at a specific version. Note this is only safe for -/// non-catalog-managed tables. +/// Get a snapshot with log tail support for catalog-managed tables. /// -/// # Safety +/// This version includes all the parameters from `snapshot` plus the ability to +/// provide a log tail for catalog-managed tables. /// -/// Caller is responsible for passing valid handles and path pointer. -#[no_mangle] -pub unsafe extern "C" fn snapshot_at_version( - path: KernelStringSlice, - engine: Handle, - version: Version, -) -> ExternResult> { - let url = unsafe { unwrap_and_parse_path_as_url(path) }; - let engine = unsafe { engine.as_ref() }; - snapshot_impl(url, engine, version.into(), Vec::new()).into_extern_result(&engine) -} - -/// Get the snapshot from the specified table at a specific version with log tail. +/// Parameters: +/// - `path`: Table path (required when `old_snapshot` is not provided, ignored otherwise) +/// - `engine`: The engine handle +/// - `old_snapshot`: Pass an existing snapshot for optimized creation (avoids re-reading the entire log) +/// - `version`: Specify a version to get a snapshot at a specific table version +/// - `log_tail`: Log tail for catalog-managed tables /// /// # Safety /// -/// Caller is responsible for passing valid handles and path pointer. +/// Caller is responsible for passing valid handles. Path must be valid when `old_snapshot` is None. /// The log_tail array and its contents must remain valid for the duration of this call. #[cfg(feature = "catalog-managed")] #[no_mangle] -pub unsafe extern "C" fn snapshot_at_version_with_log_tail( - path: KernelStringSlice, +pub unsafe extern "C" fn snapshot_with_log_tail( + path: OptionalValue, engine: Handle, - version: Version, - log_tail: log_path::LogPathArray, + old_snapshot: OptionalValue>, + version: OptionalValue, + log_tail: OptionalValue, ) -> ExternResult> { - let url = unsafe { unwrap_and_parse_path_as_url(path) }; let engine_ref = unsafe { engine.as_ref() }; - // Convert LogPathArray to Vec - let log_tail = match unsafe { log_tail.log_paths() } { - Ok(paths) => paths, - Err(err) => return Err(err).into_extern_result(&engine_ref), + let old_snapshot_handle = match old_snapshot { + OptionalValue::Some(handle) => Some(handle), + OptionalValue::None => None, + }; + + let version_opt = match version { + OptionalValue::Some(v) => Some(v), + OptionalValue::None => None, + }; + + let log_tail_vec = match log_tail { + OptionalValue::Some(paths) => match unsafe { paths.log_paths() } { + Ok(paths) => paths, + Err(err) => return Err(err).into_extern_result(&engine_ref), + }, + OptionalValue::None => Vec::new(), + }; + + // Only parse URL if we don't have an old snapshot + let url = if old_snapshot_handle.is_none() { + match path { + OptionalValue::Some(p) => Some(unsafe { unwrap_and_parse_path_as_url(p) }), + OptionalValue::None => { + return Err(delta_kernel::Error::generic( + "Path is required when not using an old snapshot", + )) + .into_extern_result(&engine_ref) + } + } + } else { + None }; - snapshot_impl(url, engine_ref, version.into(), log_tail).into_extern_result(&engine_ref) + snapshot_impl(url, engine_ref, old_snapshot_handle, version_opt, log_tail_vec) + .into_extern_result(&engine_ref) } fn snapshot_impl( - url: DeltaResult, + url: Option>, extern_engine: &dyn ExternEngine, + old_snapshot_handle: Option>, version: Option, #[allow(unused_variables)] log_tail: Vec, ) -> DeltaResult> { - let mut builder = Snapshot::builder_for(url?); + let mut builder = if let Some(handle) = old_snapshot_handle { + let snapshot_arc = unsafe { handle.clone_as_arc() }; + Snapshot::builder_from(snapshot_arc) + } else { + Snapshot::builder_for(url.ok_or_else(|| delta_kernel::Error::generic("URL is required when not using an old snapshot"))??) + }; if let Some(v) = version { builder = builder.at_version(v); @@ -947,25 +983,38 @@ mod tests { let path = "memory:///"; // Test getting latest snapshot - let snapshot1 = - unsafe { ok_or_panic(snapshot(kernel_string_slice!(path), engine.shallow_copy())) }; + let snapshot1 = unsafe { + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::None, + )) + }; let version1 = unsafe { version(snapshot1.shallow_copy()) }; assert_eq!(version1, 0); // Test getting snapshot at version let snapshot2 = unsafe { - ok_or_panic(snapshot_at_version( - kernel_string_slice!(path), + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), engine.shallow_copy(), - 0, + OptionalValue::None, + OptionalValue::Some(0), )) }; let version2 = unsafe { version(snapshot2.shallow_copy()) }; assert_eq!(version2, 0); // Test getting non-existent snapshot - let snapshot_at_non_existent_version = - unsafe { snapshot_at_version(kernel_string_slice!(path), engine.shallow_copy(), 1) }; + let snapshot_at_non_existent_version = unsafe { + snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::Some(1), + ) + }; assert_extern_result_error_with_message(snapshot_at_non_existent_version, KernelError::GenericError, "Generic delta kernel error: LogSegment end version 0 not the same as the specified end version 1"); let table_root = unsafe { snapshot_table_root(snapshot1.shallow_copy(), allocate_str) }; @@ -992,8 +1041,14 @@ mod tests { let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; - let snapshot = - unsafe { ok_or_panic(snapshot(kernel_string_slice!(path), engine.shallow_copy())) }; + let snapshot = unsafe { + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::None, + )) + }; let partition_count = unsafe { get_partition_column_count(snapshot.shallow_copy()) }; assert_eq!(partition_count, 1, "Should have one partition"); @@ -1029,8 +1084,14 @@ mod tests { let path = "memory:///"; // Get a non-existent snapshot, this will call allocate_null_err - let snapshot_at_non_existent_version = - unsafe { snapshot_at_version(kernel_string_slice!(path), engine.shallow_copy(), 1) }; + let snapshot_at_non_existent_version = unsafe { + snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::Some(1), + ) + }; assert!(snapshot_at_non_existent_version.is_err()); unsafe { free_engine(engine) } @@ -1074,7 +1135,9 @@ mod tests { ok_or_panic(snapshot_with_log_tail( kernel_string_slice!(path), engine.shallow_copy(), - log_tail.clone(), + OptionalValue::None, + OptionalValue::None, + OptionalValue::Some(log_tail.clone()), )) }; let snapshot_version = unsafe { version(snapshot.shallow_copy()) }; @@ -1082,14 +1145,15 @@ mod tests { // Test getting snapshot at version let snapshot2 = unsafe { - ok_or_panic(snapshot_at_version_with_log_tail( + ok_or_panic(snapshot_with_log_tail( kernel_string_slice!(path), engine.shallow_copy(), - 1, - log_tail, + OptionalValue::None, + OptionalValue::Some(1), + OptionalValue::Some(log_tail), )) }; - let snapshot_version = unsafe { version(snapshot.shallow_copy()) }; + let snapshot_version = unsafe { version(snapshot2.shallow_copy()) }; assert_eq!(snapshot_version, 1); unsafe { free_snapshot(snapshot) } @@ -1097,4 +1161,172 @@ mod tests { unsafe { free_engine(engine) } Ok(()) } + + #[tokio::test] + async fn test_snapshot_with_old_snapshot() -> Result<(), Box> { + let storage = Arc::new(InMemory::new()); + // Create initial commit (version 0) + add_commit( + storage.as_ref(), + 0, + actions_to_string(vec![TestAction::Metadata]), + ) + .await?; + let engine = DefaultEngineBuilder::new(storage.clone()).build(); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + let path = "memory:///"; + + // Create initial snapshot at version 0 + let old_snapshot = unsafe { + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::None, + )) + }; + let old_version = unsafe { version(old_snapshot.shallow_copy()) }; + assert_eq!(old_version, 0); + + // Add more commits to the table (version 1 and 2) + add_commit( + storage.as_ref(), + 1, + actions_to_string(vec![TestAction::Add("file1.parquet".into())]), + ) + .await?; + add_commit( + storage.as_ref(), + 2, + actions_to_string(vec![TestAction::Add("file2.parquet".into())]), + ) + .await?; + + // Create new snapshot using old snapshot for optimization (latest version) + let new_snapshot = unsafe { + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::Some(old_snapshot.shallow_copy()), + OptionalValue::None, + )) + }; + let new_version = unsafe { version(new_snapshot.shallow_copy()) }; + assert_eq!(new_version, 2); + + // Create snapshot at specific version using old snapshot for optimization + let snapshot_at_v1 = unsafe { + ok_or_panic(snapshot( + OptionalValue::Some(kernel_string_slice!(path)), + engine.shallow_copy(), + OptionalValue::Some(old_snapshot.shallow_copy()), + OptionalValue::Some(1), + )) + }; + let v1_version = unsafe { version(snapshot_at_v1.shallow_copy()) }; + assert_eq!(v1_version, 1); + + unsafe { free_snapshot(old_snapshot) } + unsafe { free_snapshot(new_snapshot) } + unsafe { free_snapshot(snapshot_at_v1) } + unsafe { free_engine(engine) } + Ok(()) + } + + #[cfg(feature = "catalog-managed")] + #[tokio::test] + async fn test_snapshot_with_log_tail_and_old_snapshot() -> Result<(), Box> { + use test_utils::add_staged_commit; + let storage = Arc::new(InMemory::new()); + + // Create initial commit (version 0) + add_commit( + storage.as_ref(), + 0, + actions_to_string(vec![TestAction::Metadata]), + ) + .await?; + let engine = DefaultEngineBuilder::new(storage.clone()).build(); + let engine = engine_to_handle(Arc::new(engine), allocate_err); + let path = "memory:///"; + + // Create initial snapshot at version 0 + let old_snapshot = unsafe { + ok_or_panic(snapshot( + kernel_string_slice!(path), + engine.shallow_copy(), + OptionalValue::None, + OptionalValue::None, + )) + }; + let old_version = unsafe { version(old_snapshot.shallow_copy()) }; + assert_eq!(old_version, 0); + + // Add staged commit (version 1) + let commit1 = add_staged_commit( + storage.as_ref(), + 1, + actions_to_string(vec![TestAction::Add("path1.parquet".into())]), + ) + .await?; + + // Add another staged commit (version 2) + let commit2 = add_staged_commit( + storage.as_ref(), + 2, + actions_to_string(vec![TestAction::Add("path2.parquet".into())]), + ) + .await?; + + // Build log tail with both commits + let commit1_path = format!( + "{}_delta_log/_staged_commits/{}", + path, + commit1.filename().unwrap() + ); + let commit2_path = format!( + "{}_delta_log/_staged_commits/{}", + path, + commit2.filename().unwrap() + ); + let log_path1 = log_path::FfiLogPath::new(kernel_string_slice!(commit1_path), 123456789, 100); + let log_path2 = log_path::FfiLogPath::new(kernel_string_slice!(commit2_path), 123456790, 101); + let log_tail = [log_path1, log_path2]; + let log_tail_array = log_path::LogPathArray { + ptr: log_tail.as_ptr(), + len: log_tail.len(), + }; + + // Create new snapshot using old snapshot for optimization with log tail + let new_snapshot = unsafe { + ok_or_panic(snapshot_with_log_tail( + kernel_string_slice!(path), + engine.shallow_copy(), + OptionalValue::Some(old_snapshot.shallow_copy()), + OptionalValue::None, + OptionalValue::Some(log_tail_array.clone()), + )) + }; + let new_version = unsafe { version(new_snapshot.shallow_copy()) }; + assert_eq!(new_version, 2); + + // Create snapshot at specific version using old snapshot with log tail + let snapshot_at_v1 = unsafe { + ok_or_panic(snapshot_with_log_tail( + kernel_string_slice!(path), + engine.shallow_copy(), + OptionalValue::Some(old_snapshot.shallow_copy()), + OptionalValue::Some(1), + OptionalValue::Some(log_tail_array), + )) + }; + let v1_version = unsafe { version(snapshot_at_v1.shallow_copy()) }; + assert_eq!(v1_version, 1); + + unsafe { free_snapshot(old_snapshot) } + unsafe { free_snapshot(new_snapshot) } + unsafe { free_snapshot(snapshot_at_v1) } + unsafe { free_engine(engine) } + Ok(()) + } }