Skip to content

Commit

Permalink
Set any missing partition values to NULL during predicate evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
timsaucer committed Jan 11, 2025
1 parent 2cd634b commit 65f00b4
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
12 changes: 6 additions & 6 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ impl LogReplayScanner {
fn new(
engine: &dyn Engine,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
have_partition_cols: bool,
partition_columns: &[String],
) -> Self {
let partition_filter = match have_partition_cols {
true => PartitionSkippingFilter::new(engine, physical_predicate.clone()),
false => None,
let partition_filter = match partition_columns.is_empty() {
false => PartitionSkippingFilter::new(engine, physical_predicate.clone(), partition_columns),
true => None,
};
Self {
partition_filter,
Expand Down Expand Up @@ -265,9 +265,9 @@ pub fn scan_action_iter(
engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
have_partition_cols: bool,
partition_columns: &[String],
) -> impl Iterator<Item = DeltaResult<ScanData>> {
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate, have_partition_cols);
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate, partition_columns);
let add_transform = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
get_add_transform_expr(),
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl Scan {
engine,
self.replay_for_scan_data(engine)?,
physical_predicate,
self.have_partition_cols,
&self.snapshot.metadata().partition_columns,
);
Ok(Some(it).into_iter().flatten())
}
Expand Down Expand Up @@ -766,7 +766,7 @@ pub(crate) mod test_utils {
&SyncEngine::new(),
batch.into_iter().map(|batch| Ok((batch as _, true))),
None,
false,
&[],
);
let mut batch_count = 0;
for res in iter {
Expand Down
27 changes: 15 additions & 12 deletions kernel/src/scan/partition_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use tracing::debug;

use crate::expressions::column_expr;
use crate::{expressions::column_expr, schema::StructType};
use crate::schema::column_name;
use crate::{
engine_data::GetData,
Expand All @@ -26,13 +26,20 @@ impl PartitionSkippingFilter {
pub(crate) fn new(
engine: &dyn Engine,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
partition_columns: &[String],
) -> Option<Self> {
static PARITIONS_EXPR: LazyLock<Expression> =
LazyLock::new(|| column_expr!("add.partitionValues"));

let (predicate, schema) = physical_predicate?;
debug!("Creating a partition skipping filter for {:#?}", predicate);

// Limit the schema passed to the row visitor of only the fields that are included
// in the predicate and are also partition columns. The data skipping columns will
// be handled elsewhere.
let partition_fields = schema.fields().filter(|f| partition_columns.contains(f.name())).cloned();
let schema = Arc::new(StructType::new(partition_fields));

let partitions_map_type = MapType::new(DataType::STRING, DataType::STRING, true);

let evaluator = engine.get_expression_handler().get_evaluator(
Expand Down Expand Up @@ -96,23 +103,19 @@ impl RowVisitor for PartitionVisitor {
for i in 0..row_count {
let val = getter.get_map(i, "output")?.map(|m| {
let partition_values = m.materialize();
let resolver = partition_values
.iter()
.filter(|(k, _v)| self.schema.field(k).is_some())
.map(|(k, v)| {
// The schema we are evaluating only contains the fields of interest,
// not all partition fields. The unwrap is safe because we have already
// checked in the filter above.
let data_type = self.schema.field(k).unwrap().data_type();
let resolver = self.schema.fields()
.map(|field| {
let data_type = field.data_type();

let DataType::Primitive(primitive_type) = data_type else {
return Err(crate::Error::unsupported(
format!("Partition filtering only supported for primitive types. Found type: {}", data_type)
format!("Partition filtering only supported for primitive types. Found type {} for field {}", data_type, field.name())
));
};

let scalar = primitive_type.parse_scalar(v)?;
Ok((ColumnName::new([k]), scalar))
let scalar = partition_values.get(field.name()).map(|v| primitive_type.parse_scalar(v)).transpose()?.unwrap_or(Scalar::Null(data_type.clone()));

Ok((ColumnName::new([field.name()]), scalar))
})
.collect::<DeltaResult<HashMap<ColumnName, Scalar>>>()?;

Expand Down

0 comments on commit 65f00b4

Please sign in to comment.