Skip to content

Commit

Permalink
Merge pull request #83 from roeap/partition-values
Browse files Browse the repository at this point in the history
Leverage more engine capabilities in data skipping 2/n
  • Loading branch information
roeap authored Feb 15, 2024
2 parents d8d3eb3 + 6cb4fe4 commit 5f48dea
Show file tree
Hide file tree
Showing 24 changed files with 672 additions and 251 deletions.
2 changes: 1 addition & 1 deletion acceptance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ tar = "0.4"

[dev-dependencies]
arrow = { version = "^49.0", features = ["json", "prettyprint"] }
datatest-stable = "0.1.3"
datatest-stable = "0.2"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
tempfile = "3"
test-case = { version = "3.1.0" }
Expand Down
22 changes: 15 additions & 7 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@ version.workspace = true

[dependencies]
arrow-array = { version = "^49.0" }
arrow-arith = { version = "^49.0" }
arrow-json = { version = "^49.0" }
arrow-ord = { version = "^49.0" }
arrow-schema = { version = "^49.0" }
arrow-select = { version = "^49.0" }
bytes = "1.4"
chrono = { version = "0.4", optional = true }
chrono = { version = "0.4" }
either = "1.8"
fix-hidden-lifetime-bug = "0.2"
indexmap = "2.2.1"
itertools = "0.12"
lazy_static = "1.4"
# need to generalize over arrow, arrow2 and diff parquet etc. (BYOP)
regex = "1.8"
roaring = "0.10.1"
serde = { version = "1", features = ["derive"] }
Expand All @@ -37,6 +33,10 @@ z85 = "3.0.5"
visibility = "0.1.0"

# Used in default client
arrow-arith = { version = "^49.0", optional = true }
arrow-json = { version = "^49.0", optional = true }
arrow-ord = { version = "^49.0", optional = true }
arrow-schema = { version = "^49.0", optional = true }
futures = { version = "0.3", optional = true }
object_store = { version = "^0.8.0", optional = true }
parquet = { version = "^49.0", optional = true, features = [
Expand All @@ -49,7 +49,15 @@ tokio = { version = "1", optional = true, features = ["rt-multi-thread"] }

[features]
default = ["default-client"]
default-client = ["chrono", "futures", "object_store", "parquet"]
default-client = [
"arrow-arith",
"arrow-json",
"arrow-ord",
"arrow-schema",
"futures",
"object_store",
"parquet",
]
developer-visibility = []

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/dump-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn main() {
let scan = ScanBuilder::new(snapshot).build();

let schema = scan.schema();
let header_names = schema.fields.iter().map(|field| {
let header_names = schema.fields().map(|field| {
let cell = Cell::new(field.name());
if cli.ascii {
cell
Expand Down
11 changes: 5 additions & 6 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ fn struct_array_to_map(arr: &StructArray) -> DeltaResult<HashMap<String, Option<
mod tests {
use std::sync::Arc;

use arrow_array::ArrayRef;
use object_store::local::LocalFileSystem;

use super::*;
Expand All @@ -534,13 +535,12 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());
handler.parse_json(json_strings, output_schema).unwrap()
}
Expand Down Expand Up @@ -597,15 +597,14 @@ mod tests {
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));

let json_strings: StringArray = vec![
let json_strings: ArrayRef = Arc::new(StringArray::from(vec![
r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#,
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":"b"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#,
r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#,
]
.into();
]));
let output_schema = Arc::new(log_schema().clone());
let batch = handler.parse_json(json_strings, output_schema).unwrap();

Expand Down
37 changes: 8 additions & 29 deletions kernel/src/client/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow_schema::{
ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, TimeUnit,
};
use itertools::Itertools;

use crate::actions::ActionType;
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType};
Expand All @@ -20,12 +21,7 @@ impl TryFrom<&StructType> for ArrowSchema {
type Error = ArrowError;

fn try_from(s: &StructType) -> Result<Self, ArrowError> {
let fields = s
.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;

let fields: Vec<ArrowField> = s.fields().map(TryInto::try_into).try_collect()?;
Ok(ArrowSchema::new(fields))
}
}
Expand Down Expand Up @@ -103,23 +99,12 @@ impl TryFrom<&DataType> for ArrowDataType {
PrimitiveType::Boolean => Ok(ArrowDataType::Boolean),
PrimitiveType::Binary => Ok(ArrowDataType::Binary),
PrimitiveType::Decimal(precision, scale) => {
let precision = u8::try_from(*precision).map_err(|_| {
ArrowError::SchemaError(format!(
"Invalid precision for decimal: {}",
precision
))
})?;
let scale = i8::try_from(*scale).map_err(|_| {
ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale))
})?;

if precision <= 38 {
Ok(ArrowDataType::Decimal128(precision, scale))
} else if precision <= 76 {
Ok(ArrowDataType::Decimal256(precision, scale))
if precision <= &38 {
Ok(ArrowDataType::Decimal128(*precision, *scale))
} else {
// NOTE: since we are converting from delta, we should never get here.
Err(ArrowError::SchemaError(format!(
"Precision too large to be represented in Arrow: {}",
"Precision too large to be represented as Delta type: {} > 38",
precision
)))
}
Expand All @@ -137,8 +122,7 @@ impl TryFrom<&DataType> for ArrowDataType {
}
DataType::Struct(s) => Ok(ArrowDataType::Struct(
s.fields()
.iter()
.map(|f| <ArrowField as TryFrom<&StructField>>::try_from(*f))
.map(TryInto::try_into)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?
.into(),
)),
Expand Down Expand Up @@ -226,12 +210,7 @@ impl TryFrom<&ArrowDataType> for DataType {
ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)),
ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal(
*p as i32, *s as i32,
))),
ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal(
*p as i32, *s as i32,
))),
ArrowDataType::Decimal128(p, s) => Ok(DataType::decimal(*p, *s)),
ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
Expand Down
Loading

0 comments on commit 5f48dea

Please sign in to comment.