-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement row group skipping for the default engine parquet readers #362
Merged
Merged
Changes from all commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
715f233
WIP - first pass at the code
ryan-johnson-databricks ef71f1a
split out a trait, add more type support
ryan-johnson-databricks 39b8927
support short circuit junction eval
ryan-johnson-databricks b5c3a52
Merge remote-tracking branch 'oss/main' into row-group-skipping
scovich e71571e
add tests, fix bugs
scovich cbca3b3
support SQL WHERE semantics, finished adding tests for skipping logic
scovich e7d87eb
Mark block text as not rust code doctest should run
scovich beeb6e8
add missing tests identified by codecov
scovich 519acbd
Wire up row group skipping
scovich 18b33cf
delete for split - parquet reader uses row group skipping
scovich 6c98441
parquet reader now uses row group skipping
scovich 0fdaf0a
add stats-getter test; review comments
scovich 8ac33f8
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich 1cf03dc
improve test coverage; clippy
scovich bc8b344
yet more test coverage
scovich 0971002
improve test coverage even more
scovich 375a380
Add a query level test as well
scovich 6236874
Fix broken sync json parsing and harmonize file reading
scovich 9efcbf7
fmt
scovich 46d19e3
remove spurious TODO
scovich 7666512
Revert "Fix broken sync json parsing and harmonize file reading"
scovich f3865d0
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich a4dc3da
review comments
scovich 40131db
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich bf65904
Infer null count stat for missing columns; add more tests
scovich cce762d
One last test
scovich c7d6bb0
test cleanup
scovich 4f92ed7
code comment tweak
scovich 08a305b
remove unneeded test
scovich e8a947e
Merge remote-tracking branch 'oss' into use-row-group-skipping
scovich bf1e3a8
fix two nullcount stat bugs
scovich 9d632e7
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich 4a77f3a
review nits
scovich File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,244 @@ | ||
//! An implementation of parquet row group skipping using data skipping predicates over footer stats. | ||
use crate::engine::parquet_stats_skipping::{col_name_to_path, ParquetStatsSkippingFilter}; | ||
use crate::expressions::{Expression, Scalar}; | ||
use crate::schema::{DataType, PrimitiveType}; | ||
use chrono::{DateTime, Days}; | ||
use parquet::arrow::arrow_reader::ArrowReaderBuilder; | ||
use parquet::file::metadata::RowGroupMetaData; | ||
use parquet::file::statistics::Statistics; | ||
use parquet::schema::types::{ColumnDescPtr, ColumnPath}; | ||
use std::collections::{HashMap, HashSet}; | ||
use tracing::debug; | ||
|
||
#[cfg(test)] | ||
mod tests; | ||
|
||
/// An extension trait for [`ArrowReaderBuilder`] that injects row group skipping capability. | ||
pub(crate) trait ParquetRowGroupSkipping { | ||
/// Instructs the parquet reader to perform row group skipping, eliminating any row group whose | ||
/// stats prove that none of the group's rows can satisfy the given `predicate`. | ||
fn with_row_group_filter(self, predicate: &Expression) -> Self; | ||
} | ||
impl<T> ParquetRowGroupSkipping for ArrowReaderBuilder<T> { | ||
fn with_row_group_filter(self, predicate: &Expression) -> Self { | ||
let indices = self | ||
.metadata() | ||
.row_groups() | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(index, row_group)| { | ||
// If the group survives the filter, return Some(index) so filter_map keeps it. | ||
RowGroupFilter::apply(row_group, predicate).then_some(index) | ||
}) | ||
.collect(); | ||
debug!("with_row_group_filter({predicate:#?}) = {indices:?})"); | ||
self.with_row_groups(indices) | ||
} | ||
} | ||
|
||
/// A ParquetStatsSkippingFilter for row group skipping. It obtains stats from a parquet | ||
/// [`RowGroupMetaData`] and pre-computes the mapping of each referenced column path to its | ||
/// corresponding field index, for O(1) stats lookups. | ||
struct RowGroupFilter<'a> { | ||
row_group: &'a RowGroupMetaData, | ||
field_indices: HashMap<ColumnPath, usize>, | ||
} | ||
|
||
impl<'a> RowGroupFilter<'a> { | ||
/// Creates a new row group filter for the given row group and predicate. | ||
fn new(row_group: &'a RowGroupMetaData, predicate: &Expression) -> Self { | ||
Self { | ||
row_group, | ||
field_indices: compute_field_indices(row_group.schema_descr().columns(), predicate), | ||
} | ||
} | ||
|
||
/// Applies a filtering predicate to a row group. Return value false means to skip it. | ||
fn apply(row_group: &'a RowGroupMetaData, predicate: &Expression) -> bool { | ||
RowGroupFilter::new(row_group, predicate).apply_sql_where(predicate) != Some(false) | ||
} | ||
|
||
/// Returns `None` if the column doesn't exist and `Some(None)` if the column has no stats. | ||
fn get_stats(&self, col: &ColumnPath) -> Option<Option<&Statistics>> { | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.field_indices | ||
.get(col) | ||
.map(|&i| self.row_group.column(i).statistics()) | ||
} | ||
|
||
fn decimal_from_bytes(bytes: Option<&[u8]>, precision: u8, scale: u8) -> Option<Scalar> { | ||
// WARNING: The bytes are stored in big-endian order; reverse and then 0-pad to 16 bytes. | ||
let bytes = bytes.filter(|b| b.len() <= 16)?; | ||
let mut bytes = Vec::from(bytes); | ||
bytes.reverse(); | ||
bytes.resize(16, 0u8); | ||
let bytes: [u8; 16] = bytes.try_into().ok()?; | ||
Some(Scalar::Decimal( | ||
i128::from_le_bytes(bytes), | ||
precision, | ||
scale, | ||
)) | ||
} | ||
|
||
fn timestamp_from_date(days: Option<&i32>) -> Option<Scalar> { | ||
let days = u64::try_from(*days?).ok()?; | ||
let timestamp = DateTime::UNIX_EPOCH.checked_add_days(Days::new(days))?; | ||
let timestamp = timestamp.signed_duration_since(DateTime::UNIX_EPOCH); | ||
Some(Scalar::TimestampNtz(timestamp.num_microseconds()?)) | ||
} | ||
} | ||
|
||
impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> { | ||
// Extracts a stat value, converting from its physical type to the requested logical type. | ||
// | ||
// NOTE: This code is highly redundant with [`get_max_stat_value`] below, but parquet | ||
// ValueStatistics<T> requires T to impl a private trait, so we can't factor out any kind of | ||
// helper method. And macros are hard enough to read that it's not worth defining one. | ||
fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> { | ||
use PrimitiveType::*; | ||
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)??) { | ||
(String, Statistics::ByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(), | ||
(String, Statistics::FixedLenByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(), | ||
(String, _) => return None, | ||
(Long, Statistics::Int64(s)) => s.min_opt()?.into(), | ||
(Long, Statistics::Int32(s)) => (*s.min_opt()? as i64).into(), | ||
(Long, _) => return None, | ||
(Integer, Statistics::Int32(s)) => s.min_opt()?.into(), | ||
(Integer, _) => return None, | ||
(Short, Statistics::Int32(s)) => (*s.min_opt()? as i16).into(), | ||
(Short, _) => return None, | ||
(Byte, Statistics::Int32(s)) => (*s.min_opt()? as i8).into(), | ||
(Byte, _) => return None, | ||
(Float, Statistics::Float(s)) => s.min_opt()?.into(), | ||
(Float, _) => return None, | ||
(Double, Statistics::Double(s)) => s.min_opt()?.into(), | ||
(Double, Statistics::Float(s)) => (*s.min_opt()? as f64).into(), | ||
(Double, _) => return None, | ||
(Boolean, Statistics::Boolean(s)) => s.min_opt()?.into(), | ||
(Boolean, _) => return None, | ||
(Binary, Statistics::ByteArray(s)) => s.min_opt()?.data().into(), | ||
(Binary, Statistics::FixedLenByteArray(s)) => s.min_opt()?.data().into(), | ||
(Binary, _) => return None, | ||
(Date, Statistics::Int32(s)) => Scalar::Date(*s.min_opt()?), | ||
(Date, _) => return None, | ||
(Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.min_opt()?), | ||
(Timestamp, _) => return None, // TODO: Int96 timestamps | ||
(TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.min_opt()?), | ||
(TimestampNtz, Statistics::Int32(s)) => Self::timestamp_from_date(s.min_opt())?, | ||
(TimestampNtz, _) => return None, // TODO: Int96 timestamps | ||
(Decimal(p, s), Statistics::Int32(i)) => Scalar::Decimal(*i.min_opt()? as i128, *p, *s), | ||
(Decimal(p, s), Statistics::Int64(i)) => Scalar::Decimal(*i.min_opt()? as i128, *p, *s), | ||
(Decimal(p, s), Statistics::FixedLenByteArray(b)) => { | ||
Self::decimal_from_bytes(b.min_bytes_opt(), *p, *s)? | ||
} | ||
(Decimal(..), _) => return None, | ||
}; | ||
Some(value) | ||
} | ||
|
||
fn get_max_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> { | ||
use PrimitiveType::*; | ||
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)??) { | ||
(String, Statistics::ByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(), | ||
(String, Statistics::FixedLenByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(), | ||
(String, _) => return None, | ||
(Long, Statistics::Int64(s)) => s.max_opt()?.into(), | ||
(Long, Statistics::Int32(s)) => (*s.max_opt()? as i64).into(), | ||
(Long, _) => return None, | ||
(Integer, Statistics::Int32(s)) => s.max_opt()?.into(), | ||
(Integer, _) => return None, | ||
(Short, Statistics::Int32(s)) => (*s.max_opt()? as i16).into(), | ||
(Short, _) => return None, | ||
(Byte, Statistics::Int32(s)) => (*s.max_opt()? as i8).into(), | ||
(Byte, _) => return None, | ||
(Float, Statistics::Float(s)) => s.max_opt()?.into(), | ||
(Float, _) => return None, | ||
(Double, Statistics::Double(s)) => s.max_opt()?.into(), | ||
(Double, Statistics::Float(s)) => (*s.max_opt()? as f64).into(), | ||
(Double, _) => return None, | ||
(Boolean, Statistics::Boolean(s)) => s.max_opt()?.into(), | ||
(Boolean, _) => return None, | ||
(Binary, Statistics::ByteArray(s)) => s.max_opt()?.data().into(), | ||
(Binary, Statistics::FixedLenByteArray(s)) => s.max_opt()?.data().into(), | ||
(Binary, _) => return None, | ||
(Date, Statistics::Int32(s)) => Scalar::Date(*s.max_opt()?), | ||
(Date, _) => return None, | ||
(Timestamp, Statistics::Int64(s)) => Scalar::Timestamp(*s.max_opt()?), | ||
(Timestamp, _) => return None, // TODO: Int96 timestamps | ||
(TimestampNtz, Statistics::Int64(s)) => Scalar::TimestampNtz(*s.max_opt()?), | ||
(TimestampNtz, Statistics::Int32(s)) => Self::timestamp_from_date(s.max_opt())?, | ||
(TimestampNtz, _) => return None, // TODO: Int96 timestamps | ||
(Decimal(p, s), Statistics::Int32(i)) => Scalar::Decimal(*i.max_opt()? as i128, *p, *s), | ||
(Decimal(p, s), Statistics::Int64(i)) => Scalar::Decimal(*i.max_opt()? as i128, *p, *s), | ||
(Decimal(p, s), Statistics::FixedLenByteArray(b)) => { | ||
Self::decimal_from_bytes(b.max_bytes_opt(), *p, *s)? | ||
} | ||
(Decimal(..), _) => return None, | ||
}; | ||
Some(value) | ||
} | ||
|
||
fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> { | ||
// NOTE: Stats for any given column are optional, which may produce a NULL nullcount. But if | ||
// the column itself is missing, then we know all values are implied to be NULL. | ||
let Some(stats) = self.get_stats(col) else { | ||
return Some(self.get_rowcount_stat_value()); | ||
}; | ||
|
||
// WARNING: [`Statistics::null_count_opt`] returns Some(0) when the underlying stat is | ||
// missing, causing an IS NULL predicate to wrongly skip the file if it contains any NULL | ||
// values. Manually drill into each arm's [`ValueStatistics`] for the stat's true. | ||
let nullcount = match stats? { | ||
Statistics::Boolean(s) => s.null_count_opt(), | ||
Statistics::Int32(s) => s.null_count_opt(), | ||
Statistics::Int64(s) => s.null_count_opt(), | ||
Statistics::Int96(s) => s.null_count_opt(), | ||
Statistics::Float(s) => s.null_count_opt(), | ||
Statistics::Double(s) => s.null_count_opt(), | ||
Statistics::ByteArray(s) => s.null_count_opt(), | ||
Statistics::FixedLenByteArray(s) => s.null_count_opt(), | ||
}; | ||
|
||
// Parquet nullcount stats are always u64, so we can directly return the value instead of | ||
// wrapping it in a Scalar. We can safely cast it from u64 to i64 because the nullcount can | ||
// never be larger than the rowcount and the parquet rowcount stat is i64. | ||
Some(nullcount? as i64) | ||
} | ||
|
||
fn get_rowcount_stat_value(&self) -> i64 { | ||
self.row_group.num_rows() | ||
} | ||
} | ||
|
||
/// Given a filter expression of interest and a set of parquet column descriptors, build a column -> | ||
/// index mapping for columns the expression references. This ensures O(1) lookup times, for an | ||
/// overall O(n) cost to evaluate an expression tree with n nodes. | ||
pub(crate) fn compute_field_indices( | ||
fields: &[ColumnDescPtr], | ||
expression: &Expression, | ||
) -> HashMap<ColumnPath, usize> { | ||
fn do_recurse(expression: &Expression, cols: &mut HashSet<ColumnPath>) { | ||
use Expression::*; | ||
let mut recurse = |expr| do_recurse(expr, cols); // simplifies the call sites below | ||
match expression { | ||
Literal(_) => {} | ||
Column(name) => cols.extend([col_name_to_path(name)]), // returns `()`, unlike `insert` | ||
Struct(fields) => fields.iter().for_each(recurse), | ||
UnaryOperation { expr, .. } => recurse(expr), | ||
BinaryOperation { left, right, .. } => [left, right].iter().for_each(|e| recurse(e)), | ||
VariadicOperation { exprs, .. } => exprs.iter().for_each(recurse), | ||
} | ||
} | ||
|
||
// Build up a set of requested column paths, then take each found path as the corresponding map | ||
// key (avoids unnecessary cloning). | ||
// | ||
// NOTE: If a requested column was not available, it is silently ignored. These missing columns | ||
// are implied all-null, so we will infer their min/max stats as NULL and nullcount == rowcount. | ||
let mut requested_columns = HashSet::new(); | ||
do_recurse(expression, &mut requested_columns); | ||
fields | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(i, f)| requested_columns.take(f.path()).map(|path| (path, i))) | ||
.collect() | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could define an enum for this rather than
Some(None)
. Just a thought, I'm okay with both ways, and theSome(None)
will have cleaner code in the method (at the cost of a more confusing return type)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nice i think i commented on this too - maybe just Result? and we can have
Err(MissingColumn)
for a more understandable return type?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I avoided
Result
because that would be exceptions as control flow (this isn't actually an error, it's just a situation).If this were public code, the enum might make sense. But for a private method, I don't think it's worth the cognitive overhead (both to define and use it) when one line of code comment can fully explain what's going on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it's fine as is since both the method and the call site have a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm!