Skip to content

Commit

Permalink
delete for split - parquet reader uses row group skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
scovich committed Sep 27, 2024
1 parent 519acbd commit 18b33cf
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 241 deletions.
1 change: 0 additions & 1 deletion ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ fn read_parquet_file_impl(
last_modified: file.last_modified,
size: file.size,
};
// TODO: Plumb the predicate through the FFI?
let data = parquet_handler.read_parquet_files(&[delta_fm], physical_schema, None)?;
let res = Box::new(FileReadResultIterator {
data,
Expand Down
28 changes: 4 additions & 24 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

Expand Down Expand Up @@ -48,7 +47,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<Expression>,
_predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -63,15 +62,10 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
};
Expand All @@ -89,23 +83,20 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
store: Arc<DynObjectStore>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
batch_size,
table_schema,
predicate,
limit: None,
store,
}
Expand All @@ -120,7 +111,6 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
// let projection = self.projection.clone();
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;

Ok(Box::pin(async move {
Expand All @@ -143,9 +133,6 @@ impl FileOpener for ParquetOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand All @@ -166,18 +153,16 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
Self {
batch_size,
table_schema: schema,
predicate,
limit: None,
client: reqwest::Client::new(),
}
Expand All @@ -188,7 +173,6 @@ impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs

Expand All @@ -212,9 +196,6 @@ impl FileOpener for PresignedUrlOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand Down Expand Up @@ -280,7 +261,6 @@ mod tests {
size: meta.size,
}];

// TODO: add a test that uses predicate skipping?
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
Expand Down
3 changes: 0 additions & 3 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ pub mod arrow_expression;
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod arrow_data;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_row_group_skipping;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_stats_skipping;

Expand Down
177 changes: 0 additions & 177 deletions kernel/src/engine/parquet_row_group_skipping.rs

This file was deleted.

1 change: 1 addition & 0 deletions kernel/src/engine/parquet_stats_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::cmp::Ordering;
/// a SET of rows -- has different semantics than row-based predicate evaluation. The provided
/// methods of this class convert various supported expressions into data skipping predicates, and
/// then return the result of evaluating the translated filter.
#[allow(unused)] // temporary, until we wire up the parquet reader to actually use this
pub(crate) trait ParquetStatsSkippingFilter {
/// Retrieves the minimum value of a column, if it exists and has the requested type.
fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar>;
Expand Down
13 changes: 2 additions & 11 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@ use url::Url;

use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

pub(crate) struct SyncParquetHandler;

fn try_create_from_parquet(
schema: SchemaRef,
location: Url,
predicate: Option<&Expression>,
) -> DeltaResult<ArrowEngineData> {
fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult<ArrowEngineData> {
let file = File::open(
location
.to_file_path()
Expand All @@ -30,9 +25,6 @@ fn try_create_from_parquet(
{
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
let mut reader = builder.build()?;
let data = reader
.next()
Expand All @@ -54,8 +46,7 @@ impl ParquetHandler for SyncParquetHandler {
}
let locations: Vec<_> = files.iter().map(|file| file.location.clone()).collect();
Ok(Box::new(locations.into_iter().map(move |location| {
try_create_from_parquet(schema.clone(), location, predicate.as_ref())
.map(|d| Box::new(d) as _)
try_create_from_parquet(schema.clone(), location).map(|d| Box::new(d) as _)
})))
}
}
2 changes: 2 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ pub trait JsonHandler: Send + Sync {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
// TODO: This should really be an Option<Arc<Expression>>, because otherwise we have to
// clone the (potentially large) expression every time we call this function.
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator>;
}
Expand Down
6 changes: 2 additions & 4 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,11 @@ impl Scan {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
let log_iter = self.snapshot.log_segment.replay(
engine,
commit_read_schema,
checkpoint_read_schema,
None,
self.predicate.clone(),
)?;

Ok(scan_action_iter(
Expand Down Expand Up @@ -287,7 +285,7 @@ impl Scan {
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
self.predicate().clone(),
None,
)?;
let gs = global_state.clone(); // Arc clone
Ok(read_result_iter.into_iter().map(move |read_result| {
Expand Down
Loading

0 comments on commit 18b33cf

Please sign in to comment.