-
Notifications
You must be signed in to change notification settings - Fork 1.1k
add nested nullable field support for CachedArrayReader #9066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Weijun-H
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks @adriangb
better add this test
#[test]
fn test_level_propagation_empty_after_skip() {
let metrics = ArrowReaderMetrics::disabled();
let cache = Arc::new(Mutex::new(RowGroupCache::new(4, usize::MAX)));
// Producer populates cache with levels
let data = vec![1, 2, 3, 4];
let def_levels = vec![1, 0, 1, 1];
let rep_levels = vec![0, 1, 1, 0];
let mock_reader =
MockArrayReaderWithLevels::new(data, def_levels.clone(), rep_levels.clone());
let mut producer = CachedArrayReader::new(
Box::new(mock_reader),
cache.clone(),
0,
CacheRole::Producer,
metrics.clone(),
);
producer.read_records(4).unwrap();
producer.consume_batch().unwrap();
// Consumer skips all rows, resulting in an empty output batch
let mock_reader2 = MockArrayReaderWithLevels::new(
vec![10, 20, 30, 40],
vec![0, 0, 0, 0],
vec![0, 0, 0, 0],
);
let mut consumer = CachedArrayReader::new(
Box::new(mock_reader2),
cache,
0,
CacheRole::Consumer,
metrics,
);
let skipped = consumer.skip_records(4).unwrap();
assert_eq!(skipped, 4);
let array = consumer.consume_batch().unwrap();
assert_eq!(array.len(), 0);
assert_eq!(consumer.get_def_levels().unwrap(), &[]);
assert_eq!(consumer.get_rep_levels().unwrap(), &[]);
}
|
Added in a495d4d |
a495d4d to
be5b5e7
Compare
|
@XiangpengHao any chance you could take a look at this? |
d80a887 to
6aa713d
Compare
|
Looks good to me, thank you @adriangb ! |
Thank you for reviewing! I think this needs approval from someone with write permissions and for them to hit merge, I don't have these permissions in this repo. |
|
me neither, asking for @alamb's help! |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks -- the high level idea of this looks good. However, I think we need some end to end tests prior to merging this
Specifically an end to end test that actually reads a StructAray that uses repetition and definition levels, and then a row filter that filters out some of the rows
I looked around in
arrow-rs/parquet/tests/arrow_reader/mod.rs
Line 1136 in 02fa779
| builder = builder.set_statistics_truncate_length(DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); |
I am imagining something like
arrow-rs/parquet/tests/arrow_reader/io/async_reader.rs
Lines 295 to 297 in 02fa779
| let builder = builder | |
| .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) | |
| .with_row_filter(filter_b_false(&schema_descr)); |
| let array = self.inner.consume_batch()?; | ||
|
|
||
| // Capture definition and repetition levels from inner reader before they are cleared | ||
| let def_levels = self.inner.get_def_levels().map(|l| l.to_vec()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this copies the levels, right? It seems like we should:
- Only copy the levels when needed (caching nested types)
- Look into potentially being able to take the levels rather than having to copy them
6aa713d to
f4599fb
Compare
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I've reduced some allocations and added pretty extensive integration tests
| let def_levels = if self.needs_def_levels { | ||
| self.inner.get_def_levels().map(|l| l.to_vec()) | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb this was the best I could easily do: get_def_levels() returns an Option<&[u16]> and we're storing this on structures that can't easily have a lifetime (without a major refactor at least) so we have to allocate an owned Vec
| col_idx, | ||
| cache_options.role, | ||
| self.metrics.clone(), // cheap clone | ||
| field.def_level > 0, // needs_def_levels: true if has nullable ancestors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively could pass field: &ParquetField or def_level: i16 and have CachedArrayReader::new figure out how to use that
Add tests demonstrating that nested fields inside List structures (with rep_level > 0) are NOT cached, while top-level columns work correctly: - test_list_struct_fields_not_cached_filter_on_id: filter on id (rep_level=0), shows id is cached but list struct fields are not - test_list_struct_fields_not_cached_filter_on_struct_field: filter on struct_field_b inside the list, shows 0 cache reads for all projections since filter mask leaves have rep_level > 0 Co-Authored-By: Claude Opus 4.5 <[email protected]>
Needed to unblock apache/datafusion#19556
This was mostly written by Claude with guidance on to fix the following MRE (reproducible on the branch for apache/datafusion#19556):