Skip to content

Commit 474f192

Browse files
kylebarronalchemist51etseidlalamb
authored
Change Parquet API interaction to use u64 (support files larger than 4GB in WASM) (#7371)
* Change AsyncFileReader trait for u64 Signed-off-by: Arpit Bandejiya <[email protected]> * update metadatafetch trait Signed-off-by: Arpit Bandejiya <[email protected]> * Fix lint issue * fix tests for latest main * Address comments by @mbrobbel from #7252 * Fix compile * Revert suffix length back to usize * Use `u64` for `file_size` * address comments * fix calculation of metadata_start * change file_size type to u64 * change _sized functions to take u64 file_size * clippy * remove some potential panics * use u64 for page index ranges * Revert change to deprecated method --------- Signed-off-by: Arpit Bandejiya <[email protected]> Co-authored-by: Arpit Bandejiya <[email protected]> Co-authored-by: Ed Seidl <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 56e8208 commit 474f192

File tree

11 files changed

+191
-160
lines changed

11 files changed

+191
-160
lines changed

Diff for: parquet/examples/external_metadata.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ async fn get_metadata_from_remote_parquet_file(
112112
// tell the reader to read the page index
113113
ParquetMetaDataReader::new()
114114
.with_page_indexes(true)
115-
.load_and_finish(remote_file, file_size as usize)
115+
.load_and_finish(remote_file, file_size)
116116
.await
117117
.unwrap()
118118
}

Diff for: parquet/examples/read_with_rowgroup.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,7 @@ impl InMemoryRowGroup {
166166
for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
167167
if self.mask.leaf_included(leaf_idx) {
168168
let (start, len) = meta.byte_range();
169-
let data = reader
170-
.get_bytes(start as usize..(start + len) as usize)
171-
.await?;
169+
let data = reader.get_bytes(start..(start + len)).await?;
172170

173171
vs[leaf_idx] = Some(Arc::new(ColumnChunkData {
174172
offset: start as usize,

Diff for: parquet/src/arrow/arrow_reader/selection.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ impl RowSelection {
162162
/// Note: this method does not make any effort to combine consecutive ranges, nor coalesce
163163
/// ranges that are close together. This is instead delegated to the IO subsystem to optimise,
164164
/// e.g. [`ObjectStore::get_ranges`](object_store::ObjectStore::get_ranges)
165-
pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec<Range<usize>> {
166-
let mut ranges = vec![];
165+
pub fn scan_ranges(&self, page_locations: &[crate::format::PageLocation]) -> Vec<Range<u64>> {
166+
let mut ranges: Vec<Range<u64>> = vec![];
167167
let mut row_offset = 0;
168168

169169
let mut pages = page_locations.iter().peekable();
@@ -175,8 +175,8 @@ impl RowSelection {
175175

176176
while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
177177
if !(selector.skip || current_page_included) {
178-
let start = page.offset as usize;
179-
let end = start + page.compressed_page_size as usize;
178+
let start = page.offset as u64;
179+
let end = start + page.compressed_page_size as u64;
180180
ranges.push(start..end);
181181
current_page_included = true;
182182
}
@@ -200,8 +200,8 @@ impl RowSelection {
200200
}
201201
} else {
202202
if !(selector.skip || current_page_included) {
203-
let start = page.offset as usize;
204-
let end = start + page.compressed_page_size as usize;
203+
let start = page.offset as u64;
204+
let end = start + page.compressed_page_size as u64;
205205
ranges.push(start..end);
206206
}
207207
current_selector = selectors.next()

Diff for: parquet/src/arrow/async_reader/metadata.rs

+29-18
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ use std::ops::Range;
4848
/// file: tokio::fs::File,
4949
/// }
5050
/// impl MetadataFetch for TokioFileMetadata {
51-
/// fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
51+
/// fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
5252
/// // return a future that fetches data in range
5353
/// async move {
54-
/// let mut buf = vec![0; range.len()]; // target buffer
54+
/// let len = (range.end - range.start).try_into().unwrap();
55+
/// let mut buf = vec![0; len]; // target buffer
5556
/// // seek to the start of the range and read the data
56-
/// self.file.seek(SeekFrom::Start(range.start as u64)).await?;
57+
/// self.file.seek(SeekFrom::Start(range.start)).await?;
5758
/// self.file.read_exact(&mut buf).await?;
5859
/// Ok(Bytes::from(buf)) // convert to Bytes
5960
/// }
@@ -66,11 +67,11 @@ pub trait MetadataFetch {
6667
///
6768
/// Note the returned type is a boxed future, often created by
6869
/// [FutureExt::boxed]. See the trait documentation for an example
69-
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
70+
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
7071
}
7172

7273
impl<T: AsyncFileReader> MetadataFetch for &mut T {
73-
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
74+
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
7475
self.get_bytes(range)
7576
}
7677
}
@@ -117,7 +118,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
117118
file_size - FOOTER_SIZE
118119
};
119120

120-
let suffix = fetch.fetch(footer_start..file_size).await?;
121+
let suffix = fetch.fetch(footer_start as u64..file_size as u64).await?;
121122
let suffix_len = suffix.len();
122123

123124
let mut footer = [0; FOOTER_SIZE];
@@ -137,7 +138,9 @@ impl<F: MetadataFetch> MetadataLoader<F> {
137138
// Did not fetch the entire file metadata in the initial read, need to make a second request
138139
let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
139140
let metadata_start = file_size - length - FOOTER_SIZE;
140-
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
141+
let meta = fetch
142+
.fetch(metadata_start as u64..(file_size - FOOTER_SIZE) as u64)
143+
.await?;
141144
(ParquetMetaDataReader::decode_metadata(&meta)?, None)
142145
} else {
143146
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
@@ -187,16 +190,18 @@ impl<F: MetadataFetch> MetadataLoader<F> {
187190
};
188191

189192
let data = match &self.remainder {
190-
Some((remainder_start, remainder)) if *remainder_start <= range.start => {
191-
let offset = range.start - *remainder_start;
192-
remainder.slice(offset..range.end - *remainder_start + offset)
193+
Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
194+
let remainder_start = *remainder_start as u64;
195+
let range_start = usize::try_from(range.start - remainder_start)?;
196+
let range_end = usize::try_from(range.end - remainder_start)?;
197+
remainder.slice(range_start..range_end)
193198
}
194199
// Note: this will potentially fetch data already in remainder, this keeps things simple
195200
_ => self.fetch.fetch(range.start..range.end).await?,
196201
};
197202

198203
// Sanity check
199-
assert_eq!(data.len(), range.end - range.start);
204+
assert_eq!(data.len(), (range.end - range.start) as usize);
200205
let offset = range.start;
201206

202207
if column_index {
@@ -208,10 +213,11 @@ impl<F: MetadataFetch> MetadataLoader<F> {
208213
x.columns()
209214
.iter()
210215
.map(|c| match c.column_index_range() {
211-
Some(r) => decode_column_index(
212-
&data[r.start - offset..r.end - offset],
213-
c.column_type(),
214-
),
216+
Some(r) => {
217+
let r_start = usize::try_from(r.start - offset)?;
218+
let r_end = usize::try_from(r.end - offset)?;
219+
decode_column_index(&data[r_start..r_end], c.column_type())
220+
}
215221
None => Ok(Index::NONE),
216222
})
217223
.collect::<Result<Vec<_>>>()
@@ -230,7 +236,11 @@ impl<F: MetadataFetch> MetadataLoader<F> {
230236
x.columns()
231237
.iter()
232238
.map(|c| match c.offset_index_range() {
233-
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
239+
Some(r) => {
240+
let r_start = usize::try_from(r.start - offset)?;
241+
let r_end = usize::try_from(r.end - offset)?;
242+
decode_offset_index(&data[r_start..r_end])
243+
}
234244
None => Err(general_err!("missing offset index")),
235245
})
236246
.collect::<Result<Vec<_>>>()
@@ -256,8 +266,8 @@ where
256266
F: FnMut(Range<usize>) -> Fut + Send,
257267
Fut: Future<Output = Result<Bytes>> + Send,
258268
{
259-
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
260-
async move { self.0(range).await }.boxed()
269+
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
270+
async move { self.0(range.start.try_into()?..range.end.try_into()?).await }.boxed()
261271
}
262272
}
263273

@@ -287,6 +297,7 @@ where
287297
F: FnMut(Range<usize>) -> Fut + Send,
288298
Fut: Future<Output = Result<Bytes>> + Send,
289299
{
300+
let file_size = u64::try_from(file_size)?;
290301
let fetch = MetadataFetchFn(fetch);
291302
ParquetMetaDataReader::new()
292303
.with_prefetch_hint(prefetch)

Diff for: parquet/src/arrow/async_reader/mod.rs

+41-27
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ pub use store::*;
8080
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
8181
pub trait AsyncFileReader: Send {
8282
/// Retrieve the bytes in `range`
83-
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
83+
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>>;
8484

8585
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
86-
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
86+
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
8787
async move {
8888
let mut result = Vec::with_capacity(ranges.len());
8989

@@ -121,11 +121,11 @@ pub trait AsyncFileReader: Send {
121121

122122
/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
123123
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
124-
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
124+
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
125125
self.as_mut().get_bytes(range)
126126
}
127127

128-
fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
128+
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
129129
self.as_mut().get_byte_ranges(ranges)
130130
}
131131

@@ -150,14 +150,14 @@ impl<T: AsyncFileReader + MetadataFetch + AsyncRead + AsyncSeek + Unpin> Metadat
150150
}
151151

152152
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
153-
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
153+
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
154154
async move {
155-
self.seek(SeekFrom::Start(range.start as u64)).await?;
155+
self.seek(SeekFrom::Start(range.start)).await?;
156156

157157
let to_read = range.end - range.start;
158-
let mut buffer = Vec::with_capacity(to_read);
159-
let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
160-
if read != to_read {
158+
let mut buffer = Vec::with_capacity(to_read.try_into()?);
159+
let read = self.take(to_read).read_to_end(&mut buffer).await?;
160+
if read as u64 != to_read {
161161
return Err(eof_err!("expected to read {} bytes, got {}", to_read, read));
162162
}
163163

@@ -424,7 +424,7 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
424424
let metadata = self.metadata.row_group(row_group_idx);
425425
let column_metadata = metadata.column(column_idx);
426426

427-
let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
427+
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
428428
offset
429429
.try_into()
430430
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
@@ -433,16 +433,16 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
433433
};
434434

435435
let buffer = match column_metadata.bloom_filter_length() {
436-
Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
436+
Some(length) => self.input.0.get_bytes(offset..offset + length as u64),
437437
None => self
438438
.input
439439
.0
440-
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
440+
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE as u64),
441441
}
442442
.await?;
443443

444444
let (header, bitset_offset) =
445-
chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;
445+
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
446446

447447
match header.algorithm {
448448
BloomFilterAlgorithm::BLOCK(_) => {
@@ -461,14 +461,17 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
461461
}
462462

463463
let bitset = match column_metadata.bloom_filter_length() {
464-
Some(_) => buffer.slice((bitset_offset as usize - offset)..),
464+
Some(_) => buffer.slice(
465+
(TryInto::<usize>::try_into(bitset_offset).unwrap()
466+
- TryInto::<usize>::try_into(offset).unwrap())..,
467+
),
465468
None => {
466-
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
469+
let bitset_length: u64 = header.num_bytes.try_into().map_err(|_| {
467470
ParquetError::General("Bloom filter length is invalid".to_string())
468471
})?;
469472
self.input
470473
.0
471-
.get_bytes(bitset_offset as usize..bitset_offset as usize + bitset_length)
474+
.get_bytes(bitset_offset..bitset_offset + bitset_length)
472475
.await?
473476
}
474477
};
@@ -880,7 +883,7 @@ impl InMemoryRowGroup<'_> {
880883
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
881884
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
882885
// `RowSelection`
883-
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
886+
let mut page_start_offsets: Vec<Vec<u64>> = vec![];
884887

885888
let fetch_ranges = self
886889
.column_chunks
@@ -893,11 +896,11 @@ impl InMemoryRowGroup<'_> {
893896
.flat_map(|(idx, (_chunk, chunk_meta))| {
894897
// If the first page does not start at the beginning of the column,
895898
// then we need to also fetch a dictionary page.
896-
let mut ranges = vec![];
899+
let mut ranges: Vec<Range<u64>> = vec![];
897900
let (start, _len) = chunk_meta.byte_range();
898901
match offset_index[idx].page_locations.first() {
899902
Some(first) if first.offset as u64 != start => {
900-
ranges.push(start as usize..first.offset as usize);
903+
ranges.push(start..first.offset as u64);
901904
}
902905
_ => (),
903906
}
@@ -925,7 +928,11 @@ impl InMemoryRowGroup<'_> {
925928

926929
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
927930
length: metadata.column(idx).byte_range().1 as usize,
928-
data: offsets.into_iter().zip(chunks.into_iter()).collect(),
931+
data: offsets
932+
.into_iter()
933+
.map(|x| x as usize)
934+
.zip(chunks.into_iter())
935+
.collect(),
929936
}))
930937
}
931938
}
@@ -938,7 +945,7 @@ impl InMemoryRowGroup<'_> {
938945
.map(|(idx, _chunk)| {
939946
let column = metadata.column(idx);
940947
let (start, length) = column.byte_range();
941-
start as usize..(start + length) as usize
948+
start..(start + length)
942949
})
943950
.collect();
944951

@@ -1108,9 +1115,16 @@ mod tests {
11081115
}
11091116

11101117
impl AsyncFileReader for TestReader {
1111-
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1112-
self.requests.lock().unwrap().push(range.clone());
1113-
futures::future::ready(Ok(self.data.slice(range))).boxed()
1118+
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1119+
let range = range.clone();
1120+
self.requests
1121+
.lock()
1122+
.unwrap()
1123+
.push(range.start as usize..range.end as usize);
1124+
futures::future::ready(Ok(self
1125+
.data
1126+
.slice(range.start as usize..range.end as usize)))
1127+
.boxed()
11141128
}
11151129

11161130
fn get_metadata<'a>(
@@ -2238,7 +2252,7 @@ mod tests {
22382252
let file_size = file.metadata().await.unwrap().len();
22392253
let mut metadata = ParquetMetaDataReader::new()
22402254
.with_page_indexes(true)
2241-
.load_and_finish(&mut file, file_size as usize)
2255+
.load_and_finish(&mut file, file_size)
22422256
.await
22432257
.unwrap();
22442258

@@ -2263,7 +2277,7 @@ mod tests {
22632277
let file_size = file.metadata().await.unwrap().len();
22642278
let metadata = ParquetMetaDataReader::new()
22652279
.with_page_indexes(true)
2266-
.load_and_finish(&mut file, file_size as usize)
2280+
.load_and_finish(&mut file, file_size)
22672281
.await
22682282
.unwrap();
22692283

@@ -2309,7 +2323,7 @@ mod tests {
23092323
let file_size = file.metadata().await.unwrap().len();
23102324
let metadata = ParquetMetaDataReader::new()
23112325
.with_page_indexes(true)
2312-
.load_and_finish(&mut file, file_size as usize)
2326+
.load_and_finish(&mut file, file_size)
23132327
.await
23142328
.unwrap();
23152329

0 commit comments

Comments
 (0)