Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: handle partition filters via kernel expressions #3099

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tlaplus/*.toolbox/*/MC.cfg
tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/
/.idea
.vscode
.zed/
.env
.venv
venv
Expand All @@ -32,4 +33,4 @@ Cargo.lock

justfile
site
__pycache__
__pycache__
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.6.0", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }
# delta_kernel = { version = "0.6.0", features = ["default-engine"] }
delta_kernel = { path = "../delta-kernel-rs/kernel", features = [
"default-engine",
] }
# delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "fcc43b50dafdc5e6b84c206492bbde8ed1115529", features = [
# "default-engine",
# ] }

# arrow
arrow = { version = "53" }
Expand Down Expand Up @@ -75,4 +80,3 @@ async-trait = { version = "0.1" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }

6 changes: 2 additions & 4 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
parquet = { workspace = true, features = [
"async",
"object_store",
] }
parquet = { workspace = true, features = ["async", "object_store"] }
pin-project-lite = "^0.2.7"

# datafusion
Expand Down Expand Up @@ -76,6 +73,7 @@ tokio = { workspace = true, features = [
] }

# other deps (these should be organized and pulled into workspace.dependencies as necessary)
convert_case = "0.6"
cfg-if = "1"
dashmap = "6"
errno = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_expr::expr::InList;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
// Needed for MakeParquetArray
use datafusion_expr::planner::{PlannerResult, RawBinaryExpr};
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_sql::planner::{ContextProvider, SqlToRel};
Expand Down Expand Up @@ -156,7 +157,6 @@ impl Default for CustomNestedFunctionPlanner {
}
}

use datafusion_expr::planner::{PlannerResult, RawBinaryExpr};
impl ExprPlanner for CustomNestedFunctionPlanner {
fn plan_array_literal(
&self,
Expand Down
10 changes: 3 additions & 7 deletions crates/core/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
"MetricObserver"
}

fn inputs(&self) -> Vec<&datafusion_expr::LogicalPlan> {
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}

Expand Down Expand Up @@ -50,19 +50,15 @@ impl UserDefinedLogicalNodeCore for MetricObserver {
write!(f, "MetricObserver id={}", &self.id)
}

fn from_template(
&self,
exprs: &[datafusion_expr::Expr],
inputs: &[datafusion_expr::LogicalPlan],
) -> Self {
fn from_template(&self, exprs: &[datafusion_expr::Expr], inputs: &[LogicalPlan]) -> Self {
self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec())
.unwrap()
}

fn with_exprs_and_inputs(
&self,
_exprs: Vec<datafusion_expr::Expr>,
inputs: Vec<datafusion_expr::LogicalPlan>,
inputs: Vec<LogicalPlan>,
) -> datafusion_common::Result<Self> {
Ok(MetricObserver {
id: self.id.clone(),
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl From<DataFusionError> for DeltaTableError {
}
}

/// Convience trait for calling common methods on snapshot heirarchies
/// Convenience trait for calling common methods on snapshot hierarchies
pub trait DataFusionMixins {
/// The physical datafusion schema of a table
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef>;
Expand Down Expand Up @@ -557,7 +557,7 @@ impl<'a> DeltaScanBuilder<'a> {
let mut files_pruned = 0usize;
let files = self
.snapshot
.file_actions_iter()?
.file_actions()?
.zip(files_to_prune.into_iter())
.filter_map(|(action, keep)| {
if keep {
Expand All @@ -572,7 +572,7 @@ impl<'a> DeltaScanBuilder<'a> {
let files_scanned = files.len();
(files, files_scanned, files_pruned)
} else {
let files = self.snapshot.file_actions()?;
let files = self.snapshot.file_actions()?.collect_vec();
let files_scanned = files.len();
(files, files_scanned, 0)
}
Expand Down Expand Up @@ -1563,7 +1563,7 @@ pub(crate) async fn find_files_scan(
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
.file_actions_iter()?
.file_actions()?
.map(|add| (add.path.clone(), add.to_owned()))
.collect();

Expand Down Expand Up @@ -1706,7 +1706,7 @@ pub async fn find_files(
}
}
None => Ok(FindFiles {
candidates: snapshot.file_actions()?,
candidates: snapshot.file_actions()?.collect_vec(),
partition_scan: true,
}),
}
Expand Down Expand Up @@ -2637,7 +2637,7 @@ mod tests {
#[tokio::test]
async fn passes_sanity_checker_when_all_files_filtered() {
// Run a query that filters out all files and sorts.
// Verify that it returns an empty set of rows without panicing.
// Verify that it returns an empty set of rows without panicking.
//
// Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
// datafusion rejected.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/kernel/arrow/extract.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Utilties to extract columns from a record batch or nested / complex arrays.
//! Utilities to extract columns from a record batch or nested / complex arrays.

use std::sync::Arc;

Expand Down Expand Up @@ -70,7 +70,7 @@ pub(crate) fn extract_column<'a>(
if let Some(next_path_step) = remaining_path_steps.next() {
match child.data_type() {
DataType::Map(_, _) => {
// NOTE a map has exatly one child, but we wnat to be agnostic of its name.
// NOTE a map has exactly one child, but we want to be agnostic of its name.
// so we case the current array as map, and use the entries accessor.
let maparr = cast_column_as::<MapArray>(path_step, &Some(child))?;
if let Some(next_path) = remaining_path_steps.next() {
Expand Down
112 changes: 40 additions & 72 deletions crates/core/src/kernel/arrow/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
use std::io::{BufRead, BufReader, Cursor};
use std::task::Poll;

use arrow_array::{new_null_array, Array, RecordBatch, StringArray};
use arrow_array::{Array, RecordBatch, StringArray};
use arrow_json::{reader::Decoder, ReaderBuilder};
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_select::concat::concat_batches;
use bytes::{Buf, Bytes};
use futures::{ready, Stream, StreamExt};
use itertools::Itertools;
use object_store::Result as ObjectStoreResult;

use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
Expand All @@ -28,18 +29,42 @@ pub(crate) fn get_decoder(
.build_decoder()?)
}

fn insert_nulls(
batches: &mut Vec<RecordBatch>,
null_count: usize,
// Raw arrow implementation of the json parsing. Separate from the public function for testing.
//
// NOTE: This code is really inefficient because arrow lacks the native capability to perform robust
// StringArray -> StructArray JSON parsing. See https://github.com/apache/arrow-rs/issues/6522. If
// that shortcoming gets fixed upstream, this method can simplify or hopefully even disappear.
//
// NOTE: this function is hoisted from delta-kernel-rs to support transitioning to kernel.
fn parse_json_impl(
json_strings: &StringArray,
schema: ArrowSchemaRef,
) -> Result<(), ArrowError> {
let columns = schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), null_count))
.collect();
batches.push(RecordBatch::try_new(schema, columns)?);
Ok(())
) -> Result<RecordBatch, delta_kernel::Error> {
if json_strings.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}

// Use batch size of 1 to force one record per string input
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(1)
.build_decoder()?;
let parse_one = |json_string: Option<&str>| -> Result<RecordBatch, delta_kernel::Error> {
let mut reader = BufReader::new(json_string.unwrap_or("{}").as_bytes());
let buf = reader.fill_buf()?;
let read = buf.len();
if !(decoder.decode(buf)? == read) {
return Err(delta_kernel::Error::missing_data("Incomplete JSON string"));
}
let Some(batch) = decoder.flush()? else {
return Err(delta_kernel::Error::missing_data("Expected data"));
};
if !(batch.num_rows() == 1) {
return Err(delta_kernel::Error::generic("Expected one row"));
}
Ok(batch)
};
let output: Vec<_> = json_strings.iter().map(parse_one).try_collect()?;
Ok(concat_batches(&schema, output.iter())?)
}

/// Parse an array of JSON strings into a record batch.
Expand All @@ -48,65 +73,8 @@ fn insert_nulls(
pub(crate) fn parse_json(
json_strings: &StringArray,
output_schema: ArrowSchemaRef,
config: &DeltaTableConfig,
) -> DeltaResult<RecordBatch> {
let mut decoder = ReaderBuilder::new(output_schema.clone())
.with_batch_size(config.log_batch_size)
.build_decoder()?;
let mut batches = Vec::new();

let mut null_count = 0;
let mut value_count = 0;
let mut value_start = 0;

for it in 0..json_strings.len() {
if json_strings.is_null(it) {
if value_count > 0 {
let slice_data = get_nonnull_slice_data(json_strings, value_start, value_count);
let batch =
decode_reader(&mut decoder, get_reader(&slice_data))
.collect::<Result<Vec<_>, _>>()?;
batches.extend(batch);
value_count = 0;
}
null_count += 1;
continue;
}
if value_count == 0 {
value_start = it;
}
if null_count > 0 {
insert_nulls(&mut batches, null_count, output_schema.clone())?;
null_count = 0;
}
value_count += 1;
}

if null_count > 0 {
insert_nulls(&mut batches, null_count, output_schema.clone())?;
}

if value_count > 0 {
let slice_data = get_nonnull_slice_data(json_strings, value_start, value_count);
let batch =
decode_reader(&mut decoder, get_reader(&slice_data)).collect::<Result<Vec<_>, _>>()?;
batches.extend(batch);
}

Ok(concat_batches(&output_schema, &batches)?)
}

/// Get the data of a slice of non-null JSON strings.
fn get_nonnull_slice_data(
json_strings: &StringArray,
value_start: usize,
value_count: usize,
) -> Vec<u8> {
let slice = json_strings.slice(value_start, value_count);
slice.iter().fold(Vec::new(), |mut acc, s| {
acc.extend_from_slice(s.unwrap().as_bytes());
acc
})
Ok(parse_json_impl(json_strings, output_schema)?)
}

/// Decode a stream of bytes into a stream of record batches.
Expand Down Expand Up @@ -184,7 +152,7 @@ mod tests {
Field::new("b", DataType::Utf8, true),
]));
let config = DeltaTableConfig::default();
let result = parse_json(&json_strings, struct_schema.clone(), &config).unwrap();
let result = parse_json(&json_strings, struct_schema.clone()).unwrap();
let expected = RecordBatch::try_new(
struct_schema,
vec![
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use lazy_static::lazy_static;
pub(crate) mod extract;
pub(crate) mod json;

pub(crate) const LIST_ARRAY_ROOT: &str = "element";
const MAP_ROOT_DEFAULT: &str = "key_value";
const MAP_KEY_DEFAULT: &str = "key";
const MAP_VALUE_DEFAULT: &str = "value";
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ impl Protocol {
mut self,
writer_features: impl IntoIterator<Item = impl Into<WriterFeatures>>,
) -> Self {
let all_writer_feautures = writer_features
let all_writer_features = writer_features
.into_iter()
.map(|c| c.into())
.collect::<HashSet<_>>();
if !all_writer_feautures.is_empty() {
if !all_writer_features.is_empty() {
self.min_writer_version = 7
}
self.writer_features = Some(all_writer_feautures);
self.writer_features = Some(all_writer_features);
self
}

Expand Down
9 changes: 1 addition & 8 deletions crates/core/src/kernel/models/schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Delta table schema

use std::sync::Arc;

pub use delta_kernel::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataValue, PrimitiveType, StructField,
StructType,
Expand All @@ -11,11 +9,6 @@ use serde_json::Value;
use crate::kernel::error::Error;
use crate::kernel::DataCheck;

/// Type alias for a top level schema
pub type Schema = StructType;
/// Schema reference type
pub type SchemaRef = Arc<StructType>;

/// An invariant for a column that is enforced on all writes to a Delta table.
#[derive(Eq, PartialEq, Debug, Default, Clone)]
pub struct Invariant {
Expand Down Expand Up @@ -45,7 +38,7 @@ impl DataCheck for Invariant {
}
}

/// Trait to add convenince functions to struct type
/// Trait to add convenience functions to struct type
pub trait StructTypeExt {
/// Get all invariants in the schemas
fn get_invariants(&self) -> Result<Vec<Invariant>, Error>;
Expand Down
Loading
Loading