diff --git a/parquet/examples/external_metadata.rs b/parquet/examples/external_metadata.rs index 5ca322ee2890..2c3250782c0f 100644 --- a/parquet/examples/external_metadata.rs +++ b/parquet/examples/external_metadata.rs @@ -112,7 +112,7 @@ async fn get_metadata_from_remote_parquet_file( // tell the reader to read the page index ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(remote_file, file_size as usize) + .load_and_finish(remote_file, file_size) .await .unwrap() } diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs index 52b3d112274d..5d1ff0770f9e 100644 --- a/parquet/examples/read_with_rowgroup.rs +++ b/parquet/examples/read_with_rowgroup.rs @@ -166,9 +166,7 @@ impl InMemoryRowGroup { for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() { if self.mask.leaf_included(leaf_idx) { let (start, len) = meta.byte_range(); - let data = reader - .get_bytes(start as usize..(start + len) as usize) - .await?; + let data = reader.get_bytes(start..(start + len)).await?; vs[leaf_idx] = Some(Arc::new(ColumnChunkData { offset: start as usize, diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index ffcf39df0e23..c53d47be2e56 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -162,8 +162,8 @@ impl RowSelection { /// Note: this method does not make any effort to combine consecutive ranges, nor coalesce /// ranges that are close together. This is instead delegated to the IO subsystem to optimise, /// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges) - pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { - let mut ranges = vec![]; + pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec> { + let mut ranges: Vec> = vec![]; let mut row_offset = 0; let mut pages = page_locations.iter().peekable(); @@ -175,8 +175,8 @@ impl RowSelection { while let Some((selector, page)) = current_selector.as_mut().zip(current_page) { if !(selector.skip || current_page_included) { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; ranges.push(start..end); current_page_included = true; } @@ -200,8 +200,8 @@ impl RowSelection { } } else { if !(selector.skip || current_page_included) { - let start = page.offset as usize; - let end = start + page.compressed_page_size as usize; + let start = page.offset as u64; + let end = start + page.compressed_page_size as u64; ranges.push(start..end); } current_selector = selectors.next() diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index ff183f418538..e0f7bdbbe902 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -48,12 +48,13 @@ use std::ops::Range; /// file: tokio::fs::File, /// } /// impl MetadataFetch for TokioFileMetadata { -/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { +/// fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { /// // return a future that fetches data in range /// async move { -/// let mut buf = vec![0; range.len()]; // target buffer +/// let len = (range.end - range.start).try_into().unwrap(); +/// let mut buf = vec![0; len]; // target buffer /// // seek to the start of the range and read the data -/// self.file.seek(SeekFrom::Start(range.start as u64)).await?; +/// self.file.seek(SeekFrom::Start(range.start)).await?; /// self.file.read_exact(&mut buf).await?; /// Ok(Bytes::from(buf)) // convert to Bytes /// } @@ -66,11 +67,11 @@ pub trait MetadataFetch { /// /// Note the returned type is a boxed future, often created by /// [FutureExt::boxed]. See the trait documentation for an example - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result>; } impl MetadataFetch for &mut T { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { self.get_bytes(range) } } @@ -117,7 +118,7 @@ impl MetadataLoader { file_size - FOOTER_SIZE }; - let suffix = fetch.fetch(footer_start..file_size).await?; + let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?; let suffix_len = suffix.len(); let mut footer = [0; FOOTER_SIZE]; @@ -137,7 +138,9 @@ impl MetadataLoader { // Did not fetch the entire file metadata in the initial read, need to make a second request let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE { let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let meta = fetch + .fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64) + .await?; (ParquetMetaDataReader::decode_metadata(&meta)?, None) } else { let metadata_start = file_size - length - FOOTER_SIZE - footer_start; @@ -187,16 +190,18 @@ impl MetadataLoader { }; let data = match &self.remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - remainder.slice(offset..range.end - *remainder_start + offset) + Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => { + let remainder_start = *remainder_start as u64; + let range_start = usize::try_from(range.start - remainder_start)?; + let range_end = usize::try_from(range.end - remainder_start)?; + remainder.slice(range_start..range_end) } // Note: this will potentially fetch data already in remainder, this keeps things simple _ => self.fetch.fetch(range.start..range.end).await?, }; // Sanity check - assert_eq!(data.len(), range.end - range.start); + assert_eq!(data.len(), (range.end - range.start) as usize); let offset = range.start; if column_index { @@ -208,10 +213,11 @@ impl MetadataLoader { x.columns() .iter() .map(|c| match c.column_index_range() { - Some(r) => decode_column_index( - &data[r.start - offset..r.end - offset], - c.column_type(), - ), + Some(r) => { + let r_start = usize::try_from(r.start - offset)?; + let r_end = usize::try_from(r.end - offset)?; + decode_column_index(&data[r_start..r_end], c.column_type()) + } None => Ok(Index::NONE), }) .collect::>>() @@ -230,7 +236,11 @@ impl MetadataLoader { x.columns() .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]), + Some(r) => { + let r_start = usize::try_from(r.start - offset)?; + let r_end = usize::try_from(r.end - offset)?; + decode_offset_index(&data[r_start..r_end]) + } None => Err(general_err!("missing offset index")), }) .collect::>>() @@ -256,8 +266,8 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { - async move { self.0(range).await }.boxed() + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + async move { self.0(range.start.try_into()?..range.end.try_into()?).await }.boxed() } } @@ -287,6 +297,7 @@ where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { + let file_size = u64::try_from(file_size)?; let fetch = MetadataFetchFn(fetch); ParquetMetaDataReader::new() .with_prefetch_hint(prefetch) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index cbbb6c415086..fc33608a4c21 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -80,10 +80,10 @@ pub use store::*; /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html pub trait AsyncFileReader: Send { /// Retrieve the bytes in `range` - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result>; /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { async move { let mut result = Vec::with_capacity(ranges.len()); @@ -121,11 +121,11 @@ pub trait AsyncFileReader: Send { /// This allows Box to be used as an AsyncFileReader, impl AsyncFileReader for Box { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { self.as_mut().get_bytes(range) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> { self.as_mut().get_byte_ranges(ranges) } @@ -150,14 +150,14 @@ impl Metadat } impl AsyncFileReader for T { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { - self.seek(SeekFrom::Start(range.start as u64)).await?; + self.seek(SeekFrom::Start(range.start)).await?; let to_read = range.end - range.start; - let mut buffer = Vec::with_capacity(to_read); - let read = self.take(to_read as u64).read_to_end(&mut buffer).await?; - if read != to_read { + let mut buffer = Vec::with_capacity(to_read.try_into()?); + let read = self.take(to_read).read_to_end(&mut buffer).await?; + if read as u64 != to_read { return Err(eof_err!("expected to read {} bytes, got {}", to_read, read)); } @@ -424,7 +424,7 @@ impl ParquetRecordBatchStreamBuilder { let metadata = self.metadata.row_group(row_group_idx); let column_metadata = metadata.column(column_idx); - let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() { + let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() { offset .try_into() .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))? @@ -433,16 +433,16 @@ impl ParquetRecordBatchStreamBuilder { }; let buffer = match column_metadata.bloom_filter_length() { - Some(length) => self.input.0.get_bytes(offset..offset + length as usize), + Some(length) => self.input.0.get_bytes(offset..offset + length as u64), None => self .input .0 - .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE), + .get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64), } .await?; let (header, bitset_offset) = - chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?; + chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?; match header.algorithm { BloomFilterAlgorithm::BLOCK(_) => { @@ -461,14 +461,17 @@ impl ParquetRecordBatchStreamBuilder { } let bitset = match column_metadata.bloom_filter_length() { - Some(_) => buffer.slice((bitset_offset as usize - offset)..), + Some(_) => buffer.slice( + (TryInto::::try_into(bitset_offset).unwrap() + - TryInto::::try_into(offset).unwrap()).., + ), None => { - let bitset_length: usize = header.num_bytes.try_into().map_err(|_| { + let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| { ParquetError::General("Bloom filter length is invalid".to_string()) })?; self.input .0 - .get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length) + .get_bytes(bitset_offset..bitset_offset + bitset_length) .await? } }; @@ -880,7 +883,7 @@ impl InMemoryRowGroup<'_> { if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` - let mut page_start_offsets: Vec> = vec![]; + let mut page_start_offsets: Vec> = vec![]; let fetch_ranges = self .column_chunks @@ -893,11 +896,11 @@ impl InMemoryRowGroup<'_> { .flat_map(|(idx, (_chunk, chunk_meta))| { // If the first page does not start at the beginning of the column, // then we need to also fetch a dictionary page. - let mut ranges = vec![]; + let mut ranges: Vec> = vec![]; let (start, _len) = chunk_meta.byte_range(); match offset_index[idx].page_locations.first() { Some(first) if first.offset as u64 != start => { - ranges.push(start as usize..first.offset as usize); + ranges.push(start..first.offset as u64); } _ => (), } @@ -925,7 +928,11 @@ impl InMemoryRowGroup<'_> { *chunk = Some(Arc::new(ColumnChunkData::Sparse { length: metadata.column(idx).byte_range().1 as usize, - data: offsets.into_iter().zip(chunks.into_iter()).collect(), + data: offsets + .into_iter() + .map(|x| x as usize) + .zip(chunks.into_iter()) + .collect(), })) } } @@ -938,7 +945,7 @@ impl InMemoryRowGroup<'_> { .map(|(idx, _chunk)| { let column = metadata.column(idx); let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + start..(start + length) }) .collect(); @@ -1108,9 +1115,16 @@ mod tests { } impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - self.requests.lock().unwrap().push(range.clone()); - futures::future::ready(Ok(self.data.slice(range))).boxed() + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { + let range = range.clone(); + self.requests + .lock() + .unwrap() + .push(range.start as usize..range.end as usize); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() } fn get_metadata<'a>( @@ -2238,7 +2252,7 @@ mod tests { let file_size = file.metadata().await.unwrap().len(); let mut metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(&mut file, file_size as usize) + .load_and_finish(&mut file, file_size) .await .unwrap(); @@ -2263,7 +2277,7 @@ mod tests { let file_size = file.metadata().await.unwrap().len(); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(&mut file, file_size as usize) + .load_and_finish(&mut file, file_size) .await .unwrap(); @@ -2309,7 +2323,7 @@ mod tests { let file_size = file.metadata().await.unwrap().len(); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .load_and_finish(&mut file, file_size as usize) + .load_and_finish(&mut file, file_size) .await .unwrap(); diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index d5595a83be6e..8eaf7183e822 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -46,7 +46,7 @@ use tokio::runtime::Handle; /// println!("Found Blob with {}B at {}", meta.size, meta.location); /// /// // Show Parquet metadata -/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size.try_into().unwrap()); +/// let reader = ParquetObjectReader::new(storage_container, meta.location).with_file_size(meta.size); /// let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); /// print_parquet_metadata(&mut stdout(), builder.metadata()); /// # } @@ -55,7 +55,7 @@ use tokio::runtime::Handle; pub struct ParquetObjectReader { store: Arc, path: Path, - file_size: Option, + file_size: Option, metadata_size_hint: Option, preload_column_index: bool, preload_offset_index: bool, @@ -94,7 +94,7 @@ impl ParquetObjectReader { /// underlying store does not support suffix range requests. /// /// The file size can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]. - pub fn with_file_size(self, file_size: usize) -> Self { + pub fn with_file_size(self, file_size: u64) -> Self { Self { file_size: Some(file_size), ..self @@ -177,19 +177,14 @@ impl MetadataSuffixFetch for &mut ParquetObjectReader { } impl AsyncFileReader for ParquetObjectReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { - let range = range.start as u64..range.end as u64; + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, Result> { self.spawn(|store, path| store.get_range(path, range)) } - fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> + fn get_byte_ranges(&mut self, ranges: Vec>) -> BoxFuture<'_, Result>> where Self: Send, { - let ranges = ranges - .into_iter() - .map(|range| range.start as u64..range.end as u64) - .collect::>(); self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed()) } @@ -259,8 +254,8 @@ mod tests { #[tokio::test] async fn test_simple() { let (meta, store) = get_meta_store().await; - let object_reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()); + let object_reader = + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); let builder = ParquetRecordBatchStreamBuilder::new(object_reader) .await @@ -290,8 +285,8 @@ mod tests { let (mut meta, store) = get_meta_store().await; meta.location = Path::from("I don't exist.parquet"); - let object_reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()); + let object_reader = + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); // Cannot use unwrap_err as ParquetRecordBatchStreamBuilder: !Debug match ParquetRecordBatchStreamBuilder::new(object_reader).await { Ok(_) => panic!("expected failure"), @@ -325,7 +320,7 @@ mod tests { let initial_actions = num_actions.load(Ordering::Relaxed); let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()) + .with_file_size(meta.size) .with_runtime(rt.handle().clone()); let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); @@ -353,7 +348,7 @@ mod tests { let (meta, store) = get_meta_store().await; let reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()) + .with_file_size(meta.size) .with_runtime(rt.handle().clone()); let current_id = std::thread::current().id(); @@ -378,7 +373,7 @@ mod tests { let (meta, store) = get_meta_store().await; let mut reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()) + .with_file_size(meta.size) .with_runtime(rt.handle().clone()); rt.shutdown_background(); diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 233a55778721..ea9976f3a746 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -1103,9 +1103,9 @@ impl ColumnChunkMetaData { } /// Returns the range for the offset index if any - pub(crate) fn column_index_range(&self) -> Option> { - let offset = usize::try_from(self.column_index_offset?).ok()?; - let length = usize::try_from(self.column_index_length?).ok()?; + pub(crate) fn column_index_range(&self) -> Option> { + let offset = u64::try_from(self.column_index_offset?).ok()?; + let length = u64::try_from(self.column_index_length?).ok()?; Some(offset..(offset + length)) } @@ -1120,9 +1120,9 @@ impl ColumnChunkMetaData { } /// Returns the range for the offset index if any - pub(crate) fn offset_index_range(&self) -> Option> { - let offset = usize::try_from(self.offset_index_offset?).ok()?; - let length = usize::try_from(self.offset_index_length?).ok()?; + pub(crate) fn offset_index_range(&self) -> Option> { + let offset = u64::try_from(self.offset_index_offset?).ok()?; + let length = u64::try_from(self.offset_index_length?).ok()?; Some(offset..(offset + length)) } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 7203c3a00522..aebf1a890621 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -209,7 +209,7 @@ impl ParquetMetaDataReader { /// the request, and must include the Parquet footer. If page indexes are desired, the buffer /// must contain the entire file, or [`Self::try_parse_sized()`] should be used. pub fn try_parse(&mut self, reader: &R) -> Result<()> { - self.try_parse_sized(reader, reader.len() as usize) + self.try_parse_sized(reader, reader.len()) } /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader` @@ -232,10 +232,10 @@ impl ParquetMetaDataReader { /// # use parquet::file::metadata::ParquetMetaDataReader; /// # use parquet::errors::ParquetError; /// # use crate::parquet::file::reader::Length; - /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } + /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } /// let file = open_parquet_file("some_path.parquet"); - /// let len = file.len() as usize; + /// let len = file.len(); /// // Speculatively read 1 kilobyte from the end of the file /// let bytes = get_bytes(&file, len - 1024..len); /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); @@ -243,7 +243,7 @@ impl ParquetMetaDataReader { /// Ok(_) => (), /// Err(ParquetError::NeedMoreData(needed)) => { /// // Read the needed number of bytes from the end of the file - /// let bytes = get_bytes(&file, len - needed..len); + /// let bytes = get_bytes(&file, len - needed as u64..len); /// reader.try_parse_sized(&bytes, len).unwrap(); /// } /// _ => panic!("unexpected error") @@ -259,10 +259,10 @@ impl ParquetMetaDataReader { /// # use parquet::file::metadata::ParquetMetaDataReader; /// # use parquet::errors::ParquetError; /// # use crate::parquet::file::reader::Length; - /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } + /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range) -> bytes::Bytes { unimplemented!(); } /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); } /// let file = open_parquet_file("some_path.parquet"); - /// let len = file.len() as usize; + /// let len = file.len(); /// // Speculatively read 1 kilobyte from the end of the file /// let mut bytes = get_bytes(&file, len - 1024..len); /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); @@ -272,7 +272,7 @@ impl ParquetMetaDataReader { /// Ok(_) => break, /// Err(ParquetError::NeedMoreData(needed)) => { /// // Read the needed number of bytes from the end of the file - /// bytes = get_bytes(&file, len - needed..len); + /// bytes = get_bytes(&file, len - needed as u64..len); /// // If file metadata was read only read page indexes, otherwise continue loop /// if reader.has_metadata() { /// reader.read_page_indexes_sized(&bytes, len); @@ -284,13 +284,13 @@ impl ParquetMetaDataReader { /// } /// let metadata = reader.finish().unwrap(); /// ``` - pub fn try_parse_sized(&mut self, reader: &R, file_size: usize) -> Result<()> { + pub fn try_parse_sized(&mut self, reader: &R, file_size: u64) -> Result<()> { self.metadata = match self.parse_metadata(reader) { Ok(metadata) => Some(metadata), Err(ParquetError::NeedMoreData(needed)) => { // If reader is the same length as `file_size` then presumably there is no more to // read, so return an EOF error. - if file_size == reader.len() as usize || needed > file_size { + if file_size == reader.len() || needed as u64 > file_size { return Err(eof_err!( "Parquet file too small. Size is {} but need {}", file_size, @@ -315,7 +315,7 @@ impl ParquetMetaDataReader { /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. pub fn read_page_indexes(&mut self, reader: &R) -> Result<()> { - self.read_page_indexes_sized(reader, reader.len() as usize) + self.read_page_indexes_sized(reader, reader.len()) } /// Read the page index structures when a [`ParquetMetaData`] has already been obtained. @@ -326,7 +326,7 @@ impl ParquetMetaDataReader { pub fn read_page_indexes_sized( &mut self, reader: &R, - file_size: usize, + file_size: u64, ) -> Result<()> { if self.metadata.is_none() { return Err(general_err!( @@ -350,7 +350,7 @@ impl ParquetMetaDataReader { // Check to see if needed range is within `file_range`. Checking `range.end` seems // redundant, but it guards against `range_for_page_index()` returning garbage. - let file_range = file_size.saturating_sub(reader.len() as usize)..file_size; + let file_range = file_size.saturating_sub(reader.len())..file_size; if !(file_range.contains(&range.start) && file_range.contains(&range.end)) { // Requested range starts beyond EOF if range.end > file_size { @@ -360,14 +360,16 @@ impl ParquetMetaDataReader { )); } else { // Ask for a larger buffer - return Err(ParquetError::NeedMoreData(file_size - range.start)); + return Err(ParquetError::NeedMoreData( + (file_size - range.start).try_into()?, + )); } } // Perform extra sanity check to make sure `range` and the footer metadata don't // overlap. if let Some(metadata_size) = self.metadata_size { - let metadata_range = file_size.saturating_sub(metadata_size)..file_size; + let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size; if range.end > metadata_range.start { return Err(eof_err!( "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}", @@ -377,8 +379,8 @@ impl ParquetMetaDataReader { } } - let bytes_needed = range.end - range.start; - let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?; + let bytes_needed = usize::try_from(range.end - range.start)?; + let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?; let offset = range.start; self.parse_column_index(&bytes, offset)?; @@ -397,7 +399,7 @@ impl ParquetMetaDataReader { pub async fn load_and_finish( mut self, fetch: F, - file_size: usize, + file_size: u64, ) -> Result { self.try_load(fetch, file_size).await?; self.finish() @@ -423,11 +425,7 @@ impl ParquetMetaDataReader { /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches /// performed by this function. #[cfg(all(feature = "async", feature = "arrow"))] - pub async fn try_load( - &mut self, - mut fetch: F, - file_size: usize, - ) -> Result<()> { + pub async fn try_load(&mut self, mut fetch: F, file_size: u64) -> Result<()> { let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?; self.metadata = Some(metadata); @@ -487,9 +485,10 @@ impl ParquetMetaDataReader { }; let bytes = match &remainder { - Some((remainder_start, remainder)) if *remainder_start <= range.start => { - let offset = range.start - *remainder_start; - let end = offset + range.end - range.start; + Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => { + let remainder_start = *remainder_start as u64; + let offset = usize::try_from(range.start - remainder_start)?; + let end = usize::try_from(range.end - remainder_start)?; assert!(end <= remainder.len()); remainder.slice(offset..end) } @@ -498,16 +497,15 @@ impl ParquetMetaDataReader { }; // Sanity check - assert_eq!(bytes.len(), range.end - range.start); - let offset = range.start; + assert_eq!(bytes.len() as u64, range.end - range.start); - self.parse_column_index(&bytes, offset)?; - self.parse_offset_index(&bytes, offset)?; + self.parse_column_index(&bytes, range.start)?; + self.parse_offset_index(&bytes, range.start)?; Ok(()) } - fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { let metadata = self.metadata.as_mut().unwrap(); if self.column_index { let index = metadata @@ -517,10 +515,11 @@ impl ParquetMetaDataReader { x.columns() .iter() .map(|c| match c.column_index_range() { - Some(r) => decode_column_index( - &bytes[r.start - start_offset..r.end - start_offset], - c.column_type(), - ), + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + decode_column_index(&bytes[r_start..r_end], c.column_type()) + } None => Ok(Index::NONE), }) .collect::>>() @@ -531,7 +530,7 @@ impl ParquetMetaDataReader { Ok(()) } - fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> { + fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> { let metadata = self.metadata.as_mut().unwrap(); if self.offset_index { let index = metadata @@ -541,9 +540,11 @@ impl ParquetMetaDataReader { x.columns() .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index( - &bytes[r.start - start_offset..r.end - start_offset], - ), + Some(r) => { + let r_start = usize::try_from(r.start - start_offset)?; + let r_end = usize::try_from(r.end - start_offset)?; + decode_offset_index(&bytes[r_start..r_end]) + } None => Err(general_err!("missing offset index")), }) .collect::>>() @@ -555,7 +556,7 @@ impl ParquetMetaDataReader { Ok(()) } - fn range_for_page_index(&self) -> Option> { + fn range_for_page_index(&self) -> Option> { // sanity check self.metadata.as_ref()?; @@ -592,7 +593,7 @@ impl ParquetMetaDataReader { let footer_metadata_len = FOOTER_SIZE + metadata_len; self.metadata_size = Some(footer_metadata_len); - if footer_metadata_len > file_size as usize { + if footer_metadata_len as u64 > file_size { return Err(ParquetError::NeedMoreData(footer_metadata_len)); } @@ -620,11 +621,11 @@ impl ParquetMetaDataReader { async fn load_metadata( &self, fetch: &mut F, - file_size: usize, + file_size: u64, ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { - let prefetch = self.get_prefetch_size(); + let prefetch = self.get_prefetch_size() as u64; - if file_size < FOOTER_SIZE { + if file_size < FOOTER_SIZE as u64 { return Err(eof_err!("file size of {} is less than footer", file_size)); } @@ -635,7 +636,9 @@ impl ParquetMetaDataReader { let suffix = fetch.fetch(footer_start..file_size).await?; let suffix_len = suffix.len(); - let fetch_len = file_size - footer_start; + let fetch_len = (file_size - footer_start) + .try_into() + .expect("footer size should never be larger than u32"); if suffix_len < fetch_len { return Err(eof_err!( "metadata requires {} bytes, but could only read {}", @@ -650,7 +653,7 @@ impl ParquetMetaDataReader { let footer = Self::decode_footer_tail(&footer)?; let length = footer.metadata_length(); - if file_size < length + FOOTER_SIZE { + if file_size < (length + FOOTER_SIZE) as u64 { return Err(eof_err!( "file size of {} is less than footer + metadata {}", file_size, @@ -660,15 +663,19 @@ impl ParquetMetaDataReader { // Did not fetch the entire file metadata in the initial read, need to make a second request if length > suffix_len - FOOTER_SIZE { - let metadata_start = file_size - length - FOOTER_SIZE; - let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?; + let metadata_start = file_size - (length + FOOTER_SIZE) as u64; + let meta = fetch + .fetch(metadata_start..(file_size - FOOTER_SIZE as u64)) + .await?; Ok((self.decode_footer_metadata(&meta, &footer)?, None)) } else { - let metadata_start = file_size - length - FOOTER_SIZE - footer_start; + let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start) + .try_into() + .expect("metadata length should never be larger than u32"); let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE]; Ok(( self.decode_footer_metadata(slice, &footer)?, - Some((footer_start, suffix.slice(..metadata_start))), + Some((footer_start as usize, suffix.slice(..metadata_start))), )) } } @@ -680,7 +687,7 @@ impl ParquetMetaDataReader { ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { let prefetch = self.get_prefetch_size(); - let suffix = fetch.fetch_suffix(prefetch).await?; + let suffix = fetch.fetch_suffix(prefetch as _).await?; let suffix_len = suffix.len(); if suffix_len < FOOTER_SIZE { @@ -1089,12 +1096,12 @@ mod tests { #[test] fn test_try_parse() { let file = get_test_file("alltypes_tiny_pages.parquet"); - let len = file.len() as usize; + let len = file.len(); let mut reader = ParquetMetaDataReader::new().with_page_indexes(true); - let bytes_for_range = |range: Range| { - file.get_bytes(range.start as u64, range.end - range.start) + let bytes_for_range = |range: Range| { + file.get_bytes(range.start, (range.end - range.start).try_into().unwrap()) .unwrap() }; @@ -1125,7 +1132,7 @@ mod tests { match reader.try_parse_sized(&bytes, len).unwrap_err() { // expected error, try again with provided bounds ParquetError::NeedMoreData(needed) => { - let bytes = bytes_for_range(len - needed..len); + let bytes = bytes_for_range(len - needed as u64..len); reader.try_parse_sized(&bytes, len).unwrap(); let metadata = reader.finish().unwrap(); assert!(metadata.column_index.is_some()); @@ -1141,7 +1148,7 @@ mod tests { match reader.try_parse_sized(&bytes, len) { Ok(_) => break, Err(ParquetError::NeedMoreData(needed)) => { - bytes = bytes_for_range(len - needed..len); + bytes = bytes_for_range(len - needed as u64..len); if reader.has_metadata() { reader.read_page_indexes_sized(&bytes, len).unwrap(); break; @@ -1169,7 +1176,7 @@ mod tests { match reader.try_parse_sized(&bytes, len).unwrap_err() { // expected error, try again with provided bounds ParquetError::NeedMoreData(needed) => { - let bytes = bytes_for_range(len - needed..len); + let bytes = bytes_for_range(len - needed as u64..len); reader.try_parse_sized(&bytes, len).unwrap(); reader.finish().unwrap(); } @@ -1220,10 +1227,10 @@ mod async_tests { impl MetadataFetch for MetadataFetchFn where - F: FnMut(Range) -> Fut + Send, + F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } @@ -1232,18 +1239,18 @@ mod async_tests { impl MetadataFetch for MetadataSuffixFetchFn where - F1: FnMut(Range) -> Fut + Send, + F1: FnMut(Range) -> Fut + Send, Fut: Future> + Send, F2: Send, { - fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { + fn fetch(&mut self, range: Range) -> BoxFuture<'_, Result> { async move { self.0(range).await }.boxed() } } impl MetadataSuffixFetch for MetadataSuffixFetchFn where - F1: FnMut(Range) -> Fut + Send, + F1: FnMut(Range) -> Fut + Send, F2: FnMut(usize) -> Fut + Send, Fut: Future> + Send, { @@ -1252,10 +1259,10 @@ mod async_tests { } } - fn read_range(file: &mut File, range: Range) -> Result { + fn read_range(file: &mut File, range: Range) -> Result { file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; - let mut buf = Vec::with_capacity(len); + let mut buf = Vec::with_capacity(len.try_into().unwrap()); file.take(len as _).read_to_end(&mut buf)?; Ok(buf.into()) } @@ -1263,7 +1270,7 @@ mod async_tests { fn read_suffix(file: &mut File, suffix: usize) -> Result { let file_len = file.len(); // Don't seek before beginning of file - file.seek(SeekFrom::End(0 - suffix.min(file_len as usize) as i64))?; + file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?; let mut buf = Vec::with_capacity(suffix); file.take(suffix as _).read_to_end(&mut buf)?; Ok(buf.into()) @@ -1272,7 +1279,7 @@ mod async_tests { #[tokio::test] async fn test_simple() { let mut file = get_test_file("nulls.snappy.parquet"); - let len = file.len() as usize; + let len = file.len(); let expected = ParquetMetaDataReader::new() .parse_and_finish(&file) @@ -1466,7 +1473,7 @@ mod async_tests { #[tokio::test] async fn test_page_index() { let mut file = get_test_file("alltypes_tiny_pages.parquet"); - let len = file.len() as usize; + let len = file.len(); let fetch_count = AtomicUsize::new(0); let mut fetch = |range| { fetch_count.fetch_add(1, Ordering::SeqCst); @@ -1519,7 +1526,7 @@ mod async_tests { let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .with_prefetch_hint(Some(len - 1000)) // prefetch entire file + .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file .load_and_finish(f, len) .await .unwrap(); @@ -1531,7 +1538,7 @@ mod async_tests { let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .with_prefetch_hint(Some(len)) // prefetch entire file + .with_prefetch_hint(Some(len as usize)) // prefetch entire file .load_and_finish(f, len) .await .unwrap(); @@ -1543,7 +1550,7 @@ mod async_tests { let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) - .with_prefetch_hint(Some(len + 1000)) // prefetch entire file + .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file .load_and_finish(f, len) .await .unwrap(); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index fd3639ac3069..c472ceb29128 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -31,7 +31,7 @@ use std::ops::Range; /// Computes the covering range of two optional ranges /// /// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)` -pub(crate) fn acc_range(a: Option>, b: Option>) -> Option> { +pub(crate) fn acc_range(a: Option>, b: Option>) -> Option> { match (a, b) { (Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)), (None, x) | (x, None) => x, @@ -61,14 +61,17 @@ pub fn read_columns_indexes( None => return Ok(None), }; - let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; - let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?; Some( chunks .iter() .map(|c| match c.column_index_range() { - Some(r) => decode_column_index(get(r), c.column_type()), + Some(r) => decode_column_index( + &bytes[usize::try_from(r.start - fetch.start)? + ..usize::try_from(r.end - fetch.start)?], + c.column_type(), + ), None => Ok(Index::NONE), }) .collect(), @@ -101,13 +104,15 @@ pub fn read_pages_locations( None => return Ok(vec![]), }; - let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; - let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?; chunks .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_page_locations(get(r)), + Some(r) => decode_page_locations( + &bytes[usize::try_from(r.start - fetch.start)? + ..usize::try_from(r.end - fetch.start)?], + ), None => Err(general_err!("missing offset index")), }) .collect() @@ -136,14 +141,16 @@ pub fn read_offset_indexes( None => return Ok(None), }; - let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; - let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + let bytes = reader.get_bytes(fetch.start as _, (fetch.end - fetch.start).try_into()?)?; Some( chunks .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index(get(r)), + Some(r) => decode_offset_index( + &bytes[usize::try_from(r.start - fetch.start)? + ..usize::try_from(r.end - fetch.start)?], + ), None => Err(general_err!("missing offset index")), }) .collect(), diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index 7de5d7e346d6..b427bd4302e2 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -156,7 +156,7 @@ async fn bad_metadata_err() { let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin")); - let metadata_length = metadata_buffer.len(); + let metadata_length = metadata_buffer.len() as u64; let mut reader = std::io::Cursor::new(&metadata_buffer); let mut loader = ParquetMetaDataReader::new(); diff --git a/parquet/tests/encryption/encryption_async.rs b/parquet/tests/encryption/encryption_async.rs index 9deadece9544..11448207c6fc 100644 --- a/parquet/tests/encryption/encryption_async.rs +++ b/parquet/tests/encryption/encryption_async.rs @@ -310,8 +310,7 @@ async fn test_read_encrypted_file_from_object_store() { .unwrap(); let options = ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties); - let mut reader = ParquetObjectReader::new(store, meta.location) - .with_file_size(meta.size.try_into().unwrap()); + let mut reader = ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); let metadata = reader.get_metadata(Some(&options)).await.unwrap(); let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options) .await