-
Notifications
You must be signed in to change notification settings - Fork 75
[WIP] feat: scan from previous result #829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #829 +/- ##
==========================================
+ Coverage 85.07% 85.09% +0.01%
==========================================
Files 84 84
Lines 20797 20916 +119
Branches 20797 20916 +119
==========================================
+ Hits 17694 17798 +104
- Misses 2226 2234 +8
- Partials 877 884 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting approach. I think it should be relatively inexpensive because it's only shuffling columns around rather than rewriting data?
kernel/src/scan/mod.rs
Outdated
// back into shape as we read it from the log. Since it is already reconciled data, | ||
// we treat it as if it originated from a checkpoint. | ||
let transform = engine.evaluation_handler().new_expression_evaluator( | ||
Arc::new(scan_row_schema()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should scan_row_schema
return a SchemaRef
so we can create once and reuse it?
Expression::Struct(vec![Expression::Struct(vec![ | ||
column_expr!("path"), | ||
column_expr!("fileConstantValues.partitionValues"), | ||
column_expr!("size"), | ||
column_expr!("modificationTime"), | ||
column_expr!("stats"), | ||
column_expr!("deletionVector"), | ||
])]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since all of these are just column extracts, it should be pretty simple for an engine to rewire the corresponding columns without actually examining their bytes. I believe our default arrow evaluation would be cheap this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right. even if clones are needed it should be very cheap.
hint_version: Version, | ||
hint_data: impl IntoIterator<Item = Box<dyn EngineData>> + 'static, | ||
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>>>> { | ||
static RESTORED_ADD_SCHEMA: LazyLock<DataType> = LazyLock::new(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from the original schema? Is it just a subset of fields? Asking because:
- How do we keep them from diverging accidentally?
- If the original schema included unnecessary fields, should we be projecting those out in the original scan as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need all the fields we currently extract. we could not use the add schema due to nullability of the fields - IIRC - and the scan row schema is in the wrong order to match the indices in the dedup visitor.
We can reuse the deleteion vector schema at least though.
RESTORED_ADD_SCHEMA.clone(), | ||
); | ||
let apply_transform = | ||
move |data: Box<dyn EngineData>| Ok((transform.evaluate(data.as_ref())?, false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing scan equivalent to a checkpoint, because it already deduplicated everything, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my reasoning at least - the constraint being predicates, but we deferred that to the engine...
kernel/src/scan/mod.rs
Outdated
// If the current log segment contains a checkpoint newer than the hint version | ||
// we disregard the existing data hint, and perform a full scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. In the incremental snapshot API, a newer checkpoint matters because downstream use sites (like scan) could be quite expensive as the number of deltas grows -- even if the immediate incremental P&M is cheap. Here tho, we already paid the cost of a full scan previously, effectively giving us a checkpoint as-of the hint version, and there are no further "downstream" operations to worry about if we aggressively pursue incrementality in our scan reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I guess that ultimately doesn't matter. The log segment only has deltas after the checkpoint, so a checkpoint after the hint version blocks any hope of an incremental scan.
Maybe a quick code comment could be helpful?
kernel/src/scan/mod.rs
Outdated
let scan_iter = self.scan_metadata(engine)?; | ||
return Ok(Box::new(scan_iter)); | ||
}; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need ;
after an if let
block?
(surprised clippy/fmt didn't notice)
kernel/src/scan/mod.rs
Outdated
let mut ascending_commit_files = self.snapshot.log_segment().ascending_commit_files.clone(); | ||
ascending_commit_files.retain(|f| f.version > hint_version); | ||
let log_segment = LogSegment::try_new( | ||
ascending_commit_files, | ||
vec![], | ||
self.snapshot.log_segment().log_root.clone(), | ||
Some(self.snapshot.log_segment().end_version), | ||
)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pull out a local variable:
let log_segment = self.snapshot.log_segment();`
to simplify L522 above as well as this block here:
let mut ascending_commit_files = self.snapshot.log_segment().ascending_commit_files.clone(); | |
ascending_commit_files.retain(|f| f.version > hint_version); | |
let log_segment = LogSegment::try_new( | |
ascending_commit_files, | |
vec![], | |
self.snapshot.log_segment().log_root.clone(), | |
Some(self.snapshot.log_segment().end_version), | |
)?; | |
let mut ascending_commit_files = log_segment.ascending_commit_files.clone(); | |
ascending_commit_files.retain(|f| f.version > hint_version); | |
let log_segment = LogSegment::try_new( | |
ascending_commit_files, | |
vec![], | |
log_segment.log_root.clone(), | |
Some(log_segment.end_version), | |
)?; |
What changes are proposed in this pull request?
While integrating kernel in delta-rs, we ended up exposing quite a few internal functions to make state management and incrementally updating logs work. A hopefully cleaner approach might be to expose an API that allows engines to re-use existing scan results to facilitate scans.
see also: #825
Since I expect there might be some discussion around the if and when how this should land, I'm putting up an initial draft to gather some feedback before cleaning things up.
The basic idea is to transform existing scan metadata back into the shape expected by the visitors and treat it as if it originated from a checkpoint. To keep things simple, it is engines responsibility to provide data that does not conflict with the current scans predicate.
This PR affects the following public APIs
New
Scan::scan_metadata_from_exisiting
method that consumes aHow was this change tested?
additional unit tests for new APIs.