Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement row group skipping for the default engine parquet readers #362

Merged
merged 33 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
715f233
WIP - first pass at the code
ryan-johnson-databricks Sep 25, 2024
ef71f1a
split out a trait, add more type support
ryan-johnson-databricks Sep 25, 2024
39b8927
support short circuit junction eval
ryan-johnson-databricks Sep 25, 2024
b5c3a52
Merge remote-tracking branch 'oss/main' into row-group-skipping
scovich Sep 25, 2024
e71571e
add tests, fix bugs
scovich Sep 26, 2024
cbca3b3
support SQL WHERE semantics, finished adding tests for skipping logic
scovich Sep 27, 2024
e7d87eb
Mark block text as not rust code doctest should run
scovich Sep 27, 2024
beeb6e8
add missing tests identified by codecov
scovich Sep 27, 2024
519acbd
Wire up row group skipping
scovich Sep 27, 2024
18b33cf
delete for split - parquet reader uses row group skipping
scovich Sep 27, 2024
6c98441
parquet reader now uses row group skipping
scovich Sep 27, 2024
0fdaf0a
add stats-getter test; review comments
scovich Oct 3, 2024
8ac33f8
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 3, 2024
1cf03dc
improve test coverage; clippy
scovich Oct 3, 2024
bc8b344
yet more test coverage
scovich Oct 3, 2024
0971002
improve test coverage even more
scovich Oct 4, 2024
375a380
Add a query level test as well
scovich Oct 4, 2024
6236874
Fix broken sync json parsing and harmonize file reading
scovich Oct 4, 2024
9efcbf7
fmt
scovich Oct 4, 2024
46d19e3
remove spurious TODO
scovich Oct 7, 2024
7666512
Revert "Fix broken sync json parsing and harmonize file reading"
scovich Oct 7, 2024
f3865d0
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 7, 2024
a4dc3da
review comments
scovich Oct 7, 2024
40131db
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 8, 2024
bf65904
Infer null count stat for missing columns; add more tests
scovich Oct 8, 2024
cce762d
One last test
scovich Oct 8, 2024
c7d6bb0
test cleanup
scovich Oct 8, 2024
4f92ed7
code comment tweak
scovich Oct 8, 2024
08a305b
remove unneeded test
scovich Oct 8, 2024
e8a947e
Merge remote-tracking branch 'oss' into use-row-group-skipping
scovich Oct 9, 2024
bf1e3a8
fix two nullcount stat bugs
scovich Oct 9, 2024
9d632e7
Merge remote-tracking branch 'oss/main' into use-row-group-skipping
scovich Oct 9, 2024
4a77f3a
review nits
scovich Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn read_parquet_file_impl(
last_modified: file.last_modified,
size: file.size,
};
// TODO: Plumb the predicate through the FFI?
scovich marked this conversation as resolved.
Show resolved Hide resolved
let data = parquet_handler.read_parquet_files(&[delta_fm], physical_schema, None)?;
let res = Box::new(FileReadResultIterator {
data,
Expand Down
62 changes: 59 additions & 3 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
//! Some utilities for working with arrow data types

use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, io::BufReader, sync::Arc};

use crate::{
engine::arrow_data::ArrowEngineData,
schema::{DataType, PrimitiveType, Schema, SchemaRef, StructField, StructType},
utils::require,
DeltaResult, Error,
DeltaResult, EngineData, Error,
};

use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
StructArray,
RecordBatch, StringArray, StructArray,
};
use arrow_json::ReaderBuilder;
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
SchemaRef as ArrowSchemaRef,
};
use arrow_select::concat::concat_batches;
use itertools::Itertools;
use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor};
use tracing::debug;
Expand Down Expand Up @@ -757,6 +760,59 @@ fn reorder_list<O: OffsetSizeTrait>(
}
}

fn hack_parse(
stats_schema: &ArrowSchemaRef,
json_string: Option<&str>,
) -> DeltaResult<RecordBatch> {
match json_string {
Some(s) => Ok(ReaderBuilder::new(stats_schema.clone())
.build(BufReader::new(s.as_bytes()))?
.next()
.transpose()?
.ok_or(Error::missing_data("Expected data"))?),
None => Ok(RecordBatch::try_new(
stats_schema.clone(),
stats_schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), 1))
.collect(),
)?),
}
}

/// Arrow lacks the functionality to json-parse a string column into a struct column -- even tho the
/// JSON file reader does exactly the same thing. This function is a hack to work around that gap.
pub(crate) fn parse_json(
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
// TODO(nick): this is pretty terrible
let struct_array: StructArray = json_strings.into();
let json_strings = struct_array
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::generic("Expected json_strings to be a StringArray, found something else")
})?;
let output_schema: ArrowSchemaRef = Arc::new(output_schema.as_ref().try_into()?);
if json_strings.is_empty() {
return Ok(Box::new(ArrowEngineData::new(RecordBatch::new_empty(
output_schema,
))));
}
let output: Vec<_> = json_strings
.iter()
.map(|json_string| hack_parse(&output_schema, json_string))
.try_collect()?;
Ok(Box::new(ArrowEngineData::new(concat_batches(
&output_schema,
output.iter(),
)?)))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
56 changes: 5 additions & 51 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@ use std::ops::Range;
use std::sync::Arc;
use std::task::{ready, Poll};

use arrow_array::{new_null_array, Array, RecordBatch, StringArray, StructArray};
use arrow_json::ReaderBuilder;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use arrow_select::concat::concat_batches;
use bytes::{Buf, Bytes};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::path::Path;
use object_store::{DynObjectStore, GetResultPayload};

use super::executor::TaskExecutor;
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::parse_json as arrow_parse_json;
use crate::schema::SchemaRef;
use crate::{
DeltaResult, EngineData, Error, Expression, FileDataReadResultIterator, FileMeta, JsonHandler,
Expand Down Expand Up @@ -62,57 +59,13 @@ impl<E: TaskExecutor> DefaultJsonHandler<E> {
}
}

fn hack_parse(
stats_schema: &ArrowSchemaRef,
json_string: Option<&str>,
) -> DeltaResult<RecordBatch> {
match json_string {
Some(s) => Ok(ReaderBuilder::new(stats_schema.clone())
.build(BufReader::new(s.as_bytes()))?
.next()
.transpose()?
.ok_or(Error::missing_data("Expected data"))?),
None => Ok(RecordBatch::try_new(
stats_schema.clone(),
stats_schema
.fields
.iter()
.map(|field| new_null_array(field.data_type(), 1))
.collect(),
)?),
}
}

impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn parse_json(
&self,
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>> {
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
// TODO(nick): this is pretty terrible
let struct_array: StructArray = json_strings.into();
let json_strings = struct_array
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
Error::generic("Expected json_strings to be a StringArray, found something else")
})?;
let output_schema: ArrowSchemaRef = Arc::new(output_schema.as_ref().try_into()?);
if json_strings.is_empty() {
return Ok(Box::new(ArrowEngineData::new(RecordBatch::new_empty(
output_schema,
))));
}
let output: Vec<_> = json_strings
.iter()
.map(|json_string| hack_parse(&output_schema, json_string))
.try_collect()?;
Ok(Box::new(ArrowEngineData::new(concat_batches(
&output_schema,
output.iter(),
)?)))
arrow_parse_json(json_strings, output_schema)
}

fn read_json_files(
Expand Down Expand Up @@ -220,14 +173,15 @@ impl FileOpener for JsonOpener {
mod tests {
use std::path::PathBuf;

use arrow::array::AsArray;
use arrow::array::{AsArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};

use super::*;
use crate::{
actions::get_log_schema, engine::default::executor::tokio::TokioBackgroundExecutor,
actions::get_log_schema, engine::arrow_data::ArrowEngineData,
engine::default::executor::tokio::TokioBackgroundExecutor,
};

fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
Expand Down
28 changes: 24 additions & 4 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStream
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler};

Expand Down Expand Up @@ -47,7 +48,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -62,10 +63,15 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
};
Expand All @@ -83,20 +89,23 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
table_schema: SchemaRef,
predicate: Option<Expression>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
batch_size,
table_schema,
predicate,
limit: None,
store,
}
Expand All @@ -111,6 +120,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
// let projection = self.projection.clone();
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;

Ok(Box::pin(async move {
Expand All @@ -133,6 +143,9 @@ impl FileOpener for ParquetOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand All @@ -153,16 +166,18 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
Self {
batch_size,
table_schema: schema,
predicate,
limit: None,
client: reqwest::Client::new(),
}
Expand All @@ -173,6 +188,7 @@ impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); // uses Arc internally according to reqwest docs

Expand All @@ -196,6 +212,9 @@ impl FileOpener for PresignedUrlOpener {
builder = builder.with_projection(mask)
}

if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate);
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
Expand Down Expand Up @@ -261,6 +280,7 @@ mod tests {
size: meta.size,
}];

// TODO: add a test that uses predicate skipping?
scovich marked this conversation as resolved.
Show resolved Hide resolved
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_parquet_files(files, Arc::new(physical_schema.try_into().unwrap()), None)
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub mod arrow_expression;
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod arrow_data;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_row_group_skipping;

#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
pub mod parquet_stats_skipping;

Expand Down
Loading
Loading