-
Notifications
You must be signed in to change notification settings - Fork 139
feat: read parsed-stats from checkpoint #1638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: read parsed-stats from checkpoint #1638
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1638 +/- ##
========================================
Coverage 84.65% 84.65%
========================================
Files 123 124 +1
Lines 34109 34363 +254
Branches 34109 34363 +254
========================================
+ Hits 28875 29091 +216
- Misses 3905 3939 +34
- Partials 1329 1333 +4 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
a73cb03 to
28e1054
Compare
e92ca69 to
ebc62cd
Compare
cea3c64 to
f840878
Compare
) ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/1635/files) to review incremental changes. - [**stack/propagate-nulls**](#1635) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1635/files)] - [stack/nullable-transform](#1636) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1636/files/b3b511b3b11aada328cf613b92bc851b3095efaf..0568a9b8b1ffaa6d6128fd05abff3f6820e1fd76)] - [stack/has_compatible_parsed_stats](#1638) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1638/files/0568a9b8b1ffaa6d6128fd05abff3f6820e1fd76..f840878ae0ca1db97d51512dca0ba7dff1b8371e)] - [stack/read-parsed-stats](#1639) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1639/files/f840878ae0ca1db97d51512dca0ba7dff1b8371e..f0e4680cab79fb7d6878cda709fc55119449d680)] --------- ## What changes are proposed in this pull request? Change apply_schema to propagate top-level struct nulls to child columns instead of erroring - Remove the error check for top-level nulls in apply_schema - Document that child columns are expected to already have nulls propagated (Arrow's JSON reader does this automatically, and parquet data goes through fix_nested_null_masks) - Add comprehensive test case test_apply_schema_handles_top_level_null ## How was this change tested? Edited unit tests Added unit test to show new behavior
f840878 to
a9b9b11
Compare
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/1636/files) to review incremental changes. - [**stack/nullable-transform**](#1636) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1636/files)] - [stack/has_compatible_parsed_stats](#1638) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1638/files/16e428c2fe7a256de0ddb852a11475d7ea131769..a9b9b115b2dd7289447b94bf7646872ee049fec6)] - [stack/read-parsed-stats](#1639) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1639/files/a9b9b115b2dd7289447b94bf7646872ee049fec6..dff4b57e26b9819a18b29d85d2860283c8ad04c0)] --------- ## What changes are proposed in this pull request? This PR consolidates duplicated NullableStatsTransform and NullCountStatsTransform implementations into a single shared location. <!-- **Uncomment** this section if there are any changes affecting public APIs. Else, **delete** this section. ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested?
62144e8 to
377105a
Compare
| /// Information about checkpoint reading for data skipping optimization. | ||
| /// | ||
| /// Returned alongside the actions iterator from checkpoint reading functions. | ||
| #[derive(Debug, Clone)] | ||
| pub(crate) struct CheckpointReadInfo { | ||
| /// Whether the checkpoint has compatible pre-parsed stats for data skipping. | ||
| /// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON. | ||
| #[allow(unused)] | ||
| pub has_stats_parsed: bool, | ||
| /// The schema used to read checkpoint files, potentially including stats_parsed. | ||
| #[allow(unused)] | ||
| pub checkpoint_read_schema: SchemaRef, | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we really want to store has_stats_parsed seperately here, guess we can add a method to compute it? e.g. sth like
pub fn has_stats_parsed(&self) -> bool {
self.checkpoint_read_schema
.field("add")
.and_then(|add| add.data_type().as_struct())
.is_some_and(|s| s.field("stats_parsed").is_some())
}
Feel like if we store it seperately, when we update checkpoint_read_schema somehow, we need to remember to update has_stats_parsed as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leave that as a followup if we think necessary. I'd rather not recompute it in the DataSkipping module, seems like a waste if we already know if we have stats_parsed here. It is a valid concern though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense! SGTM on this then.
| Some(true) => { | ||
| // Hint says V2 checkpoint, extract sidecars | ||
| let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?; | ||
| // For V2, read first sidecar's schema | ||
| // For V2, read first sidecar's schema if sidecars exist, | ||
| // otherwise fall back to hint schema (for empty V2 checkpoints) | ||
| let file_actions_schema = match sidecar_files.first() { | ||
| Some(first) => { | ||
| Some(engine.parquet_handler().read_parquet_footer(first)?.schema) | ||
| } | ||
| None => None, | ||
| None => hint_schema.cloned(), | ||
| }; | ||
| Ok((file_actions_schema, sidecar_files)) | ||
| } | ||
| None => { | ||
| // No hint, need to read parquet footer | ||
| let footer = engine | ||
| .parquet_handler() | ||
| .read_parquet_footer(&checkpoint.location)?; | ||
|
|
||
| if footer.schema.field(SIDECAR_NAME).is_some() { | ||
| // V2 parquet checkpoint | ||
| let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?; | ||
| // For V2, read first sidecar's schema if sidecars exist, | ||
| // otherwise fall back to footer schema (for empty V2 checkpoints) | ||
| let file_actions_schema = match sidecar_files.first() { | ||
| Some(first) => Some( | ||
| engine.parquet_handler().read_parquet_footer(first)?.schema, | ||
| ), | ||
| None => None, | ||
| None => Some(footer.schema), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why we change these two return values, are there some special reasons here? I think for empty V2 checkpoints, the footer schema or hint_schema will not contain action schema. Returning footer.schema or hint_schema as file_actions_schema seems a small mix-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So i actually caught this on a test. We can have a V2 checkpoint with the following:
- Has sidecar columns
- Has no sidecar
- Has add actions
In that case, we should just return the V2 checkpoint manifest schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, just double-checked the protocol, it's valid case
| Note: A V2 spec Checkpoint can either have all the add and remove file actions embedded inside itself or all of them should be in sidecar files.
As it's valid that all add/remove inside the checkpoint, it may not be empty. A NIT is to change the origin comment "otherwise fall back to footer schema (for empty V2 checkpoints)"? E.g. just "otherwise fall back to footer schema "
nicklan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally looks reasonable. had a few detail comments
kernel/src/log_segment.rs
Outdated
| let DataType::Struct(values_struct) = values_field.data_type() else { | ||
| let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else { | ||
| debug!( | ||
| "stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
| "stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}", | |
| "stats_parsed not compatible: stats_parsed. {} is not a Struct, got {:?}", |
| .fields() | ||
| .map(|f| { | ||
| if f.name() == "add" { | ||
| new_add_field.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think you should need to clone this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't work without it, something about .map() not knowing if there are multiple add fields in a schema or not
377105a to
9619ed9
Compare
9619ed9 to
fdd657e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming the new comments resolved
kernel/src/log_segment.rs
Outdated
| impl Iterator<Item = DeltaResult<ActionsBatch>> + Send, | ||
| CheckpointReadInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to use ActionsWithCheckpointInfo here?
🥞 Stacked PR
Use this link to review incremental changes.
What changes are proposed in this pull request?
This PR adds infrastructure to detect when checkpoints have compatible pre-parsed statistics (stats_parsed) that can be used for data skipping without JSON parsing.
Added CheckpointReadInfo struct containing:
How was this change tested?
New and existing unit tests