Skip to content

Commit

Permalink
use chunk_by instead of new iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Jan 13, 2025
1 parent ad85ff7 commit 059c2c3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 166 deletions.
35 changes: 19 additions & 16 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! files.
use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::log_segment::delta_log_group_iterator::DeltaLogGroupingIterator;
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
Expand All @@ -16,7 +15,6 @@ use std::sync::{Arc, LazyLock};
use tracing::warn;
use url::Url;

mod delta_log_group_iterator;
#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -314,22 +312,27 @@ fn list_log_files_with_version(
let mut checkpoint_parts = vec![];
let mut max_checkpoint_version = start_version;

let log_iterator = DeltaLogGroupingIterator::new(list_log_files(
fs_client,
log_root,
start_version,
end_version,
)?);
let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;

for (version, files) in log_iterator {
let mut new_checkpoint_parts = Vec::new();

files.into_iter().for_each(|file| match file {
f if f.is_commit() => commit_files.push(f),
f if f.is_checkpoint() => new_checkpoint_parts.push(f),
_ => {}
});
for (version, files) in &log_files
.filter_map(|res| match res {
Ok(path) => Some(path),
Err(e) => {
warn!("Error processing path: {:?}", e);
None
}
})
.chunk_by(|path| path.version)
{
let mut new_checkpoint_parts = vec![];

for file in files {
if file.is_commit() {
commit_files.push(file);
} else if file.is_checkpoint() {
new_checkpoint_parts.push(file);
}
}
if validate_checkpoint_parts(version, &new_checkpoint_parts)
&& (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version)
{
Expand Down
85 changes: 0 additions & 85 deletions kernel/src/log_segment/delta_log_group_iterator.rs

This file was deleted.

66 changes: 1 addition & 65 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use url::Url;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::log_segment::{list_log_files, DeltaLogGroupingIterator, LogSegment};
use crate::log_segment::LogSegment;
use crate::snapshot::CheckpointMetadata;
use crate::{FileSystemClient, Table};
use test_utils::delta_path_for_version;
Expand Down Expand Up @@ -107,70 +107,6 @@ fn build_log_with_paths_and_checkpoint(
(Box::new(client), log_root)
}

#[test]
fn test_delta_log_group_iterator() {
// Test that the DeltaLogGroupingIterator groups log files by version correctly
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_multipart_checkpoint(1, 1, 3),
delta_path_for_version(1, "json"),
delta_path_for_multipart_checkpoint(1, 2, 3),
delta_path_for_version(2, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_multipart_checkpoint(3, 1, 3),
delta_path_for_multipart_checkpoint(3, 2, 3),
delta_path_for_version(4, "json"),
],
None,
);

let log_files: Vec<_> = list_log_files(client.as_ref(), &log_root, None, None)
.unwrap()
.collect();

let mut iterator = DeltaLogGroupingIterator::new(log_files.into_iter());

if let Some((version, files)) = iterator.next() {
assert_eq!(version, 1, "Expected version 1 but got {}", version);
assert_eq!(files.len(), 3);
assert!(files.iter().all(|file| file.version == 1));
} else {
panic!("Expected group for version 1, but none was found");
}

if let Some((version, files)) = iterator.next() {
assert_eq!(version, 2, "Expected version 2 but got {}", version);
assert_eq!(files.len(), 2);
assert!(files.iter().all(|file| file.version == 2));
} else {
panic!("Expected group for version 2, but none was found");
}

if let Some((version, files)) = iterator.next() {
assert_eq!(version, 3, "Expected version 3 but got {}", version);
assert_eq!(files.len(), 2);
assert!(files.iter().all(|file| file.version == 3));
} else {
panic!("Expected group for version 3, but none was found");
}

if let Some((version, files)) = iterator.next() {
assert_eq!(version, 4, "Expected version 4 but got {}", version);
assert_eq!(files.len(), 1);
assert!(files.iter().all(|file| file.version == 4));
} else {
panic!("Expected group for version 4, but none was found");
}

// Verify that there are no more groups after version 4
if let Some((version, _)) = iterator.next() {
panic!(
"Expected no more groups, but found group for version {}",
version
);
}
}

#[test]
fn build_snapshot_with_out_of_date_last_checkpoint() {
let checkpoint_metadata = CheckpointMetadata {
Expand Down

0 comments on commit 059c2c3

Please sign in to comment.