-
Notifications
You must be signed in to change notification settings - Fork 122
Implement row group skipping for the default engine parquet readers #362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
715f233
ef71f1a
39b8927
b5c3a52
e71571e
cbca3b3
e7d87eb
beeb6e8
519acbd
18b33cf
6c98441
0fdaf0a
8ac33f8
1cf03dc
bc8b344
0971002
375a380
6236874
9efcbf7
46d19e3
7666512
f3865d0
a4dc3da
40131db
bf65904
cce762d
c7d6bb0
4f92ed7
08a305b
e8a947e
bf1e3a8
9d632e7
4a77f3a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,229 @@ | ||
| //! An implementation of parquet row group skipping using data skipping predicates over footer stats. | ||
| use crate::engine::parquet_stats_skipping::{col_name_to_path, ParquetStatsSkippingFilter}; | ||
| use crate::expressions::{Expression, Scalar}; | ||
| use crate::schema::{DataType, PrimitiveType}; | ||
| use chrono::{DateTime, Days}; | ||
| use parquet::arrow::arrow_reader::ArrowReaderBuilder; | ||
| use parquet::file::metadata::RowGroupMetaData; | ||
| use parquet::file::statistics::Statistics; | ||
| use parquet::schema::types::{ColumnDescPtr, ColumnPath}; | ||
| use std::collections::{HashMap, HashSet}; | ||
| use tracing::debug; | ||
|
|
||
| #[cfg(test)] | ||
| mod tests; | ||
|
|
||
| /// An extension trait for [`ArrowReaderBuilder`] that injects row group skipping capability. | ||
| pub(crate) trait ParquetRowGroupSkipping { | ||
| /// Instructs the parquet reader to perform row group skipping, eliminating any row group whose | ||
| /// stats prove that none of the group's rows can satisfy the given `predicate`. | ||
| fn with_row_group_filter(self, predicate: &Expression) -> Self; | ||
| } | ||
| impl<T> ParquetRowGroupSkipping for ArrowReaderBuilder<T> { | ||
| fn with_row_group_filter(self, predicate: &Expression) -> Self { | ||
| let indices = self | ||
| .metadata() | ||
| .row_groups() | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map(|(index, row_group)| { | ||
| // If the group survives the filter, return Some(index) so filter_map keeps it. | ||
| RowGroupFilter::apply(row_group, predicate).then_some(index) | ||
| }) | ||
| .collect(); | ||
| debug!("with_row_group_filter({predicate:#?}) = {indices:?})"); | ||
| self.with_row_groups(indices) | ||
| } | ||
| } | ||
|
|
||
| /// A ParquetStatsSkippingFilter for row group skipping. It obtains stats from a parquet | ||
| /// [`RowGroupMetaData`] and pre-computes the mapping of each referenced column path to its | ||
| /// corresponding field index, for O(1) stats lookups. | ||
| struct RowGroupFilter<'a> { | ||
| row_group: &'a RowGroupMetaData, | ||
| field_indices: HashMap<ColumnPath, usize>, | ||
| } | ||
|
|
||
| impl<'a> RowGroupFilter<'a> { | ||
| /// Creates a new row group filter for the given row group and predicate. | ||
| fn new(row_group: &'a RowGroupMetaData, predicate: &Expression) -> Self { | ||
| Self { | ||
| row_group, | ||
| field_indices: compute_field_indices(row_group.schema_descr().columns(), predicate), | ||
| } | ||
| } | ||
|
|
||
| /// Applies a filtering predicate to a row group. Return value false means to skip it. | ||
| fn apply(row_group: &'a RowGroupMetaData, predicate: &Expression) -> bool { | ||
| let result = RowGroupFilter::new(row_group, predicate).apply_sql_where(predicate); | ||
| !matches!(result, Some(false)) | ||
scovich marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| fn get_stats(&self, col: &ColumnPath) -> Option<&Statistics> { | ||
| let field_index = self.field_indices.get(col)?; | ||
| self.row_group.column(*field_index).statistics() | ||
| } | ||
|
|
||
| fn decimal_from_bytes(bytes: Option<&[u8]>, precision: u8, scale: u8) -> Option<Scalar> { | ||
| // WARNING: The bytes are stored in big-endian order; reverse and then 0-pad to 16 bytes. | ||
| let bytes = bytes.filter(|b| b.len() <= 16)?; | ||
| let mut bytes = Vec::from(bytes); | ||
| bytes.reverse(); | ||
| bytes.resize(16, 0u8); | ||
| let bytes: [u8; 16] = bytes.try_into().ok()?; | ||
| Some(Scalar::Decimal( | ||
| i128::from_le_bytes(bytes), | ||
| precision, | ||
| scale, | ||
| )) | ||
| } | ||
|
|
||
| fn timestamp_from_date(days: Option<&i32>) -> Option<Scalar> { | ||
| let days = u64::try_from(*days?).ok()?; | ||
| let timestamp = DateTime::UNIX_EPOCH.checked_add_days(Days::new(days))?; | ||
| let timestamp = timestamp.signed_duration_since(DateTime::UNIX_EPOCH); | ||
| Some(Scalar::TimestampNtz(timestamp.num_microseconds()?)) | ||
| } | ||
| } | ||
|
|
||
| impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> { | ||
| // Extracts a stat value, converting from its physical type to the requested logical type. | ||
| // | ||
| // NOTE: This code is highly redundant with [`get_max_stat_value`] below, but parquet | ||
| // ValueStatistics<T> requires T to impl a private trait, so we can't factor out any kind of | ||
| // helper method. And macros are hard enough to read that it's not worth defining one. | ||
| fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> { | ||
| use PrimitiveType::*; | ||
| let value = match (data_type.as_primitive_opt()?, self.get_stats(col)?) { | ||
| (String, Statistics::ByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(), | ||
| (String, Statistics::FixedLenByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(), | ||
| (String, _) => return None, | ||
| (Long, Statistics::Int64(s)) => s.min_opt()?.into(), | ||
| (Long, Statistics::Int32(s)) => (*s.min_opt()? as i64).into(), | ||
| (Long, _) => return None, | ||
| (Integer, Statistics::Int32(s)) => s.min_opt()?.into(), | ||
| (Integer, _) => return None, | ||
| (Short, Statistics::Int32(s)) => (*s.min_opt()? as i16).into(), | ||
| (Short, _) => return None, | ||
| (Byte, Statistics::Int32(s)) => (*s.min_opt()? as i8).into(), | ||
| (Byte, _) => return None, | ||
| (Float, Statistics::Float(s)) => s.min_opt()?.into(), | ||
| (Float, _) => return None, | ||
| (Double, Statistics::Double(s)) => s.min_opt()?.into(), | ||
| (Double, Statistics::Float(s)) => (*s.min_opt()? as f64).into(), | ||
| (Double, _) => return None, | ||
| (Boolean, Statistics::Boolean(s)) => s.min_opt()?.into(), | ||
| (Boolean, _) => return None, | ||
| (Binary, Statistics::ByteArray(s)) => s.min_opt()?.data().into(), | ||
| (Binary, Statistics::FixedLenByteArray(s)) => s.min_opt()?.data().into(), | ||
| (Binary, _) => return None, | ||
| (Date, Statistics::Int32(s)) => Scalar::Date(*s.min_opt()?), | ||
| (Date, _) => return None, | ||
| (Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.min_opt()?), | ||
| (Timestamp, _) => return None, // TODO: Int96 timestamps | ||
| (TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.min_opt()?), | ||
| (TimestampNtz, Statistics::Int32(s)) => Self::timestamp_from_date(s.min_opt())?, | ||
| (TimestampNtz, _) => return None, // TODO: Int96 timestamps | ||
| (Decimal(p, s), Statistics::Int32(i)) => Scalar::Decimal(*i.min_opt()? as i128, *p, *s), | ||
| (Decimal(p, s), Statistics::Int64(i)) => Scalar::Decimal(*i.min_opt()? as i128, *p, *s), | ||
| (Decimal(p, s), Statistics::FixedLenByteArray(b)) => { | ||
| Self::decimal_from_bytes(b.min_bytes_opt(), *p, *s)? | ||
| } | ||
| (Decimal(..), _) => return None, | ||
| }; | ||
| Some(value) | ||
| } | ||
|
|
||
| fn get_max_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> { | ||
| use PrimitiveType::*; | ||
| let value = match (data_type.as_primitive_opt()?, self.get_stats(col)?) { | ||
| (String, Statistics::ByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(), | ||
| (String, Statistics::FixedLenByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(), | ||
| (String, _) => return None, | ||
| (Long, Statistics::Int64(s)) => s.max_opt()?.into(), | ||
| (Long, Statistics::Int32(s)) => (*s.max_opt()? as i64).into(), | ||
| (Long, _) => return None, | ||
| (Integer, Statistics::Int32(s)) => s.max_opt()?.into(), | ||
| (Integer, _) => return None, | ||
| (Short, Statistics::Int32(s)) => (*s.max_opt()? as i16).into(), | ||
| (Short, _) => return None, | ||
| (Byte, Statistics::Int32(s)) => (*s.max_opt()? as i8).into(), | ||
| (Byte, _) => return None, | ||
| (Float, Statistics::Float(s)) => s.max_opt()?.into(), | ||
| (Float, _) => return None, | ||
| (Double, Statistics::Double(s)) => s.max_opt()?.into(), | ||
| (Double, Statistics::Float(s)) => (*s.max_opt()? as f64).into(), | ||
| (Double, _) => return None, | ||
| (Boolean, Statistics::Boolean(s)) => s.max_opt()?.into(), | ||
| (Boolean, _) => return None, | ||
| (Binary, Statistics::ByteArray(s)) => s.max_opt()?.data().into(), | ||
| (Binary, Statistics::FixedLenByteArray(s)) => s.max_opt()?.data().into(), | ||
| (Binary, _) => return None, | ||
| (Date, Statistics::Int32(s)) => Scalar::Date(*s.max_opt()?), | ||
| (Date, _) => return None, | ||
| (Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.max_opt()?), | ||
| (Timestamp, _) => return None, // TODO: Int96 timestamps | ||
| (TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.max_opt()?), | ||
| (TimestampNtz, Statistics::Int32(s)) => Self::timestamp_from_date(s.max_opt())?, | ||
| (TimestampNtz, _) => return None, // TODO: Int96 timestamps | ||
| (Decimal(p, s), Statistics::Int32(i)) => Scalar::Decimal(*i.max_opt()? as i128, *p, *s), | ||
| (Decimal(p, s), Statistics::Int64(i)) => Scalar::Decimal(*i.max_opt()? as i128, *p, *s), | ||
| (Decimal(p, s), Statistics::FixedLenByteArray(b)) => { | ||
| Self::decimal_from_bytes(b.max_bytes_opt(), *p, *s)? | ||
| } | ||
| (Decimal(..), _) => return None, | ||
| }; | ||
| Some(value) | ||
| } | ||
|
|
||
| // Parquet nullcount stats always have the same type (u64), so we can directly return the value | ||
| // instead of wrapping it in a Scalar. We can safely cast it from u64 to i64, because the | ||
| // nullcount can never be larger than the rowcount, and the parquet rowcount stat is i64. | ||
| // | ||
| // NOTE: Stats for any given column are optional, which may produce a NULL nullcount. But if | ||
| // the column itself is missing, then we know all values are implied to be NULL. | ||
| fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> { | ||
| let nullcount = match self.get_stats(col) { | ||
| Some(s) => s.null_count_opt()? as i64, | ||
| None => self.get_rowcount_stat_value(), | ||
|
||
| }; | ||
| Some(nullcount) | ||
| } | ||
|
|
||
| fn get_rowcount_stat_value(&self) -> i64 { | ||
| self.row_group.num_rows() | ||
| } | ||
| } | ||
|
|
||
| /// Given a filter expression of interest and a set of parquet column descriptors, build a column -> | ||
| /// index mapping for columns the expression references. This ensures O(1) lookup times, for an | ||
| /// overall O(n) cost to evaluate an expression tree with n nodes. | ||
| pub(crate) fn compute_field_indices( | ||
| fields: &[ColumnDescPtr], | ||
| expression: &Expression, | ||
| ) -> HashMap<ColumnPath, usize> { | ||
| fn do_recurse(expression: &Expression, cols: &mut HashSet<ColumnPath>) { | ||
| use Expression::*; | ||
| let mut recurse = |expr| do_recurse(expr, cols); // simplifies the call sites below | ||
| match expression { | ||
| Literal(_) => {} | ||
| Column(name) => cols.extend([col_name_to_path(name)]), // returns `()`, unlike `insert` | ||
| Struct(fields) => fields.iter().for_each(recurse), | ||
| UnaryOperation { expr, .. } => recurse(expr), | ||
| BinaryOperation { left, right, .. } => [left, right].iter().for_each(|e| recurse(e)), | ||
| VariadicOperation { exprs, .. } => exprs.iter().for_each(recurse), | ||
| } | ||
| } | ||
|
|
||
| // Build up a set of requested column paths, then take each found path as the corresponding map | ||
| // key (avoids unnecessary cloning). | ||
| // | ||
| // NOTE: If a requested column was not available, it is silently ignored. | ||
zachschuermann marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let mut requested_columns = HashSet::new(); | ||
| do_recurse(expression, &mut requested_columns); | ||
| fields | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map(|(i, f)| requested_columns.take(f.path()).map(|path| (path, i))) | ||
| .collect() | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.