Skip to content

Commit 2e0b646

Browse files
authored
feat: eagerly project the arrow schema to scope out non-selected fields (apache#785)
1 parent 03f0889 commit 2e0b646

File tree

1 file changed

+116
-17
lines changed

1 file changed

+116
-17
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 116 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use std::sync::Arc;
2525
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
2626
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
2727
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
28-
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
28+
use arrow_schema::{
29+
ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
30+
};
2931
use arrow_string::like::starts_with;
3032
use bytes::Bytes;
3133
use fnv::FnvHashSet;
@@ -328,22 +330,27 @@ impl ArrowReader {
328330
let mut column_map = HashMap::new();
329331

330332
let fields = arrow_schema.fields();
331-
let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
332-
fields.filter_leaves(|idx, field| {
333-
let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
334-
if field_id.is_none() {
335-
return false;
336-
}
337-
338-
let field_id = i32::from_str(field_id.unwrap());
339-
if field_id.is_err() {
340-
return false;
341-
}
342-
let field_id = field_id.unwrap();
333+
// Pre-project only the fields that have been selected, possibly avoiding converting
334+
// some Arrow types that are not yet supported.
335+
let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
336+
let projected_arrow_schema = ArrowSchema::new_with_metadata(
337+
fields.filter_leaves(|_, f| {
338+
f.metadata()
339+
.get(PARQUET_FIELD_ID_META_KEY)
340+
.and_then(|field_id| i32::from_str(field_id).ok())
341+
.map_or(false, |field_id| {
342+
projected_fields.insert((*f).clone(), field_id);
343+
field_ids.contains(&field_id)
344+
})
345+
}),
346+
arrow_schema.metadata().clone(),
347+
);
348+
let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
343349

344-
if !field_ids.contains(&field_id) {
350+
fields.filter_leaves(|idx, field| {
351+
let Some(field_id) = projected_fields.get(field).cloned() else {
345352
return false;
346-
}
353+
};
347354

348355
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
349356
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
@@ -1128,13 +1135,20 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
11281135

11291136
#[cfg(test)]
11301137
mod tests {
1131-
use std::collections::HashSet;
1138+
use std::collections::{HashMap, HashSet};
11321139
use std::sync::Arc;
11331140

1134-
use crate::arrow::reader::CollectFieldIdVisitor;
1141+
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1142+
use parquet::arrow::ProjectionMask;
1143+
use parquet::schema::parser::parse_message_type;
1144+
use parquet::schema::types::SchemaDescriptor;
1145+
1146+
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1147+
use crate::arrow::ArrowReader;
11351148
use crate::expr::visitors::bound_predicate_visitor::visit;
11361149
use crate::expr::{Bind, Reference};
11371150
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
1151+
use crate::ErrorKind;
11381152

11391153
fn table_schema_simple() -> SchemaRef {
11401154
Arc::new(
@@ -1208,4 +1222,89 @@ mod tests {
12081222

12091223
assert_eq!(visitor.field_ids, expected);
12101224
}
1225+
1226+
#[test]
1227+
fn test_arrow_projection_mask() {
1228+
let schema = Arc::new(
1229+
Schema::builder()
1230+
.with_schema_id(1)
1231+
.with_identifier_field_ids(vec![1])
1232+
.with_fields(vec![
1233+
NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(),
1234+
NestedField::optional(2, "c2", Type::Primitive(PrimitiveType::Int)).into(),
1235+
NestedField::optional(
1236+
3,
1237+
"c3",
1238+
Type::Primitive(PrimitiveType::Decimal {
1239+
precision: 38,
1240+
scale: 3,
1241+
}),
1242+
)
1243+
.into(),
1244+
])
1245+
.build()
1246+
.unwrap(),
1247+
);
1248+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
1249+
Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([(
1250+
PARQUET_FIELD_ID_META_KEY.to_string(),
1251+
"1".to_string(),
1252+
)])),
1253+
// Type not supported
1254+
Field::new("c2", DataType::Duration(TimeUnit::Microsecond), true).with_metadata(
1255+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
1256+
),
1257+
// Precision is beyond the supported range
1258+
Field::new("c3", DataType::Decimal128(39, 3), true).with_metadata(HashMap::from([(
1259+
PARQUET_FIELD_ID_META_KEY.to_string(),
1260+
"3".to_string(),
1261+
)])),
1262+
]));
1263+
1264+
let message_type = "
1265+
message schema {
1266+
required binary c1 (STRING) = 1;
1267+
optional int32 c2 (INTEGER(8,true)) = 2;
1268+
optional fixed_len_byte_array(17) c3 (DECIMAL(39,3)) = 3;
1269+
}
1270+
";
1271+
let parquet_type = parse_message_type(message_type).expect("should parse schema");
1272+
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type));
1273+
1274+
// Try projecting the fields c2 and c3 with the unsupported data types
1275+
let err = ArrowReader::get_arrow_projection_mask(
1276+
&[1, 2, 3],
1277+
&schema,
1278+
&parquet_schema,
1279+
&arrow_schema,
1280+
)
1281+
.unwrap_err();
1282+
1283+
assert_eq!(err.kind(), ErrorKind::DataInvalid);
1284+
assert_eq!(
1285+
err.to_string(),
1286+
"DataInvalid => Unsupported Arrow data type: Duration(Microsecond)".to_string()
1287+
);
1288+
1289+
// Omitting field c2, we still get an error due to c3 being selected
1290+
let err = ArrowReader::get_arrow_projection_mask(
1291+
&[1, 3],
1292+
&schema,
1293+
&parquet_schema,
1294+
&arrow_schema,
1295+
)
1296+
.unwrap_err();
1297+
1298+
assert_eq!(err.kind(), ErrorKind::DataInvalid);
1299+
assert_eq!(
1300+
err.to_string(),
1301+
"DataInvalid => Failed to create decimal type, source: DataInvalid => Decimals with precision larger than 38 are not supported: 39".to_string()
1302+
);
1303+
1304+
// Finally avoid selecting fields with unsupported data types
1305+
let mask =
1306+
ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema)
1307+
.expect("Some ProjectionMask");
1308+
assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
1309+
}
12111310
}

0 commit comments

Comments
 (0)