diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index ed088b5792..6b78d13387 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -970,7 +970,7 @@ mod tests { add_commit( storage.as_ref(), 0, - actions_to_string(vec![TestAction::Metadata]), + actions_to_string(vec![TestAction::AllMetadata]), ) .await?; let engine = DefaultEngineBuilder::new(storage.clone()).build(); @@ -1091,7 +1091,7 @@ mod tests { add_commit( storage.as_ref(), 0, - actions_to_string_partitioned(vec![TestAction::Metadata]), + actions_to_string_partitioned(vec![TestAction::AllMetadata]), ) .await?; let engine = DefaultEngineBuilder::new(storage.clone()).build(); @@ -1127,7 +1127,7 @@ mod tests { add_commit( storage.as_ref(), 0, - actions_to_string(vec![TestAction::Metadata]), + actions_to_string(vec![TestAction::AllMetadata]), ) .await?; let engine = DefaultEngineBuilder::new(storage.clone()).build(); diff --git a/ffi/src/table_changes.rs b/ffi/src/table_changes.rs index 4f7d06be35..811bf942d1 100644 --- a/ffi/src/table_changes.rs +++ b/ffi/src/table_changes.rs @@ -414,7 +414,7 @@ mod tests { storage, version, actions_to_string_with_metadata( - vec![TestAction::Metadata, TestAction::Add(file)], + vec![TestAction::AllMetadata, TestAction::Add(file)], METADATA, ), ) @@ -430,7 +430,7 @@ mod tests { storage, version, actions_to_string_with_metadata( - vec![TestAction::Metadata, TestAction::Remove(file)], + vec![TestAction::AllMetadata, TestAction::Remove(file)], METADATA, ), ) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 567b9ca4f3..479633ff2a 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -142,6 +142,7 @@ walkdir = { version = "2.5.0" } async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation paste = "1.0" test-log = { version = "0.2", default-features = false, features = ["trace"] } +rstest = "0.26.1" tempfile = "3" tracing-subscriber = { version = "0.3", default-features = false, features = [ "env-filter", diff --git a/kernel/tests/log_tail.rs b/kernel/tests/log_tail.rs index abf893e7f6..e436811611 100644 --- a/kernel/tests/log_tail.rs +++ b/kernel/tests/log_tail.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use object_store::memory::InMemory; use object_store::path::Path; +use rstest::rstest; use url::Url; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; @@ -9,7 +10,7 @@ use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder}; use delta_kernel::{FileMeta, LogPath, Snapshot}; use test_utils::{ - actions_to_string, add_commit, add_staged_commit, delta_path_for_version, TestAction, + actions_to_string, add_commit, add_crc, add_staged_commit, delta_path_for_version, TestAction, }; /// Helper function to create a LogPath for a commit at the given version @@ -47,7 +48,7 @@ async fn basic_snapshot_with_log_tail_staged_commits() -> Result<(), Box Result<(), Box // _delta_log/0.json // _delta_log/1.json // _delta_log/2.json - let actions = vec![TestAction::Metadata]; + let actions = vec![TestAction::AllMetadata]; add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?; let actions = vec![TestAction::Add("file_1.parquet".to_string())]; add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?; @@ -184,7 +185,7 @@ async fn log_tail_behind_filesystem() -> Result<(), Box> let (storage, engine, table_root) = setup_test(); // Create commits 0, 1, 2 in storage - let actions = vec![TestAction::Metadata]; + let actions = vec![TestAction::AllMetadata]; add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?; let actions = vec![TestAction::Add("file_1.parquet".to_string())]; add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?; @@ -215,7 +216,7 @@ async fn incremental_snapshot_with_log_tail() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { + let (storage, engine, table_root) = setup_test(); + + // Create published commits 0.json in storage + let metadata_actions = vec![TestAction::AllMetadata]; + add_commit(storage.as_ref(), 0, actions_to_string(metadata_actions)).await?; + + // Create published commits 1 to 5 in storage + for i in 1..=5 { + let add_actions = vec![TestAction::Add(format!("file_{}.parquet", i))]; + add_commit(storage.as_ref(), i, actions_to_string(add_actions)).await?; + } + + // Create the checksum (CRC) file + let crc_actions = vec![TestAction::Protocol, TestAction::Metadata]; + add_crc( + storage.as_ref(), + crc_target_version, + actions_to_string(crc_actions), + ) + .await?; + + // Create staged commits 6 to 10 in storage, and then add to log_tail + let mut log_tail = Vec::new(); + for i in 6..=10 { + let add_actions = vec![TestAction::Add(format!("file_{}.parquet", i))]; + let path = add_staged_commit(storage.as_ref(), i, actions_to_string(add_actions)).await?; + log_tail.push(create_log_path(&table_root, path)); + } + + // Load our snapshot + let snapshot = Snapshot::builder_for(table_root.clone()) + .with_log_tail(log_tail) + .build(engine.as_ref())?; + + + assert_eq!(snapshot.version(), 10); + assert_eq!( + snapshot.log_segment().latest_crc_file.as_ref().map(|p| p.version), + Some(crc_target_version) + ); + + Ok(()) +} diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index ef1372713e..1ae3a083ff 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -42,7 +42,7 @@ async fn single_commit_two_add_files() -> Result<(), Box> storage.as_ref(), 0, actions_to_string(vec![ - TestAction::Metadata, + TestAction::AllMetadata, TestAction::Add(PARQUET_FILE1.to_string()), TestAction::Add(PARQUET_FILE2.to_string()), ]), @@ -88,7 +88,7 @@ async fn two_commits() -> Result<(), Box> { storage.as_ref(), 0, actions_to_string(vec![ - TestAction::Metadata, + TestAction::AllMetadata, TestAction::Add(PARQUET_FILE1.to_string()), ]), ) @@ -140,7 +140,7 @@ async fn remove_action() -> Result<(), Box> { storage.as_ref(), 0, actions_to_string(vec![ - TestAction::Metadata, + TestAction::AllMetadata, TestAction::Add(PARQUET_FILE1.to_string()), ]), ) @@ -191,7 +191,7 @@ async fn stats() -> Result<(), Box> { .map(|test_action| match test_action { TestAction::Add(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path), TestAction::Remove(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path), - TestAction::Metadata => METADATA.into(), + TestAction::AllMetadata => METADATA.into(), }) .fold(String::new(), |a, b| a + &b + "\n") } @@ -207,7 +207,7 @@ async fn stats() -> Result<(), Box> { storage.as_ref(), 0, actions_to_string(vec![ - TestAction::Metadata, + TestAction::AllMetadata, TestAction::Add(PARQUET_FILE1.to_string()), ]), ) @@ -1335,7 +1335,7 @@ async fn test_row_index_metadata_column() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box, metadata: &str) .map(|test_action| match test_action { TestAction::Add(path) => format!(r#"{{"add":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 1}},\"maxValues\":{{\"id\":3}}}}"}}}}"#), TestAction::Remove(path) => format!(r#"{{"remove":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#), - TestAction::Metadata => metadata.into(), + TestAction::AllMetadata => metadata.into(), + TestAction::Protocol => PROTOCOL_ACTION.into(), + TestAction::Metadata => METADATA_ACTION.into(), }) .join("\n") } @@ -198,6 +211,11 @@ pub fn staged_commit_path_for_version(version: u64) -> Path { Path::from(path.as_str()) } +pub fn crc_path_for_version(version: u64) -> Path { + let path = format!("_delta_log/{version:020}.crc"); + Path::from(path.as_str()) +} + /// get an ObjectStore path for a compressed log file, based on the start/end versions pub fn compacted_log_path_for_versions(start_version: u64, end_version: u64, suffix: &str) -> Path { let path = format!("_delta_log/{start_version:020}.{end_version:020}.compacted.{suffix}"); @@ -225,6 +243,16 @@ pub async fn add_staged_commit( Ok(path) } +pub async fn add_crc( + store: &dyn ObjectStore, + version: u64, + data: String, +) -> Result> { + let path = crc_path_for_version(version); + store.put(&path, data.into()).await?; + Ok(path) +} + /// Try to convert an `EngineData` into a `RecordBatch`. Panics if not using `ArrowEngineData` from /// the default module pub fn into_record_batch(engine_data: Box) -> RecordBatch {