Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map access for expressions #352

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[workspace]
members = [
"acceptance",
"derive-macros",
"ffi",
"kernel",
"kernel/examples/read-table-single-threaded", # todo: put back to `examples/*` when inspect-table is fixed
"kernel/examples/read-table-multi-threaded",
"acceptance",
"derive-macros",
"ffi",
"kernel",
"kernel/examples/read-table-single-threaded", # todo: put back to `examples/*` when inspect-table is fixed
"kernel/examples/read-table-multi-threaded",
]
# Only check / build main crates by default (check all with `--workspace`)
default-members = ["acceptance", "kernel"]
Expand Down
4 changes: 2 additions & 2 deletions derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, Meta, PathAr
/// delta schema camelCase version).
///
/// If a field sets `drop_null_container_values`, it means the underlying data can contain null in
/// the values of the container (i.e. a `key` -> `null` in a `HashMap`). Therefore the schema should
/// the values of the container (i.e. a `key` -> `null` in a `HashMap`). Therefore, the schema should
/// mark the value field as nullable, but those mappings will be dropped when converting to an
/// actual rust `HashMap`. Currently this can _only_ be set on `HashMap` fields.
/// actual rust `HashMap`. Currently, this can _only_ be set on `HashMap` fields.
#[proc_macro_derive(Schema, attributes(drop_null_container_values))]
pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input = parse_macro_input!(input as DeriveInput);
Expand Down
10 changes: 5 additions & 5 deletions ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ crate-type = ["lib", "cdylib", "staticlib"]
tracing = "0.1"
url = "2"
delta_kernel = { path = "../kernel", default-features = false, features = [
"developer-visibility",
"developer-visibility",
] }
delta_kernel_ffi_macros = { path = "../ffi-proc-macros", version = "0.3.1" }

Expand All @@ -40,10 +40,10 @@ trybuild = "1.0"
[features]
default = ["default-engine"]
default-engine = [
"delta_kernel/default-engine",
"arrow-array",
"arrow-data",
"arrow-schema",
"delta_kernel/default-engine",
"arrow-array",
"arrow-data",
"arrow-schema",
]
sync-engine = ["delta_kernel/sync-engine"]
developer-visibility = []
64 changes: 32 additions & 32 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repository.workspace = true
readme.workspace = true
version.workspace = true
# exclude golden tests + golden test data since they push us over 10MB crate size limit
exclude = ["tests/golden_tables.rs", "tests/golden_data/" ]
exclude = ["tests/golden_tables.rs", "tests/golden_data/"]

[package.metadata.docs.rs]
all-features = true
Expand Down Expand Up @@ -68,44 +68,44 @@ walkdir = { workspace = true, optional = true }
arrow-conversion = ["arrow-schema"]
arrow-expression = ["arrow-arith", "arrow-array", "arrow-buffer", "arrow-ord", "arrow-schema"]
cloud = [
"object_store/aws",
"object_store/azure",
"object_store/gcp",
"object_store/http",
"hdfs-native-object-store",
"object_store/aws",
"object_store/azure",
"object_store/gcp",
"object_store/http",
"hdfs-native-object-store",
]
default = ["sync-engine"]
default-engine = [
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-json",
"arrow-schema",
"arrow-select",
"futures",
"object_store",
"parquet/async",
"parquet/object_store",
"reqwest",
"tokio",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-json",
"arrow-schema",
"arrow-select",
"futures",
"object_store",
"parquet/async",
"parquet/object_store",
"reqwest",
"tokio",
]

developer-visibility = []
sync-engine = [
"arrow-cast",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-json",
"arrow-select",
"parquet",
"arrow-cast",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-json",
"arrow-select",
"parquet",
]
integration-test = [
"hdfs-native-object-store/integration-test",
"hdfs-native",
"walkdir",
"hdfs-native-object-store/integration-test",
"hdfs-native",
"walkdir",
]

[build-dependencies]
Expand All @@ -120,6 +120,6 @@ tempfile = "3"
tar = "0.4"
zstd = "0.13"
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"fmt",
"env-filter",
"fmt",
] }
2 changes: 1 addition & 1 deletion kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ where
}

impl EngineMap for MapArray {
fn get<'a>(&'a self, row_index: usize, key: &str) -> Option<&'a str> {
fn get(&self, row_index: usize, key: &str) -> Option<&str> {
let offsets = self.offsets();
let start_offset = offsets[row_index] as usize;
let count = offsets[row_index + 1] as usize - start_offset;
Expand Down
159 changes: 142 additions & 17 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, MapArray, RecordBatch,
StringArray, StructArray, TimestampMicrosecondArray,
};
use arrow_buffer::OffsetBuffer;
Expand All @@ -18,9 +18,10 @@ use arrow_schema::{
Schema as ArrowSchema, TimeUnit,
};
use arrow_select::concat::concat;
use arrow_select::filter::filter;
use itertools::Itertools;

use super::arrow_conversion::LIST_ARRAY_ROOT;
use super::arrow_conversion::{LIST_ARRAY_ROOT, MAP_ROOT_DEFAULT};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::ensure_data_types;
use crate::engine::arrow_utils::prim_array_cmp;
Expand Down Expand Up @@ -50,6 +51,14 @@ impl Scalar {
Double(val) => Arc::new(Float64Array::from_value(*val, num_rows)),
String(val) => Arc::new(StringArray::from(vec![val.clone(); num_rows])),
Boolean(val) => Arc::new(BooleanArray::from(vec![*val; num_rows])),
// Integer(val) => Arc::new(Int32Array::new_scalar(*val).into_inner()),
// Long(val) => Arc::new(Int64Array::new_scalar(*val).into_inner()),
// Short(val) => Arc::new(Int16Array::new_scalar(*val).into_inner()),
// Byte(val) => Arc::new(Int8Array::new_scalar(*val).into_inner()),
// Float(val) => Arc::new(Float32Array::new_scalar(*val).into_inner()),
// Double(val) => Arc::new(Float64Array::new_scalar(*val).into_inner()),
// String(val) => Arc::new(StringArray::new_scalar(val).into_inner()),
// Boolean(val) => Arc::new(BooleanArray::new_scalar(*val).into_inner()),
Timestamp(val) => {
Arc::new(TimestampMicrosecondArray::from_value(*val, num_rows).with_timezone("UTC"))
}
Expand Down Expand Up @@ -120,7 +129,16 @@ impl Scalar {
ArrowField::new(LIST_ARRAY_ROOT, t.element_type().try_into()?, true);
Arc::new(ListArray::new_null(Arc::new(field), num_rows))
}
DataType::Map { .. } => unimplemented!(),
DataType::Map(m) => {
let field = ArrowField::new(MAP_ROOT_DEFAULT, m.key_type().try_into()?, true);
Arc::new(MapArray::new(
Arc::new(field),
OffsetBuffer::new_empty(),
StructArray::new_empty_fields(num_rows, None),
None,
false,
))
}
},
};
Ok(arr)
Expand Down Expand Up @@ -175,14 +193,18 @@ fn column_as_struct<'a>(
name: &str,
column: &Option<&'a Arc<dyn Array>>,
) -> Result<&'a StructArray, ArrowError> {
column
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?
.as_any()
.downcast_ref::<StructArray>()
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
let c = column.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))?;
if let arrow_schema::DataType::Map(_, _) = c.data_type() {
hntd187 marked this conversation as resolved.
Show resolved Hide resolved
Ok(c.as_map_opt()
.ok_or(ArrowError::SchemaError(format!("{} is not a map", name)))?
.entries())
} else {
c.as_struct_opt()
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
}
}

fn evaluate_expression(
pub(crate) fn evaluate_expression(
expression: &Expression,
batch: &RecordBatch,
result_type: Option<&DataType>,
Expand Down Expand Up @@ -319,8 +341,14 @@ fn evaluate_expression(
.map_err(Error::generic_err)
}
(BinaryOperation { op, left, right }, _) => {
let left_arr = evaluate_expression(left.as_ref(), batch, None)?;
let right_arr = evaluate_expression(right.as_ref(), batch, None)?;
let mut left_arr = evaluate_expression(left.as_ref(), batch, None)?;
let mut right_arr = evaluate_expression(right.as_ref(), batch, None)?;

if matches!(**left, MapAccess { .. }) {
// Only modify arrays for maps...
left_arr = left_arr.as_map().values().clone(); // Compare against values since we already filtered to only matching keys
right_arr = right_arr.slice(0, left_arr.len()); // Since we don't use a scalar array, modify the array to be of equal length
}

type Operation = fn(&dyn Datum, &dyn Datum) -> Result<Arc<dyn Array>, ArrowError>;
let eval: Operation = match op {
Expand Down Expand Up @@ -363,6 +391,17 @@ fn evaluate_expression(
"Variadic {expression:?} is expected to return boolean results, got {result_type:?}"
)))
}
(MapAccess { source, key }, _) => {
let source = evaluate_expression(source, batch, None)?;
if let Some(key) = key {
let entries = source.as_map();
let key_array = StringArray::new_scalar(key); // Keys shouldn't ever be null by definition in arrow, but doing this second filter to be careful
let key_mask = eq(entries.keys(), &key_array)?;
filter(entries, &key_mask).map_err(Error::generic_err)
} else {
Ok(source)
}
}
}
}

Expand Down Expand Up @@ -424,16 +463,50 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {

#[cfg(test)]
mod tests {
use std::ops::{Add, Div, Mul, Sub};

use arrow_array::{GenericStringArray, Int32Array};
use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field, Fields, Schema};

use super::*;
use crate::expressions::*;
use crate::schema::ArrayType;
use crate::DataType as DeltaDataTypes;
use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use arrow_array::{GenericStringArray, Int32Array};
use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field, Fields, Schema};
use std::collections::HashMap;
use std::ops::{Add, Div, Mul, Sub};

fn setup_map_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new_map(
"test_map",
MAP_ROOT_DEFAULT,
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Utf8, true),
false,
true,
)]))
}

fn setup_map_array(map: HashMap<String, Option<String>>) -> DeltaResult<Arc<MapArray>> {
let mut array_builder = MapBuilder::new(
Some(MapFieldNames {
entry: MAP_ROOT_DEFAULT.to_string(),
..Default::default()
}),
StringBuilder::new(),
StringBuilder::new(),
);

for (k, v) in map {
array_builder.keys().append_value(k);
if let Some(v) = v {
array_builder.values().append_value(v);
} else {
array_builder.values().append_null();
}
array_builder.append(true)?;
}

Ok(Arc::new(array_builder.finish()))
}

#[test]
fn test_array_column() {
Expand Down Expand Up @@ -750,4 +823,56 @@ mod tests {
let expected = Arc::new(BooleanArray::from(vec![true, false]));
assert_eq!(results.as_ref(), expected.as_ref());
}

#[test]
fn test_map_expression_access() -> DeltaResult<()> {
let map_access = Expression::map(Expression::column("test_map"), Some("first_key"));

let schema = setup_map_schema();
let map_values = HashMap::from_iter([
("first_key".to_string(), Some("first".to_string())),
("second_key".to_string(), Some("second_value".to_string())),
]);
let array = setup_map_array(map_values)?;
let expected = HashMap::from_iter([("first_key".to_string(), Some("first".to_string()))]);
let expected_array = setup_map_array(expected)?;

let batch = RecordBatch::try_new(schema.clone(), vec![array])?;
let output = evaluate_expression(&map_access, &batch, None)?;

assert_eq!(output.len(), 1);
assert_eq!(*output, *expected_array);

Ok(())
}

#[test]
fn test_map_expression_eq() -> DeltaResult<()> {
let map_access = Expression::map(Expression::column("test_map"), None);
let predicate_expr = Expression::binary(
BinaryOperator::Equal,
map_access.clone(),
Expression::literal("second_value"),
);

let schema = setup_map_schema();
let map_values = HashMap::from_iter([
("first_key".to_string(), Some("first".to_string())),
("second_key".to_string(), Some("second_value".to_string())),
]);
let array = setup_map_array(map_values)?;

let batch = RecordBatch::try_new(schema.clone(), vec![array])?;
let output = evaluate_expression(
&predicate_expr,
&batch,
Some(&crate::schema::DataType::BOOLEAN),
)?;
let expected = Arc::new(BooleanArray::from(vec![false, true]));

assert_eq!(output.len(), 2);
assert_eq!(*output, *expected);

Ok(())
}
}
Loading
Loading