Skip to content

Commit

Permalink
parquet reader now uses row group skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
scovich committed Sep 27, 2024
1 parent 18b33cf commit 6c98441
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 20 deletions.
1 change: 1 addition & 0 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn read_parquet_file_impl(
last_modified: file.last_modified,
size: file.size,
};
// TODO: Plumb the predicate through the FFI?
let data = parquet_handler.read_parquet_files(&[delta_fm], physical_schema, None)?;
let res = Box::new(FileReadResultIterator {
data,
Expand Down
28 changes: 24 additions & 4 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

Expand Down Expand Up @@ -47,7 +48,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -62,10 +63,15 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
};
Expand All @@ -83,20 +89,23 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
table_schema: SchemaRef,
predicate: Option<Expression>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
batch_size,
table_schema,
predicate,
limit: None,
store,
}
Expand All @@ -111,6 +120,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
// let projection = self.projection.clone();
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;

Ok(Box::pin(async move {
Expand All @@ -133,6 +143,9 @@ impl FileOpener for ParquetOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand All @@ -153,16 +166,18 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
Self {
batch_size,
table_schema: schema,
predicate,
limit: None,
client: reqwest::Client::new(),
}
Expand All @@ -173,6 +188,7 @@ impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs

Expand All @@ -196,6 +212,9 @@ impl FileOpener for PresignedUrlOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand Down Expand Up @@ -261,6 +280,7 @@ mod tests {
size: meta.size,
}];

// TODO: add a test that uses predicate skipping?
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub mod arrow_expression;
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod arrow_data;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_row_group_skipping;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_stats_skipping;

Expand Down
177 changes: 177 additions & 0 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
//! An implementation of parquet row group skipping using data skipping predicates over footer stats.
use crate::engine::parquet_stats_skipping::{col_name_to_path, ParquetStatsSkippingFilter};
use crate::expressions::{Expression, Scalar};
use crate::schema::{DataType, PrimitiveType};
use parquet::arrow::arrow_reader::ArrowReaderBuilder;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::statistics::Statistics;
use parquet::schema::types::{ColumnDescPtr, ColumnPath};
use std::collections::{HashMap, HashSet};

/// An extension trait for [`ArrowReaderBuilder`] that injects row group skipping capability.
pub(crate) trait ParquetRowGroupSkipping {
/// Instructs the parquet reader to perform row group skipping, eliminating any row group whose
/// stats prove that none of the group's rows can satisfy the given `predicate`.
fn with_row_group_filter(self, predicate: &Expression) -> Self;
}
impl<T> ParquetRowGroupSkipping for ArrowReaderBuilder<T> {
fn with_row_group_filter(self, predicate: &Expression) -> Self {
let indices = self
.metadata()
.row_groups()
.iter()
.enumerate()
.filter_map(|(index, row_group)| {
RowGroupFilter::apply(predicate, row_group).then_some(index)
})
.collect();
self.with_row_groups(indices)
}
}

/// A ParquetStatsSkippingFilter for row group skipping. It obtains stats from a parquet
/// [`RowGroupMetaData`] and pre-computes the mapping of each referenced column path to its
/// corresponding field index, for O(1) stats lookups.
struct RowGroupFilter<'a> {
row_group: &'a RowGroupMetaData,
field_indices: HashMap<ColumnPath, usize>,
}

impl<'a> RowGroupFilter<'a> {
/// Applies a filtering expression to a row group. Return value false means to skip it.
fn apply(filter: &Expression, row_group: &'a RowGroupMetaData) -> bool {
let field_indices = compute_field_indices(row_group.schema_descr().columns(), filter);
let result = Self {
row_group,
field_indices,
}
.apply_sql_where(filter);
!matches!(result, Some(false))
}

fn get_stats(&self, col: &ColumnPath) -> Option<&Statistics> {
let field_index = self.field_indices.get(col)?;
self.row_group.column(*field_index).statistics()
}
}

impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> {
// Extracts a stat value, converting from its physical type to the requested logical type.
//
// NOTE: This code is highly redundant with [`get_min_stat_value`], but parquet
// ValueStatistics<T> requires T to impl a private trait, so we can't factor out any kind of
// helper method. And macros are hard enough to read that it's not worth defining one.
fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> {
use PrimitiveType::*;
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)?) {
(String, Statistics::ByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(),
(String, Statistics::FixedLenByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(),
(String, _) => None?,
(Long, Statistics::Int64(s)) => s.min_opt()?.into(),
(Long, Statistics::Int32(s)) => (*s.min_opt()? as i64).into(),
(Long, _) => None?,
(Integer, Statistics::Int32(s)) => s.min_opt()?.into(),
(Integer, _) => None?,
(Short, Statistics::Int32(s)) => (*s.min_opt()? as i16).into(),
(Short, _) => None?,
(Byte, Statistics::Int32(s)) => (*s.min_opt()? as i8).into(),
(Byte, _) => None?,
(Float, Statistics::Float(s)) => s.min_opt()?.into(),
(Float, _) => None?,
(Double, Statistics::Double(s)) => s.min_opt()?.into(),
(Double, _) => None?,
(Boolean, Statistics::Boolean(s)) => s.min_opt()?.into(),
(Boolean, _) => None?,
(Binary, Statistics::ByteArray(s)) => s.min_opt()?.data().into(),
(Binary, Statistics::FixedLenByteArray(s)) => s.min_opt()?.data().into(),
(Binary, _) => None?,
(Date, Statistics::Int32(s)) => Scalar::Date(*s.min_opt()?),
(Date, _) => None?,
(Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.min_opt()?),
(Timestamp, _) => None?, // TODO: Int96 timestamps
(TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.min_opt()?),
(TimestampNtz, _) => None?, // TODO: Int96 timestamps
(Decimal(..), _) => None?, // TODO: Decimal (Int32, Int64, FixedLenByteArray)
};
Some(value)
}

fn get_max_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> {
use PrimitiveType::*;
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)?) {
(String, Statistics::ByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(),
(String, Statistics::FixedLenByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(),
(String, _) => None?,
(Long, Statistics::Int64(s)) => s.max_opt()?.into(),
(Long, Statistics::Int32(s)) => (*s.max_opt()? as i64).into(),
(Long, _) => None?,
(Integer, Statistics::Int32(s)) => s.max_opt()?.into(),
(Integer, _) => None?,
(Short, Statistics::Int32(s)) => (*s.max_opt()? as i16).into(),
(Short, _) => None?,
(Byte, Statistics::Int32(s)) => (*s.max_opt()? as i8).into(),
(Byte, _) => None?,
(Float, Statistics::Float(s)) => s.max_opt()?.into(),
(Float, _) => None?,
(Double, Statistics::Double(s)) => s.max_opt()?.into(),
(Double, _) => None?,
(Boolean, Statistics::Boolean(s)) => s.max_opt()?.into(),
(Boolean, _) => None?,
(Binary, Statistics::ByteArray(s)) => s.max_opt()?.data().into(),
(Binary, Statistics::FixedLenByteArray(s)) => s.max_opt()?.data().into(),
(Binary, _) => None?,
(Date, Statistics::Int32(s)) => Scalar::Date(*s.max_opt()?),
(Date, _) => None?,
(Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.max_opt()?),
(Timestamp, _) => None?, // TODO: Int96 timestamps
(TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.max_opt()?),
(TimestampNtz, _) => None?, // TODO: Int96 timestamps
(Decimal(..), _) => None?, // TODO: Decimal (Int32, Int64, FixedLenByteArray)
};
Some(value)
}

// Parquet nullcount stats always have the same type (u64), so we can directly return the value
// instead of wrapping it in a Scalar. We can safely cast it from u64 to i64, because the
// nullcount can never be larger than the rowcount, and the parquet rowcount stat is i64.
fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> {
Some(self.get_stats(col)?.null_count_opt()? as i64)
}

fn get_rowcount_stat_value(&self) -> i64 {
self.row_group.num_rows()
}
}

/// Given a filter expression of interest and a set of parquet column descriptors, build a column ->
/// index mapping for columns the expression references. This ensures O(1) lookup times, for an
/// overall O(n) cost to evaluate an expression tree with n nodes.
pub(crate) fn compute_field_indices(
fields: &[ColumnDescPtr],
expression: &Expression,
) -> HashMap<ColumnPath, usize> {
fn do_recurse(expression: &Expression, cols: &mut HashSet<ColumnPath>) {
use Expression::*;
let mut recurse = |expr| do_recurse(expr, cols); // less arg passing below
match expression {
Literal(_) => {}
Column(name) => drop(cols.insert(col_name_to_path(name))),
Struct(fields) => fields.iter().for_each(recurse),
UnaryOperation { expr, .. } => recurse(expr),
BinaryOperation { left, right, .. } => [left, right].iter().for_each(|e| recurse(e)),
VariadicOperation { exprs, .. } => exprs.iter().for_each(recurse),
}
}

// Build up a set of requested column paths, then take each found path as the corresponding map
// key (avoids unnecessary cloning).
//
// NOTE: If a requested column was not available, it is silently ignored.
let mut requested_columns = HashSet::new();
do_recurse(expression, &mut requested_columns);
fields
.iter()
.enumerate()
.filter_map(|(i, f)| requested_columns.take(f.path()).map(|path| (path, i)))
.collect()
}
1 change: 0 additions & 1 deletion kernel/src/engine/parquet_stats_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::cmp::Ordering;
/// a SET of rows -- has different semantics than row-based predicate evaluation. The provided
/// methods of this class convert various supported expressions into data skipping predicates, and
/// then return the result of evaluating the translated filter.
#[allow(unused)] // temporary, until we wire up the parquet reader to actually use this
pub(crate) trait ParquetStatsSkippingFilter {
/// Retrieves the minimum value of a column, if it exists and has the requested type.
fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar>;
Expand Down
13 changes: 11 additions & 2 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ use url::Url;

use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

pub(crate) struct SyncParquetHandler;

fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult<ArrowEngineData> {
fn try_create_from_parquet(
schema: SchemaRef,
location: Url,
predicate: Option<&Expression>,
) -> DeltaResult<ArrowEngineData> {
let file = File::open(
location
.to_file_path()
Expand All @@ -25,6 +30,9 @@ fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult<Arro
{
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
let mut reader = builder.build()?;
let data = reader
.next()
Expand All @@ -46,7 +54,8 @@ impl ParquetHandler for SyncParquetHandler {
}
let locations: Vec<_> = files.iter().map(|file| file.location.clone()).collect();
Ok(Box::new(locations.into_iter().map(move |location| {
try_create_from_parquet(schema.clone(), location).map(|d| Box::new(d) as _)
try_create_from_parquet(schema.clone(), location, predicate.as_ref())
.map(|d| Box::new(d) as _)
})))
}
}
2 changes: 0 additions & 2 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ pub trait JsonHandler: Send + Sync {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
// TODO: This should really be an Option<Arc<Expression>>, because otherwise we have to
// clone the (potentially large) expression every time we call this function.
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator>;
}
Expand Down
6 changes: 4 additions & 2 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,13 @@ impl Scan {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
let log_iter = self.snapshot.log_segment.replay(
engine,
commit_read_schema,
checkpoint_read_schema,
self.predicate.clone(),
None,
)?;

Ok(scan_action_iter(
Expand Down Expand Up @@ -285,7 +287,7 @@ impl Scan {
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
None,
self.predicate().clone(),
)?;
let gs = global_state.clone(); // Arc clone
Ok(read_result_iter.into_iter().map(move |read_result| {
Expand Down
Loading

0 comments on commit 6c98441

Please sign in to comment.