Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ impl RowGroups for InMemoryRowGroup {
}
}
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(&self.metadata))
}
}

impl InMemoryRowGroup {
Expand Down
75 changes: 60 additions & 15 deletions parquet/src/arrow/array_reader/builder.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thrilled to see this!
Please let me know if I can help in any way. I can make it my top priority to work on this, as we need to make use of it in the next few weeks.
Our use-case is to leverage this from iceberg-rust, which uses ParquetRecordBatchStreamBuilder. The API seems to work for that, but I understand from other comments that it may not be the most desirable one - happy to help either with research/proposal or with the implementation of the chosen option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vustef! I'd be very happy if you want to help get row number support into the Parquet reader, either with this PR or through other alternatives. If you want to pick up this PR I can give you commit rights to the branch? Sadly, I don't have capacity to work on this PR at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jkylling, yes, please do that if you can, happy to continue where you left.
I'd also need some guidance from @scovich and @alamb on the preferred path forward. And potentially help from @etseidl if I hit a wall with merging metadata changes that happened in the meanwhile (but more on that once I try it out).

Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::row_number::RowNumberReader;
use crate::arrow::array_reader::{
make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
Expand All @@ -39,9 +39,29 @@ pub fn build_array_reader(
field: Option<&ParquetField>,
mask: &ProjectionMask,
row_groups: &dyn RowGroups,
row_number_column: Option<String>,
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| build_reader(field, mask, row_groups).transpose())
.and_then(|field| {
build_reader(field, mask, row_groups, row_number_column.clone()).transpose()
})
.or_else(|| {
row_number_column.map(|column| {
let row_number_reader = build_row_number_reader(row_groups)?;
let reader: Box<dyn ArrayReader> = Box::new(StructArrayReader::new(
DataType::Struct(Fields::from(vec![Field::new(
column,
row_number_reader.get_data_type().clone(),
false,
)])),
vec![row_number_reader],
0,
0,
false,
));
Ok(reader)
})
})
.transpose()?
.unwrap_or_else(|| make_empty_array_reader(row_groups.num_rows()));

Expand All @@ -52,12 +72,13 @@ fn build_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroups,
row_number_column: Option<String>,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups),
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
DataType::Struct(_) => build_struct_reader(field, mask, row_groups, row_number_column),
DataType::List(_) => build_list_reader(field, mask, false, row_groups),
DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups),
Expand All @@ -66,6 +87,10 @@ fn build_reader(
}
}

fn build_row_number_reader(row_groups: &dyn RowGroups) -> Result<Box<dyn ArrayReader>> {
Ok(Box::new(RowNumberReader::try_new(row_groups.row_groups())?))
}

/// Build array reader for map type.
fn build_map_reader(
field: &ParquetField,
Expand All @@ -75,8 +100,8 @@ fn build_map_reader(
let children = field.children().unwrap();
assert_eq!(children.len(), 2);

let key_reader = build_reader(&children[0], mask, row_groups)?;
let value_reader = build_reader(&children[1], mask, row_groups)?;
let key_reader = build_reader(&children[0], mask, row_groups, None)?;
let value_reader = build_reader(&children[1], mask, row_groups, None)?;

match (key_reader, value_reader) {
(Some(key_reader), Some(value_reader)) => {
Expand Down Expand Up @@ -127,7 +152,7 @@ fn build_list_reader(
let children = field.children().unwrap();
assert_eq!(children.len(), 1);

let reader = match build_reader(&children[0], mask, row_groups)? {
let reader = match build_reader(&children[0], mask, row_groups, None)? {
Some(item_reader) => {
// Need to retrieve underlying data type to handle projection
let item_type = item_reader.get_data_type().clone();
Expand Down Expand Up @@ -173,7 +198,7 @@ fn build_fixed_size_list_reader(
let children = field.children().unwrap();
assert_eq!(children.len(), 1);

let reader = match build_reader(&children[0], mask, row_groups)? {
let reader = match build_reader(&children[0], mask, row_groups, None)? {
Some(item_reader) => {
let item_type = item_reader.get_data_type().clone();
let reader = match &field.arrow_type {
Expand Down Expand Up @@ -300,6 +325,7 @@ fn build_struct_reader(
field: &ParquetField,
mask: &ProjectionMask,
row_groups: &dyn RowGroups,
row_number_column: Option<String>,
) -> Result<Option<Box<dyn ArrayReader>>> {
let arrow_fields = match &field.arrow_type {
DataType::Struct(children) => children,
Expand All @@ -312,14 +338,24 @@ fn build_struct_reader(
let mut builder = SchemaBuilder::with_capacity(children.len());

for (arrow, parquet) in arrow_fields.iter().zip(children) {
if let Some(reader) = build_reader(parquet, mask, row_groups)? {
if let Some(reader) = build_reader(parquet, mask, row_groups, None)? {
// Need to retrieve underlying data type to handle projection
let child_type = reader.get_data_type().clone();
builder.push(arrow.as_ref().clone().with_data_type(child_type));
readers.push(reader);
}
}

if let Some(row_number_column) = row_number_column {
let reader = build_row_number_reader(row_groups)?;
builder.push(Field::new(
row_number_column,
reader.get_data_type().clone(),
false,
));
readers.push(reader);
}

if readers.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -356,14 +392,23 @@ mod tests {
)
.unwrap();

let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
let array_reader = build_array_reader(
fields.as_ref(),
&mask,
&file_reader,
Some("row_number".to_string()),
)
.unwrap();

// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
"b_struct",
DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
true,
)]));
let arrow_type = DataType::Struct(Fields::from(vec![
Field::new(
"b_struct",
DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
true,
),
Field::new("row_number", DataType::Int64, false),
]));

assert_eq!(array_reader.get_data_type(), &arrow_type);
}
Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ mod tests {
)
.unwrap();

let mut array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
let mut array_reader =
build_array_reader(fields.as_ref(), &mask, &file_reader, None).unwrap();

let batch = array_reader.next_batch(100).unwrap();
assert_eq!(batch.data_type(), array_reader.get_data_type());
Expand Down
9 changes: 9 additions & 0 deletions parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ mod list_array;
mod map_array;
mod null_array;
mod primitive_array;
mod row_number;
mod struct_array;

#[cfg(test)]
mod test_util;

use crate::file::metadata::RowGroupMetaData;
pub use builder::build_array_reader;
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
Expand Down Expand Up @@ -113,6 +115,9 @@ pub trait RowGroups {

/// Returns a [`PageIterator`] for the column chunks with the given leaf column index
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;

/// Returns an iterator over the row groups in this collection
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
}

impl RowGroups for Arc<dyn FileReader> {
Expand All @@ -124,6 +129,10 @@ impl RowGroups for Arc<dyn FileReader> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(self.metadata().row_groups().iter())
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
Expand Down
154 changes: 154 additions & 0 deletions parquet/src/arrow/array_reader/row_number.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::ArrayReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaData;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

pub(crate) struct RowNumberReader {
row_numbers: Vec<i64>,
row_groups: RowGroupSizeIterator,
}

impl RowNumberReader {
pub(crate) fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self>
where
I: TryInto<RowGroupSize, Error = ParquetError>,
{
let row_groups = RowGroupSizeIterator::try_new(row_groups)?;
Ok(Self {
row_numbers: Vec::new(),
row_groups,
})
}
}

impl ArrayReader for RowNumberReader {
fn as_any(&self) -> &dyn Any {
self
}

fn get_data_type(&self) -> &DataType {
&DataType::Int64
}

fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let read = self
.row_groups
.read_records(batch_size, &mut self.row_numbers);
Ok(read)
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from_iter(self.row_numbers.drain(..))))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let skipped = self.row_groups.skip_records(num_records);
Ok(skipped)
}

fn get_def_levels(&self) -> Option<&[i16]> {
None
}

fn get_rep_levels(&self) -> Option<&[i16]> {
None
}
}

struct RowGroupSizeIterator {
row_groups: VecDeque<RowGroupSize>,
}

impl RowGroupSizeIterator {
fn try_new<I>(row_groups: impl IntoIterator<Item = I>) -> Result<Self>
where
I: TryInto<RowGroupSize, Error = ParquetError>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this whole RowGroupSizeIterator thing is a complicated and error-prone way of chaining several Range<i64>? Can we use standard iterator machinery instead?

pub(crate) struct RowNumberReader {
    buffered_row_numbers: Vec<i64>,
    remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
}

impl RowNumberReader {
    pub(crate) fn try_new<'a>(
        row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
    ) -> Result<Self> {
        let ranges = row_groups
            .map(|rg| {
                let first_row_number = rg.first_row_index().ok_or(ParquetError::General(
                    "Row group missing row number".to_string(),
                ))?;
                Ok(first_row_number..first_row_number + rg.num_rows())
            })
            .collect::<Result<Vec<_>>>()?;
        Ok(Self {
            buffered_row_numbers: Vec::new(),
            remaining_row_numbers: ranges.into_iter().flatten(),
        })
    }
    
    // Use `take` on a `&mut Iterator` to consume a number of elements without consuming the iterator.
    fn take(&mut self, batch_size: usize) -> impl Iterator<Item = i64> {
        (&mut self.remaining_row_numbers).take(batch_size)
    }
}

impl ArrayReader for RowNumberReader {
    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
        let starting_len = self.buffered_row_numbers.len();
        self.buffered_row_numbers.extend(self.take(batch_size));
        Ok(self.buffered_row_numbers.len() - starting_len)
    }

    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
        Ok(self.take(num_records).count())
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much simpler. Thank you! I suspect we are missing out on some performance in skip_records with this, but the bulk of the data pruning will likely have happened by pruning Parquet row groups already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich I see you are involved in the maintenance of delta-kernel-rs. If you are interested, I've started on an implementation of deletion vector read support in delta-rs in this branch, based on a back port of an early version of this PR to arrow-54.2.1. The PR is still very rough, but the read path has got okay test coverage and it's able to read tables with deletion vectors produced by Spark correctly. The write support for deletion vectors is rudimentary (deletion vectors are only used for deletes when configured, and deleting from the same file twice is unsupported), and is mostly there to be able to unit test the read support. Unfortunately, I've not had time to wokr on this lately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW @zhuqi-lucas and I are working on improvements to the filter application here, which may result in some additional API churn:

{
Ok(Self {
row_groups: VecDeque::from(
row_groups
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?,
),
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding myself a bit uneasy with adding the first row number to the RowGroupMetaData. Rather than that, could this bit here instead be changed to keep track of the first row number while populating the deque? Is there some wrinkle I'm missing? Might the row groups be filtered before instantiating the RowNumberReader?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered my own question...it seems there's some complexity here at least when using the async reader.

Copy link
Contributor Author

@jkylling jkylling Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe we don't have access to all row groups when creating the array readers.

I took a quick look at the corresponding Parquet reader implementations for Trino and parquet-java.

Trino:

parquet-java:

Their approaches are rather similar to ours.

One take away is that the above implementations do not be keep the full RowGroupMetaDatas around as we do by requiring an iterator over RowGroupMetadata in the RowGroups trait. This is likely a good idea as this struct can be quite large. What do you think about changing the RowGroups trait to something like below?

/// A collection of row groups
pub trait RowGroups {
    /// Get the number of rows in this collection
    fn num_rows(&self) -> usize {
        self.row_group_infos.iter().map(|info| info.num_rows).sum()
    }

    /// Returns a [`PageIterator`] for the column chunks with the given leaf column index
    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;

    /// Returns an iterator over the row groups in this collection
    fn row_group_infos(&self) -> Box<dyn Iterator<Item = &RowGroupInfo> + '_>;
}

struct RowGroupInfo {
    num_rows: usize,
    row_index: i64,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary...the full ParquetMetaData is already available everywhere this trait is implemented, so I don't see a need to worry about adding another metadata structure here.

}
}

impl RowGroupSizeIterator {
fn read_records(&mut self, mut batch_size: usize, row_numbers: &mut Vec<i64>) -> usize {
let mut read = 0;
while batch_size > 0 {
let Some(front) = self.row_groups.front_mut() else {
return read as usize;
};
let to_read = std::cmp::min(front.num_rows, batch_size as i64);
row_numbers.extend(front.first_row_number..front.first_row_number + to_read);
front.num_rows -= to_read;
front.first_row_number += to_read;
if front.num_rows == 0 {
self.row_groups.pop_front();
}
batch_size -= to_read as usize;
read += to_read;
}
read as usize
}

fn skip_records(&mut self, mut num_records: usize) -> usize {
let mut skipped = 0;
while num_records > 0 {
let Some(front) = self.row_groups.front_mut() else {
return skipped as usize;
};
let to_skip = std::cmp::min(front.num_rows, num_records as i64);
front.num_rows -= to_skip;
front.first_row_number += to_skip;
if front.num_rows == 0 {
self.row_groups.pop_front();
}
skipped += to_skip;
num_records -= to_skip as usize;
}
skipped as usize
}
}

pub(crate) struct RowGroupSize {
first_row_number: i64,
num_rows: i64,
}

impl TryFrom<&RowGroupMetaData> for RowGroupSize {
type Error = ParquetError;

fn try_from(rg: &RowGroupMetaData) -> Result<Self, Self::Error> {
Ok(Self {
first_row_number: rg
.first_row_number()
.ok_or(ParquetError::RowGroupMetaDataMissingRowNumber)?,
num_rows: rg.num_rows(),
})
}
}
Loading