Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions parquet/examples/read_with_rowgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ impl RowGroups for InMemoryRowGroup {
}
}
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(&self.metadata))
}
}

impl InMemoryRowGroup {
Expand Down
101 changes: 81 additions & 20 deletions parquet/src/arrow/array_reader/builder.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thrilled to see this!
Please let me know if I can help in any way. I can make it my top priority to work on this, as we need to make use of it in the next few weeks.
Our use-case is to leverage this from iceberg-rust, which uses ParquetRecordBatchStreamBuilder. The API seems to work for that, but I understand from other comments that it may not be the most desirable one - happy to help either with research/proposal or with the implementation of the chosen option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vustef! I'd be very happy if you want to help get row number support into the Parquet reader, either with this PR or through other alternatives. If you want to pick up this PR I can give you commit rights to the branch? Sadly, I don't have capacity to work on this PR at the moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jkylling, yes, please do that if you can, happy to continue where you left.
I'd also need some guidance from @scovich and @alamb on the preferred path forward. And potentially help from @etseidl if I hit a wall with merging metadata changes that happened in the meanwhile (but more on that once I try it out).

Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
use std::sync::{Arc, Mutex};

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::ProjectionMask;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::cached_array_reader::CacheRole;
use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
use crate::arrow::array_reader::row_number::RowNumberReader;
use crate::arrow::array_reader::{
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
Expand Down Expand Up @@ -113,25 +113,39 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: Option<&ParquetField>,
mask: &ProjectionMask,
) -> Result<Box<dyn ArrayReader>> {
row_number_column: Option<&str>,
) -> Result<Box<dyn ArrayReader>> {
let reader = field
.and_then(|field| self.build_reader(field, mask).transpose())
.transpose()?
.unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
.and_then(|field| self.build_reader(field, mask, row_number_column).transpose())
.or_else(|| {
row_number_column.map(|column| {
let row_number_reader = self.build_row_number_reader()?;
let reader: Box<dyn ArrayReader> = Box::new(StructArrayReader::new(
DataType::Struct(Fields::from(vec![Field::new(
column,
row_number_reader.get_data_type().clone(),
false,
)])),
vec![row_number_reader],
0,
0,
false,
));
Ok(reader)
})
})
.transpose()?
.unwrap_or_else(|| make_empty_array_reader(self.row_groups.num_rows()));

Ok(reader)
}

/// Return the total number of rows
fn num_rows(&self) -> usize {
self.row_groups.num_rows()
}

fn build_reader(
&self,
field: &ParquetField,
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
row_number_column: Option<&str>,
) -> Result<Option<Box<dyn ArrayReader>>> {
match field.field_type {
ParquetFieldType::Primitive { col_idx, .. } => {
let Some(reader) = self.build_primitive_reader(field, mask)? else {
Expand All @@ -155,7 +169,7 @@ impl<'a> ArrayReaderBuilder<'a> {
}
ParquetFieldType::Group { .. } => match &field.arrow_type {
DataType::Map(_, _) => self.build_map_reader(field, mask),
DataType::Struct(_) => self.build_struct_reader(field, mask),
DataType::Struct(_) => self.build_struct_reader(field, mask, row_number_column),
DataType::List(_) => self.build_list_reader(field, mask, false),
DataType::LargeList(_) => self.build_list_reader(field, mask, true),
DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask),
Expand All @@ -164,6 +178,10 @@ impl<'a> ArrayReaderBuilder<'a> {
}
}

fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
Ok(Box::new(RowNumberReader::try_new(self.row_groups.row_groups())?))
}

/// Build array reader for map type.
fn build_map_reader(
&self,
Expand All @@ -173,8 +191,8 @@ impl<'a> ArrayReaderBuilder<'a> {
let children = field.children().unwrap();
assert_eq!(children.len(), 2);

let key_reader = self.build_reader(&children[0], mask)?;
let value_reader = self.build_reader(&children[1], mask)?;
let key_reader = self.build_reader(&children[0], mask, None)?;
let value_reader = self.build_reader(&children[1], mask, None)?;

match (key_reader, value_reader) {
(Some(key_reader), Some(value_reader)) => {
Expand Down Expand Up @@ -225,7 +243,7 @@ impl<'a> ArrayReaderBuilder<'a> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);

let reader = match self.build_reader(&children[0], mask)? {
let reader = match self.build_reader(&children[0], mask, None)? {
Some(item_reader) => {
// Need to retrieve underlying data type to handle projection
let item_type = item_reader.get_data_type().clone();
Expand Down Expand Up @@ -271,7 +289,7 @@ impl<'a> ArrayReaderBuilder<'a> {
let children = field.children().unwrap();
assert_eq!(children.len(), 1);

let reader = match self.build_reader(&children[0], mask)? {
let reader = match self.build_reader(&children[0], mask, None)? {
Some(item_reader) => {
let item_type = item_reader.get_data_type().clone();
let reader = match &field.arrow_type {
Expand Down Expand Up @@ -401,7 +419,8 @@ impl<'a> ArrayReaderBuilder<'a> {
&self,
field: &ParquetField,
mask: &ProjectionMask,
) -> Result<Option<Box<dyn ArrayReader>>> {
row_number_column: Option<&str>,
) -> Result<Option<Box<dyn ArrayReader>>> {
let arrow_fields = match &field.arrow_type {
DataType::Struct(children) => children,
_ => unreachable!(),
Expand All @@ -413,14 +432,24 @@ impl<'a> ArrayReaderBuilder<'a> {
let mut builder = SchemaBuilder::with_capacity(children.len());

for (arrow, parquet) in arrow_fields.iter().zip(children) {
if let Some(reader) = self.build_reader(parquet, mask)? {
if let Some(reader) = self.build_reader(parquet, mask, None)? {
// Need to retrieve underlying data type to handle projection
let child_type = reader.get_data_type().clone();
builder.push(arrow.as_ref().clone().with_data_type(child_type));
readers.push(reader);
}
}

if let Some(row_number_column) = row_number_column {
let reader = self.build_row_number_reader()?;
builder.push(Field::new(
row_number_column,
reader.get_data_type().clone(),
false,
));
readers.push(reader);
}

if readers.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -460,7 +489,7 @@ mod tests {

let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.build_array_reader(fields.as_ref(), &mask)
.build_array_reader(fields.as_ref(), &mask, None)
.unwrap();

// Create arrow types
Expand All @@ -472,4 +501,36 @@ mod tests {

assert_eq!(array_reader.get_data_type(), &arrow_type);
}

#[test]
fn test_create_array_reader_with_row_numbers() {
let file = get_test_file("nulls.snappy.parquet");
let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
let (_, fields) = parquet_to_arrow_schema_and_fields(
file_metadata.schema_descr(),
ProjectionMask::all(),
file_metadata.key_value_metadata(),
)
.unwrap();

let metrics = ArrowReaderMetrics::disabled();
let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.build_array_reader(fields.as_ref(), &mask, Some("row_number"))
.unwrap();

// Create arrow types
let arrow_type = DataType::Struct(Fields::from(vec![
Field::new(
"b_struct",
DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
true,
),
Field::new("row_number", DataType::Int64, false),
]));

assert_eq!(array_reader.get_data_type(), &arrow_type);
}
}
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ mod tests {

let metrics = ArrowReaderMetrics::disabled();
let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
.build_array_reader(fields.as_ref(), &mask)
.build_array_reader(fields.as_ref(), &mask, None)
.unwrap();

let batch = array_reader.next_batch(100).unwrap();
Expand Down
10 changes: 9 additions & 1 deletion parquet/src/arrow/array_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ mod map_array;
mod null_array;
mod primitive_array;
mod row_group_cache;
mod row_number;
mod struct_array;

#[cfg(test)]
mod test_util;

// Note that this crate is public under the `experimental` feature flag.
use crate::file::metadata::RowGroupMetaData;
pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;
Expand Down Expand Up @@ -139,6 +140,9 @@ pub trait RowGroups {
/// Returns a [`PageIterator`] for all pages in the specified column chunk
/// across all row groups in this collection.
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;

/// Returns an iterator over the row groups in this collection
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
}

impl RowGroups for Arc<dyn FileReader> {
Expand All @@ -150,6 +154,10 @@ impl RowGroups for Arc<dyn FileReader> {
let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
Ok(Box::new(iterator))
}

fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(self.metadata().row_groups().iter())
}
}

/// Uses `record_reader` to read up to `batch_size` records from `pages`
Expand Down
82 changes: 82 additions & 0 deletions parquet/src/arrow/array_reader/row_number.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::ArrayReader;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaData;
use arrow_array::{ArrayRef, Int64Array};
use arrow_schema::DataType;
use std::any::Any;
use std::sync::Arc;

pub(crate) struct RowNumberReader {
buffered_row_numbers: Vec<i64>,
remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
}

impl RowNumberReader {
pub(crate) fn try_new<'a>(
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
) -> Result<Self> {
let ranges = row_groups
.map(|rg| {
let first_row_index = rg.first_row_index();
Ok(first_row_index..first_row_index + rg.num_rows())
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
buffered_row_numbers: Vec::new(),
remaining_row_numbers: ranges.into_iter().flatten(),
})
}
}

impl ArrayReader for RowNumberReader {
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
let starting_len = self.buffered_row_numbers.len();
self.buffered_row_numbers
.extend((&mut self.remaining_row_numbers).take(batch_size));
Ok(self.buffered_row_numbers.len() - starting_len)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
// TODO: Use advance_by when it stabilizes to improve performance
Ok((&mut self.remaining_row_numbers).take(num_records).count())
}

fn as_any(&self) -> &dyn Any {
self
}

fn get_data_type(&self) -> &DataType {
&DataType::Int64
}

fn consume_batch(&mut self) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from_iter(
self.buffered_row_numbers.drain(..),
)))
}

fn get_def_levels(&self) -> Option<&[i16]> {
None
}

fn get_rep_levels(&self) -> Option<&[i16]> {
None
}
}
Loading