Skip to content

Commit 4987a5a

Browse files
Add tests in to validate predicate filtering
1 parent f2c15e3 commit 4987a5a

File tree

1 file changed

+75
-10
lines changed

1 file changed

+75
-10
lines changed

kernel/src/engine/default/parquet.rs

+75-10
Original file line numberDiff line numberDiff line change
@@ -234,14 +234,17 @@ impl FileOpener for PresignedUrlOpener {
234234

235235
#[cfg(test)]
236236
mod tests {
237+
use std::ops::Not;
237238
use std::path::PathBuf;
238239

239240
use arrow_array::RecordBatch;
240241
use object_store::{local::LocalFileSystem, ObjectStore};
241242

243+
use crate::actions::{Metadata, Protocol};
242244
use crate::engine::arrow_data::ArrowEngineData;
243245
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
244246
use crate::EngineData;
247+
use Expression as Expr;
245248

246249
use itertools::Itertools;
247250

@@ -255,13 +258,10 @@ mod tests {
255258
.map(Into::into)
256259
}
257260

258-
#[tokio::test]
259-
async fn test_read_parquet_files() {
261+
async fn get_record_batch(path: &str, predicate: Option<Expression>) -> Vec<RecordBatch> {
260262
let store = Arc::new(LocalFileSystem::new());
261263

262-
let path = std::fs::canonicalize(PathBuf::from(
263-
"./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
264-
)).unwrap();
264+
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
265265
let url = url::Url::from_file_path(path).unwrap();
266266
let location = Path::from(url.path());
267267
let meta = store.head(&location).await.unwrap();
@@ -280,14 +280,79 @@ mod tests {
280280
}];
281281

282282
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
283-
let data: Vec<RecordBatch> = handler
284-
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
283+
handler
284+
.read_parquet_files(
285+
files,
286+
Arc::new(physical_schema.try_into().unwrap()),
287+
predicate,
288+
)
285289
.unwrap()
286290
.map(into_record_batch)
287291
.try_collect()
288-
.unwrap();
292+
.unwrap()
293+
}
294+
295+
#[tokio::test]
296+
async fn test_read_parquet_files_with_empty_output() {
297+
let predicate = Some(Expr::lt(Expr::column("value"), Expr::literal(0)));
298+
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
299+
let data = get_record_batch(path, predicate).await;
300+
assert_eq!(data.len(), 0);
301+
}
302+
303+
#[tokio::test]
304+
async fn test_read_parquet_files_filter_data() {
305+
let cases = [
306+
(
307+
5,
308+
Some(Expr::gt_eq(Expr::column("value"), Expr::literal(5))),
309+
),
310+
(1, Some(Expr::gt(Expr::column("value"), Expr::literal(8)))),
311+
(10, None),
312+
];
313+
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
314+
for (num_rows, predicate) in cases {
315+
let data = get_record_batch(path, predicate).await;
316+
assert_eq!(data.len(), 1);
317+
assert_eq!(data[0].num_rows(), num_rows);
318+
}
319+
}
320+
321+
#[tokio::test]
322+
async fn test_parquet_protocol_metadata_filter() {
323+
let predicate = Some(Expr::or(
324+
Expr::not(Expr::is_null(Expr::column("metaData.id"))),
325+
Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))),
326+
));
327+
328+
let path = "./tests/data/app-txn-checkpoint/_delta_log/00000000000000000001.checkpoint.parquet";
329+
let mut protocol_filtered: Vec<Protocol> = vec![];
330+
let data_filtered = get_record_batch(path, predicate).await;
331+
let data = get_record_batch(path, None).await;
332+
333+
let mut metadata_filtered: Vec<Metadata> = vec![];
334+
let mut metadata_expected: Vec<Metadata> = vec![];
335+
let mut protocol_filtered: Vec<Protocol> = vec![];
336+
let mut protocol_expected: Vec<Protocol> = vec![];
337+
338+
for batch in data_filtered.into_iter().map(Into::<ArrowEngineData>::into) {
339+
if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() {
340+
metadata_filtered.push(metadata);
341+
}
342+
if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() {
343+
protocol_filtered.push(protocol);
344+
}
345+
}
346+
for batch in data.into_iter().map(Into::<ArrowEngineData>::into) {
347+
if let Some(metadata) = Metadata::try_new_from_data(&batch).unwrap() {
348+
metadata_expected.push(metadata);
349+
}
350+
if let Some(protocol) = Protocol::try_new_from_data(&batch).unwrap() {
351+
protocol_expected.push(protocol);
352+
}
353+
}
289354

290-
assert_eq!(data.len(), 1);
291-
assert_eq!(data[0].num_rows(), 10);
355+
assert_eq!(metadata_filtered, metadata_expected);
356+
assert_eq!(protocol_expected, protocol_expected);
292357
}
293358
}

0 commit comments

Comments
 (0)