Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map access for expressions #352

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[workspace]
members = [
"acceptance",
"derive-macros",
"ffi",
"kernel",
"kernel/examples/read-table-single-threaded", # todo: put back to `examples/*` when inspect-table is fixed
"kernel/examples/read-table-multi-threaded",
"acceptance",
"derive-macros",
"ffi",
"kernel",
"kernel/examples/read-table-single-threaded", # todo: put back to `examples/*` when inspect-table is fixed
"kernel/examples/read-table-multi-threaded",
]
# Only check / build main crates by default (check all with `--workspace`)
default-members = ["acceptance", "kernel"]
Expand Down
3 changes: 3 additions & 0 deletions derive-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub fn derive_schema(input: proc_macro::TokenStream) -> proc_macro::TokenStream
#schema_fields
]).into()
}
fn nullable() -> bool {
false
}
}
};
proc_macro::TokenStream::from(output)
Expand Down
10 changes: 5 additions & 5 deletions ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ crate-type = ["lib", "cdylib", "staticlib"]
tracing = "0.1"
url = "2"
delta_kernel = { path = "../kernel", default-features = false, features = [
"developer-visibility",
"developer-visibility",
] }
delta_kernel_ffi_macros = { path = "../ffi-proc-macros", version = "0.3.1" }

Expand All @@ -40,10 +40,10 @@ trybuild = "1.0"
[features]
default = ["default-engine"]
default-engine = [
"delta_kernel/default-engine",
"arrow-array",
"arrow-data",
"arrow-schema",
"delta_kernel/default-engine",
"arrow-array",
"arrow-data",
"arrow-schema",
]
sync-engine = ["delta_kernel/sync-engine"]
developer-visibility = []
64 changes: 32 additions & 32 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repository.workspace = true
readme.workspace = true
version.workspace = true
# exclude golden tests + golden test data since they push us over 10MB crate size limit
exclude = ["tests/golden_tables.rs", "tests/golden_data/" ]
exclude = ["tests/golden_tables.rs", "tests/golden_data/"]

[package.metadata.docs.rs]
all-features = true
Expand Down Expand Up @@ -68,44 +68,44 @@ walkdir = { workspace = true, optional = true }
arrow-conversion = ["arrow-schema"]
arrow-expression = ["arrow-arith", "arrow-array", "arrow-buffer", "arrow-ord", "arrow-schema"]
cloud = [
"object_store/aws",
"object_store/azure",
"object_store/gcp",
"object_store/http",
"hdfs-native-object-store",
"object_store/aws",
"object_store/azure",
"object_store/gcp",
"object_store/http",
"hdfs-native-object-store",
]
default = ["sync-engine"]
default-engine = [
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-json",
"arrow-schema",
"arrow-select",
"futures",
"object_store",
"parquet/async",
"parquet/object_store",
"reqwest",
"tokio",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-json",
"arrow-schema",
"arrow-select",
"futures",
"object_store",
"parquet/async",
"parquet/object_store",
"reqwest",
"tokio",
]

developer-visibility = []
sync-engine = [
"arrow-cast",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-json",
"arrow-select",
"parquet",
"arrow-cast",
"arrow-conversion",
"arrow-expression",
"arrow-array",
"arrow-json",
"arrow-select",
"parquet",
]
integration-test = [
"hdfs-native-object-store/integration-test",
"hdfs-native",
"walkdir",
"hdfs-native-object-store/integration-test",
"hdfs-native",
"walkdir",
]

[build-dependencies]
Expand All @@ -120,6 +120,6 @@ tempfile = "3"
tar = "0.4"
zstd = "0.13"
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"fmt",
"env-filter",
"fmt",
] }
4 changes: 2 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) fn get_log_schema() -> &'static StructType {
pub struct Format {
/// Name of the encoding for files in this table
pub provider: String,
/// A map containingconfiguration options for the format
/// A map containing configuration options for the format
pub options: HashMap<String, String>,
}

Expand Down Expand Up @@ -141,7 +141,7 @@ pub struct Add {
pub path: String,

/// A map from partition column to value for this logical file.
pub partition_values: HashMap<String, String>,
pub partition_values: HashMap<String, Option<String>>,

/// The size of this data file in bytes
pub size: i64,
Expand Down
63 changes: 45 additions & 18 deletions kernel/src/actions/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,75 @@ use crate::schema::{ArrayType, DataType, MapType, StructField};

pub(crate) trait ToDataType {
fn to_data_type() -> DataType;
fn nullable() -> bool;
}

macro_rules! impl_to_data_type {
( $(($rust_type: ty, $kernel_type: expr)), * ) => {
( $(($rust_type: ty, $kernel_type: expr, $nullable: expr)), * ) => {
$(
impl ToDataType for $rust_type {
fn to_data_type() -> DataType {
$kernel_type
}

fn nullable() -> bool {
$nullable
}
}
)*
};
}

impl_to_data_type!(
(String, DataType::STRING),
(i64, DataType::LONG),
(i32, DataType::INTEGER),
(i16, DataType::SHORT),
(char, DataType::BYTE),
(f32, DataType::FLOAT),
(f64, DataType::DOUBLE),
(bool, DataType::BOOLEAN)
(String, DataType::STRING, false),
(i64, DataType::LONG, false),
(i32, DataType::INTEGER, false),
(i16, DataType::SHORT, false),
(char, DataType::BYTE, false),
(f32, DataType::FLOAT, false),
(f64, DataType::DOUBLE, false),
(bool, DataType::BOOLEAN, false)
);

// ToDataType impl for non-nullable array types
impl<T: ToDataType> ToDataType for Vec<T> {
fn to_data_type() -> DataType {
ArrayType::new(T::to_data_type(), false).into()
ArrayType::new(T::to_data_type(), T::nullable()).into()
}

fn nullable() -> bool {
T::nullable()
}
}

impl<T: ToDataType> ToDataType for HashSet<T> {
fn to_data_type() -> DataType {
ArrayType::new(T::to_data_type(), false).into()
ArrayType::new(T::to_data_type(), T::nullable()).into()
}

fn nullable() -> bool {
T::nullable()
}
}

// ToDataType impl for non-nullable map types
impl<K: ToDataType, V: ToDataType> ToDataType for HashMap<K, V> {
fn to_data_type() -> DataType {
MapType::new(K::to_data_type(), V::to_data_type(), false).into()
MapType::new(K::to_data_type(), V::to_data_type(), V::nullable()).into()
}

fn nullable() -> bool {
V::nullable()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right? The nullability of a list (or map) should be independent of whether the list elements (mapped values) are nullable? ie Option<Vec<T>> and Vec<Option<T>> (or Option<HashMap> and HashMap<K, Option<V>>) should be orthogonal?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing this based on Nick's feedback

}
}

impl<T: ToDataType> ToDataType for Option<T> {
fn to_data_type() -> DataType {
T::to_data_type()
}

fn nullable() -> bool {
true
}
}

Expand All @@ -58,13 +85,13 @@ pub(crate) trait GetStructField {
// Normal types produce non-nullable fields
impl<T: ToDataType> GetStructField for T {
fn get_struct_field(name: impl Into<String>) -> StructField {
StructField::new(name, T::to_data_type(), false)
StructField::new(name, T::to_data_type(), T::nullable())
}
}

// Option types produce nullable fields
impl<T: ToDataType> GetStructField for Option<T> {
fn get_struct_field(name: impl Into<String>) -> StructField {
StructField::new(name, T::to_data_type(), true)
}
}
// impl<T: ToDataType> GetStructField for Option<T> {
// fn get_struct_field(name: impl Into<String>) -> StructField {
// StructField::new(name, T::to_data_type(), T::nullable())
// }
// }
15 changes: 8 additions & 7 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ impl AddVisitor {
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Add> {
let partition_values: HashMap<_, _> = getters[1].get(row_index, "add.partitionValues")?;
let partition_values: HashMap<_, Option<String>> =
getters[1].get(row_index, "add.partitionValues")?;
let size: i64 = getters[2].get(row_index, "add.size")?;
let modification_time: i64 = getters[3].get(row_index, "add.modificationTime")?;
let data_change: bool = getters[4].get(row_index, "add.dataChange")?;
Expand Down Expand Up @@ -426,8 +427,8 @@ mod tests {
let add1 = Add {
path: "c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), "4".to_string()),
("c2".to_string(), "c".to_string()),
("c1".to_string(), Some("4".to_string())),
("c2".to_string(), Some("c".to_string())),
]),
size: 452,
modification_time: 1670892998135,
Expand All @@ -442,8 +443,8 @@ mod tests {
let add2 = Add {
path: "c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), "5".to_string()),
("c2".to_string(), "b".to_string()),
("c1".to_string(), Some("5".to_string())),
("c2".to_string(), Some("b".to_string())),
]),
modification_time: 1670892998136,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}".into()),
Expand All @@ -452,8 +453,8 @@ mod tests {
let add3 = Add {
path: "c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet".into(),
partition_values: HashMap::from([
("c1".to_string(), "6".to_string()),
("c2".to_string(), "a".to_string()),
("c1".to_string(), Some("6".to_string())),
("c2".to_string(), Some("a".to_string())),
]),
modification_time: 1670892998137,
stats: Some("{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}".into()),
Expand Down
17 changes: 15 additions & 2 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ where
fn materialize(&self, row_index: usize) -> Vec<String> {
let mut result = vec![];
for i in 0..EngineList::len(self, row_index) {
result.push(self.get(row_index, i));
result.push(EngineList::get(self, row_index, i));
}
result
}
}

impl EngineMap for MapArray {
fn get<'a>(&'a self, row_index: usize, key: &str) -> Option<&'a str> {
fn get(&self, row_index: usize, key: &str) -> Option<&str> {
let offsets = self.offsets();
let start_offset = offsets[row_index] as usize;
let count = offsets[row_index + 1] as usize - start_offset;
Expand Down Expand Up @@ -150,6 +150,19 @@ impl EngineMap for MapArray {
}
ret
}

fn materialize_opt(&self, row_index: usize) -> HashMap<String, Option<String>> {
let mut ret = HashMap::new();
let map_val = self.value(row_index);
let keys = map_val.column(0).as_string::<i32>();
let values = map_val.column(1).as_string::<i32>();
for (key, value) in keys.iter().zip(values.iter()) {
if let (Some(key), value) = (key, value) {
ret.insert(key.into(), value.map(Into::into));
}
}
ret
}
}

impl ArrowEngineData {
Expand Down
Loading
Loading