Skip to content

Commit cd60057

Browse files
authored
fix(reader): filter row groups when FileScanTask contains byte ranges (#1779)
1 parent 044b45b commit cd60057

File tree

1 file changed

+248
-6
lines changed

1 file changed

+248
-6
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 248 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,16 @@ impl ArrowReader {
224224
}
225225
};
226226

227-
// There are two possible sources both for potential lists of selected RowGroup indices,
228-
// and for `RowSelection`s.
229-
// Selected RowGroup index lists can come from two sources:
227+
// There are three possible sources for potential lists of selected RowGroup indices,
228+
// and two for `RowSelection`s.
229+
// Selected RowGroup index lists can come from three sources:
230+
// * When task.start and task.length specify a byte range (file splitting);
230231
// * When there are equality delete files that are applicable;
231232
// * When there is a scan predicate and row_group_filtering_enabled = true.
232233
// `RowSelection`s can be created in either or both of the following cases:
233234
// * When there are positional delete files that are applicable;
234235
// * When there is a scan predicate and row_selection_enabled = true
235-
// Note that, in the former case we only perform row group filtering when
236+
// Note that row group filtering from predicates only happens when
236237
// there is a scan predicate AND row_group_filtering_enabled = true,
237238
// but we perform row selection filtering if there are applicable
238239
// equality delete files OR (there is a scan predicate AND row_selection_enabled),
@@ -241,6 +242,17 @@ impl ArrowReader {
241242
let mut selected_row_group_indices = None;
242243
let mut row_selection = None;
243244

245+
// Filter row groups based on byte range from task.start and task.length.
246+
// If both start and length are 0, read the entire file (backwards compatibility).
247+
if task.start != 0 || task.length != 0 {
248+
let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range(
249+
record_batch_stream_builder.metadata(),
250+
task.start,
251+
task.length,
252+
)?;
253+
selected_row_group_indices = Some(byte_range_filtered_row_groups);
254+
}
255+
244256
if let Some(predicate) = final_predicate {
245257
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
246258
record_batch_stream_builder.parquet_schema(),
@@ -256,14 +268,26 @@ impl ArrowReader {
256268
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);
257269

258270
if row_group_filtering_enabled {
259-
let result = Self::get_selected_row_group_indices(
271+
let predicate_filtered_row_groups = Self::get_selected_row_group_indices(
260272
&predicate,
261273
record_batch_stream_builder.metadata(),
262274
&field_id_map,
263275
&task.schema,
264276
)?;
265277

266-
selected_row_group_indices = Some(result);
278+
// Merge predicate-based filtering with byte range filtering (if present)
279+
// by taking the intersection of both filters
280+
selected_row_group_indices = match selected_row_group_indices {
281+
Some(byte_range_filtered) => {
282+
// Keep only row groups that are in both filters
283+
let intersection: Vec<usize> = byte_range_filtered
284+
.into_iter()
285+
.filter(|idx| predicate_filtered_row_groups.contains(idx))
286+
.collect();
287+
Some(intersection)
288+
}
289+
None => Some(predicate_filtered_row_groups),
290+
};
267291
}
268292

269293
if row_selection_enabled {
@@ -717,6 +741,36 @@ impl ArrowReader {
717741

718742
Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
719743
}
744+
745+
/// Filters row groups by byte range to support Iceberg's file splitting.
746+
///
747+
/// Iceberg splits large files at row group boundaries, so we only read row groups
748+
/// whose byte ranges overlap with [start, start+length).
749+
fn filter_row_groups_by_byte_range(
750+
parquet_metadata: &Arc<ParquetMetaData>,
751+
start: u64,
752+
length: u64,
753+
) -> Result<Vec<usize>> {
754+
let row_groups = parquet_metadata.row_groups();
755+
let mut selected = Vec::new();
756+
let end = start + length;
757+
758+
// Row groups are stored sequentially after the 4-byte magic header.
759+
let mut current_byte_offset = 4u64;
760+
761+
for (idx, row_group) in row_groups.iter().enumerate() {
762+
let row_group_size = row_group.compressed_size() as u64;
763+
let row_group_end = current_byte_offset + row_group_size;
764+
765+
if current_byte_offset < end && start < row_group_end {
766+
selected.push(idx);
767+
}
768+
769+
current_byte_offset = row_group_end;
770+
}
771+
772+
Ok(selected)
773+
}
720774
}
721775

722776
/// Build the map of parquet field id to Parquet column index in the schema.
@@ -1949,6 +2003,194 @@ message schema {
19492003
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
19502004
}
19512005

2006+
/// Verifies that file splits respect byte ranges and only read specific row groups.
2007+
#[tokio::test]
2008+
async fn test_file_splits_respect_byte_ranges() {
2009+
use arrow_array::Int32Array;
2010+
use parquet::file::reader::{FileReader, SerializedFileReader};
2011+
2012+
let schema = Arc::new(
2013+
Schema::builder()
2014+
.with_schema_id(1)
2015+
.with_fields(vec![
2016+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
2017+
])
2018+
.build()
2019+
.unwrap(),
2020+
);
2021+
2022+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
2023+
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
2024+
PARQUET_FIELD_ID_META_KEY.to_string(),
2025+
"1".to_string(),
2026+
)])),
2027+
]));
2028+
2029+
let tmp_dir = TempDir::new().unwrap();
2030+
let table_location = tmp_dir.path().to_str().unwrap().to_string();
2031+
let file_path = format!("{}/multi_row_group.parquet", &table_location);
2032+
2033+
// Force each batch into its own row group for testing byte range filtering.
2034+
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2035+
(0..100).collect::<Vec<i32>>(),
2036+
))])
2037+
.unwrap();
2038+
let batch2 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2039+
(100..200).collect::<Vec<i32>>(),
2040+
))])
2041+
.unwrap();
2042+
let batch3 = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(Int32Array::from(
2043+
(200..300).collect::<Vec<i32>>(),
2044+
))])
2045+
.unwrap();
2046+
2047+
let props = WriterProperties::builder()
2048+
.set_compression(Compression::SNAPPY)
2049+
.set_max_row_group_size(100)
2050+
.build();
2051+
2052+
let file = File::create(&file_path).unwrap();
2053+
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
2054+
writer.write(&batch1).expect("Writing batch 1");
2055+
writer.write(&batch2).expect("Writing batch 2");
2056+
writer.write(&batch3).expect("Writing batch 3");
2057+
writer.close().unwrap();
2058+
2059+
// Read the file metadata to get row group byte positions
2060+
let file = File::open(&file_path).unwrap();
2061+
let reader = SerializedFileReader::new(file).unwrap();
2062+
let metadata = reader.metadata();
2063+
2064+
println!("File has {} row groups", metadata.num_row_groups());
2065+
assert_eq!(metadata.num_row_groups(), 3, "Expected 3 row groups");
2066+
2067+
// Get byte positions for each row group
2068+
let row_group_0 = metadata.row_group(0);
2069+
let row_group_1 = metadata.row_group(1);
2070+
let row_group_2 = metadata.row_group(2);
2071+
2072+
let rg0_start = 4u64; // Parquet files start with 4-byte magic "PAR1"
2073+
let rg1_start = rg0_start + row_group_0.compressed_size() as u64;
2074+
let rg2_start = rg1_start + row_group_1.compressed_size() as u64;
2075+
let file_end = rg2_start + row_group_2.compressed_size() as u64;
2076+
2077+
println!(
2078+
"Row group 0: {} rows, starts at byte {}, {} bytes compressed",
2079+
row_group_0.num_rows(),
2080+
rg0_start,
2081+
row_group_0.compressed_size()
2082+
);
2083+
println!(
2084+
"Row group 1: {} rows, starts at byte {}, {} bytes compressed",
2085+
row_group_1.num_rows(),
2086+
rg1_start,
2087+
row_group_1.compressed_size()
2088+
);
2089+
println!(
2090+
"Row group 2: {} rows, starts at byte {}, {} bytes compressed",
2091+
row_group_2.num_rows(),
2092+
rg2_start,
2093+
row_group_2.compressed_size()
2094+
);
2095+
2096+
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
2097+
let reader = ArrowReaderBuilder::new(file_io).build();
2098+
2099+
// Task 1: read only the first row group
2100+
let task1 = FileScanTask {
2101+
start: rg0_start,
2102+
length: row_group_0.compressed_size() as u64,
2103+
record_count: Some(100),
2104+
data_file_path: file_path.clone(),
2105+
data_file_format: DataFileFormat::Parquet,
2106+
schema: schema.clone(),
2107+
project_field_ids: vec![1],
2108+
predicate: None,
2109+
deletes: vec![],
2110+
};
2111+
2112+
// Task 2: read the second and third row groups
2113+
let task2 = FileScanTask {
2114+
start: rg1_start,
2115+
length: file_end - rg1_start,
2116+
record_count: Some(200),
2117+
data_file_path: file_path.clone(),
2118+
data_file_format: DataFileFormat::Parquet,
2119+
schema: schema.clone(),
2120+
project_field_ids: vec![1],
2121+
predicate: None,
2122+
deletes: vec![],
2123+
};
2124+
2125+
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
2126+
let result1 = reader
2127+
.clone()
2128+
.read(tasks1)
2129+
.unwrap()
2130+
.try_collect::<Vec<RecordBatch>>()
2131+
.await
2132+
.unwrap();
2133+
2134+
let total_rows_task1: usize = result1.iter().map(|b| b.num_rows()).sum();
2135+
println!(
2136+
"Task 1 (bytes {}-{}) returned {} rows",
2137+
rg0_start,
2138+
rg0_start + row_group_0.compressed_size() as u64,
2139+
total_rows_task1
2140+
);
2141+
2142+
let tasks2 = Box::pin(futures::stream::iter(vec![Ok(task2)])) as FileScanTaskStream;
2143+
let result2 = reader
2144+
.read(tasks2)
2145+
.unwrap()
2146+
.try_collect::<Vec<RecordBatch>>()
2147+
.await
2148+
.unwrap();
2149+
2150+
let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2151+
println!(
2152+
"Task 2 (bytes {}-{}) returned {} rows",
2153+
rg1_start, file_end, total_rows_task2
2154+
);
2155+
2156+
assert_eq!(
2157+
total_rows_task1, 100,
2158+
"Task 1 should read only the first row group (100 rows), but got {} rows",
2159+
total_rows_task1
2160+
);
2161+
2162+
assert_eq!(
2163+
total_rows_task2, 200,
2164+
"Task 2 should read only the second+third row groups (200 rows), but got {} rows",
2165+
total_rows_task2
2166+
);
2167+
2168+
// Verify the actual data values are correct (not just the row count)
2169+
if total_rows_task1 > 0 {
2170+
let first_batch = &result1[0];
2171+
let id_col = first_batch
2172+
.column(0)
2173+
.as_primitive::<arrow_array::types::Int32Type>();
2174+
let first_val = id_col.value(0);
2175+
let last_val = id_col.value(id_col.len() - 1);
2176+
println!("Task 1 data range: {} to {}", first_val, last_val);
2177+
2178+
assert_eq!(first_val, 0, "Task 1 should start with id=0");
2179+
assert_eq!(last_val, 99, "Task 1 should end with id=99");
2180+
}
2181+
2182+
if total_rows_task2 > 0 {
2183+
let first_batch = &result2[0];
2184+
let id_col = first_batch
2185+
.column(0)
2186+
.as_primitive::<arrow_array::types::Int32Type>();
2187+
let first_val = id_col.value(0);
2188+
println!("Task 2 first value: {}", first_val);
2189+
2190+
assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
2191+
}
2192+
}
2193+
19522194
/// Test schema evolution: reading old Parquet file (with only column 'a')
19532195
/// using a newer table schema (with columns 'a' and 'b').
19542196
/// This tests that:

0 commit comments

Comments
 (0)