-
Notifications
You must be signed in to change notification settings - Fork 81
feat: Add row index support to the default parquet reader #920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -188,7 +236,7 @@ where | |||
#[derive(Debug, PartialEq)] | |||
pub(crate) struct ReorderIndex { | |||
pub(crate) index: usize, | |||
transform: ReorderIndexTransform, | |||
pub(crate) transform: ReorderIndexTransform, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A consequence of removing the ReorderIndex::needs_transform
method.
fn needs_transform(&self) -> bool { | ||
match self.transform { | ||
// if we're casting or inserting null, we need to transform | ||
ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true, | ||
// if our nested ordering needs a transform, we need a transform | ||
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children), | ||
// no transform needed | ||
ReorderIndexTransform::Identity => false, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indirection and mutual recursion of this method plus ordering_needs_transform
made the implementation more complex than it needed to be. See below.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #920 +/- ##
==========================================
+ Coverage 85.11% 85.19% +0.07%
==========================================
Files 87 87
Lines 21874 22049 +175
Branches 21874 22049 +175
==========================================
+ Hits 18619 18784 +165
- Misses 2298 2306 +8
- Partials 957 959 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
kernel/src/engine/default/parquet.rs
Outdated
let mut metas = vec![]; | ||
for url in urls { | ||
println!("url: {}", url); | ||
let location = Path::from_url_path(url.path()).unwrap(); | ||
let meta = store.head(&location).await.unwrap(); | ||
metas.push(FileMeta { | ||
location: url.clone(), | ||
last_modified: meta.last_modified.timestamp(), | ||
size: meta.size.try_into().unwrap(), | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My async-foo was too weak to collect this from a stream... meh.
c284607
to
ac33ce2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great.
The only issue I see is that I don't think we actually test reading a parquet file where we skip any row groups.
I searched our whole repo and I don't think we actually have a test parquet file with more than one row group 🤦🏽. Might be worth adding something since we'll want that as a test for skipping as well.
@@ -66,17 +69,63 @@ pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error { | |||
.with_backtrace() | |||
} | |||
|
|||
/// Prepares to enumerate row indexes of rows in a parquet file, accounting for row group skipping. | |||
pub(crate) struct RowIndexBuilder { | |||
row_group_starting_row_offsets: Vec<Range<i64>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think row_group_row_indexes
, or row_group_row_index_ranges
or similar, would make more sense as a name. This isn't really the offsets, it's the ranges of rows that each group covers.
let starting_offsets = match self.row_group_ordinals { | ||
Some(ordinals) => ordinals | ||
.iter() | ||
.map(|i| self.row_group_starting_row_offsets[*i].clone()) | ||
.collect(), | ||
None => self.row_group_starting_row_offsets, | ||
}; | ||
starting_offsets.into_iter().flatten() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could avoid the clone with:
let starting_offsets = match self.row_group_ordinals { | |
Some(ordinals) => ordinals | |
.iter() | |
.map(|i| self.row_group_starting_row_offsets[*i].clone()) | |
.collect(), | |
None => self.row_group_starting_row_offsets, | |
}; | |
starting_offsets.into_iter().flatten() | |
if let Some(ordinals) = self.row_group_ordinals { | |
let mut keep = vec![false; self.row_group_starting_row_offsets.len()]; | |
for i in ordinals.iter() { | |
keep[*i] = true; | |
} | |
let mut iter = keep.iter(); | |
self.row_group_starting_row_offsets.retain(|_| *iter.next().unwrap()); | |
} | |
self.row_group_starting_row_offsets.into_iter().flatten() |
Trades against allocating the vec, so not sure it's actually much better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clone of a Range<usize>
should be super cheap -- no worse than a fat pointer. IMO it should be Copy, but 🤷
num_rows | ||
)) | ||
); | ||
let field = field.clone(); // cheap aArc clone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
let field = field.clone(); // cheap aArc clone | |
let field = field.clone(); // cheap Arc clone |
"./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet", | ||
"./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet", | ||
].map(|p| { | ||
//println!("p: {:?}", std::fs::canonicalize(PathBuf::from(p)).unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
}); | ||
let mut metas = vec![]; | ||
for url in urls { | ||
println!("url: {}", url); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove
@@ -86,6 +88,56 @@ pub enum ColumnMetadataKey { | |||
Invariants, | |||
} | |||
|
|||
/// When present in a [`StructField::metadata`], identifies which internal Delta metadata column the | |||
/// field represents. | |||
pub(crate) const INTERNAL_METADATA_COLUMN_KEY: &str = "delta.__internal__.metadata_column"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a note, none of this delta.__internal__.
stuff is actually in the protocol :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. It's purely internal to kernel and the parquet reader.
Whether that's a good thing is a whole different question. I'm not fully convinced this is the right approach to requesting a row index from the parquet reader, but I don't know a better way. Approaches I've seen in the wild are:
- Define special annotations for otherwise "normal" columns, which the reader is expected to interpret as a special request if it sees them in the read schema. Spark takes this approach (by special struct field metadata entries). Iceberg also does (by defining special field ids for metadata columns). Downsides are:
- A reader can accidentally ignore the annotation, likely producing an all-NULL column because nothing with that name is physically present in the file. In the worst case, the file contains a column with that name, and we return file data instead.
- It's easy to accidentally propagate the annotation beyond the parquet reader and cause confusion downstream. In the worst case, a writer could even produce a parquet file that contains those annotations, or a table schema that includes them.
- Define specially-named columns, which the reader is expected to interpret as a special request if it sees them in the read schema. Delta does this with the CDC columns in data files. Downsides are:
- Same as 1/ above, with the additional problem that it's vulnerable to name collisions (what if the table actually has a column named
_row_index
for some reason).
- Same as 1/ above, with the additional problem that it's vulnerable to name collisions (what if the table actually has a column named
- Define a special API call on the parquet reader that injects the metadata column into the output schema (usually as the last column), even tho the read schema doesn't actually mention it. Avoids any ambiguity about what was requested. Downsides:
- The reader has to be ready for a result that contains columns the read schema didn't mention. And it has to be ordinal, not name based (names could collide).
Note that special Delta columns such as row ids will pose the same problem for kernel <-> engine interface that file row indexes pose for the kernel <-> parquet interface here. Ideally whatever solution we come up with can handle both of those interfaces.
What changes are proposed in this pull request?
Deletion vectors (and row tracking, eventually) rely on accurate file-level row indexes. But they're not implemented in the kernel's default parquet reader. That means we must rely on the position of rows in data batches returned by each read, and we cannot apply optimizations such as stats-based row group skipping (see #860).
Add row index support to the default parquet reader, in the form of a new
RowIndex
variant ofReorderIndexTransform
. Also start stubbing in a framework for internal metadata columns, in the form ofInternalMetadataColumn
that can be converted to a specially annotatedStructField
. Readers can add a row index column to their schema by passing the column name of their choosing toInternalMetadataColumn::RowIndex.as_struct_field
. The default parquet reader recognizes that column and injects a transform to generate row indexes (with appropriate adjustments for any row group skipping that might occur).Fixes #919
NOTE: If/when arrow-rs parquet reader gains native support for row indexes, e.g. apache/arrow-rs#7307, we should switch to using that. Our solution here is not robust to advanced parquet reader features like page-level skipping. row-level predicate pushdown, etc.
How was this change tested?
TODO: I couldn't find any parquet files in our test set that have multiple row groups, so there is incomplete coverage of the code that adjusts for row group skipping. Ideally, we would have a test that skips the middle of three row groups, to verify that the sequence of row indexes correctly skips the row group as well.
New unit tests.