Skip to content

Commit

Permalink
add map types
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Lanham committed Sep 12, 2024
1 parent f497907 commit a0eeae5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
66 changes: 65 additions & 1 deletion kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene};
use arrow_arith::numeric::{add, div, mul, sub};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{types::*, MapArray};
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch,
Expand Down Expand Up @@ -440,6 +440,70 @@ fn apply_to_col(
ListArray::try_new(transformed_field, offset_buffer, transformed_values, nulls)?;
Ok(Some(Arc::new(transformed_array)))
}
(DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => {
let ma = col.as_map_opt().ok_or(make_arrow_error(
"Arrow claimed to be a map but isn't a MapArray".to_string(),
))?;
let (map_field, offset_buffer, map_struct_array, nulls, ordered) =
ma.clone().into_parts();
if let ArrowDataType::Struct(_) = arrow_map_type.data_type() {
let (fields, msa_cols, msa_nulls) = map_struct_array.clone().into_parts();
let mut fields = fields.into_iter();
let key_field = fields.next().ok_or(make_arrow_error(
"Arrow map struct didn't have a key field".to_string(),
))?;
let value_field = fields.next().ok_or(make_arrow_error(
"Arrow map struct didn't have a value field".to_string(),
))?;
if fields.next().is_some() {
return Err(Error::generic("map fields had more than 2 members"));
}
let transformed_key = apply_to_col(
key_field.data_type(),
msa_cols[0].as_ref(),
&kernel_map_type.key_type,
)?.unwrap_or(msa_cols[0].clone());
let transformed_values = apply_to_col(
value_field.data_type(),
msa_cols[1].as_ref(),
&kernel_map_type.value_type,
)?.unwrap_or(msa_cols[1].clone());
let transformed_struct_fields = vec![
key_field
.as_ref()
.clone()
.with_data_type(transformed_key.data_type().clone()),
value_field
.as_ref()
.clone()
.with_data_type(transformed_values.data_type().clone()),
];
let transformed_struct_cols = vec![transformed_key, transformed_values];
let transformed_map_struct_array = StructArray::try_new(
transformed_struct_fields.into(),
transformed_struct_cols,
msa_nulls,
)?;
let transformed_map_field = Arc::new(
map_field
.as_ref()
.clone()
.with_data_type(transformed_map_struct_array.data_type().clone()),
);
let transformed_map = MapArray::try_new(
transformed_map_field,
offset_buffer,
transformed_map_struct_array,
nulls,
ordered,
)?;
Ok(Some(Arc::new(transformed_map)))
} else {
return Err(make_arrow_error(
"Arrow map type wasn't a struct.".to_string(),
));
}
}
_ => {
ensure_data_types(kernel_type, arrow_type)?;
Ok(None)
Expand Down
15 changes: 13 additions & 2 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{Expression, Scalar};
use crate::features::ColumnMappingMode;
use crate::scan::state::{DvInfo, Stats};
use crate::schema::{ArrayType, DataType, Schema, SchemaRef, StructField, StructType};
use crate::schema::{ArrayType, DataType, MapType, Schema, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};

Expand Down Expand Up @@ -399,6 +399,17 @@ fn make_data_type_physical(
array_type.contains_null,
))))
}
DataType::Map(map_type) => {
let new_key_type =
make_data_type_physical(&map_type.key_type, column_mapping_mode)?;
let new_value_type =
make_data_type_physical(&map_type.value_type, column_mapping_mode)?;
Ok(DataType::Map(Box::new(MapType::new(
new_key_type,
new_value_type,
map_type.value_contains_null,
))))
}
DataType::Struct(struct_type) => {
// build up the mapped child fields
let children = struct_type
Expand Down Expand Up @@ -447,7 +458,7 @@ fn get_state_info(
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field = make_field_physical(logical_field, column_mapping_mode)?;
println!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let name = physical_field.name.clone();
read_fields.push(physical_field);
Ok(ColumnType::Selected(name))
Expand Down

0 comments on commit a0eeae5

Please sign in to comment.