-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Snapshot::try_new_from()
API
#549
base: main
Are you sure you want to change the base?
feat: Snapshot::try_new_from()
API
#549
Conversation
curious if anyone has naming thoughts! EDIT: landed on |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #549 +/- ##
==========================================
+ Coverage 84.64% 84.80% +0.15%
==========================================
Files 82 82
Lines 19735 20159 +424
Branches 19735 20159 +424
==========================================
+ Hits 16705 17096 +391
- Misses 2214 2218 +4
- Partials 816 845 +29 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A general question I have: What should a listing optimization even look like for a snapshot refresh? If the snapshot is not very old, then we should just LIST to find new commit .json after the end of the current segment, and not even try to find new checkpoints. Quick, easy.
Also, the "append new deltas" approach is friendly to the "partial P&M query" optimization, which is only applicable if we have a contiguous chain of commits back to the previous snapshot version -- a newer checkpoint would actually force us to do the full P&M query all over, which for a large checkpoint could be annoying.
On the other hand, if there is a newer checkpoint available, then data skipping will be more efficient if we use it (fewer jsons to replay serially and keep track of). This is especially true if a lot of versions have landed since the original snapshot was taken.
Problem is, there's no way to know in advance whether the snapshot is "stale" because it's by number of versions that land, not elapsed time.
Complicated stuff...
kernel/src/snapshot.rs
Outdated
existing_snapshot: &Snapshot, | ||
engine: &dyn Engine, | ||
version: Option<Version>, | ||
) -> DeltaResult<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the method should take+return Arc<Snapshot>
so we have the option to return the same snapshot if we determine it is still fresh?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe even do
pub fn refresh(self: &Arc<Self>, ...) -> DeltaResult<Arc<Self>>
(this would have slightly different intuition than new_from
-- refresh
specifically assumes I want a newer snapshot, if available, and attempting to request an older version
may not even be legal; I'm not sure if it would even make sense to pass an upper bound version for a refresh operation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've modified to take + return Arc<Snapshot>
but i've avoided calling it refresh
since that feels to me like implying mutability? I'm in favor of new_from
since that's saying you get a new snapshot but just 'from' an older one. let me know if you agree with that thinking!
kernel/src/table_changes/mod.rs
Outdated
@@ -90,7 +90,7 @@ impl TableChanges { | |||
// supported for every protocol action in the CDF range. | |||
let start_snapshot = | |||
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; | |||
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; | |||
let end_snapshot = Snapshot::new_from(&start_snapshot, engine, end_version)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This opens an interesting question... if we knew that new_from
would reuse the log checkpoint and just "append" any new commit .json files to the log segment, then we could almost (**) reuse that log segment for the CDF replay by just stripping out its checkpoint files? But that's pretty CDF specific; in the normal case we want a refresh to use the newest checkpoint available because it makes data skipping log replay cheaper. Maybe the CDF case needs a completely different way of creating the end_snapshot
, unrelated to this optimization here.
(**) Almost, because the start version might have a checkpoint, in which case stripping the checkpoint out of the log segment would also remove the start version. But then again, do we actually want the older snapshot to be the start version? Or the previous version which the start version is making changes to? Or, maybe we should just restrict the checkpoint search to versions before the start version, specifically so that this optimization can work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we actually want the older snapshot to be the start version?
It would be sufficient to have the older snapshot be start_version-1
as long as we also have access to the commit at start_version
. With these, we would start P&M at start_version
then continue it on the older snapshot if we don't find anything.
I guess this would look like: snapshot(start_version-1).refresh_with_commits(end_version)
After all, the goal of the start_snapshot is just to ensure that CDF is enabled.
@@ -71,6 +72,26 @@ impl Snapshot { | |||
Self::try_new_from_log_segment(table_root, log_segment, engine) | |||
} | |||
|
|||
/// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you | |||
/// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the | |||
/// snapshot to a later version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, is this api only for versions later than the existing snapshot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep for now proposing that we allow old snapshot but just return a new snapshot (no incrementalization) maybe warn!
in that case? or i suppose we could disallow that..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any valid scenario where a caller could legitimately pass a newer snapshot than the one they're asking for? I guess time travel? But if they know they're time traveling why would they pass a newer snapshot in the first place?
Either way, we should publicly document whether a too-new starting snapshot is an error or merely a useless hint, so callers don't have to wonder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of any optimization that's available (let alone useful) if the caller passes in a new snapshot as the hint.
If that's true, then the question is: do we prohibit this behavior or just let it degenerate to the usual try_new the client should have done anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would vote for returning an error in that case. It's unlikely the engine meant to get into that situation, so let's let them know they are doing something wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to be an error now! i agree :)
Snapshot::new_from()
APISnapshot::new_from()
API
309f1ad
to
5480711
Compare
@scovich for now (after brief chat with @roeap) i propose doing a simple heuristic based on the presence of a checkpoint and we can take on further optimization in the future. The heuristic is:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I beleive we may need to merge LogSegment
s in the case when no checkpint is contained in the incremental log slice.
@@ -71,6 +72,26 @@ impl Snapshot { | |||
Self::try_new_from_log_segment(table_root, log_segment, engine) | |||
} | |||
|
|||
/// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you | |||
/// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the | |||
/// snapshot to a later version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any valid scenario where a caller could legitimately pass a newer snapshot than the one they're asking for? I guess time travel? But if they know they're time traveling why would they pass a newer snapshot in the first place?
Either way, we should publicly document whether a too-new starting snapshot is an error or merely a useless hint, so callers don't have to wonder.
kernel/src/snapshot.rs
Outdated
Some(v) if v < existing_snapshot.version() => { | ||
Self::try_new(existing_snapshot.table_root().clone(), engine, version).map(Arc::new) | ||
} | ||
Some(v) if v == existing_snapshot.version() => Ok(existing_snapshot.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny nit: I'd put this one first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I wonder if a match is really that helpful here, especially given that LogSegment::for_versions
needs to handle the no-change case?
let old_version = existing_snapshot.version();
if let Some(new_version) = version {
if new_version == old_version {
// Re-requesting the same version
return Ok(existing_snapshot.clone());
}
if new_version > old_version {
// Hint is too new, just create a new snapshot the normal way
return Self.:try_new(...).map(Arc::new);
}
}
// Check for new commits
let (mut new_ascending_commit_files, checkpoint_parts) =
list_log_files_with_version(fs_client, &log_root, Some(start_version), end_version)?;
if new_ascending_commit_files.is_empty() {
// No new commits, just return the same snapshot
return Ok(existing_snapshot.clone());
}
if !checkpoint_parts.is_empty() {
// We found a checkpoint, so just create a new snapshot the normal way
return Self::try_new(...).map(Arc::new);
}
// Append the new commits to the existing LogSegment
let checkpoint_parts = existing_snapshot.log_segment.checkpoint_parts.clone();
let mut ascending_commit_files = existing_snapshot.log_segment.ascending_commit_files.clone();
ascending_commit_files.extend(new_ascending_commit_files);
let new_log_segment = LogSegment::try_new(
ascending_commit_files,
checkpoint_parts,
log_root,
version,
);
Avoids the indirection and complexity of building a suffix log segment... but then we don't have an easy way to do the incremental P&M :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with this some and refactored. critically, I'm still leveraging a LogSegment, but we have a new Error::EmptyLogSegment that we can specifically check for. I like the idea of (1) still using LogSegment and (2) having this error capture the empty case without having to modify semantics of LogSegment. BUT i dislike having to introduce a new pub Error
variant. I didn't do the leg work to have a private error here - wanted to gather some feedback on overall approach first
kernel/src/log_segment.rs
Outdated
} | ||
} | ||
let (mut ascending_commit_files, checkpoint_parts) = | ||
list_log_files_with_version(fs_client, &log_root, Some(start_version), end_version)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the table has not changed? I somehow doubt LogSegment::try_new
would like the empty file listing that results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New EmptyLogSegment
error that we can explicitly leverage (instead of having to change LogSegment semantics). I just dislike it being pub
... see other comment above.
kernel/src/snapshot.rs
Outdated
let new_log_segment = LogSegment::for_versions( | ||
fs_client.as_ref(), | ||
log_root, | ||
existing_snapshot.version() + 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we incrementally update frequently, we would never see any new checkpoints because those always land with some delay after the commit. Should we LIST from the existing snapshot's checkpoint version + 1 in order to detect and take advantage of new checkpoints that landed in the existing log segment? It would be pretty easy to construct the new log segment in that case, just take the existing log segment's checkpoint and the listing's commit files.
We could also get fancy and only look for checkpoints in some recent window, so that we only pick up a new checkpoint if the one we know about is too many commits behind?
We could still do the same incremental P&M either way, it just takes a bit more care handling the commit lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this definitely sounds reasonable, though i'm inclined to do something 'simple'(ish) here (as long as we aren't blocking such an optimization) and track this as a follow-up?
Snapshot::new_from()
APISnapshot::try_new_from()
API
@@ -71,6 +72,26 @@ impl Snapshot { | |||
Self::try_new_from_log_segment(table_root, log_segment, engine) | |||
} | |||
|
|||
/// Create a new [`Snapshot`] instance from an existing [`Snapshot`]. This is useful when you | |||
/// already have a [`Snapshot`] lying around and want to do the minimal work to 'update' the | |||
/// snapshot to a later version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would vote for returning an error in that case. It's unlikely the engine meant to get into that situation, so let's let them know they are doing something wrong
kernel/src/log_segment.rs
Outdated
// Commit file versions must be greater than the most recent checkpoint version if it exists | ||
if let Some(checkpoint_file) = checkpoint_parts.first() { | ||
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only call site did this right before try_new: we should probably just bake it in here and then when we use this in Snapshot::try_new_from
we get the guarantee
// interesting cases for testing Snapshot::new_from: | ||
// 1. new version < existing version | ||
// 2. new version == existing version | ||
// 3. new version > existing version AND | ||
// a. log segment hasn't changed | ||
// b. log segment for old..=new version has a checkpoint (with new protocol/metadata) | ||
// b. log segment for old..=new version has no checkpoint | ||
// i. commits have (new protocol, new metadata) | ||
// ii. commits have (new protocol, no metadata) | ||
// iii. commits have (no protocol, new metadata) | ||
// iv. commits have (no protocol, no metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to roughly outline the tests here, I think we have a bit of work to do to make testing easier but instead of invest a lot of time here i'm proposing we ensure we have coverage but then take a follow-up to clean up testing (share code, etc.) just so we can introduce this API and not have it linger for too much longer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking really good, just one question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! thanks
kernel/src/snapshot.rs
Outdated
/// We implement a simple heuristic: | ||
/// 1. if the new version == existing version, just return the existing snapshot | ||
/// 2. if the new version < existing version, error: there is no optimization to do here | ||
/// 3. list from (existing snapshot version + 1) onward |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescuing #549 (comment) from github oblivion...
If we incrementally update frequently, we would never see any new checkpoints because those always land with some delay after the commit.
With the current code arrangement, I believe it would be quite simple to reliably pick up new checkpoints:
- LIST from existing checkpoint version + 1 instead of existing snapshot version + 1
- If we find a checkpoint, the existing new-snapshot code should Just Work (thanks to https://github.com/delta-io/delta-kernel-rs/pull/549/files#r2013073514)
- Otherwise, we just need to retain commit files above the existing snapshot version before continuing
The difference in file counts returned by the LIST should be small enough that the overall cost is still dominated by the network round trip.
NOTE: In its DeltaLog::update
method, delta-spark has always listed from existing checkpoint version in its method, and lacks the incremental P&M optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea sounds good - this ended up being a reasonable change. In the case we don't have a checkpoint I list from the end of existing snapshot commits (like we used to). Note that it means
- we check for commit files being the same as well as being empty (same = case of list from checkpoint, empty = list from end of commits and there are no new commits)
- I introduced a simple
checkpoint_version
API forLogSegment
(could just leverage simplecheckpoint_parts.first().version
etc. but this seemed cleaner/generally useful)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(need to add some more tests though)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. We should probably just address https://github.com/delta-io/delta-kernel-rs/pull/549/files#r2018928167 before merging, since it should be very few LoC change?
// NB: we need to check both checkpoints and commits since we filter commits at and below | ||
// the checkpoint version. Example: if we have a checkpoint + commit at version 1, the log | ||
// listing above will only return the checkpoint and not the commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this (still) true? What code does the filtering? I thought the filtering happens in the call to LogSegment::try_new
, which didn't happen yet? Or does the log listing also filter and the log segment filtering is just a safety check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup it looks like in list_log_files_with_version
we have:
// [snip]
checkpoint_parts = complete_checkpoint;
commit_files.clear(); // Log replay only uses commits after a complete checkpoint
// [snip]
and again in LogSegment::try_new()
(which I moved around a little but was there in the control flow anyways):
// Commit file versions must be greater than the most recent checkpoint version if it exists
let checkpoint_version = checkpoint_parts.first().map(|checkpoint_file| {
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
checkpoint_file.version
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i made an issue for my todo comment, but this seems related, adding a bit there: #778
My github wasn't properly resolving that link but I think this refers to that comment above on list_from(last checkpoint version)? That ended up being a reasonable change and maybe just snuck in before you had refreshed for your last review! |
This PR enables incremental snapshot updates. This is done with a new
Snapshot::try_new_from(...)
which takes anArc<Snapshot>
and an optional version (None = latest version) to incrementally create a new snapshot from the existing one. The heuristic is as follows:LogSegment
)a. if new checkpoint is found: just create a new snapshot from that checkpoint (and commits after it)
b. if no new checkpoint is found: do lightweight P+M replay on the latest commits
In addition to the 'main'
Snapshot::try_new_from()
API, the following incremental APIs were introduced to support the above implementation:TableConfiguration::try_new_from(...)
LogSegment::read_metadata()
intoLogSegment::read_metadata()
andLogSegment::protocol_and_metadata()
LogSegment::has_checkpoint
helperresolves #489