Skip to content

feat: Snapshot::try_new_from() API #549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f8bc074
new Snapshot::new_from() API
zachschuermann Nov 27, 2024
3d37288
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 18, 2025
5480711
incremental snapshot update, log segment, table config
zachschuermann Mar 18, 2025
2686fb3
incremental log segment
zachschuermann Mar 20, 2025
0daf1e9
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 20, 2025
692c9d5
tests
zachschuermann Mar 21, 2025
f18e380
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 21, 2025
8d3357b
refactor
zachschuermann Mar 21, 2025
3bf3d67
nits
zachschuermann Mar 21, 2025
453db1b
not pleased with these tests
zachschuermann Mar 25, 2025
ce31322
fix
zachschuermann Mar 25, 2025
f1578fb
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 25, 2025
ee61b75
docs
zachschuermann Mar 25, 2025
fea0f76
incremental table config test
zachschuermann Mar 26, 2025
81f61ae
comment
zachschuermann Mar 26, 2025
7b0bd1c
fix to include old checkpoint parts
zachschuermann Mar 26, 2025
691b23a
Merge branch 'main' into snapshot-from-snapshot
zachschuermann Mar 26, 2025
27c4a95
Merge branch 'main' into snapshot-from-snapshot
zachschuermann Mar 27, 2025
66e44d2
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Mar 28, 2025
46ab944
do list_from last checkpoint version
zachschuermann Mar 28, 2025
5a582d6
quick nit
zachschuermann Mar 28, 2025
592667b
few nits
zachschuermann Apr 4, 2025
75b6178
fix
zachschuermann Apr 4, 2025
5cdde76
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Apr 4, 2025
a94ddef
clippy
zachschuermann Apr 4, 2025
e8e327d
redo the 'no new commits' check
zachschuermann Apr 7, 2025
d5bcb67
match instead of if chain
zachschuermann Apr 7, 2025
be88b88
nits and use tableconfig::try_new in try_new_from
zachschuermann Apr 7, 2025
d396953
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Apr 7, 2025
6e49bc2
fix merging main
zachschuermann Apr 7, 2025
032d72b
fix deref
zachschuermann Apr 7, 2025
cb06927
Merge remote-tracking branch 'upstream/main' into snapshot-from-snapshot
zachschuermann Apr 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,36 @@ impl LogSegment {
)
}

pub(crate) fn for_versions(
fs_client: &dyn FileSystemClient,
log_root: Url,
start_version: Version,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<Self> {
let end_version = end_version.into();
if let Some(end_version) = end_version {
if start_version > end_version {
return Err(Error::generic(
"Failed to build LogSegment: start_version cannot be greater than end_version",
));
}
}
let (mut ascending_commit_files, checkpoint_parts) =
list_log_files_with_version(fs_client, &log_root, Some(start_version), end_version)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the table has not changed? I somehow doubt LogSegment::try_new would like the empty file listing that results?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New EmptyLogSegment error that we can explicitly leverage (instead of having to change LogSegment semantics). I just dislike it being pub... see other comment above.


// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

LogSegment::try_new(
ascending_commit_files,
checkpoint_parts,
log_root,
end_version,
)
}

/// Constructs a [`LogSegment`] to be used for `TableChanges`. For a TableChanges between versions
/// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits
/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
Expand Down Expand Up @@ -186,6 +216,7 @@ impl LogSegment {
);
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
}

/// Read a stream of log data from this log segment.
///
/// The log files will be read from most recent to oldest.
Expand Down Expand Up @@ -360,8 +391,12 @@ impl LogSegment {
)?))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
// Do a lightweight protocol+metadata log replay to find the latest Protocol and Metadata in
// the LogSegment
pub(crate) fn protocol_and_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<(Option<Metadata>, Option<Protocol>)> {
let data_batches = self.replay_for_metadata(engine)?;
let (mut metadata_opt, mut protocol_opt) = (None, None);
for batch in data_batches {
Expand All @@ -377,7 +412,12 @@ impl LogSegment {
break;
}
}
match (metadata_opt, protocol_opt) {
Ok((metadata_opt, protocol_opt))
}

// Get the most up-to-date Protocol and Metadata actions
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
match self.protocol_and_metadata(engine)? {
(Some(m), Some(p)) => Ok((m, p)),
(None, Some(_)) => Err(Error::MissingMetadata),
(Some(_), None) => Err(Error::MissingProtocol),
Expand All @@ -401,6 +441,11 @@ impl LogSegment {
// read the same protocol and metadata schema for both commits and checkpoints
self.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
}

/// Return whether or not the LogSegment contains a checkpoint.
pub(crate) fn has_checkpoint(&self) -> bool {
!self.checkpoint_parts.is_empty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
!self.checkpoint_parts.is_empty()
self.checkpoint_version.is_some()

Copy link
Collaborator

@scovich scovich Mar 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(or, just do that at the one call site, instead of defining a helper at all)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea realized probably not necessary, though I've kept the 'not is_empty()` since it's a vec

}
}

/// Returns a fallible iterator of [`ParsedLogPath`] that are between the provided `start_version` (inclusive)
Expand Down Expand Up @@ -430,6 +475,7 @@ fn list_log_files(
Err(_) => true,
}))
}

/// List all commit and checkpoint files with versions above the provided `start_version` (inclusive).
/// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type
/// `(Vec<ParsedLogPath>, Vec<ParsedLogPath>)`. The commit files are guaranteed to be sorted in
Expand Down
94 changes: 93 additions & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,21 @@ impl std::fmt::Debug for Snapshot {
}

impl Snapshot {
fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self {
Self {
log_segment,
table_configuration,
}
}

/// Create a new [`Snapshot`] instance for the given version.
///
/// # Parameters
///
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located)
/// - `engine`: Implementation of [`Engine`] apis.
/// - `version`: target version of the [`Snapshot`]
/// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest
/// version of the table.
pub fn try_new(
table_root: Url,
engine: &dyn Engine,
Expand All @@ -67,6 +75,70 @@ impl Snapshot {
Self::try_new_from_log_segment(table_root, log_segment, engine)
}

/// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you
/// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the
/// snapshot to a later version.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, is this api only for versions later than the existing snapshot?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep for now proposing that we allow old snapshot but just return a new snapshot (no incrementalization) maybe warn! in that case? or i suppose we could disallow that..?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any valid scenario where a caller could legitimately pass a newer snapshot than the one they're asking for? I guess time travel? But if they know they're time traveling why would they pass a newer snapshot in the first place?

Either way, we should publicly document whether a too-new starting snapshot is an error or merely a useless hint, so callers don't have to wonder.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of any optimization that's available (let alone useful) if the caller passes in a new snapshot as the hint.

If that's true, then the question is: do we prohibit this behavior or just let it degenerate to the usual try_new the client should have done anyways?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vote for returning an error in that case. It's unlikely the engine meant to get into that situation, so let's let them know they are doing something wrong

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to be an error now! i agree :)

///
/// # Parameters
///
/// - `existing_snapshot`: reference to an existing [`Snapshot`]
/// - `engine`: Implementation of [`Engine`] apis.
/// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest
/// version of the table.
pub fn new_from(
existing_snapshot: Arc<Snapshot>,
engine: &dyn Engine,
version: Option<Version>,
) -> DeltaResult<Arc<Self>> {
// simple heuristic for now:
// 1. if the new version < existing version, just return an entirely new snapshot
// 2. if the new version == existing version, just return the existing snapshot
// 3. list from existing snapshot version
// 4a. if new checkpoint is found: just create a new snapshot from that checkpoint (and
// commits after it)
// 4b. if no new checkpoint is found: do lightweight P+M replay on the latest commits
match version {
Some(v) if v < existing_snapshot.version() => {
Self::try_new(existing_snapshot.table_root().clone(), engine, version).map(Arc::new)
}
Some(v) if v == existing_snapshot.version() => Ok(existing_snapshot.clone()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: I'd put this one first?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I wonder if a match is really that helpful here, especially given that LogSegment::for_versions needs to handle the no-change case?

let old_version = existing_snapshot.version();
if let Some(new_version) = version {
    if new_version == old_version {
        // Re-requesting the same version
        return Ok(existing_snapshot.clone());
    }
    if new_version > old_version {
        // Hint is too new, just create a new snapshot the normal way
        return Self.:try_new(...).map(Arc::new);
    }
}
    
// Check for new commits
let (mut new_ascending_commit_files, checkpoint_parts) =
    list_log_files_with_version(fs_client, &log_root, Some(start_version), end_version)?;

if new_ascending_commit_files.is_empty() {
    // No new commits, just return the same snapshot
    return Ok(existing_snapshot.clone());
}

if !checkpoint_parts.is_empty() {
    // We found a checkpoint, so just create a new snapshot the normal way
    return Self::try_new(...).map(Arc::new);
}    

// Append the new commits to the existing LogSegment
let checkpoint_parts = existing_snapshot.log_segment.checkpoint_parts.clone();
let mut ascending_commit_files = existing_snapshot.log_segment.ascending_commit_files.clone();
ascending_commit_files.extend(new_ascending_commit_files);
let new_log_segment = LogSegment::try_new(
    ascending_commit_files, 
    checkpoint_parts, 
    log_root,
    version,
);

Avoids the indirection and complexity of building a suffix log segment... but then we don't have an easy way to do the incremental P&M :(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this some and refactored. critically, I'm still leveraging a LogSegment, but we have a new Error::EmptyLogSegment that we can specifically check for. I like the idea of (1) still using LogSegment and (2) having this error capture the empty case without having to modify semantics of LogSegment. BUT i dislike having to introduce a new pub Error variant. I didn't do the leg work to have a private error here - wanted to gather some feedback on overall approach first

new_version => {
debug!(
"new version: {new_version:?}, existing version: {}",
existing_snapshot.version()
);
let log_root = existing_snapshot.log_segment.log_root.clone();
let fs_client = engine.get_file_system_client();

// create a log segment just from existing_snapshot.version -> new_version
let log_segment = LogSegment::for_versions(
fs_client.as_ref(),
log_root,
existing_snapshot.version(),
new_version,
)?;

if log_segment.has_checkpoint() {
Self::try_new_from_log_segment(
existing_snapshot.table_root().clone(),
log_segment,
engine,
)
.map(Arc::new)
} else {
let (new_metadata, new_protocol) = log_segment.protocol_and_metadata(engine)?;
let table_configuration = TableConfiguration::new_from(
existing_snapshot.table_configuration(),
new_metadata,
new_protocol,
log_segment.end_version,
)?;
Ok(Arc::new(Snapshot::new(log_segment, table_configuration)))
}
}
}
}

/// Create a new [`Snapshot`] instance.
pub(crate) fn try_new_from_log_segment(
location: Url,
Expand Down Expand Up @@ -241,6 +313,26 @@ mod tests {
assert_eq!(snapshot.schema(), &expected);
}

// TODO(zach)
#[test]
fn test_snapshot_new_from() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();

let engine = SyncEngine::new();
let old_snapshot = Arc::new(Snapshot::try_new(url, &engine, Some(0)).unwrap());
let snapshot = Snapshot::new_from(old_snapshot, &engine, Some(0)).unwrap();

let expected =
Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap();
assert_eq!(snapshot.protocol(), &expected);

let schema_string = r#"{"type":"struct","fields":[{"name":"value","type":"integer","nullable":true,"metadata":{}}]}"#;
let expected: StructType = serde_json::from_str(schema_string).unwrap();
assert_eq!(snapshot.schema(), &expected);
}

#[test]
fn test_read_table_with_last_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
11 changes: 7 additions & 4 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
pub struct TableChanges {
pub(crate) log_segment: LogSegment,
table_root: Url,
end_snapshot: Snapshot,
end_snapshot: Arc<Snapshot>,
start_version: Version,
schema: Schema,
}
Expand Down Expand Up @@ -149,9 +149,12 @@ impl TableChanges {
// Both snapshots ensure that reading is supported at the start and end version using
// `ensure_read_supported`. Note that we must still verify that reading is
// supported for every protocol action in the CDF range.
let start_snapshot =
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;
let start_snapshot = Arc::new(Snapshot::try_new(
table_root.as_url().clone(),
engine,
Some(start_version),
)?);
let end_snapshot = Snapshot::new_from(start_snapshot.clone(), engine, end_version)?;

// Verify CDF is enabled at the beginning and end of the interval using
// [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is
Expand Down
54 changes: 53 additions & 1 deletion kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{DeltaResult, Error, Version};
/// `try_new` successfully returns `TableConfiguration`, it is also guaranteed that reading the
/// table is supported.
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct TableConfiguration {
metadata: Metadata,
protocol: Protocol,
Expand Down Expand Up @@ -89,6 +89,58 @@ impl TableConfiguration {
})
}

pub(crate) fn new_from(
table_configuration: &Self,
new_metadata: Option<Metadata>,
new_protocol: Option<Protocol>,
new_version: Version,
) -> DeltaResult<Self> {
// simplest case: no new P/M, just return the existing table configuration with new version
if new_metadata.is_none() && new_protocol.is_none() {
return Ok(Self {
version: new_version,
..(*table_configuration).clone()
});
}

// if there's new metadata: have to parse schema, table properties
let (metadata, schema, table_properties) = match new_metadata {
Some(metadata) => {
let schema = Arc::new(metadata.parse_schema()?);
let table_properties = metadata.parse_table_properties();
(metadata, schema, table_properties)
}
None => (
table_configuration.metadata.clone(),
table_configuration.schema.clone(),
table_configuration.table_properties.clone(),
),
};

// if there's new protocol: have to ensure read suported
let protocol = match new_protocol {
Some(protocol) => {
protocol.ensure_read_supported()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we're "supporting" some table features (like check constraints) only if not actually used... I think we also need to ensure_read_supported if metadata changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually our ensure_read_supported only depends on the protocol right now it looks like. We have TableConfiguration.ensure_write_supported which is a function of Protocol and Metadata but we don't yet have that for 'read supported' - I wonder if we should go ahead and introduce that abstraction and for now just pass through to protocol.ensure_read_supported?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, because column constraints are only a writer feature... do we not have any reader-writer features whose validity checks depend on metadata? Seems like column mapping needs to validate the schema annotations for example?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that I can tell: even for col mapping we just check that the feature is enabled to say that 'reads are supported' then, I think if there is incorrect schema annotations it would fail downstream.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that validation happens in the TableConfiguration constructor? At least, that's where we originally planned to put it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep you're right - my mistake, lol i already included that in the new code (just forgot about it oops)

in both TableConfiguration::try_new and try_new_from (new code) we do a protocol.ensure_read_supported and a validate_schema_column_mapping - I wonder if we could do better here by modifying the constructor so that we can (1) have someone upstream do the parsing leg work and (2) leverage the constructor directly in try_new_from?

protocol
}
None => table_configuration.protocol.clone(),
};

// if either change, have to validate column mapping mode
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
validate_schema_column_mapping(&schema, column_mapping_mode)?;

Ok(Self {
schema,
metadata,
protocol,
table_properties,
column_mapping_mode,
table_root: table_configuration.table_root.clone(),
version: new_version,
})
}

/// The [`Metadata`] for this table at this version.
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn metadata(&self) -> &Metadata {
Expand Down
Loading