Skip to content

Commit 9dd3367

Browse files
committed
propogate error and add test
1 parent 39f34d3 commit 9dd3367

File tree

2 files changed

+67
-31
lines changed

2 files changed

+67
-31
lines changed

kernel/src/log_segment.rs

+37-31
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::utils::require;
99
use crate::{
1010
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileSystemClient, Version,
1111
};
12-
use itertools::Itertools;
12+
use itertools::{process_results, Itertools};
1313
use std::convert::identity;
1414
use std::sync::{Arc, LazyLock};
1515
use tracing::warn;
@@ -310,36 +310,25 @@ fn list_log_files_with_version(
310310
// on config at some point
311311
let mut commit_files = Vec::with_capacity(10);
312312
let mut checkpoint_parts = vec![];
313-
let mut max_checkpoint_version = start_version;
314313

315314
let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;
316315

317-
for (version, files) in &log_files
318-
.filter_map(|res| match res {
319-
Ok(path) => Some(path),
320-
Err(e) => {
321-
warn!("Error processing path: {:?}", e);
322-
None
316+
process_results(log_files, |iter| {
317+
let log_files = iter.chunk_by(move |x| x.version);
318+
for (version, files) in &log_files {
319+
let mut new_checkpoint_parts = vec![];
320+
for file in files {
321+
if file.is_commit() {
322+
commit_files.push(file);
323+
} else if file.is_checkpoint() {
324+
new_checkpoint_parts.push(file);
325+
}
323326
}
324-
})
325-
.chunk_by(|path| path.version)
326-
{
327-
let mut new_checkpoint_parts = vec![];
328-
329-
for file in files {
330-
if file.is_commit() {
331-
commit_files.push(file);
332-
} else if file.is_checkpoint() {
333-
new_checkpoint_parts.push(file);
327+
if validate_checkpoint_parts(version, &new_checkpoint_parts) {
328+
checkpoint_parts = new_checkpoint_parts;
334329
}
335330
}
336-
if validate_checkpoint_parts(version, &new_checkpoint_parts)
337-
&& (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version)
338-
{
339-
max_checkpoint_version = Some(version);
340-
checkpoint_parts = new_checkpoint_parts;
341-
}
342-
}
331+
})?;
343332

344333
Ok((commit_files, checkpoint_parts))
345334
}
@@ -385,10 +374,6 @@ fn list_log_files_with_checkpoint(
385374
/// Validates that all the checkpoint parts belong to the same checkpoint version and that all parts
386375
/// are present. Returns `true` if we have a complete checkpoint, `false` otherwise.
387376
fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -> bool {
388-
if checkpoint_parts.is_empty() {
389-
return false;
390-
}
391-
392377
match checkpoint_parts.last().map(|file| &file.file_type) {
393378
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => {
394379
if *num_parts as usize != checkpoint_parts.len() {
@@ -411,8 +396,29 @@ fn validate_checkpoint_parts(version: u64, checkpoint_parts: &[ParsedLogPath]) -
411396
return false;
412397
}
413398
}
414-
// TODO: Include UuidCheckpoint once we actually support v2 checkpoints
415-
_ => {}
399+
Some(LogPathFileType::UuidCheckpoint(_)) => {
400+
warn!(
401+
"Found a UUID checkpoint at version {} when it is not supported",
402+
version
403+
);
404+
return false;
405+
}
406+
Some(LogPathFileType::Commit) | Some(LogPathFileType::CompactedCommit { .. }) => {
407+
warn!(
408+
"Found a commit file at version {} when expecting a checkpoint",
409+
version
410+
);
411+
return false;
412+
}
413+
Some(LogPathFileType::Unknown) => {
414+
warn!(
415+
"Found an unknown file type at version {} when expecting a checkpoint",
416+
version
417+
);
418+
return false;
419+
}
420+
// No checkpoint parts
421+
None => return false,
416422
}
417423

418424
true

kernel/src/log_segment/tests.rs

+30
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,35 @@ fn build_log_with_paths_and_checkpoint(
107107
(Box::new(client), log_root)
108108
}
109109

110+
#[test]
111+
fn build_snapshot_with_unsupported_uuid_checkpoint() {
112+
let (client, log_root) = build_log_with_paths_and_checkpoint(
113+
&[
114+
delta_path_for_version(0, "json"),
115+
delta_path_for_version(1, "checkpoint.parquet"),
116+
delta_path_for_version(2, "json"),
117+
delta_path_for_version(3, "checkpoint.parquet"),
118+
delta_path_for_version(4, "json"),
119+
delta_path_for_version(5, "json"),
120+
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"),
121+
delta_path_for_version(6, "json"),
122+
delta_path_for_version(7, "json"),
123+
],
124+
None,
125+
);
126+
127+
let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
128+
let commit_files = log_segment.ascending_commit_files;
129+
let checkpoint_parts = log_segment.checkpoint_parts;
130+
131+
assert_eq!(checkpoint_parts.len(), 1);
132+
assert_eq!(checkpoint_parts[0].version, 3);
133+
134+
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
135+
let expected_versions = vec![4, 5, 6, 7];
136+
assert_eq!(versions, expected_versions);
137+
}
138+
110139
#[test]
111140
fn build_snapshot_with_out_of_date_last_checkpoint() {
112141
let checkpoint_metadata = CheckpointMetadata {
@@ -331,6 +360,7 @@ fn build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpo
331360
let checkpoint_parts = log_segment.checkpoint_parts;
332361

333362
assert_eq!(checkpoint_parts.len(), 1);
363+
assert_eq!(checkpoint_parts[0].version, 3);
334364

335365
let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
336366
let expected_versions = vec![4, 5, 6, 7];

0 commit comments

Comments
 (0)