Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Snapshot::try_new_from() API #549

Merged
merged 32 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f8bc074
new Snapshot::new_from() API
zachschuermann Nov 27, 2024
3d37288
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 18, 2025
5480711
incremental snapshot update, log segment, table config
zachschuermann Mar 18, 2025
2686fb3
incremental log segment
zachschuermann Mar 20, 2025
0daf1e9
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 20, 2025
692c9d5
tests
zachschuermann Mar 21, 2025
f18e380
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 21, 2025
8d3357b
refactor
zachschuermann Mar 21, 2025
3bf3d67
nits
zachschuermann Mar 21, 2025
453db1b
not pleased with these tests
zachschuermann Mar 25, 2025
ce31322
fix
zachschuermann Mar 25, 2025
f1578fb
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 25, 2025
ee61b75
docs
zachschuermann Mar 25, 2025
fea0f76
incremental table config test
zachschuermann Mar 26, 2025
81f61ae
comment
zachschuermann Mar 26, 2025
7b0bd1c
fix to include old checkpoint parts
zachschuermann Mar 26, 2025
691b23a
Merge branch 'main' into snapshot-from-snapshot
zachschuermann Mar 26, 2025
27c4a95
Merge branch 'main' into snapshot-from-snapshot
zachschuermann Mar 27, 2025
66e44d2
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 28, 2025
46ab944
do list_from last checkpoint version
zachschuermann Mar 28, 2025
5a582d6
quick nit
zachschuermann Mar 28, 2025
592667b
few nits
zachschuermann Apr 4, 2025
75b6178
fix
zachschuermann Apr 4, 2025
5cdde76
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Apr 4, 2025
a94ddef
clippy
zachschuermann Apr 4, 2025
e8e327d
redo the 'no new commits' check
zachschuermann Apr 7, 2025
d5bcb67
match instead of if chain
zachschuermann Apr 7, 2025
be88b88
nits and use tableconfig::try_new in try_new_from
zachschuermann Apr 7, 2025
d396953
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Apr 7, 2025
6e49bc2
fix merging main
zachschuermann Apr 7, 2025
032d72b
fix deref
zachschuermann Apr 7, 2025
cb06927
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Apr 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod tests;
/// and in `TableChanges` when built with [`LogSegment::for_table_changes`].
///
/// [`Snapshot`]: crate::snapshot::Snapshot
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct LogSegment {
pub end_version: Version,
Expand All @@ -49,12 +49,17 @@ pub(crate) struct LogSegment {
}

impl LogSegment {
fn try_new(
ascending_commit_files: Vec<ParsedLogPath>,
pub(crate) fn try_new(
mut ascending_commit_files: Vec<ParsedLogPath>,
checkpoint_parts: Vec<ParsedLogPath>,
log_root: Url,
end_version: Option<Version>,
) -> DeltaResult<Self> {
// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only call site did this right before try_new: we should probably just bake it in here and then when we use this in Snapshot::try_new_from we get the guarantee


// We require that commits that are contiguous. In other words, there must be no gap between commit versions.
require!(
ascending_commit_files
Expand All @@ -81,22 +86,23 @@ impl LogSegment {
}

// Get the effective version from chosen files
let version_eff = ascending_commit_files
let effective_version = ascending_commit_files
.last()
.or(checkpoint_parts.first())
.ok_or(Error::generic("No files in log segment"))?
.version;
if let Some(end_version) = end_version {
require!(
version_eff == end_version,
effective_version == end_version,
Error::generic(format!(
"LogSegment end version {} not the same as the specified end version {}",
version_eff, end_version
effective_version, end_version
))
);
}

Ok(LogSegment {
end_version: version_eff,
end_version: effective_version,
log_root,
ascending_commit_files,
checkpoint_parts,
Expand All @@ -122,7 +128,7 @@ impl LogSegment {
) -> DeltaResult<Self> {
let time_travel_version = time_travel_version.into();

let (mut ascending_commit_files, checkpoint_parts) =
let (ascending_commit_files, checkpoint_parts) =
match (checkpoint_hint.into(), time_travel_version) {
(Some(cp), None) => {
list_log_files_with_checkpoint(&cp, fs_client, &log_root, None)?
Expand All @@ -133,11 +139,6 @@ impl LogSegment {
_ => list_log_files_with_version(fs_client, &log_root, None, time_travel_version)?,
};

// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

LogSegment::try_new(
ascending_commit_files,
checkpoint_parts,
Expand Down Expand Up @@ -362,8 +363,12 @@ impl LogSegment {
)?))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
// Do a lightweight protocol+metadata log replay to find the latest Protocol and Metadata in
// the LogSegment
pub(crate) fn protocol_and_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<(Option<Metadata>, Option<Protocol>)> {
let data_batches = self.replay_for_metadata(engine)?;
let (mut metadata_opt, mut protocol_opt) = (None, None);
for batch in data_batches {
Expand All @@ -379,7 +384,12 @@ impl LogSegment {
break;
}
}
match (metadata_opt, protocol_opt) {
Ok((metadata_opt, protocol_opt))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
match self.protocol_and_metadata(engine)? {
(Some(m), Some(p)) => Ok((m, p)),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
Expand All @@ -403,6 +413,11 @@ impl LogSegment {
// read the same protocol and metadata schema for both commits and checkpoints
self.read_actions(engine, schema.clone(), schema, META_PREDICATE.clone())
}

/// Return whether or not the LogSegment contains a checkpoint.
pub(crate) fn has_checkpoint(&self) -> bool {
!self.checkpoint_parts.is_empty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
!self.checkpoint_parts.is_empty()
self.checkpoint_version.is_some()

Copy link
Collaborator

@scovich scovich Mar 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or, just do that at the one call site, instead of defining a helper at all)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea realized probably not necessary, though I've kept the 'not is_empty()` since it's a vec

}
}

/// Returns a fallible iterator of [`ParsedLogPath`] that are between the provided `start_version` (inclusive)
Expand Down Expand Up @@ -432,12 +447,13 @@ fn list_log_files(
Err(_) => true,
}))
}

/// List all commit and checkpoint files with versions above the provided `start_version` (inclusive).
/// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type
/// `(Vec<ParsedLogPath>, Vec<ParsedLogPath>)`. The commit files are guaranteed to be sorted in
/// ascending order by version. The elements of `checkpoint_parts` are all the parts of the same
/// checkpoint. Checkpoint parts share the same version.
fn list_log_files_with_version(
pub(crate) fn list_log_files_with_version(
fs_client: &dyn FileSystemClient,
log_root: &Url,
start_version: Option<Version>,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ fn write_parquet_to_store(

/// Writes all actions to a _delta_log parquet checkpoint file in the store.
/// This function formats the provided filename into the _delta_log directory.
fn add_checkpoint_to_store(
pub(crate) fn add_checkpoint_to_store(
store: &Arc<InMemory>,
data: Box<dyn EngineData>,
filename: &str,
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const MULTIPART_PART_LEN: usize = 10;
/// The number of characters in the uuid part of a uuid checkpoint
const UUID_PART_LEN: usize = 36;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
enum LogPathFileType {
Expand All @@ -37,7 +37,7 @@ enum LogPathFileType {
Unknown,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ParsedLogPath<Location: AsUrl = FileMeta> {
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ pub(crate) mod test_utils {
use super::{state::ScanCallback, Transform};

// TODO(nick): Merge all copies of this into one "test utils" thing
fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
pub(crate) fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
let string_field = Arc::new(Field::new("a", DataType::Utf8, true));
let schema = Arc::new(ArrowSchema::new(vec![string_field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array)])
Expand Down
Loading
Loading