Skip to content

Commit

Permalink
Move RowFilter creation to a helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Sep 19, 2024
1 parent cef20e3 commit a8c7ca4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 48 deletions.
26 changes: 20 additions & 6 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use arrow_schema::{
};
use arrow_select::concat::concat;
use itertools::Itertools;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::ProjectionMask;

use super::arrow_conversion::LIST_ARRAY_ROOT;
use crate::engine::arrow_data::ArrowEngineData;
Expand All @@ -29,9 +31,24 @@ use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, Vari
use crate::schema::{DataType, PrimitiveType, SchemaRef};
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};

pub fn expression_to_row_filter(predicate: Expression) -> RowFilter {
let arrow_predicate = ArrowPredicateFn::new(
ProjectionMask::all(),
move |batch| {
downcast_to_bool(
&evaluate_expression(&predicate, &batch, None)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?,
)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
.cloned()
},
);
RowFilter::new(vec![Box::new(arrow_predicate)])
}

// TODO leverage scalars / Datum

pub fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> {
fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> {
arr.as_any()
.downcast_ref::<BooleanArray>()
.ok_or(Error::generic("expected boolean array"))
Expand Down Expand Up @@ -182,7 +199,7 @@ fn column_as_struct<'a>(
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
}

pub fn evaluate_expression(
fn evaluate_expression(
expression: &Expression,
batch: &RecordBatch,
result_type: Option<&DataType>,
Expand All @@ -197,10 +214,7 @@ pub fn evaluate_expression(
if name.contains('.') {
let mut path = name.split('.');
// Safety: we know that the first path step exists, because we checked for '.'
let x = extract_column(batch, path.next().unwrap(), &mut path)
.cloned()
.expect(&format!("Failed on a name: {:?}", name).to_owned());
Ok(x)
Ok(extract_column(batch, path.next().unwrap(), &mut path).cloned()?)
} else {
batch
.column_by_name(name)
Expand Down
33 changes: 6 additions & 27 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,16 @@
use std::ops::Range;
use std::sync::Arc;

use arrow_schema::ArrowError;
use futures::StreamExt;
use object_store::path::Path;
use object_store::DynObjectStore;
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
RowFilter,
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::arrow::ProjectionMask;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_expression::{downcast_to_bool, evaluate_expression};
use crate::engine::arrow_expression::expression_to_row_filter;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::schema::SchemaRef;
Expand Down Expand Up @@ -136,18 +133,9 @@ impl FileOpener for ParquetOpener {
let options = ArrowReaderOptions::new(); //.with_page_index(enable_page_index);
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options).await?;
if let Some(predicate) = predicate {
builder = builder.with_row_filter(RowFilter::new(vec![Box::new(
ArrowPredicateFn::new(ProjectionMask::all(), move |batch| {
downcast_to_bool(
&evaluate_expression(&predicate, &batch, None)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?,
)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
.cloned()
}),
)]));
}
if let Some(predicate) = predicate {
builder = builder.with_row_filter(expression_to_row_filter(predicate));
}
if let Some(mask) = generate_mask(
&table_schema,
parquet_schema,
Expand Down Expand Up @@ -224,16 +212,7 @@ impl FileOpener for PresignedUrlOpener {
}

if let Some(predicate) = predicate {
builder = builder.with_row_filter(RowFilter::new(vec![Box::new(
ArrowPredicateFn::new(ProjectionMask::all(), move |batch| {
downcast_to_bool(
&evaluate_expression(&predicate, &batch, None)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?,
)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
.cloned()
}),
)]));
builder = builder.with_row_filter(expression_to_row_filter(predicate));
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
Expand Down
18 changes: 3 additions & 15 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use std::fs::File;

use arrow_schema::ArrowError;
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderMetadata, ParquetRecordBatchReaderBuilder, RowFilter,
ArrowReaderMetadata, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::ProjectionMask;
use tracing::debug;
use url::Url;

use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_expression::{downcast_to_bool, evaluate_expression};
use crate::engine::arrow_expression::{ expression_to_row_filter};
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};
Expand All @@ -35,17 +33,7 @@ fn try_create_from_parquet(
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
builder = builder.with_row_filter(RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
ProjectionMask::all(),
move |batch| {
downcast_to_bool(
&evaluate_expression(&predicate, &batch, None)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))?,
)
.map_err(|err| ArrowError::ExternalError(Box::new(err)))
.cloned()
},
))]));
builder = builder.with_row_filter(expression_to_row_filter(predicate));
}
let mut reader = builder.build()?;
let data = reader
Expand Down
Empty file added profile.log
Empty file.

0 comments on commit a8c7ca4

Please sign in to comment.