diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index a4c5fdc04d36..e2f021d1c97f 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -129,6 +129,10 @@ impl RowGroups for InMemoryRowGroup { } } } + + fn row_groups(&self) -> Box + '_> { + Box::new(std::iter::once(&self.metadata)) + } } impl InMemoryRowGroup { diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 6107ba4f2575..da6d0df90fdf 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. +use arrow_schema::{DataType, Field, Fields, SchemaBuilder}; use std::sync::{Arc, Mutex}; -use arrow_schema::{DataType, Fields, SchemaBuilder}; - use crate::arrow::ProjectionMask; use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader; use crate::arrow::array_reader::cached_array_reader::CacheRole; @@ -26,6 +25,7 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader; use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader; use crate::arrow::array_reader::row_group_cache::RowGroupCache; +use crate::arrow::array_reader::row_number::RowNumberReader; use crate::arrow::array_reader::{ ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader, @@ -113,25 +113,39 @@ impl<'a> ArrayReaderBuilder<'a> { &self, field: Option<&ParquetField>, mask: &ProjectionMask, - ) -> Result> { + row_number_column: Option<&str>, +) -> Result> { let reader = field - .and_then(|field| self.build_reader(field, mask).transpose()) - .transpose()? - .unwrap_or_else(|| make_empty_array_reader(self.num_rows())); + .and_then(|field| self.build_reader(field, mask, row_number_column).transpose()) + .or_else(|| { + row_number_column.map(|column| { + let row_number_reader = self.build_row_number_reader()?; + let reader: Box = Box::new(StructArrayReader::new( + DataType::Struct(Fields::from(vec![Field::new( + column, + row_number_reader.get_data_type().clone(), + false, + )])), + vec![row_number_reader], + 0, + 0, + false, + )); + Ok(reader) + }) + }) + .transpose()? + .unwrap_or_else(|| make_empty_array_reader(self.row_groups.num_rows())); Ok(reader) } - /// Return the total number of rows - fn num_rows(&self) -> usize { - self.row_groups.num_rows() - } - fn build_reader( &self, field: &ParquetField, mask: &ProjectionMask, - ) -> Result>> { + row_number_column: Option<&str>, +) -> Result>> { match field.field_type { ParquetFieldType::Primitive { col_idx, .. } => { let Some(reader) = self.build_primitive_reader(field, mask)? else { @@ -155,7 +169,7 @@ impl<'a> ArrayReaderBuilder<'a> { } ParquetFieldType::Group { .. } => match &field.arrow_type { DataType::Map(_, _) => self.build_map_reader(field, mask), - DataType::Struct(_) => self.build_struct_reader(field, mask), + DataType::Struct(_) => self.build_struct_reader(field, mask, row_number_column), DataType::List(_) => self.build_list_reader(field, mask, false), DataType::LargeList(_) => self.build_list_reader(field, mask, true), DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask), @@ -164,6 +178,10 @@ impl<'a> ArrayReaderBuilder<'a> { } } + fn build_row_number_reader(&self) -> Result> { + Ok(Box::new(RowNumberReader::try_new(self.row_groups.row_groups())?)) + } + /// Build array reader for map type. fn build_map_reader( &self, @@ -173,8 +191,8 @@ impl<'a> ArrayReaderBuilder<'a> { let children = field.children().unwrap(); assert_eq!(children.len(), 2); - let key_reader = self.build_reader(&children[0], mask)?; - let value_reader = self.build_reader(&children[1], mask)?; + let key_reader = self.build_reader(&children[0], mask, None)?; + let value_reader = self.build_reader(&children[1], mask, None)?; match (key_reader, value_reader) { (Some(key_reader), Some(value_reader)) => { @@ -225,7 +243,7 @@ impl<'a> ArrayReaderBuilder<'a> { let children = field.children().unwrap(); assert_eq!(children.len(), 1); - let reader = match self.build_reader(&children[0], mask)? { + let reader = match self.build_reader(&children[0], mask, None)? { Some(item_reader) => { // Need to retrieve underlying data type to handle projection let item_type = item_reader.get_data_type().clone(); @@ -271,7 +289,7 @@ impl<'a> ArrayReaderBuilder<'a> { let children = field.children().unwrap(); assert_eq!(children.len(), 1); - let reader = match self.build_reader(&children[0], mask)? { + let reader = match self.build_reader(&children[0], mask, None)? { Some(item_reader) => { let item_type = item_reader.get_data_type().clone(); let reader = match &field.arrow_type { @@ -401,7 +419,8 @@ impl<'a> ArrayReaderBuilder<'a> { &self, field: &ParquetField, mask: &ProjectionMask, - ) -> Result>> { + row_number_column: Option<&str>, +) -> Result>> { let arrow_fields = match &field.arrow_type { DataType::Struct(children) => children, _ => unreachable!(), @@ -413,7 +432,7 @@ impl<'a> ArrayReaderBuilder<'a> { let mut builder = SchemaBuilder::with_capacity(children.len()); for (arrow, parquet) in arrow_fields.iter().zip(children) { - if let Some(reader) = self.build_reader(parquet, mask)? { + if let Some(reader) = self.build_reader(parquet, mask, None)? { // Need to retrieve underlying data type to handle projection let child_type = reader.get_data_type().clone(); builder.push(arrow.as_ref().clone().with_data_type(child_type)); @@ -421,6 +440,16 @@ impl<'a> ArrayReaderBuilder<'a> { } } + if let Some(row_number_column) = row_number_column { + let reader = self.build_row_number_reader()?; + builder.push(Field::new( + row_number_column, + reader.get_data_type().clone(), + false, + )); + readers.push(reader); + } + if readers.is_empty() { return Ok(None); } @@ -460,7 +489,7 @@ mod tests { let metrics = ArrowReaderMetrics::disabled(); let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) - .build_array_reader(fields.as_ref(), &mask) + .build_array_reader(fields.as_ref(), &mask, None) .unwrap(); // Create arrow types @@ -472,4 +501,36 @@ mod tests { assert_eq!(array_reader.get_data_type(), &arrow_type); } + + #[test] + fn test_create_array_reader_with_row_numbers() { + let file = get_test_file("nulls.snappy.parquet"); + let file_reader: Arc = Arc::new(SerializedFileReader::new(file).unwrap()); + + let file_metadata = file_reader.metadata().file_metadata(); + let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]); + let (_, fields) = parquet_to_arrow_schema_and_fields( + file_metadata.schema_descr(), + ProjectionMask::all(), + file_metadata.key_value_metadata(), + ) + .unwrap(); + + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + .build_array_reader(fields.as_ref(), &mask, Some("row_number")) + .unwrap(); + + // Create arrow types + let arrow_type = DataType::Struct(Fields::from(vec![ + Field::new( + "b_struct", + DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()), + true, + ), + Field::new("row_number", DataType::Int64, false), + ])); + + assert_eq!(array_reader.get_data_type(), &arrow_type); + } } diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 487c2bdd56cd..3e2bfaa506cc 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -566,7 +566,7 @@ mod tests { let metrics = ArrowReaderMetrics::disabled(); let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) - .build_array_reader(fields.as_ref(), &mask) + .build_array_reader(fields.as_ref(), &mask, None) .unwrap(); let batch = array_reader.next_batch(100).unwrap(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index b3595e58d695..7fef792a8722 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -42,12 +42,13 @@ mod map_array; mod null_array; mod primitive_array; mod row_group_cache; +mod row_number; mod struct_array; #[cfg(test)] mod test_util; -// Note that this crate is public under the `experimental` feature flag. +use crate::file::metadata::RowGroupMetaData; pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder}; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; @@ -139,6 +140,9 @@ pub trait RowGroups { /// Returns a [`PageIterator`] for all pages in the specified column chunk /// across all row groups in this collection. fn column_chunks(&self, i: usize) -> Result>; + + /// Returns an iterator over the row groups in this collection + fn row_groups(&self) -> Box + '_>; } impl RowGroups for Arc { @@ -150,6 +154,10 @@ impl RowGroups for Arc { let iterator = FilePageIterator::new(column_index, Arc::clone(self))?; Ok(Box::new(iterator)) } + + fn row_groups(&self) -> Box + '_> { + Box::new(self.metadata().row_groups().iter()) + } } /// Uses `record_reader` to read up to `batch_size` records from `pages` diff --git a/parquet/src/arrow/array_reader/row_number.rs b/parquet/src/arrow/array_reader/row_number.rs new file mode 100644 index 000000000000..5484f8d14186 --- /dev/null +++ b/parquet/src/arrow/array_reader/row_number.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::RowGroupMetaData; +use arrow_array::{ArrayRef, Int64Array}; +use arrow_schema::DataType; +use std::any::Any; +use std::sync::Arc; + +pub(crate) struct RowNumberReader { + buffered_row_numbers: Vec, + remaining_row_numbers: std::iter::Flatten>>, +} + +impl RowNumberReader { + pub(crate) fn try_new<'a>( + row_groups: impl Iterator, + ) -> Result { + let ranges = row_groups + .map(|rg| { + let first_row_index = rg.first_row_index(); + Ok(first_row_index..first_row_index + rg.num_rows()) + }) + .collect::>>()?; + Ok(Self { + buffered_row_numbers: Vec::new(), + remaining_row_numbers: ranges.into_iter().flatten(), + }) + } +} + +impl ArrayReader for RowNumberReader { + fn read_records(&mut self, batch_size: usize) -> Result { + let starting_len = self.buffered_row_numbers.len(); + self.buffered_row_numbers + .extend((&mut self.remaining_row_numbers).take(batch_size)); + Ok(self.buffered_row_numbers.len() - starting_len) + } + + fn skip_records(&mut self, num_records: usize) -> Result { + // TODO: Use advance_by when it stabilizes to improve performance + Ok((&mut self.remaining_row_numbers).take(num_records).count()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &DataType { + &DataType::Int64 + } + + fn consume_batch(&mut self) -> Result { + Ok(Arc::new(Int64Array::from_iter( + self.buffered_row_numbers.drain(..), + ))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 4e91685519f3..1ab2b8bab4a0 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -38,7 +38,7 @@ use crate::column::page::{PageIterator, PageReader}; #[cfg(feature = "encryption")] use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; @@ -122,6 +122,8 @@ pub struct ArrowReaderBuilder { pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize, + + pub(crate) row_number_column: Option, } impl Debug for ArrowReaderBuilder { @@ -139,6 +141,7 @@ impl Debug for ArrowReaderBuilder { .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) + .field("row_number_column", &self.row_number_column) .finish() } } @@ -159,6 +162,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size + row_number_column: None, } } @@ -368,6 +372,16 @@ impl ArrowReaderBuilder { ..self } } + + /// Include file row numbers in the output with the given column name + /// + /// This will add a column to the output record batch with the file row number + pub fn with_row_number_column(self, row_number_column: impl Into) -> Self { + Self { + row_number_column: Some(row_number_column.into()), + ..self + } + } } /// Options that control how metadata is read for a parquet file @@ -869,6 +883,7 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, + row_number_column } = self; // Try to avoid allocate large buffer @@ -898,14 +913,14 @@ impl ParquetRecordBatchReaderBuilder { cache_projection.intersect(&projection); let array_reader = ArrayReaderBuilder::new(&reader, &metrics) - .build_array_reader(fields.as_deref(), predicate.projection())?; + .build_array_reader(fields.as_deref(), predicate.projection(), row_number_column.as_deref(),)?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } let array_reader = ArrayReaderBuilder::new(&reader, &metrics) - .build_array_reader(fields.as_deref(), &projection)?; + .build_array_reader(fields.as_deref(), &projection, row_number_column.as_deref())?; let read_plan = plan_builder .limited(reader.num_rows()) @@ -943,6 +958,14 @@ impl RowGroups for ReaderRowGroups { row_groups: self.row_groups.clone().into_iter(), })) } + + fn row_groups(&self) -> Box + '_> { + Box::new( + self.row_groups + .iter() + .map(move |i| self.metadata.row_group(*i)), + ) + } } struct ReaderPageIterator { @@ -1108,7 +1131,7 @@ impl ParquetRecordBatchReader { // note metrics are not supported in this API let metrics = ArrowReaderMetrics::disabled(); let array_reader = ArrayReaderBuilder::new(row_groups, &metrics) - .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?; + .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), None)?; let read_plan = ReadPlanBuilder::new(batch_size) .with_selection(selection) @@ -1144,7 +1167,7 @@ impl ParquetRecordBatchReader { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::cmp::min; use std::collections::{HashMap, VecDeque}; use std::fmt::Formatter; @@ -1153,6 +1176,10 @@ mod tests { use std::path::PathBuf; use std::sync::Arc; + use rand::rngs::StdRng; + use rand::{random, rng, thread_rng, Rng, RngCore, SeedableRng}; + use tempfile::tempfile; + use arrow_array::builder::*; use arrow_array::cast::AsArray; use arrow_array::types::{ @@ -1170,12 +1197,11 @@ mod tests { use bytes::Bytes; use half::f16; use num_traits::PrimInt; - use rand::{Rng, RngCore, rng}; - use tempfile::tempfile; use crate::arrow::arrow_reader::{ - ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader, - ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, + ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, + ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, + RowSelector, }; use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; use crate::arrow::{ArrowWriter, ProjectionMask}; @@ -1189,6 +1215,7 @@ mod tests { use crate::file::metadata::ParquetMetaData; use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::file::writer::SerializedFileWriter; + use crate::format::FileMetaData; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; use crate::util::test_common::rand_gen::RandGen; @@ -2928,7 +2955,7 @@ mod tests { assert_eq!(end - total_read, batch.num_rows()); let a = converter(&expected_data[total_read..end]); - let b = Arc::clone(batch.column(0)); + let b = batch.column(0); assert_eq!(a.data_type(), b.data_type()); assert_eq!(a.to_data(), b.to_data()); @@ -5003,4 +5030,231 @@ mod tests { assert!(sbbf.check(&"Hello")); assert!(!sbbf.check(&"Hello_Not_Exists")); } + + #[test] + fn test_read_row_numbers() { + let file = write_parquet_from_iter(vec![( + "value", + Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, + )]); + let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]); + + let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields))); + let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + file.try_clone().unwrap(), + options, + ) + .expect("reader builder with schema") + .with_row_number_column("row_number") + .build() + .expect("reader with schema"); + + let batch = arrow_reader.next().unwrap().unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("value", ArrowDataType::Int64, false), + Field::new("row_number", ArrowDataType::Int64, false), + ])); + + assert_eq!(batch.schema(), schema); + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(1), Some(2), Some(3)] + ); + assert_eq!( + batch + .column(1) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(0), Some(1), Some(2)] + ); + } + + #[test] + fn test_read_only_row_numbers() { + let file = write_parquet_from_iter(vec![( + "value", + Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, + )]); + let mut metadata = ArrowReaderMetadata::load(&file, Default::default()).unwrap(); + metadata.fields = None; + + let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata) + .with_row_number_column("row_number") + .build() + .expect("reader with schema"); + + let batch = arrow_reader.next().unwrap().unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new( + "row_number", + ArrowDataType::Int64, + false, + )])); + + assert_eq!(batch.schema(), schema); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.num_rows(), 3); + assert_eq!( + batch + .column(0) + .as_primitive::() + .iter() + .collect::>(), + vec![Some(0), Some(1), Some(2)] + ); + } + + #[test] + fn test_row_numbers_with_multiple_row_groups() { + test_row_numbers_with_multiple_row_groups_helper( + false, + |path, selection, _row_filter, batch_size| { + let file = File::open(path).unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(file) + .unwrap() + .with_row_selection(selection) + .with_batch_size(batch_size) + .with_row_number_column("row_number") + .build() + .expect("Could not create reader"); + reader + .collect::, _>>() + .expect("Could not read") + }, + ); + } + + #[test] + fn test_row_numbers_with_multiple_row_groups_and_filter() { + test_row_numbers_with_multiple_row_groups_helper( + true, + |path, selection, row_filter, batch_size| { + let file = File::open(path).unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(file) + .unwrap() + .with_row_selection(selection) + .with_batch_size(batch_size) + .with_row_filter(row_filter.expect("No filter")) + .with_row_number_column("row_number") + .build() + .expect("Could not create reader"); + reader + .collect::, _>>() + .expect("Could not read") + }, + ); + } + + pub(crate) fn test_row_numbers_with_multiple_row_groups_helper( + use_filter: bool, + test_case: F, + ) where + F: FnOnce(PathBuf, RowSelection, Option, usize) -> Vec, + { + let seed: u64 = random(); + println!("test_row_numbers_with_multiple_row_groups seed: {}", seed); + let mut rng = StdRng::seed_from_u64(seed); + + use tempfile::TempDir; + let tempdir = TempDir::new().expect("Could not create temp dir"); + + let (bytes, metadata) = generate_file_with_row_numbers(&mut rng); + + let path = tempdir.path().join("test.parquet"); + std::fs::write(&path, bytes).expect("Could not write file"); + + let mut case = vec![]; + let mut remaining = metadata.file_metadata().num_rows(); + while remaining > 0 { + let row_count = rng.gen_range(1..=remaining); + remaining -= row_count; + case.push(RowSelector { + row_count: row_count as usize, + skip: rng.gen_bool(0.5), + }); + } + + let filter = use_filter.then(|| { + let filter = (0..metadata.file_metadata().num_rows()) + .map(|_| rng.gen_bool(0.99)) + .collect::>(); + let mut filter_offset = 0; + RowFilter::new(vec![Box::new(ArrowPredicateFn::new( + ProjectionMask::all(), + move |b| { + let array = BooleanArray::from_iter( + filter + .iter() + .skip(filter_offset) + .take(b.num_rows()) + .map(|x| Some(*x)), + ); + filter_offset += b.num_rows(); + Ok(array) + }, + ))]) + }); + + let selection = RowSelection::from(case); + let batches = test_case(path, selection.clone(), filter, rng.gen_range(1..4096)); + + if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize { + assert!(batches.into_iter().all(|batch| batch.num_rows() == 0)); + return; + } + let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches) + .expect("Failed to concatenate"); + // assert_eq!(selection.row_count(), actual.num_rows()); + let values = actual + .column(0) + .as_primitive::() + .iter() + .collect::>(); + let row_numbers = actual + .column(1) + .as_primitive::() + .iter() + .collect::>(); + assert_eq!( + row_numbers + .into_iter() + .map(|number| number.map(|number| number + 1)) + .collect::>(), + values + ); + } + + fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) { + let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( + "value", + ArrowDataType::Int64, + false, + )]))); + + let mut buf = Vec::with_capacity(1024); + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer"); + + let mut values = 1..=rng.gen_range(1..4096); + while !values.is_empty() { + let batch_values = values + .by_ref() + .take(rng.gen_range(1..4096)) + .collect::>(); + let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef; + let batch = + RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch"); + writer.write(&batch).expect("Could not write batch"); + writer.flush().expect("Could not flush"); + } + let metadata = writer.close().expect("Could not close writer"); + + (Bytes::from(buf), metadata) + } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 27f90e3d7bc6..0f9df2f77509 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -53,7 +53,7 @@ use crate::bloom_filter::{ }; use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; +use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; @@ -516,6 +516,7 @@ impl ParquetRecordBatchStreamBuilder { offset: self.offset, metrics: self.metrics, max_predicate_cache_size: self.max_predicate_cache_size, + row_number_column: self.row_number_column, }; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -561,7 +562,7 @@ struct ReaderFactory { /// Optional filter filter: Option, - /// Limit to apply to remaining row groups. + /// Limit to apply to remaining row groups. limit: Option, /// Offset to apply to the next @@ -572,6 +573,8 @@ struct ReaderFactory { /// Maximum size of the predicate cache max_predicate_cache_size: usize, + + row_number_column: Option, } impl ReaderFactory @@ -649,7 +652,7 @@ where let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) .with_cache_options(Some(&cache_options)) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + .build_array_reader(self.fields.as_deref(), predicate.projection(), self.row_number_column.as_deref())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } @@ -706,7 +709,7 @@ where let cache_options = cache_options_builder.consumer(); let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics) .with_cache_options(Some(&cache_options)) - .build_array_reader(self.fields.as_deref(), &projection)?; + .build_array_reader(self.fields.as_deref(), &projection, self.row_number_column.as_deref())?; let reader = ParquetRecordBatchReader::new(array_reader, plan); @@ -1130,6 +1133,10 @@ impl RowGroups for InMemoryRowGroup<'_> { } } } + + fn row_groups(&self) -> Box + '_> { + Box::new(std::iter::once(self.metadata.row_group(self.row_group_idx))) + } } /// An in-memory column chunk @@ -1210,6 +1217,7 @@ impl PageIterator for ColumnChunkIterator {} mod tests { use super::*; use crate::arrow::ArrowWriter; + use crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper; use crate::arrow::arrow_reader::{ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector, }; @@ -1990,6 +1998,7 @@ mod tests { offset: None, metrics: ArrowReaderMetrics::disabled(), max_predicate_cache_size: 0, + row_number_column: None, }; let mut skip = true; @@ -2455,6 +2464,7 @@ mod tests { offset: None, metrics: ArrowReaderMetrics::disabled(), max_predicate_cache_size: 0, + row_number_column: None, }; // Provide an output projection that also selects the same nested leaf @@ -2708,4 +2718,55 @@ mod tests { 92 ); } + + #[test] + fn test_row_numbers_with_multiple_row_groups() { + test_row_numbers_with_multiple_row_groups_helper( + false, + |path, selection, _row_filter, batch_size| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Could not create runtime"); + runtime.block_on(async move { + let file = tokio::fs::File::open(path).await.unwrap(); + let reader = ParquetRecordBatchStreamBuilder::new(file) + .await + .unwrap() + .with_row_selection(selection) + .with_batch_size(batch_size) + .with_row_number_column("row_number") + .build() + .expect("Could not create reader"); + reader.try_collect::>().await.unwrap() + }) + }, + ); + } + + #[test] + fn test_row_numbers_with_multiple_row_groups_and_filter() { + test_row_numbers_with_multiple_row_groups_helper( + true, + |path, selection, row_filter, batch_size| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Could not create runtime"); + runtime.block_on(async move { + let file = tokio::fs::File::open(path).await.unwrap(); + let reader = ParquetRecordBatchStreamBuilder::new(file) + .await + .unwrap() + .with_row_selection(selection) + .with_row_filter(row_filter.expect("No row filter")) + .with_batch_size(batch_size) + .with_row_number_column("row_number") + .build() + .expect("Could not create reader"); + reader.try_collect::>().await.unwrap() + }) + }, + ); + } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 763025fe142b..7afc8045c40d 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -616,6 +616,7 @@ pub type RowGroupMetaDataPtr = Arc; pub struct RowGroupMetaData { columns: Vec, num_rows: i64, + first_row_index: i64, sorting_columns: Option>, total_byte_size: i64, schema_descr: SchemaDescPtr, @@ -656,6 +657,11 @@ impl RowGroupMetaData { self.num_rows } + /// Returns the global index number for the first row in this row group. + pub fn first_row_index(&self) -> i64 { + self.first_row_index + } + /// Returns the sort ordering of the rows in this RowGroup if any pub fn sorting_columns(&self) -> Option<&Vec> { self.sorting_columns.as_ref() @@ -713,6 +719,7 @@ impl RowGroupMetaDataBuilder { schema_descr, file_offset: None, num_rows: 0, + first_row_index: 0, sorting_columns: None, total_byte_size: 0, ordinal: None, @@ -725,6 +732,12 @@ impl RowGroupMetaDataBuilder { self } + /// Sets the first row number in this row group. + pub fn set_first_row_index(mut self, value: i64) -> Self { + self.0.first_row_index = value; + self + } + /// Sets the sorting order for columns pub fn set_sorting_columns(mut self, value: Option>) -> Self { self.0.sorting_columns = value; @@ -1875,10 +1888,10 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let base_expected_size = 2248; + let base_expected_size = 2256; #[cfg(feature = "encryption")] // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 - let base_expected_size = 2416; + let base_expected_size = 2424; assert_eq!(parquet_meta.memory_size(), base_expected_size); @@ -1907,10 +1920,10 @@ mod tests { .build(); #[cfg(not(feature = "encryption"))] - let bigger_expected_size = 2674; + let bigger_expected_size = 2682; #[cfg(feature = "encryption")] // Not as accurate as it should be: https://github.com/apache/arrow-rs/issues/8472 - let bigger_expected_size = 2842; + let bigger_expected_size = 2850; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); diff --git a/parquet/src/file/metadata/thrift/encryption.rs b/parquet/src/file/metadata/thrift/encryption.rs index 9744f0f7a6b5..240071292680 100644 --- a/parquet/src/file/metadata/thrift/encryption.rs +++ b/parquet/src/file/metadata/thrift/encryption.rs @@ -113,6 +113,7 @@ pub(crate) struct FileCryptoMetaData<'a> { fn row_group_from_encrypted_thrift( mut rg: RowGroupMetaData, decryptor: Option<&FileDecryptor>, + first_row_index: i64, ) -> Result { let schema_descr = rg.schema_descr; @@ -192,6 +193,7 @@ fn row_group_from_encrypted_thrift( Ok(RowGroupMetaData { columns, num_rows, + first_row_index, sorting_columns, total_byte_size, schema_descr, @@ -294,10 +296,14 @@ pub(crate) fn parquet_metadata_with_encryption( } // decrypt column chunk info - let row_groups = row_groups - .into_iter() - .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref())) - .collect::>>()?; + let mut first_row_index = 0i64; + let mut decrypted_row_groups = Vec::with_capacity(row_groups.len()); + for rg in row_groups { + let decrypted_rg = row_group_from_encrypted_thrift(rg, file_decryptor.as_ref(), first_row_index)?; + first_row_index += decrypted_rg.num_rows(); + decrypted_row_groups.push(decrypted_rg); + } + let row_groups = decrypted_row_groups; let metadata = ParquetMetaDataBuilder::new(file_metadata) .set_row_groups(row_groups) diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index 14774910961f..b157c5fe45be 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -585,9 +585,11 @@ fn read_column_chunk<'a>( fn read_row_group( prot: &mut ThriftSliceInputProtocol, schema_descr: &Arc, + first_row_index: i64, ) -> Result { // create default initialized RowGroupMetaData let mut row_group = RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked(); + row_group.first_row_index = first_row_index; // mask values for required fields const RG_COLUMNS: u8 = 1 << 1; @@ -726,8 +728,11 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) -> Result let schema_descr = schema_descr.as_ref().unwrap(); let list_ident = prot.read_list_begin()?; let mut rg_vec = Vec::with_capacity(list_ident.size as usize); + let mut first_row_index = 0i64; for _ in 0..list_ident.size { - rg_vec.push(read_row_group(&mut prot, schema_descr)?); + let rg = read_row_group(&mut prot, schema_descr, first_row_index)?; + first_row_index += rg.num_rows(); + rg_vec.push(rg); } row_groups = Some(rg_vec); } @@ -1586,7 +1591,7 @@ pub(crate) mod tests { schema_descr: Arc, ) -> Result { let mut reader = ThriftSliceInputProtocol::new(buf); - crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr) + crate::file::metadata::thrift::read_row_group(&mut reader, &schema_descr, 0) } pub(crate) fn read_column_chunk(