-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ColumnName tracks a path of field names instead of a simple string #445
Changes from 9 commits
baa75ca
2d9c0b8
cf2cbf2
e00fb28
6741b3a
ba1ad77
f3514df
521035f
ff4c6f8
1717de7
9c4c2d3
9a22f5e
b9c543e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,8 +41,8 @@ pub struct EnginePredicate { | |
extern "C" fn(predicate: *mut c_void, state: &mut KernelExpressionVisitorState) -> usize, | ||
} | ||
|
||
fn wrap_expression(state: &mut KernelExpressionVisitorState, expr: Expression) -> usize { | ||
state.inflight_expressions.insert(expr) | ||
fn wrap_expression(state: &mut KernelExpressionVisitorState, expr: impl Into<Expression>) -> usize { | ||
state.inflight_expressions.insert(expr.into()) | ||
} | ||
|
||
pub fn unwrap_kernel_expression( | ||
|
@@ -149,7 +149,7 @@ fn visit_expression_column_impl( | |
name: DeltaResult<&str>, | ||
) -> DeltaResult<usize> { | ||
// TODO: FIXME: This is incorrect if any field name in the column path contains a period. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracked as #423 |
||
let name = ColumnName::new(name?.split('.')).into(); | ||
let name = ColumnName::from_naive_str_split(name?); | ||
Ok(wrap_expression(state, name)) | ||
} | ||
|
||
|
@@ -184,7 +184,7 @@ fn visit_expression_literal_string_impl( | |
state: &mut KernelExpressionVisitorState, | ||
value: DeltaResult<String>, | ||
) -> DeltaResult<usize> { | ||
Ok(wrap_expression(state, Expression::literal(value?))) | ||
Ok(wrap_expression(state, value?)) | ||
} | ||
|
||
// We need to get parse.expand working to be able to macro everything below, see issue #255 | ||
|
@@ -194,53 +194,53 @@ pub extern "C" fn visit_expression_literal_int( | |
state: &mut KernelExpressionVisitorState, | ||
value: i32, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} | ||
|
||
#[no_mangle] | ||
pub extern "C" fn visit_expression_literal_long( | ||
state: &mut KernelExpressionVisitorState, | ||
value: i64, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} | ||
|
||
#[no_mangle] | ||
pub extern "C" fn visit_expression_literal_short( | ||
state: &mut KernelExpressionVisitorState, | ||
value: i16, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} | ||
|
||
#[no_mangle] | ||
pub extern "C" fn visit_expression_literal_byte( | ||
state: &mut KernelExpressionVisitorState, | ||
value: i8, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} | ||
|
||
#[no_mangle] | ||
pub extern "C" fn visit_expression_literal_float( | ||
state: &mut KernelExpressionVisitorState, | ||
value: f32, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} | ||
|
||
#[no_mangle] | ||
pub extern "C" fn visit_expression_literal_double( | ||
state: &mut KernelExpressionVisitorState, | ||
value: f64, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} | ||
|
||
#[no_mangle] | ||
pub extern "C" fn visit_expression_literal_bool( | ||
state: &mut KernelExpressionVisitorState, | ||
value: bool, | ||
) -> usize { | ||
wrap_expression(state, Expression::literal(value)) | ||
wrap_expression(state, value) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,8 @@ use std::sync::{Arc, LazyLock}; | |
|
||
use crate::actions::visitors::SetTransactionVisitor; | ||
use crate::actions::{get_log_schema, SetTransaction, SET_TRANSACTION_NAME}; | ||
use crate::expressions::column_expr; | ||
use crate::snapshot::Snapshot; | ||
use crate::{DeltaResult, Engine, EngineData, ExpressionRef, SchemaRef}; | ||
use crate::{DeltaResult, Engine, EngineData, Expression as Expr, ExpressionRef, SchemaRef}; | ||
|
||
pub use crate::actions::visitors::SetTransactionMap; | ||
pub struct SetTransactionScanner { | ||
|
@@ -53,8 +52,11 @@ impl SetTransactionScanner { | |
// checkpoint part when patitioned by `add.path` like the Delta spec requires. There's no | ||
// point filtering by a particular app id, even if we have one, because app ids are all in | ||
// the a single checkpoint part having large min/max range (because they're usually uuids). | ||
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = | ||
LazyLock::new(|| Some(Arc::new(column_expr!("txn.appId").is_not_null()))); | ||
static META_PREDICATE: LazyLock<Option<ExpressionRef>> = LazyLock::new(|| { | ||
Some(Arc::new( | ||
Expr::column([SET_TRANSACTION_NAME, "appId"]).is_not_null(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit bulkier than the original code, but it relies less on "magic constant" code. Do we prefer this new way as somewhat safer? Or prefer the old way as more compact? (several more below) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer safer myself. |
||
)) | ||
}); | ||
self.snapshot | ||
.log_segment | ||
.replay(engine, schema.clone(), schema, META_PREDICATE.clone()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,12 @@ | ||
//! An implementation of parquet row group skipping using data skipping predicates over footer stats. | ||
use crate::engine::parquet_stats_skipping::{col_name_to_path, ParquetStatsSkippingFilter}; | ||
use crate::expressions::{Expression, Scalar}; | ||
use crate::engine::parquet_stats_skipping::ParquetStatsSkippingFilter; | ||
use crate::expressions::{ColumnName, Expression, Scalar}; | ||
use crate::schema::{DataType, PrimitiveType}; | ||
use chrono::{DateTime, Days}; | ||
use parquet::arrow::arrow_reader::ArrowReaderBuilder; | ||
use parquet::file::metadata::RowGroupMetaData; | ||
use parquet::file::statistics::Statistics; | ||
use parquet::schema::types::{ColumnDescPtr, ColumnPath}; | ||
use parquet::schema::types::ColumnDescPtr; | ||
use std::collections::{HashMap, HashSet}; | ||
use tracing::debug; | ||
|
||
|
@@ -41,7 +41,7 @@ impl<T> ParquetRowGroupSkipping for ArrowReaderBuilder<T> { | |
/// corresponding field index, for O(1) stats lookups. | ||
struct RowGroupFilter<'a> { | ||
row_group: &'a RowGroupMetaData, | ||
field_indices: HashMap<ColumnPath, usize>, | ||
field_indices: HashMap<ColumnName, usize>, | ||
} | ||
|
||
impl<'a> RowGroupFilter<'a> { | ||
|
@@ -59,7 +59,7 @@ impl<'a> RowGroupFilter<'a> { | |
} | ||
|
||
/// Returns `None` if the column doesn't exist and `Some(None)` if the column has no stats. | ||
fn get_stats(&self, col: &ColumnPath) -> Option<Option<&Statistics>> { | ||
fn get_stats(&self, col: &ColumnName) -> Option<Option<&Statistics>> { | ||
self.field_indices | ||
.get(col) | ||
.map(|&i| self.row_group.column(i).statistics()) | ||
|
@@ -93,7 +93,7 @@ impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> { | |
// NOTE: This code is highly redundant with [`get_max_stat_value`] below, but parquet | ||
// ValueStatistics<T> requires T to impl a private trait, so we can't factor out any kind of | ||
// helper method. And macros are hard enough to read that it's not worth defining one. | ||
fn get_min_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> { | ||
fn get_min_stat_value(&self, col: &ColumnName, data_type: &DataType) -> Option<Scalar> { | ||
use PrimitiveType::*; | ||
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)??) { | ||
(String, Statistics::ByteArray(s)) => s.min_opt()?.as_utf8().ok()?.into(), | ||
|
@@ -135,7 +135,7 @@ impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> { | |
Some(value) | ||
} | ||
|
||
fn get_max_stat_value(&self, col: &ColumnPath, data_type: &DataType) -> Option<Scalar> { | ||
fn get_max_stat_value(&self, col: &ColumnName, data_type: &DataType) -> Option<Scalar> { | ||
use PrimitiveType::*; | ||
let value = match (data_type.as_primitive_opt()?, self.get_stats(col)??) { | ||
(String, Statistics::ByteArray(s)) => s.max_opt()?.as_utf8().ok()?.into(), | ||
|
@@ -177,7 +177,7 @@ impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> { | |
Some(value) | ||
} | ||
|
||
fn get_nullcount_stat_value(&self, col: &ColumnPath) -> Option<i64> { | ||
fn get_nullcount_stat_value(&self, col: &ColumnName) -> Option<i64> { | ||
// NOTE: Stats for any given column are optional, which may produce a NULL nullcount. But if | ||
// the column itself is missing, then we know all values are implied to be NULL. | ||
// | ||
|
@@ -221,13 +221,13 @@ impl<'a> ParquetStatsSkippingFilter for RowGroupFilter<'a> { | |
pub(crate) fn compute_field_indices( | ||
fields: &[ColumnDescPtr], | ||
expression: &Expression, | ||
) -> HashMap<ColumnPath, usize> { | ||
fn do_recurse(expression: &Expression, cols: &mut HashSet<ColumnPath>) { | ||
) -> HashMap<ColumnName, usize> { | ||
fn do_recurse(expression: &Expression, cols: &mut HashSet<ColumnName>) { | ||
use Expression::*; | ||
let mut recurse = |expr| do_recurse(expr, cols); // simplifies the call sites below | ||
match expression { | ||
Literal(_) => {} | ||
Column(name) => cols.extend([col_name_to_path(name)]), // returns `()`, unlike `insert` | ||
Column(name) => cols.extend([name.clone()]), // returns `()`, unlike `insert` | ||
Struct(fields) => fields.iter().for_each(recurse), | ||
UnaryOperation { expr, .. } => recurse(expr), | ||
BinaryOperation { left, right, .. } => [left, right].iter().for_each(|e| recurse(e)), | ||
|
@@ -245,6 +245,10 @@ pub(crate) fn compute_field_indices( | |
fields | ||
.iter() | ||
.enumerate() | ||
.filter_map(|(i, f)| requested_columns.take(f.path()).map(|path| (path, i))) | ||
.filter_map(|(i, f)| { | ||
requested_columns | ||
.take(f.path().parts()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sneaky: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice :) |
||
.map(|path| (path, i)) | ||
}) | ||
.collect() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an opportunistic change that makes the visitor logic simpler
(I noticed it while updating the column name visitor below).
Happy to pull it out as a separate (prefactor) PR if it's unwelcome here, just holler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is okay here, makes sense for the simplification