Skip to content

Commit 9b2e7e3

Browse files
authored
Paquet and JSON readers use Arc<Expression> to avoid deep copies (#364)
Today, the engine parquet/json file handler APIs take an `Expression` arg for predicate pushdown. They cannot take a reference, because the iterator they return will likely depend on (but outlive) that reference. Worse, they need to do it for every file the query reads. Unfortunately, data skipping predicates can be arbitrarily large, and thus annoying/expensive to copy so much. We already use Arc to protect schemas (some of the time, at least), and we can start using Arc to protect expressions as well.
1 parent edc85e5 commit 9b2e7e3

File tree

14 files changed

+83
-81
lines changed

14 files changed

+83
-81
lines changed

ffi/src/scan.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,9 @@ fn scan_impl(
135135
if let Some(predicate) = predicate {
136136
let mut visitor_state = KernelExpressionVisitorState::new();
137137
let exprid = (predicate.visitor)(predicate.predicate, &mut visitor_state);
138-
if let Some(predicate) = unwrap_kernel_expression(&mut visitor_state, exprid) {
139-
debug!("Got predicate: {}", predicate);
140-
scan_builder = scan_builder.with_predicate(predicate);
141-
}
138+
let predicate = unwrap_kernel_expression(&mut visitor_state, exprid);
139+
debug!("Got predicate: {:#?}", predicate);
140+
scan_builder = scan_builder.with_predicate(predicate.map(Arc::new));
142141
}
143142
Ok(Arc::new(scan_builder.build()?).into())
144143
}

kernel/src/actions/set_transaction.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, LazyLock};
22

33
use crate::actions::visitors::SetTransactionVisitor;
44
use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME};
55
use crate::snapshot::Snapshot;
6-
use crate::{DeltaResult, Engine, EngineData, Expression as Expr, SchemaRef};
6+
use crate::{DeltaResult, Engine, EngineData, Expression, ExpressionRef, SchemaRef};
77

88
pub use crate::actions::visitors::SetTransactionMap;
99
pub struct SetTransactionScanner {
@@ -52,10 +52,11 @@ impl SetTransactionScanner {
5252
// checkpoint part when patitioned by `add.path` like the Delta spec requires. There's no
5353
// point filtering by a particular app id, even if we have one, because app ids are all in
5454
// the a single checkpoint part having large min/max range (because they're usually uuids).
55-
let meta_predicate = Expr::column("txn.appId").is_not_null();
55+
static META_PREDICATE: LazyLock<Option<ExpressionRef>> =
56+
LazyLock::new(|| Some(Arc::new(Expression::column("txn.appId").is_not_null())));
5657
self.snapshot
5758
.log_segment
58-
.replay(engine, schema.clone(), schema, Some(meta_predicate))
59+
.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
5960
}
6061

6162
/// Scan the Delta Log for the latest transaction entry of an application

kernel/src/engine/default/json.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
1717
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
1818
use crate::schema::SchemaRef;
1919
use crate::{
20-
DeltaResult, EngineData, Error, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
20+
DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta,
21+
JsonHandler,
2122
};
2223

2324
#[derive(Debug)]
@@ -72,7 +73,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
7273
&self,
7374
files: &[FileMeta],
7475
physical_schema: SchemaRef,
75-
_predicate: Option<Expression>,
76+
_predicate: Option<ExpressionRef>,
7677
) -> DeltaResult<FileDataReadResultIterator> {
7778
if files.is_empty() {
7879
return Ok(Box::new(std::iter::empty()));

kernel/src/engine/default/parquet.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_s
1616
use crate::engine::default::executor::TaskExecutor;
1717
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
1818
use crate::schema::SchemaRef;
19-
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};
19+
use crate::{
20+
DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler,
21+
};
2022

2123
#[derive(Debug)]
2224
pub struct DefaultParquetHandler<E: TaskExecutor> {
@@ -48,7 +50,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
4850
&self,
4951
files: &[FileMeta],
5052
physical_schema: SchemaRef,
51-
predicate: Option<Expression>,
53+
predicate: Option<ExpressionRef>,
5254
) -> DeltaResult<FileDataReadResultIterator> {
5355
if files.is_empty() {
5456
return Ok(Box::new(std::iter::empty()));
@@ -90,7 +92,7 @@ struct ParquetOpener {
9092
// projection: Arc<[usize]>,
9193
batch_size: usize,
9294
table_schema: SchemaRef,
93-
predicate: Option<Expression>,
95+
predicate: Option<ExpressionRef>,
9496
limit: Option<usize>,
9597
store: Arc<DynObjectStore>,
9698
}
@@ -99,7 +101,7 @@ impl ParquetOpener {
99101
pub(crate) fn new(
100102
batch_size: usize,
101103
table_schema: SchemaRef,
102-
predicate: Option<Expression>,
104+
predicate: Option<ExpressionRef>,
103105
store: Arc<DynObjectStore>,
104106
) -> Self {
105107
Self {
@@ -166,14 +168,18 @@ impl FileOpener for ParquetOpener {
166168
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
167169
struct PresignedUrlOpener {
168170
batch_size: usize,
169-
predicate: Option<Expression>,
171+
predicate: Option<ExpressionRef>,
170172
limit: Option<usize>,
171173
table_schema: SchemaRef,
172174
client: reqwest::Client,
173175
}
174176

175177
impl PresignedUrlOpener {
176-
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
178+
pub(crate) fn new(
179+
batch_size: usize,
180+
schema: SchemaRef,
181+
predicate: Option<ExpressionRef>,
182+
) -> Self {
177183
Self {
178184
batch_size,
179185
table_schema: schema,

kernel/src/engine/sync/json.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::engine::arrow_data::ArrowEngineData;
77
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
88
use crate::schema::SchemaRef;
99
use crate::{
10-
DeltaResult, EngineData, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
10+
DeltaResult, EngineData, ExpressionRef, FileDataReadResultIterator, FileMeta, JsonHandler,
1111
};
1212

1313
pub(crate) struct SyncJsonHandler;
@@ -16,7 +16,7 @@ fn try_create_from_json(
1616
file: File,
1717
_schema: SchemaRef,
1818
arrow_schema: ArrowSchemaRef,
19-
_predicate: Option<&Expression>,
19+
_predicate: Option<ExpressionRef>,
2020
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
2121
let json = arrow_json::ReaderBuilder::new(arrow_schema)
2222
.build(BufReader::new(file))?
@@ -29,7 +29,7 @@ impl JsonHandler for SyncJsonHandler {
2929
&self,
3030
files: &[FileMeta],
3131
schema: SchemaRef,
32-
predicate: Option<Expression>,
32+
predicate: Option<ExpressionRef>,
3333
) -> DeltaResult<FileDataReadResultIterator> {
3434
read_files(files, schema, predicate, try_create_from_json)
3535
}

kernel/src/engine/sync/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use super::arrow_expression::ArrowExpressionHandler;
44
use crate::engine::arrow_data::ArrowEngineData;
55
use crate::{
6-
DeltaResult, Engine, Error, Expression, ExpressionHandler, FileDataReadResultIterator,
6+
DeltaResult, Engine, Error, ExpressionHandler, ExpressionRef, FileDataReadResultIterator,
77
FileMeta, FileSystemClient, JsonHandler, ParquetHandler, SchemaRef,
88
};
99

@@ -60,12 +60,12 @@ impl Engine for SyncEngine {
6060
fn read_files<F, I>(
6161
files: &[FileMeta],
6262
schema: SchemaRef,
63-
predicate: Option<Expression>,
63+
predicate: Option<ExpressionRef>,
6464
mut try_create_from_file: F,
6565
) -> DeltaResult<FileDataReadResultIterator>
6666
where
6767
I: Iterator<Item = DeltaResult<ArrowEngineData>> + Send + 'static,
68-
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<&Expression>) -> DeltaResult<I>
68+
F: FnMut(File, SchemaRef, ArrowSchemaRef, Option<ExpressionRef>) -> DeltaResult<I>
6969
+ Send
7070
+ 'static,
7171
{
@@ -88,7 +88,7 @@ where
8888
File::open(path)?,
8989
schema.clone(),
9090
arrow_schema.clone(),
91-
predicate.as_ref(),
91+
predicate.clone(),
9292
)
9393
})
9494
// Flatten to Iterator<DeltaResult<DeltaResult<ArrowEngineData>>>

kernel/src/engine/sync/parquet.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ use crate::engine::arrow_data::ArrowEngineData;
88
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
99
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
1010
use crate::schema::SchemaRef;
11-
use crate::{DeltaResult, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};
11+
use crate::{DeltaResult, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler};
1212

1313
pub(crate) struct SyncParquetHandler;
1414

1515
fn try_create_from_parquet(
1616
file: File,
1717
schema: SchemaRef,
1818
_arrow_schema: ArrowSchemaRef,
19-
predicate: Option<&Expression>,
19+
predicate: Option<ExpressionRef>,
2020
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
2121
let metadata = ArrowReaderMetadata::load(&file, Default::default())?;
2222
let parquet_schema = metadata.schema();
@@ -27,7 +27,7 @@ fn try_create_from_parquet(
2727
builder = builder.with_projection(mask);
2828
}
2929
if let Some(predicate) = predicate {
30-
builder = builder.with_row_group_filter(predicate);
30+
builder = builder.with_row_group_filter(predicate.as_ref());
3131
}
3232
Ok(builder.build()?.map(move |data| {
3333
let reordered = reorder_struct_array(data?.into(), &requested_ordering)?;
@@ -40,7 +40,7 @@ impl ParquetHandler for SyncParquetHandler {
4040
&self,
4141
files: &[FileMeta],
4242
schema: SchemaRef,
43-
predicate: Option<Expression>,
43+
predicate: Option<ExpressionRef>,
4444
) -> DeltaResult<FileDataReadResultIterator> {
4545
read_files(files, schema, predicate, try_create_from_parquet)
4646
}

kernel/src/expressions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ pub enum UnaryOperator {
121121
IsNull,
122122
}
123123

124+
pub type ExpressionRef = std::sync::Arc<Expression>;
125+
124126
/// A SQL expression.
125127
///
126128
/// These expressions do not track or validate data types, other than the type

kernel/src/lib.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub(crate) mod utils;
7777

7878
pub use engine_data::{DataVisitor, EngineData};
7979
pub use error::{DeltaResult, Error};
80-
pub use expressions::Expression;
80+
pub use expressions::{Expression, ExpressionRef};
8181
pub use table::Table;
8282

8383
#[cfg(any(
@@ -192,9 +192,7 @@ pub trait JsonHandler: Send + Sync {
192192
&self,
193193
files: &[FileMeta],
194194
physical_schema: SchemaRef,
195-
// TODO: This should really be an Option<Arc<Expression>>, because otherwise we have to
196-
// clone the (potentially large) expression every time we call this function.
197-
predicate: Option<Expression>,
195+
predicate: Option<ExpressionRef>,
198196
) -> DeltaResult<FileDataReadResultIterator>;
199197
}
200198

@@ -216,9 +214,7 @@ pub trait ParquetHandler: Send + Sync {
216214
&self,
217215
files: &[FileMeta],
218216
physical_schema: SchemaRef,
219-
// TODO: This should really be an Option<Arc<Expression>>, because otherwise we have to
220-
// clone the (potentially large) expression every time we call this function.
221-
predicate: Option<Expression>,
217+
predicate: Option<ExpressionRef>,
222218
) -> DeltaResult<FileDataReadResultIterator>;
223219
}
224220

kernel/src/scan/data_skipping.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use std::borrow::Cow;
22
use std::collections::HashSet;
3-
use std::ops::Not;
43
use std::sync::{Arc, LazyLock};
54

65
use tracing::debug;
76

87
use crate::actions::visitors::SelectionVectorVisitor;
98
use crate::actions::{get_log_schema, ADD_NAME};
109
use crate::error::DeltaResult;
11-
use crate::expressions::{BinaryOperator, Expression as Expr, UnaryOperator, VariadicOperator};
10+
use crate::expressions::{
11+
BinaryOperator, Expression as Expr, ExpressionRef, UnaryOperator, VariadicOperator,
12+
};
1213
use crate::schema::{DataType, PrimitiveType, SchemaRef, SchemaTransform, StructField, StructType};
1314
use crate::{Engine, EngineData, ExpressionEvaluator, JsonHandler};
1415

@@ -80,7 +81,7 @@ fn as_inverted_data_skipping_predicate(expr: &Expr) -> Option<Expr> {
8081
as_data_skipping_predicate(&expr)
8182
}
8283
VariadicOperation { op, exprs } => {
83-
let expr = Expr::variadic(op.invert(), exprs.iter().cloned().map(Expr::not));
84+
let expr = Expr::variadic(op.invert(), exprs.iter().cloned().map(|e| !e));
8485
as_data_skipping_predicate(&expr)
8586
}
8687
_ => None,
@@ -179,7 +180,7 @@ impl DataSkippingFilter {
179180
pub(crate) fn new(
180181
engine: &dyn Engine,
181182
table_schema: &SchemaRef,
182-
predicate: &Option<Expr>,
183+
predicate: Option<ExpressionRef>,
183184
) -> Option<Self> {
184185
static PREDICATE_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
185186
DataType::struct_type([StructField::new("predicate", DataType::BOOLEAN, true)])
@@ -188,11 +189,7 @@ impl DataSkippingFilter {
188189
static FILTER_EXPR: LazyLock<Expr> =
189190
LazyLock::new(|| Expr::column("predicate").distinct(Expr::literal(false)));
190191

191-
let predicate = match predicate {
192-
Some(predicate) => predicate,
193-
None => return None,
194-
};
195-
192+
let predicate = predicate.as_deref()?;
196193
debug!("Creating a data skipping filter for {}", &predicate);
197194
let field_names: HashSet<_> = predicate.references();
198195

0 commit comments

Comments
 (0)