Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub mod arrow_expression;
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod arrow_data;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_stats_skipping;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub(crate) mod arrow_get_data;

Expand Down
1,267 changes: 1,267 additions & 0 deletions kernel/src/engine/parquet_stats_skipping.rs

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use std::fmt::{Display, Formatter};
use itertools::Itertools;

pub use self::scalars::{ArrayData, Scalar, StructData};
use crate::DataType;

mod scalars;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
/// A binary operator.
pub enum BinaryOperator {
/// Arithmetic Plus
Expand Down Expand Up @@ -49,7 +50,7 @@ impl BinaryOperator {
GreaterThanOrEqual => Some(LessThanOrEqual),
LessThan => Some(GreaterThan),
LessThanOrEqual => Some(GreaterThanOrEqual),
Equal | NotEqual | Plus | Multiply => Some(self.clone()),
Equal | NotEqual | Plus | Multiply => Some(*self),
_ => None,
}
}
Expand All @@ -72,7 +73,7 @@ impl BinaryOperator {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum VariadicOperator {
And,
Or,
Expand Down Expand Up @@ -111,7 +112,7 @@ impl Display for BinaryOperator {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
/// A unary operator.
pub enum UnaryOperator {
/// Unary Not
Expand Down Expand Up @@ -228,6 +229,10 @@ impl Expression {
Self::Literal(value.into())
}

pub fn null_literal(data_type: DataType) -> Self {
Self::Literal(Scalar::Null(data_type))
}
Comment on lines +232 to +234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah found it - I can use this in the write PR :)


/// Create a new struct expression
pub fn struct_expr(exprs: impl IntoIterator<Item = Self>) -> Self {
Self::Struct(exprs.into_iter().collect())
Expand Down
50 changes: 50 additions & 0 deletions kernel/src/expressions/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,44 @@ impl Display for Scalar {
}
}

impl PartialOrd for Scalar {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
use Scalar::*;
match (self, other) {
// NOTE: We intentionally do two match arms for each variant to avoid a catch-all, so
// that new variants trigger compilation failures instead of being silently ignored.
(Integer(a), Integer(b)) => a.partial_cmp(b),
(Integer(_), _) => None,
(Long(a), Long(b)) => a.partial_cmp(b),
(Long(_), _) => None,
(Short(a), Short(b)) => a.partial_cmp(b),
(Short(_), _) => None,
(Byte(a), Byte(b)) => a.partial_cmp(b),
(Byte(_), _) => None,
(Float(a), Float(b)) => a.partial_cmp(b),
(Float(_), _) => None,
(Double(a), Double(b)) => a.partial_cmp(b),
(Double(_), _) => None,
(String(a), String(b)) => a.partial_cmp(b),
(String(_), _) => None,
(Boolean(a), Boolean(b)) => a.partial_cmp(b),
(Boolean(_), _) => None,
(Timestamp(a), Timestamp(b)) => a.partial_cmp(b),
(Timestamp(_), _) => None,
(TimestampNtz(a), TimestampNtz(b)) => a.partial_cmp(b),
(TimestampNtz(_), _) => None,
(Date(a), Date(b)) => a.partial_cmp(b),
(Date(_), _) => None,
(Binary(a), Binary(b)) => a.partial_cmp(b),
(Binary(_), _) => None,
(Decimal(_, _, _), _) => None, // TODO: Support Decimal
(Null(_), _) => None, // NOTE: NULL values are incomparable by definition
(Struct(_), _) => None, // TODO: Support Struct?
(Array(_), _) => None, // TODO: Support Array?
}
}
}

impl From<i8> for Scalar {
fn from(i: i8) -> Self {
Self::Byte(i)
Expand Down Expand Up @@ -268,6 +306,18 @@ impl From<String> for Scalar {
}
}

impl<T: Into<Scalar> + Copy> From<&T> for Scalar {
fn from(t: &T) -> Self {
(*t).into()
}
}

impl From<&[u8]> for Scalar {
fn from(b: &[u8]) -> Self {
Self::Binary(b.into())
}
}

// TODO: add more From impls

impl PrimitiveType {
Expand Down
4 changes: 4 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ pub trait JsonHandler: Send + Sync {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
// TODO: This should really be an Option<Arc<Expression>>, because otherwise we have to
// clone the (potentially large) expression every time we call this function.
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator>;
}
Expand All @@ -215,6 +217,8 @@ pub trait ParquetHandler: Send + Sync {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
// TODO: This should really be an Option<Arc<Expression>>, because otherwise we have to
// clone the (potentially large) expression every time we call this function.
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator>;
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn as_data_skipping_predicate(expr: &Expr) -> Option<Expr> {
match expr {
BinaryOperation { op, left, right } => {
let (op, col, val) = match (left.as_ref(), right.as_ref()) {
(Column(col), Literal(val)) => (op.clone(), col, val),
(Column(col), Literal(val)) => (*op, col, val),
(Literal(val), Column(col)) => (op.commute()?, col, val),
_ => return None, // unsupported combination of operands
};
Expand Down
12 changes: 5 additions & 7 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,11 @@ pub fn scan_row_schema() -> Schema {
}

fn parse_partition_value(raw: Option<&String>, data_type: &DataType) -> DeltaResult<Scalar> {
match raw {
Some(v) => match data_type {
DataType::Primitive(primitive) => primitive.parse_scalar(v),
_ => Err(Error::generic(format!(
"Unexpected partition column type: {data_type:?}"
))),
},
match (raw, data_type.as_primitive_opt()) {
(Some(v), Some(primitive)) => primitive.parse_scalar(v),
(Some(_), None) => Err(Error::generic(format!(
"Unexpected partition column type: {data_type:?}"
))),
_ => Ok(Scalar::Null(data_type.clone())),
}
}
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ impl DataType {
pub fn array_type(elements: ArrayType) -> Self {
DataType::Array(Box::new(elements))
}

pub fn as_primitive_opt(&self) -> Option<&PrimitiveType> {
match self {
DataType::Primitive(ptype) => Some(ptype),
_ => None,
}
}
}

impl Display for DataType {
Expand Down