Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ mod tests {
add_commit(
storage.as_ref(),
0,
actions_to_string(vec![TestAction::Metadata]),
actions_to_string(vec![TestAction::AllMetadata]),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
Expand Down Expand Up @@ -1091,7 +1091,7 @@ mod tests {
add_commit(
storage.as_ref(),
0,
actions_to_string_partitioned(vec![TestAction::Metadata]),
actions_to_string_partitioned(vec![TestAction::AllMetadata]),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
Expand Down Expand Up @@ -1127,7 +1127,7 @@ mod tests {
add_commit(
storage.as_ref(),
0,
actions_to_string(vec![TestAction::Metadata]),
actions_to_string(vec![TestAction::AllMetadata]),
)
.await?;
let engine = DefaultEngineBuilder::new(storage.clone()).build();
Expand Down
4 changes: 2 additions & 2 deletions ffi/src/table_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ mod tests {
storage,
version,
actions_to_string_with_metadata(
vec![TestAction::Metadata, TestAction::Add(file)],
vec![TestAction::AllMetadata, TestAction::Add(file)],
METADATA,
),
)
Expand All @@ -430,7 +430,7 @@ mod tests {
storage,
version,
actions_to_string_with_metadata(
vec![TestAction::Metadata, TestAction::Remove(file)],
vec![TestAction::AllMetadata, TestAction::Remove(file)],
METADATA,
),
)
Expand Down
1 change: 1 addition & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ walkdir = { version = "2.5.0" }
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation
paste = "1.0"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
rstest = "0.26.1"
tempfile = "3"
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
Expand Down
67 changes: 60 additions & 7 deletions kernel/tests/log_tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

use object_store::memory::InMemory;
use object_store::path::Path;
use rstest::rstest;
use url::Url;

use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::{DefaultEngine, DefaultEngineBuilder};
use delta_kernel::{FileMeta, LogPath, Snapshot};

use test_utils::{
actions_to_string, add_commit, add_staged_commit, delta_path_for_version, TestAction,
actions_to_string, add_commit, add_crc, add_staged_commit, delta_path_for_version, TestAction,
};

/// Helper function to create a LogPath for a commit at the given version
Expand Down Expand Up @@ -47,7 +48,7 @@
// _delta_log/_staged_commits/1.uuid.json
// _delta_log/_staged_commits/1.uuid.json // add an unused staged commit at version 1
// _delta_log/_staged_commits/2.uuid.json
let actions = vec![TestAction::Metadata];
let actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?;
let path1 = add_staged_commit(storage.as_ref(), 1, String::from("{}")).await?;
let _ = add_staged_commit(storage.as_ref(), 1, String::from("{}")).await?;
Expand Down Expand Up @@ -158,7 +159,7 @@
// _delta_log/0.json
// _delta_log/1.json
// _delta_log/2.json
let actions = vec![TestAction::Metadata];
let actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?;
Expand All @@ -184,7 +185,7 @@
let (storage, engine, table_root) = setup_test();

// Create commits 0, 1, 2 in storage
let actions = vec![TestAction::Metadata];
let actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?;
Expand Down Expand Up @@ -215,7 +216,7 @@
let (storage, engine, table_root) = setup_test();

// commits 0, 1, 2 in storage
let actions = vec![TestAction::Metadata];
let actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?;
Expand Down Expand Up @@ -257,7 +258,7 @@
let (storage, engine, table_root) = setup_test();

// commits 0, 1, 2, 3, 4 in storage
let actions = vec![TestAction::Metadata];
let actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?;
Expand Down Expand Up @@ -292,7 +293,7 @@
let (storage, engine, table_root) = setup_test();

// create commits 0, 1, 2, 3, 4 in storage
let actions = vec![TestAction::Metadata];
let actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(actions)).await?;
let actions = vec![TestAction::Add("file_1.parquet".to_string())];
add_commit(storage.as_ref(), 1, actions_to_string(actions)).await?;
Expand Down Expand Up @@ -324,3 +325,55 @@

Ok(())
}

#[rstest]
#[case::crc_unpublished_latest(10)]
#[case::crc_unpublished_not_latest(7)]
#[case::crc_published_not_latest(5)]
#[tokio::test]
async fn snapshot_with_staged_commits_log_tail_with_crc(
#[case] crc_target_version: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let (storage, engine, table_root) = setup_test();

// Create published commits 0.json in storage
let metadata_actions = vec![TestAction::AllMetadata];
add_commit(storage.as_ref(), 0, actions_to_string(metadata_actions)).await?;

// Create published commits 1 to 5 in storage
for i in 1..=5 {
let add_actions = vec![TestAction::Add(format!("file_{}.parquet", i))];
add_commit(storage.as_ref(), i, actions_to_string(add_actions)).await?;
}

// Create the checksum (CRC) file
let crc_actions = vec![TestAction::Protocol, TestAction::Metadata];
add_crc(
storage.as_ref(),
crc_target_version,
actions_to_string(crc_actions),
)
.await?;

// Create staged commits 6 to 10 in storage, and then add to log_tail
let mut log_tail = Vec::new();
for i in 6..=10 {
let add_actions = vec![TestAction::Add(format!("file_{}.parquet", i))];
let path = add_staged_commit(storage.as_ref(), i, actions_to_string(add_actions)).await?;
log_tail.push(create_log_path(&table_root, path));
}

// Load our snapshot
let snapshot = Snapshot::builder_for(table_root.clone())
.with_log_tail(log_tail)

Check warning on line 368 in kernel/tests/log_tail.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/delta-kernel-rs/delta-kernel-rs/kernel/tests/log_tail.rs
.build(engine.as_ref())?;


assert_eq!(snapshot.version(), 10);
assert_eq!(
snapshot.log_segment().latest_crc_file.as_ref().map(|p| p.version),
Some(crc_target_version)
);

Ok(())
}
18 changes: 9 additions & 9 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
TestAction::Add(PARQUET_FILE2.to_string()),
]),
Expand Down Expand Up @@ -88,7 +88,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
]),
)
Expand Down Expand Up @@ -140,7 +140,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
]),
)
Expand Down Expand Up @@ -188,10 +188,10 @@
fn generate_commit2(actions: Vec<TestAction>) -> String {
actions
.into_iter()
.map(|test_action| match test_action {

Check failure on line 191 in kernel/tests/read.rs

View workflow job for this annotation

GitHub Actions / msrv-run-tests

non-exhaustive patterns: `TestAction::Protocol` and `TestAction::Metadata` not covered

Check failure on line 191 in kernel/tests/read.rs

View workflow job for this annotation

GitHub Actions / coverage

non-exhaustive patterns: `TestAction::Protocol` and `TestAction::Metadata` not covered

Check failure on line 191 in kernel/tests/read.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

non-exhaustive patterns: `test_utils::TestAction::Protocol` and `test_utils::TestAction::Metadata` not covered

Check failure on line 191 in kernel/tests/read.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

non-exhaustive patterns: `test_utils::TestAction::Protocol` and `test_utils::TestAction::Metadata` not covered

Check failure on line 191 in kernel/tests/read.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

non-exhaustive patterns: `TestAction::Protocol` and `TestAction::Metadata` not covered
TestAction::Add(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 5}},\"maxValues\":{{\"id\":7}}}}"}}}}"#, action = "add", path = path),
TestAction::Remove(path) => format!(r#"{{"{action}":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#, action = "remove", path = path),
TestAction::Metadata => METADATA.into(),
TestAction::AllMetadata => METADATA.into(),
})
.fold(String::new(), |a, b| a + &b + "\n")
}
Expand All @@ -207,7 +207,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
]),
)
Expand Down Expand Up @@ -1335,7 +1335,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
TestAction::Add(PARQUET_FILE2.to_string()),
TestAction::Add(PARQUET_FILE3.to_string()),
Expand Down Expand Up @@ -1430,7 +1430,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
TestAction::Add(PARQUET_FILE2.to_string()),
]),
Expand Down Expand Up @@ -1544,7 +1544,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
]),
)
Expand Down Expand Up @@ -1603,7 +1603,7 @@
storage.as_ref(),
0,
actions_to_string(vec![
TestAction::Metadata,
TestAction::AllMetadata,
TestAction::Add(PARQUET_FILE1.to_string()),
TestAction::Add(PARQUET_FILE2.to_string()),
]),
Expand Down
30 changes: 29 additions & 1 deletion test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,20 @@ pub const METADATA_WITH_PARTITION_COLS: &str = r#"{"commitInfo":{"timestamp":158
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["val"],"configuration":{},"createdTime":1587968585495}}"#;

/// Just the protocol action
pub const PROTOCOL_ACTION: &str = r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#;

/// Just the metadata action
pub const METADATA_ACTION: &str = r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#;

pub enum TestAction {
Add(String),
Remove(String),
/// Includes commitInfo, protocol, and metadata actions (full table initialization)
AllMetadata,
/// Just the protocol action
Protocol,
/// Just the metadata action
Metadata,
}

Expand All @@ -113,7 +124,9 @@ pub fn actions_to_string_with_metadata(actions: Vec<TestAction>, metadata: &str)
.map(|test_action| match test_action {
TestAction::Add(path) => format!(r#"{{"add":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":2,\"nullCount\":{{\"id\":0}},\"minValues\":{{\"id\": 1}},\"maxValues\":{{\"id\":3}}}}"}}}}"#),
TestAction::Remove(path) => format!(r#"{{"remove":{{"path":"{path}","partitionValues":{{}},"size":262,"modificationTime":1587968586000,"dataChange":true}}}}"#),
TestAction::Metadata => metadata.into(),
TestAction::AllMetadata => metadata.into(),
TestAction::Protocol => PROTOCOL_ACTION.into(),
TestAction::Metadata => METADATA_ACTION.into(),
})
.join("\n")
}
Expand Down Expand Up @@ -198,6 +211,11 @@ pub fn staged_commit_path_for_version(version: u64) -> Path {
Path::from(path.as_str())
}

pub fn crc_path_for_version(version: u64) -> Path {
let path = format!("_delta_log/{version:020}.crc");
Path::from(path.as_str())
}

/// get an ObjectStore path for a compressed log file, based on the start/end versions
pub fn compacted_log_path_for_versions(start_version: u64, end_version: u64, suffix: &str) -> Path {
let path = format!("_delta_log/{start_version:020}.{end_version:020}.compacted.{suffix}");
Expand Down Expand Up @@ -225,6 +243,16 @@ pub async fn add_staged_commit(
Ok(path)
}

pub async fn add_crc(
store: &dyn ObjectStore,
version: u64,
data: String,
) -> Result<Path, Box<dyn std::error::Error>> {
let path = crc_path_for_version(version);
store.put(&path, data.into()).await?;
Ok(path)
}

/// Try to convert an `EngineData` into a `RecordBatch`. Panics if not using `ArrowEngineData` from
/// the default module
pub fn into_record_batch(engine_data: Box<dyn EngineData>) -> RecordBatch {
Expand Down
Loading