Skip to content

Commit

Permalink
revert HashMap<_, Option<_>> changes for partition values
Browse files Browse the repository at this point in the history
  • Loading branch information
hntd187 committed Sep 26, 2024
1 parent fe57b9b commit 4921f19
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 194 deletions.
7 changes: 2 additions & 5 deletions derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use syn::{parse_macro_input, Data, DataStruct, DeriveInput, Fields, Meta, PathAr
/// delta schema camelCase version).
///
/// If a field sets `drop_null_container_values`, it means the underlying data can contain null in
/// the values of the container (i.e. a `key` -> `null` in a `HashMap`). Therefore the schema should
/// the values of the container (i.e. a `key` -> `null` in a `HashMap`). Therefore, the schema should
/// mark the value field as nullable, but those mappings will be dropped when converting to an
/// actual rust `HashMap`. Currently this can _only_ be set on `HashMap` fields.
/// actual rust `HashMap`. Currently, this can _only_ be set on `HashMap` fields.
#[proc_macro_derive(Schema, attributes(drop_null_container_values))]
pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input = parse_macro_input!(input as DeriveInput);
Expand All @@ -30,9 +30,6 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream
#schema_fields
]).into()
}
fn nullable() -> bool {
false
}
}
};
proc_macro::TokenStream::from(output)
Expand Down
63 changes: 18 additions & 45 deletions kernel/src/actions/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,79 +6,52 @@ use crate::schema::{ArrayType, DataType, MapType, StructField};

pub(crate) trait ToDataType {
fn to_data_type() -> DataType;
fn nullable() -> bool;
}

pub(crate) trait ToNullableContainerType {
fn to_nullable_container_type() -> DataType;
}

macro_rules! impl_to_data_type {
( $(($rust_type: ty, $kernel_type: expr, $nullable: expr)), * ) => {
( $(($rust_type: ty, $kernel_type: expr)), * ) => {
$(
impl ToDataType for $rust_type {
fn to_data_type() -> DataType {
$kernel_type
}

fn nullable() -> bool {
$nullable
}
}
)*
};
}

impl_to_data_type!(
(String, DataType::STRING, false),
(i64, DataType::LONG, false),
(i32, DataType::INTEGER, false),
(i16, DataType::SHORT, false),
(char, DataType::BYTE, false),
(f32, DataType::FLOAT, false),
(f64, DataType::DOUBLE, false),
(bool, DataType::BOOLEAN, false)
(String, DataType::STRING),
(i64, DataType::LONG),
(i32, DataType::INTEGER),
(i16, DataType::SHORT),
(char, DataType::BYTE),
(f32, DataType::FLOAT),
(f64, DataType::DOUBLE),
(bool, DataType::BOOLEAN)
);

// ToDataType impl for non-nullable array types
impl<T: ToDataType> ToDataType for Vec<T> {
fn to_data_type() -> DataType {
ArrayType::new(T::to_data_type(), T::nullable()).into()
}

fn nullable() -> bool {
T::nullable()
ArrayType::new(T::to_data_type(), false).into()
}
}

impl<T: ToDataType> ToDataType for HashSet<T> {
fn to_data_type() -> DataType {
ArrayType::new(T::to_data_type(), T::nullable()).into()
}

fn nullable() -> bool {
T::nullable()
ArrayType::new(T::to_data_type(), false).into()
}
}

// ToDataType impl for non-nullable map types
impl<K: ToDataType, V: ToDataType> ToDataType for HashMap<K, V> {
fn to_data_type() -> DataType {
MapType::new(K::to_data_type(), V::to_data_type(), V::nullable()).into()
}

fn nullable() -> bool {
V::nullable()
}
}

impl<T: ToDataType> ToDataType for Option<T> {
fn to_data_type() -> DataType {
T::to_data_type()
}

fn nullable() -> bool {
true
MapType::new(K::to_data_type(), V::to_data_type(), false).into()
}
}

Expand Down Expand Up @@ -108,13 +81,13 @@ impl<T: ToNullableContainerType> GetNullableContainerStructField for T {
// Normal types produce non-nullable fields
impl<T: ToDataType> GetStructField for T {
fn get_struct_field(name: impl Into<String>) -> StructField {
StructField::new(name, T::to_data_type(), T::nullable())
StructField::new(name, T::to_data_type(), false)
}
}

// Option types produce nullable fields
// impl<T: ToDataType> GetStructField for Option<T> {
// fn get_struct_field(name: impl Into<String>) -> StructField {
// StructField::new(name, T::to_data_type(), T::nullable())
// }
// }
impl<T: ToDataType> GetStructField for Option<T> {
fn get_struct_field(name: impl Into<String>) -> StructField {
StructField::new(name, T::to_data_type(), true)
}
}
15 changes: 7 additions & 8 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ impl AddVisitor {
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Add> {
let partition_values: HashMap<_, Option<String>> =
getters[1].get(row_index, "add.partitionValues")?;
let partition_values: HashMap<_, _> = getters[1].get(row_index, "add.partitionValues")?;
let size: i64 = getters[2].get(row_index, "add.size")?;
let modification_time: i64 = getters[3].get(row_index, "add.modificationTime")?;
let data_change: bool = getters[4].get(row_index, "add.dataChange")?;
Expand Down Expand Up @@ -427,8 +426,8 @@ mod tests {
let add1 = Add {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), Some("4".to_string())),
("c2".to_string(), Some("c".to_string())),
("c1".to_string(), "4".to_string()),
("c2".to_string(), "c".to_string()),
]),
size: 452,
modification_time: 1670892998135,
Expand All @@ -443,8 +442,8 @@ mod tests {
let add2 = Add {
path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), Some("5".to_string())),
("c2".to_string(), Some("b".to_string())),
("c1".to_string(), "5".to_string()),
("c2".to_string(), "b".to_string()),
]),
modification_time: 1670892998136,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}".into()),
Expand All @@ -453,8 +452,8 @@ mod tests {
let add3 = Add {
path: "c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), Some("6".to_string())),
("c2".to_string(), Some("a".to_string())),
("c1".to_string(), "6".to_string()),
("c2".to_string(), "a".to_string()),
]),
modification_time: 1670892998137,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}".into()),
Expand Down
15 changes: 1 addition & 14 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where
fn materialize(&self, row_index: usize) -> Vec<String> {
let mut result = vec![];
for i in 0..EngineList::len(self, row_index) {
result.push(EngineList::get(self, row_index, i));
result.push(self.get(row_index, i));
}
result
}
Expand Down Expand Up @@ -150,19 +150,6 @@ impl EngineMap for MapArray {
}
ret
}

fn materialize_opt(&self, row_index: usize) -> HashMap<String, Option<String>> {
let mut ret = HashMap::new();
let map_val = self.value(row_index);
let keys = map_val.column(0).as_string::<i32>();
let values = map_val.column(1).as_string::<i32>();
for (key, value) in keys.iter().zip(values.iter()) {
if let (Some(key), value) = (key, value) {
ret.insert(key.into(), value.map(Into::into));
}
}
ret
}
}

impl ArrowEngineData {
Expand Down
11 changes: 8 additions & 3 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ mod tests {
let output = evaluate_expression(&map_access, &batch, None)?;

assert_eq!(output.len(), 1);
assert_eq!(output.as_ref(), expected_array.as_ref());
assert_eq!(*output, *expected_array);

Ok(())
}
Expand All @@ -863,10 +863,15 @@ mod tests {
let array = setup_map_array(map_values)?;

let batch = RecordBatch::try_new(schema.clone(), vec![array])?;
let output = evaluate_expression(&predicate_expr, &batch, None)?;
let output = evaluate_expression(
&predicate_expr,
&batch,
Some(&crate::schema::DataType::BOOLEAN),
)?;
let expected = Arc::new(BooleanArray::from(vec![false, true]));

assert_eq!(output.len(), 2);
assert_eq!(output.as_ref(), expected.as_ref());
assert_eq!(*output, *expected);

Ok(())
}
Expand Down
85 changes: 6 additions & 79 deletions kernel/src/engine/arrow_get_data.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::engine_data::{EngineMap, OptMapItem};
use crate::{
engine_data::{GetData, ListItem, MapItem},
DeltaResult,
};
use arrow_array::cast::AsArray;
use arrow_array::{
types::{GenericStringType, Int32Type, Int64Type},
Array, BooleanArray, GenericByteArray, GenericListArray, MapArray, OffsetSizeTrait,
PrimitiveArray,
};
use std::collections::HashMap;

use crate::{
engine_data::{GetData, ListItem, MapItem},
DeltaResult,
};

// actual impls (todo: could macro these)

impl<'a> GetData<'a> for BooleanArray {
Expand Down Expand Up @@ -67,67 +66,6 @@ where
Ok(None)
}
}

fn get_map_opt(
&'a self,
row_index: usize,
_field_name: &str,
) -> DeltaResult<Option<OptMapItem<'a>>> {
if self.is_valid(row_index) {
Ok(Some(OptMapItem::new(self, row_index)))
} else {
Ok(None)
}
}
}

// Maps are internally represented as a struct of key/value columns, so this impl assumes the list
// has those structs for it's members
impl<OffsetSize: OffsetSizeTrait> EngineMap for GenericListArray<OffsetSize> {
fn get(&self, row_index: usize, key: &str) -> Option<&str> {
let offsets = self.offsets();
let start_offset = offsets[row_index].as_usize();
let count = offsets[row_index + 1].as_usize() - start_offset;
let key_col = self.values().as_struct().column_by_name("key").unwrap();
let keys = key_col.as_string::<i32>();
for (idx, map_key) in keys.iter().enumerate().skip(start_offset).take(count) {
if let Some(map_key) = map_key {
if key == map_key {
let vals = self.values().as_string::<i32>();
return Some(vals.value(idx));
}
}
}
None
}

fn materialize(&self, row_index: usize) -> HashMap<String, String> {
let mut ret = HashMap::new();
let map_val = self.value(row_index);
let map_struct = map_val.as_struct();
let keys = map_struct.column(0).as_string::<i32>();
let values = map_struct.column(1).as_string::<i32>();
for (key, value) in keys.iter().zip(values.iter()) {
if let (Some(key), Some(value)) = (key, value) {
ret.insert(key.into(), value.into());
}
}
ret
}

fn materialize_opt(&self, row_index: usize) -> HashMap<String, Option<String>> {
let mut ret = HashMap::new();
let map_val = self.value(row_index);
let map_struct = map_val.as_struct();
let keys = map_struct.column(0).as_string::<i32>();
let values = map_struct.column(1).as_string::<i32>();
for (key, value) in keys.iter().zip(values.iter()) {
if let (Some(key), value) = (key, value) {
ret.insert(key.into(), value.map(Into::into));
}
}
ret
}
}

impl<'a> GetData<'a> for MapArray {
Expand All @@ -138,15 +76,4 @@ impl<'a> GetData<'a> for MapArray {
Ok(None)
}
}
fn get_map_opt(
&'a self,
row_index: usize,
_field_name: &str,
) -> DeltaResult<Option<OptMapItem<'a>>> {
if self.is_valid(row_index) {
Ok(Some(OptMapItem::new(self, row_index)))
} else {
Ok(None)
}
}
}
Loading

0 comments on commit 4921f19

Please sign in to comment.