-
Notifications
You must be signed in to change notification settings - Fork 81
feat: Add log segment constructor for timestamp to version conversion #895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #895 +/- ##
==========================================
- Coverage 84.68% 84.66% -0.02%
==========================================
Files 92 92
Lines 23025 23074 +49
Branches 23025 23074 +49
==========================================
+ Hits 19499 19536 +37
- Misses 2564 2574 +10
- Partials 962 964 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
74c1ee6
to
deb1026
Compare
kernel/src/log_segment.rs
Outdated
log_root: Url, | ||
end_version: Option<Version>, | ||
) -> DeltaResult<Self> { | ||
let ascending_commit_files: Vec<_> = list_log_files(storage, &log_root, None, end_version)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Listing without a hint can be really painfully slow on a big table. But guess that's unavoidable, since the resulting log segment is supposed to cover the entire reachable history of the table? That said, an optimization in delta-spark lets callers limit the amount of history they want to retrieve. Then, use either an existing snapshot or the _last_checkpoint hint to get a guess of where the end of the table is, and start listing from max(0, hint.version - limit)
. It might produce a few extra versions if the hint was stale, but is generally quite effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aha interesting! The LogHistoryManager is supposed to take a Snapshot anyway (for TableConfig), so I can totally get that hint.version
. I'll try to incorporate limit 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use limit and require the end_version.
kernel/src/log_segment.rs
Outdated
let mut contiguous_commits = ascending_commit_files | ||
.into_iter() | ||
// Start from the last commit | ||
.rev() | ||
.fold_while(vec![], |mut out, commit| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be simpler to process a non-materialized listing in arrival order:
let ascending_commit_files = list_log_files(storage, &log_root, None, end_version)?
.filter_ok(ParsedLogPath::is_commit);
let mut contiguous_commits = vec![];
for commit in ascending_commit_files {
// Should we actually blow up on error? Or just skip it and let the next commit
// treat it like a gap, if it was trying to be a commit file? (if it was not trying
// to be a commit file, we probably don't even care?)
let commit = commit?;
if contiguous_commits
.last()
.is_some_and(|prev_commit| prev_commit.version != commit.version - 1)
{
// We found a gap, so throw away all earlier versions
contiguous_commits.clear();
}
contiguous_commits.push(commit);
}
// If we have a non-empty commit list and a requested end version, verify they match.
//
// NOTE: No need to check for an empty commit list, `LogSegment::try_new` fails in that case.
match contiguous_commits.last().zip(end_version) {
Some((last_commit, end_version)) if last_commit.version != end_version => {
// warning or error?
}
_ => (),
}
LogSegment::try_new(contiguous_commits, vec![], log_root, end_version)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Should we actually blow up on error? Or just skip it and let the next commit // treat it like a gap, if it was trying to be a commit file? (if it was not trying // to be a commit file, we probably don't even care?)
This is a place where better error types would make these decisions easier 😔 I'll take a look and see what kinds of errors we're returning there and see if we can ignore some.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For parsed file names, specifically, we have four cases:
- The file is a valid commit (happy path)
- It's a valid file type, but not a commit (filter it out and move on)
- The file is just plain invalid (unrecognized type, version can't parse, etc). Not relevant to commit gap searches, we should ignore it.
- The file attempts to be a commit, but is invalid (e.g. version couldn't parse). This one is slightly tricky, but we should probably treat it like a gap. Nobody will read it as a commit later, but they may still be able to read whatever came after it so we should keep going.
deb1026
to
52fab21
Compare
c651ec6
to
a9a9e51
Compare
a9a9e51
to
280e2ff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few comments but generally looking good!
kernel/src/log_segment.rs
Outdated
/// Constructs a [`LogSegment`] to be used for timestamp conversion. This [`LogSegment`] will consist | ||
/// only of contiguous commit files. If an `end_version` is specified, the commit range will | ||
/// include commits up to the `end_version` commit (inclusive). If present, `limit` specifies the | ||
/// maximum length of the returned LogSegment. | ||
pub(crate) fn for_timestamp_conversion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should make this specific like for_timestamp_conversion
or make slightly more general to just be a LogSegment::new_continuous_commits(...)
I realize we have the existing pattern of for_snapshot or for_table_changes, but I think moving more towards less specific constructors may be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The distinction is that table_changes requires that the LogSegment contains [start_version,end_version].
On the other hand, timestamp conversion usually cares about an end version, and a maximum log segment size (limit). It's not necessary for the logsegment to have everything in [(end_version-limit), end_version].
kernel/src/log_segment.rs
Outdated
storage: &dyn StorageHandler, | ||
log_root: Url, | ||
end_version: Version, | ||
limit: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why usize
? should we just do u64
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logically, the limit is the "max size of the log segment". I'm also using it as the default length of VecDeque, which demands a usize instead of u64. I also compare limit against the length down belowcontiguous_commits.len() > limit
, and this also demands a usize.
usize works better more often in this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should we consider limit: Option<NonZero<usize>>
? A limit of zero is nonsensical and would anyway break the log segment constructor. Downside is, NonZero
is kind of a pain to use in practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm trying NonZero
out and it's a bit annoying. But given that it is an internal api, I think it's fine to use for that extra safety.
kernel/src/log_segment.rs
Outdated
/// Constructs a [`LogSegment`] to be used for timestamp conversion. This [`LogSegment`] will consist | ||
/// only of contiguous commit files. If an `end_version` is specified, the commit range will | ||
/// include commits up to the `end_version` commit (inclusive). If present, `limit` specifies the | ||
/// maximum length of the returned LogSegment. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does limit
reduce the length we 'look back'?
is the log segment always one of:
[0, latest]
no limit, no end(latest-limit, latest]
limit, no end(end-limit, end]
limit, end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh just realized end_version
isn't optional - should it be? or we just think the caller will be the one to always decide 'latest'? would that necessitate another file listing that we could avoid if we push the Optional version down here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When doing timestamp conversion, the end version will be fixed. This is because you always need to know the P&M at that end version.
We may do further listings back, in which case we also have a fixed end version.
Ex:
- Create LogHistoryManager for snapshot at version x.
- List back 1000 versions from x.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To give better intuition:
(end-limit, end] is the largest possible LogSegment that can be produced.
But we may get a shorter one if the commit at (end-limit) is not present, or (end-limit +1), etc.
kernel/src/log_segment.rs
Outdated
let Ok(limit) = TryInto::<u64>::try_into(limit) else { | ||
return Err(Error::generic( | ||
"Failed to convert limit into u64 | ||
when building log segment in timestamp conversion", | ||
)); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just make it u64
to start? (related comment above)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that end_version
is Version
, we should use that for start_from
as well, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently start_from is an Option<Version>
.
kernel/src/log_segment.rs
Outdated
// List the commits greater than or equal to `start_from`. For large tables, listing with | ||
// `start_from` can be a significant speedup over listing _all_ the files in the log. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move the 'listing from start_from
recommendation to above (for docs on limit) where callers will interact with this
kernel/src/log_segment.rs
Outdated
// If limit is not specified, then use 10 as a default. Typically, there are 10 commits per | ||
// checkpoint, so start with that. | ||
let mut contiguous_commits: VecDeque<ParsedLogPath> = | ||
VecDeque::with_capacity(limit.unwrap_or(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessarily true (but we actually have this same const elsewhere in the code). We should probably either: (1) consolidate these consts to a central place where we can make our best guess at a useful number or (2) don't guess and remove entirely (just relying on the normal reallocations that Vec(Deque)s use to grow)
related comment: #936 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool. Set it to the default if no limit is defined.
kernel/src/log_segment.rs
Outdated
// If the number of commits exceeds the limit, remove the earliest one. | ||
if limit.is_some_and(|limit| contiguous_commits.len() > limit) { | ||
contiguous_commits.pop_front(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so limit specifies when to start the listing but also how many we will store at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this any more, now that the upper bound is required?
Under what circumstances could we end up with more commit files than the limit allows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Because listing start/end bounds are inclusive, we could end up with limit+1 commits.
Seems like we should just fix that off-by-one error when computing start_from
?
Then we could also use a normal Vec
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
59ed19b
to
fc8cb58
Compare
kernel/src/log_segment.rs
Outdated
let mut contiguous_commits: VecDeque<ParsedLogPath> = match limit { | ||
Some(limit) => VecDeque::with_capacity(limit), | ||
None => VecDeque::new(), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
let mut contiguous_commits: VecDeque<ParsedLogPath> = match limit { | |
Some(limit) => VecDeque::with_capacity(limit), | |
None => VecDeque::new(), | |
}; | |
let mut contiguous_commits: VecDeque<ParsedLogPath> = | |
limit.map_or_else(VecDeque::new, VecDeque::with_capacity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice 👌
kernel/src/log_segment.rs
Outdated
let start_from = limit | ||
.map(|limit| { | ||
let Ok(limit) = TryInto::<u64>::try_into(limit) else { | ||
return Err(Error::generic( | ||
"Failed to convert limit into u64 | ||
when building log segment in timestamp conversion", | ||
)); | ||
}; | ||
match (limit.cmp(&end_version)) { | ||
Ordering::Less | Ordering::Equal => Ok(end_version - limit), | ||
Ordering::Greater => Ok(0), | ||
} | ||
}) | ||
.transpose()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let start_from = limit | |
.map(|limit| { | |
let Ok(limit) = TryInto::<u64>::try_into(limit) else { | |
return Err(Error::generic( | |
"Failed to convert limit into u64 | |
when building log segment in timestamp conversion", | |
)); | |
}; | |
match (limit.cmp(&end_version)) { | |
Ordering::Less | Ordering::Equal => Ok(end_version - limit), | |
Ordering::Greater => Ok(0), | |
} | |
}) | |
.transpose()?; | |
let start_from = limit | |
.map(|limit| match Version::try_from(limit) { | |
Ok(limit) if limit > 0 => Ok(Version::saturating_sub(end_version, limit - 1)), | |
_ => Err(Error::generic(format!( | |
"Invalid limit {limit} when building log segment in timestamp conversion", | |
))), | |
}) | |
.transpose()?; |
(fixes an off-by-one error as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't familiar with all the sub varieties. Ty!
kernel/src/log_segment.rs
Outdated
let Ok(limit) = TryInto::<u64>::try_into(limit) else { | ||
return Err(Error::generic( | ||
"Failed to convert limit into u64 | ||
when building log segment in timestamp conversion", | ||
)); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that end_version
is Version
, we should use that for start_from
as well, no?
kernel/src/log_segment.rs
Outdated
// If the number of commits exceeds the limit, remove the earliest one. | ||
if limit.is_some_and(|limit| contiguous_commits.len() > limit) { | ||
contiguous_commits.pop_front(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this any more, now that the upper bound is required?
Under what circumstances could we end up with more commit files than the limit allows?
kernel/src/log_segment.rs
Outdated
// If the number of commits exceeds the limit, remove the earliest one. | ||
if limit.is_some_and(|limit| contiguous_commits.len() > limit) { | ||
contiguous_commits.pop_front(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Because listing start/end bounds are inclusive, we could end up with limit+1 commits.
Seems like we should just fix that off-by-one error when computing start_from
?
Then we could also use a normal Vec
here.
kernel/src/log_segment.rs
Outdated
storage: &dyn StorageHandler, | ||
log_root: Url, | ||
end_version: Version, | ||
limit: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should we consider limit: Option<NonZero<usize>>
? A limit of zero is nonsensical and would anyway break the log segment constructor. Downside is, NonZero
is kind of a pain to use in practice.
2b58cb3
to
5a64d76
Compare
5a64d76
to
a4318bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with one nit
|
||
for commit_res in list_log_files(storage, &log_root, start_from, Some(end_version))? { | ||
let commit = match commit_res { | ||
Ok(file) if file.is_commit() => file, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to get rid of the is_commit
method, so we don't have multiple ways of doing the same thing. So maybe:
Ok(file) if file.is_commit() => file, | |
Ok(file) if matches!(file.file_type, LogPathFileType::Commit) => file, |
What changes are proposed in this pull request?
This is a stacked PR. Please look at the LAST commit of each PR for the files changed.
This PR adds a LogSegment constructor that will be used in timestamp to version conversion. The function
LogSegment::for_timestamp_conversion
creates a LogSegment made entirely of commits up to an optional end version.LogSegment::for_timestamp_conversion
goes back to the earliest possible commit while maintaining contiguity of the LogSegment.How was this change tested?
This checks that we construct a log segment in the following cases: